feat(REQ-2752) - 处理兼容流程任务的审批人可能没人名的情况

This commit is contained in:
wangli 2024-10-15 17:43:36 +08:00
parent 0a5e939fce
commit 4a171e17e7
5 changed files with 109 additions and 11 deletions

View File

@ -27,6 +27,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
@ -55,7 +56,7 @@ public class AggregateProcessInstanceService {
/**
* 同步结束审批的数据至 ES
*/
public List<ProcessTaskDocument> syncProcessInstance(HistoricProcessInstance hpi) {
public List<ProcessTaskDocument> syncProcessInstance(HistoricProcessInstance hpi, Function<List<BpmnTaskDelegateAssigner>, Map<Long, BpmnTaskDelegateAssigner>> function) {
if (Objects.isNull(hpi)) {
return Collections.emptyList();
}
@ -105,7 +106,7 @@ public class AggregateProcessInstanceService {
} else {
log.info("通过引擎的任务表进行任务纬度的数据同步");
// 用引擎的任务表处理任务同步
toEsProcessTaskDocuments.addAll(aggregateProcessTaskService.syncProcessTaskForOld(hpi, instanceVersion, bpmnModel));
toEsProcessTaskDocuments.addAll(aggregateProcessTaskService.syncProcessTaskForOld(hpi, instanceVersion, bpmnModel, function));
}
//更新实例上的最后操作时间

View File

@ -13,6 +13,7 @@ import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.EsProcessTaskService;
import cn.hutool.core.date.DateUtil;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
@ -92,7 +93,7 @@ public class AggregateProcessTaskService {
}
processTaskDocument.setTaskTenantId(log.getTenantId());
BpmnTaskDelegateAssigner assigner = CollectionUtils.isEmpty(log.getAssigneeFull()) ? null : log.getAssigneeFull().get(0);
if(Objects.nonNull(assigner)) {
if (Objects.nonNull(assigner)) {
processTaskDocument.setAssigneeName(assigner.getAssignerName());
processTaskDocument.setAssigneeOuId(assigner.getOuId());
processTaskDocument.setAssigneePersonId(assigner.getPersonId());
@ -116,7 +117,7 @@ public class AggregateProcessTaskService {
*/
public List<ProcessTaskDocument> syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion) {
BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId());
return syncProcessTaskForOld(hpi, instanceVersion, bpmnModel);
return syncProcessTaskForOld(hpi, instanceVersion, bpmnModel, null);
}
/**
@ -125,8 +126,11 @@ public class AggregateProcessTaskService {
* @param hpi
* @param instanceVersion
* @param bpmnModel
* @param function
*/
public List<ProcessTaskDocument> syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion, BpmnModel bpmnModel) {
public List<ProcessTaskDocument> syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion,
BpmnModel bpmnModel,
Function<List<BpmnTaskDelegateAssigner>, Map<Long, BpmnTaskDelegateAssigner>> function) {
List<HistoricTaskInstance> tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(hpi.getId());
@ -143,7 +147,7 @@ public class AggregateProcessTaskService {
// 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理
List<HistoricTaskInstance> filteredEffectiveTasks = filterEffectiveTasks(tasks, instanceVersion);
List<String> taskIds = filteredEffectiveTasks.stream().map(i-> INTERNAL_TASK_RELATION_ASSIGNEE_INFO + i.getId()).distinct().collect(Collectors.toList());
List<String> taskIds = filteredEffectiveTasks.stream().map(i -> INTERNAL_TASK_RELATION_ASSIGNEE_INFO + i.getId()).distinct().collect(Collectors.toList());
Map<String, Object> taskAssigneeMap = bpmnProcessInstanceForEsService.queryInstanceVariables(hpi.getId(), taskIds);
List<ProcessTaskDocument> toEsProcessTaskDocuments = new ArrayList<>();
@ -169,8 +173,12 @@ public class AggregateProcessTaskService {
processTaskDocument.setDuration(task.getDurationInMillis());
processTaskDocument.setTaskTenantId(task.getTenantId());
BpmnTaskDelegateAssigner assigner = BpmnTaskDelegateAssigner.toObjectCompatible(taskAssigneeMap.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null));
if(Objects.nonNull(assigner)) {
processTaskDocument.setAssigneeName(assigner.getAssignerName());
if (Objects.nonNull(assigner)) {
BpmnTaskDelegateAssigner fullAssigner = null;
if (Objects.nonNull(function) && !StringUtils.hasText(assigner.getAssignerName())) {
fullAssigner = function.apply(Lists.newArrayList(assigner)).getOrDefault(Long.parseLong(assigner.getPersonId()), null);
}
processTaskDocument.setAssigneeName(Objects.nonNull(fullAssigner) ? fullAssigner.getAssignerName() : StringUtils.hasText(assigner.getAssignerName()) ? assigner.getAssignerName() : "未知");
processTaskDocument.setAssigneeOuId(assigner.getOuId());
processTaskDocument.setAssigneePersonId(assigner.getPersonId());
processTaskDocument.setAssigneeTenantId(assigner.getTenantId());

View File

@ -42,7 +42,7 @@ public class SyncEsTaskEntityEventHandle implements EntityEventHandle<TaskEntity
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(taskEntity.getProcessInstanceId())
.singleResult();
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance);
aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null);
}
}

View File

@ -1,15 +1,22 @@
package cn.axzo.workflow.server.xxljob;
import cn.axzo.karma.client.feign.FlowSupportApi;
import cn.axzo.karma.client.model.request.PersonProfileQueryReq;
import cn.axzo.karma.client.model.response.PersonProfileResp;
import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO;
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.engine.cmd.CustomInsertPropertyCmd;
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.axzo.workflow.listener.SyncEsTaskEntityEventHandle;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
@ -20,16 +27,24 @@ import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static cn.axzo.workflow.common.constant.BpmnConstants.LATEST_SYNC_TO_ELASTICSEARCH_TIME;
import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssigneeSelector.parseApiResult;
/**
* 审批结束的流程实例数据同步至 ES
@ -47,6 +62,9 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
private final ApplicationContext context;
private final SupportRefreshProperties refreshProperties;
private final FlowSupportApi flowSupportApi;
@XxlJob("completedProcessInstanceSyncToEs")
public ReturnT<String> execute(String s) {
@ -134,11 +152,38 @@ public class CompletedProcessInstanceSyncEsJobHandler extends IJobHandler {
// 如果配置的非分片广播的方式那么 shardingVO 将是 null
return true;
}).forEach(hpi -> {
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi, this::queryPersonByIds);
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size());
});
});
return new DataSyncSummaryDTO(totalProcessInstanceCount, totalProcessTaskCount.get());
}
private Map<Long, BpmnTaskDelegateAssigner> queryPersonByIds(List<BpmnTaskDelegateAssigner> users) {
Map<Long, BpmnTaskDelegateAssigner> result = new HashMap<>();
if (CollectionUtils.isEmpty(users)) {
return result;
}
Map<Long, BpmnTaskDelegateAssigner> map = users.stream()
.filter(i -> StringUtils.hasText(i.getPersonId()) && !Objects.equals("null", i.getPersonId()) && NumberUtil.isNumber(i.getPersonId()))
.collect(Collectors.toMap(i -> NumberUtil.parseLong(i.getPersonId()), Function.identity(), (s, t) -> s));
PersonProfileQueryReq query = new PersonProfileQueryReq();
ArrayList<Long> personIds = Lists.newArrayList(map.keySet());
query.setPersonIds(personIds);
Map<Long, PersonProfileResp> personProfileMap = parseApiResult(() -> flowSupportApi.listPersons(PersonProfileQueryReq.builder().personIds(personIds).build()),
"根据 PersonId 查询自然人档案", "cn.axzo.karma.client.feign.FlowSupportApi.listPersons", refreshProperties, context, personIds)
.stream().collect(Collectors.toMap(PersonProfileResp::getId, Function.identity(), (s, t) -> s));
personProfileMap.forEach((k, v) -> {
result.put(k, BpmnTaskDelegateAssigner.builder()
.personId(String.valueOf(v.getId()))
.assignerName(v.getRealName())
.avatar(v.getAvatarUrl())
.build());
});
return result;
}
}

View File

@ -1,11 +1,18 @@
package cn.axzo.workflow.server.xxljob;
import cn.axzo.karma.client.feign.FlowSupportApi;
import cn.axzo.karma.client.model.request.PersonProfileQueryReq;
import cn.axzo.karma.client.model.response.PersonProfileResp;
import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.conf.SupportRefreshProperties;
import cn.axzo.workflow.core.engine.cmd.CustomGetPropertyCmd;
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import cn.axzo.workflow.es.model.ProcessTaskDocument;
import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService;
import cn.hutool.core.util.NumberUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
@ -16,17 +23,25 @@ import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static cn.axzo.workflow.common.constant.BpmnConstants.LATEST_SYNC_TO_ELASTICSEARCH_TIME;
import static cn.axzo.workflow.server.controller.delegate.AbstractBpmnTaskAssigneeSelector.parseApiResult;
/**
* 在途的和新发起的流程实例数据同步至 ES
@ -41,6 +56,9 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
private final AggregateProcessInstanceService aggregateProcessInstanceService;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
private final ApplicationContext context;
private final SupportRefreshProperties refreshProperties;
private final FlowSupportApi flowSupportApi;
@XxlJob("otherProcessInstanceSyncToEs")
public ReturnT<String> execute(String param) {
@ -114,11 +132,37 @@ public class OtherProcessInstanceSyncEsJobHandler extends IJobHandler {
// 如果配置的非分片广播的方式那么 shardingVO 将是 null
return true;
}).forEach(hpi -> {
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi);
List<ProcessTaskDocument> processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(hpi, this::queryPersonByIds);
totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size());
});
});
return new DataSyncSummaryDTO(totalCount, totalProcessTaskCount.get());
}
private Map<Long, BpmnTaskDelegateAssigner> queryPersonByIds(List<BpmnTaskDelegateAssigner> users) {
Map<Long, BpmnTaskDelegateAssigner> result = new HashMap<>();
if (CollectionUtils.isEmpty(users)) {
return result;
}
Map<Long, BpmnTaskDelegateAssigner> map = users.stream()
.filter(i -> StringUtils.hasText(i.getPersonId()) && !Objects.equals("null", i.getPersonId()) && NumberUtil.isNumber(i.getPersonId()))
.collect(Collectors.toMap(i -> NumberUtil.parseLong(i.getPersonId()), Function.identity(), (s, t) -> s));
PersonProfileQueryReq query = new PersonProfileQueryReq();
ArrayList<Long> personIds = Lists.newArrayList(map.keySet());
query.setPersonIds(personIds);
Map<Long, PersonProfileResp> personProfileMap = parseApiResult(() -> flowSupportApi.listPersons(PersonProfileQueryReq.builder().personIds(personIds).build()),
"根据 PersonId 查询自然人档案", "cn.axzo.karma.client.feign.FlowSupportApi.listPersons", refreshProperties, context, personIds)
.stream().collect(Collectors.toMap(PersonProfileResp::getId, Function.identity(), (s, t) -> s));
personProfileMap.forEach((k, v) -> {
result.put(k, BpmnTaskDelegateAssigner.builder()
.personId(String.valueOf(v.getId()))
.assignerName(v.getRealName())
.avatar(v.getAvatarUrl())
.build());
});
return result;
}
}