feat(REQ-2752) - 增加同步纬度数据至 ES 的逻辑
This commit is contained in:
parent
8daa649642
commit
b2a4c6b57b
@ -58,6 +58,8 @@ public interface BpmnConstants {
|
||||
String FLOW_SERVER_VERSION = "serverVersion";
|
||||
String FLOW_SERVER_VERSION_121 = "1.2.1";
|
||||
String FLOW_SERVER_VERSION_130 = "1.3.0";
|
||||
// 1.4.2 开始启用新版本日志
|
||||
String FLOW_SERVER_VERSION_142 = "1.4.2";
|
||||
String CONFIG_NOTICE = "noticeConfig";
|
||||
String CONFIG_APPROVE = "approveConfig";
|
||||
String TEMPLATE_NOTICE_MESSAGE_CONFIG = "noticeMessageConfig";
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
package cn.axzo.workflow.common.enums;
|
||||
|
||||
/**
|
||||
* 时间查询方向
|
||||
* <p>
|
||||
* 注意: 该枚举用在查询 flowable 引擎数据时, 都是包含自身时间点的.
|
||||
* 例如, 使用 Before 时,也就是说在某个时间点之前,是包含"某个时间"自身的.
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-29 09:56
|
||||
*/
|
||||
public enum TimeQueryDirection {
|
||||
BEFORE,
|
||||
AFTER,
|
||||
;
|
||||
}
|
||||
@ -1,11 +1,14 @@
|
||||
package cn.axzo.workflow.common.model.dto.es;
|
||||
|
||||
import cn.axzo.workflow.common.enums.TimeQueryDirection;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 历史流程实例的搜索对象
|
||||
*
|
||||
@ -49,5 +52,39 @@ public class HistoricProcessInstanceSearchForEsDTO {
|
||||
*/
|
||||
private String tenantId;
|
||||
|
||||
/**
|
||||
* 实例是否已结束
|
||||
*/
|
||||
private Boolean finished;
|
||||
|
||||
/**
|
||||
* 开始时间
|
||||
*/
|
||||
private Date startTime;
|
||||
|
||||
/**
|
||||
* 控制查询开始时间的方向,包含自身的时间点
|
||||
* <p>
|
||||
* 默认是查询开始时间之后
|
||||
*/
|
||||
private TimeQueryDirection startTimeDirection = TimeQueryDirection.AFTER;
|
||||
|
||||
/**
|
||||
* 结束时间
|
||||
*/
|
||||
private Date endTime;
|
||||
|
||||
/**
|
||||
* 控制查询结束时间的方向,包含自身的时间点
|
||||
* <p>
|
||||
* 默认是查询结束时间点之前
|
||||
*/
|
||||
private TimeQueryDirection endTimeDirection = TimeQueryDirection.BEFORE;
|
||||
|
||||
/**
|
||||
* 用于覆盖同步逻辑中的PageSize,一般不需要传
|
||||
*/
|
||||
private Integer overPageSize = 10;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package cn.axzo.workflow.core.service;
|
||||
|
||||
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
|
||||
import java.util.List;
|
||||
@ -14,5 +15,9 @@ import java.util.List;
|
||||
*/
|
||||
public interface BpmnProcessInstanceForEsService {
|
||||
|
||||
Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search);
|
||||
|
||||
List<HistoricProcessInstance> queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page);
|
||||
|
||||
BpmnModel queryBpmnModel(String processDefinitionId);
|
||||
}
|
||||
|
||||
@ -0,0 +1,28 @@
|
||||
package cn.axzo.workflow.core.service;
|
||||
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
|
||||
import org.flowable.engine.task.Attachment;
|
||||
import org.flowable.engine.task.Comment;
|
||||
import org.flowable.task.api.history.HistoricTaskInstance;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 专用与对接 ES 的流程任务相关操作
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-29 10:55
|
||||
*/
|
||||
public interface BpmnProcessTaskForEsService {
|
||||
|
||||
List<HistoricTaskInstance> queryHistoricProcessTaskByProcessInstanceId(String processInstanceId);
|
||||
|
||||
List<ExtAxHiTaskInst> queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId);
|
||||
|
||||
List<Comment> queryCommentByProcessInstanceId(String processInstanceId);
|
||||
|
||||
List<Attachment> queryAttachmentByProcessInstanceId(String processInstanceId);
|
||||
|
||||
List<ExtAxProcessLog> queryProcessLogByProcessInstanceId(String processInstanceId);
|
||||
}
|
||||
@ -4,14 +4,20 @@ import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDT
|
||||
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.RepositoryService;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.history.HistoricProcessInstanceQuery;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 专用与对接 ES 的流程实例相关操作
|
||||
@ -24,9 +30,36 @@ import java.util.List;
|
||||
public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceForEsService {
|
||||
@Resource
|
||||
private HistoryService historyService;
|
||||
@Resource
|
||||
private RepositoryService repositoryService;
|
||||
|
||||
@Override
|
||||
public Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search) {
|
||||
if(Objects.isNull(search)){
|
||||
return 0L;
|
||||
}
|
||||
return buildHistoricProcessInstanceQuery(search).count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HistoricProcessInstance> queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page) {
|
||||
if(Objects.isNull(search)){
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return buildHistoricProcessInstanceQuery(search)
|
||||
.includeProcessVariables()
|
||||
.orderByProcessInstanceId()
|
||||
.asc()
|
||||
.listPage(((Long) (page.getPages() * page.getSize())).intValue(),
|
||||
((Long) ((page.getPages() + 1) * page.getSize())).intValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public BpmnModel queryBpmnModel(String processDefinitionId) {
|
||||
return repositoryService.getBpmnModel(processDefinitionId);
|
||||
}
|
||||
|
||||
private HistoricProcessInstanceQuery buildHistoricProcessInstanceQuery(HistoricProcessInstanceSearchForEsDTO search) {
|
||||
HistoricProcessInstanceQuery historicProcessInstanceQuery = historyService.createHistoricProcessInstanceQuery();
|
||||
if (StringUtils.hasText(search.getProcessInstanceId())) {
|
||||
historicProcessInstanceQuery.processInstanceId(search.getProcessInstanceId());
|
||||
@ -46,11 +79,31 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
|
||||
if (StringUtils.hasText(search.getTenantId())) {
|
||||
historicProcessInstanceQuery.processInstanceTenantId(search.getTenantId());
|
||||
}
|
||||
return historicProcessInstanceQuery
|
||||
.includeProcessVariables()
|
||||
.orderByProcessInstanceId()
|
||||
.asc()
|
||||
.listPage(((Long) (page.getPages() * page.getSize())).intValue(),
|
||||
((Long) ((page.getPages() + 1) * page.getSize())).intValue());
|
||||
if (Objects.equals(Boolean.TRUE, search.getFinished())) {
|
||||
historicProcessInstanceQuery.finished();
|
||||
}
|
||||
if (Objects.nonNull(search.getStartTime())) {
|
||||
// 引擎默认仅支撑两种
|
||||
switch (search.getStartTimeDirection()) {
|
||||
case BEFORE:
|
||||
historicProcessInstanceQuery.startedBefore(search.getStartTime());
|
||||
break;
|
||||
default:
|
||||
historicProcessInstanceQuery.startedAfter(search.getStartTime());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (Objects.nonNull(search.getEndTime())) {
|
||||
// 引擎默认仅支撑两种
|
||||
switch (search.getEndTimeDirection()) {
|
||||
case AFTER:
|
||||
historicProcessInstanceQuery.finishedAfter(search.getEndTime());
|
||||
break;
|
||||
default:
|
||||
historicProcessInstanceQuery.finishedBefore(search.getEndTime());
|
||||
break;
|
||||
}
|
||||
}
|
||||
return historicProcessInstanceQuery;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1061,6 +1061,7 @@ public class BpmnProcessInstanceServiceImpl implements BpmnProcessInstanceServic
|
||||
.orElse(variables.getOrDefault(OLD_INTERNAL_INITIATOR, null))))
|
||||
.tenantId(historicProcessInstance.getTenantId())
|
||||
.agented((Boolean) Optional.ofNullable(variables.get(INTERNAL_PROCESS_AGENT)).orElse(false))
|
||||
// 任务
|
||||
.taskDetails(genericTaskLogVos(historicProcessInstance.getId(), logs, forecasting, dto))
|
||||
.defaultButtonConf(getButtonConfig(bpmnModel.getMainProcess()).orElse(new BpmnButtonConf()))
|
||||
.supportBatchOperation(getProcessApproveConf(bpmnModel.getMainProcess()).orElse(new BpmnApproveConf()).getSupportBatchOperation())
|
||||
|
||||
@ -0,0 +1,85 @@
|
||||
package cn.axzo.workflow.core.service.impl;
|
||||
|
||||
import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
|
||||
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
|
||||
import cn.axzo.workflow.core.service.ExtAxProcessLogService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.TaskService;
|
||||
import org.flowable.engine.task.Attachment;
|
||||
import org.flowable.engine.task.Comment;
|
||||
import org.flowable.task.api.history.HistoricTaskInstance;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 专用与对接 ES 的流程任务相关操作
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024-09-29 10:55
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class BpmnProcessTaskForEsServiceImpl implements BpmnProcessTaskForEsService {
|
||||
@Resource
|
||||
private HistoryService historyService;
|
||||
@Resource
|
||||
private TaskService taskService;
|
||||
@Resource
|
||||
private ExtAxHiTaskInstService extAxHiTaskInstService;
|
||||
@Resource
|
||||
private ExtAxProcessLogService extAxProcessLogService;
|
||||
|
||||
@Override
|
||||
public List<HistoricTaskInstance> queryHistoricProcessTaskByProcessInstanceId(String processInstanceId) {
|
||||
// List<BpmnHistoricTaskInstanceVO> historicTaskListByProcessInstanceId = bpmnProcessTaskService.getHistoricTaskListByProcessInstanceId(processInstanceId, null);
|
||||
if (!StringUtils.hasText(processInstanceId)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return historyService.createHistoricTaskInstanceQuery()
|
||||
.processInstanceId(processInstanceId)
|
||||
.list();
|
||||
}
|
||||
|
||||
public List<ExtAxHiTaskInst> queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId) {
|
||||
if (!StringUtils.hasText(processInstanceId)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
ExtHiTaskSearchDTO searchDTO = new ExtHiTaskSearchDTO();
|
||||
searchDTO.setProcessInstanceId(processInstanceId);
|
||||
return extAxHiTaskInstService.queryList(searchDTO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Comment> queryCommentByProcessInstanceId(String processInstanceId) {
|
||||
if (!StringUtils.hasText(processInstanceId)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return taskService.getProcessInstanceComments(processInstanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attachment> queryAttachmentByProcessInstanceId(String processInstanceId) {
|
||||
if (!StringUtils.hasText(processInstanceId)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return taskService.getProcessInstanceAttachments(processInstanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ExtAxProcessLog> queryProcessLogByProcessInstanceId(String processInstanceId) {
|
||||
if(!StringUtils.hasText(processInstanceId)){
|
||||
return Collections.emptyList();
|
||||
}
|
||||
ExtAxProcessLog queryLog = new ExtAxProcessLog();
|
||||
queryLog.setProcessInstanceId(processInstanceId);
|
||||
return extAxProcessLogService.genericQuery(queryLog);
|
||||
}
|
||||
}
|
||||
@ -14,7 +14,7 @@ import org.springframework.context.annotation.Configuration;
|
||||
* @author wangli
|
||||
* @since 2024/4/28 14:02
|
||||
*/
|
||||
@OnlyPodsEnvironment
|
||||
//@OnlyPodsEnvironment
|
||||
@Configuration
|
||||
public class XxlJobConfiguration {
|
||||
Logger logger = LoggerFactory.getLogger(XxlJobConfiguration.class);
|
||||
|
||||
@ -2,11 +2,17 @@ package cn.axzo.workflow.server.xxljob;
|
||||
|
||||
import cn.axzo.workflow.common.enums.WorkspaceType;
|
||||
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
|
||||
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
|
||||
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
|
||||
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
|
||||
import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
|
||||
import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter;
|
||||
import cn.axzo.workflow.es.model.EsProcessInstance;
|
||||
import cn.axzo.workflow.es.model.EsProcessTask;
|
||||
import cn.axzo.workflow.es.service.EsProcessInstanceService;
|
||||
import cn.axzo.workflow.es.service.EsProcessTaskService;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
@ -15,18 +21,43 @@ import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import com.xxl.job.core.log.XxlJobLogger;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.FlowElement;
|
||||
import org.flowable.engine.history.HistoricProcessInstance;
|
||||
import org.flowable.engine.impl.util.ProcessDefinitionUtil;
|
||||
import org.flowable.engine.impl.persistence.entity.CommentEntityImpl;
|
||||
import org.flowable.engine.task.Attachment;
|
||||
import org.flowable.engine.task.Comment;
|
||||
import org.flowable.task.api.history.HistoricTaskInstance;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT;
|
||||
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
|
||||
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END;
|
||||
import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED;
|
||||
|
||||
/**
|
||||
* 流程实例数据同步
|
||||
@ -39,15 +70,45 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VER
|
||||
@Slf4j
|
||||
public class ProcessInstanceSyncJobHandler extends IJobHandler {
|
||||
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
|
||||
private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService;
|
||||
private final BpmnHistoricAttachmentConverter attachmentConverter;
|
||||
private final EsProcessInstanceService esProcessInstanceService;
|
||||
private final EsProcessTaskService esProcessTaskService;
|
||||
|
||||
@XxlJob("processInstanceToEs")
|
||||
public ReturnT<String> execute(String s) {
|
||||
log.debug("start exec process instance data sync... ");
|
||||
XxlJobLogger.log("start exec process instance data sync... ");
|
||||
|
||||
HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO();
|
||||
IPage page = new Page(0, 10);
|
||||
search.setFinished(true);
|
||||
search.setEndTime(new Date());
|
||||
try {
|
||||
if (StringUtils.hasText(s)) {
|
||||
search = JSON.parseObject(s, search.getClass());
|
||||
}
|
||||
log.info("入参为空, 将以默认条件执行");
|
||||
XxlJobLogger.log("入参为空, 将以默认条件执行");
|
||||
} catch (Exception e) {
|
||||
log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
|
||||
XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
|
||||
return ReturnT.FAIL;
|
||||
}
|
||||
|
||||
Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
|
||||
log.info("查询到待同步数据量: {} 条", totalCount);
|
||||
XxlJobLogger.log("查询到待同步数据量: {} 条", totalCount);
|
||||
Integer overPageSize = search.getOverPageSize();
|
||||
HistoricProcessInstanceSearchForEsDTO finalSearch = search;
|
||||
IntStream.iterate(0, i -> i + overPageSize).limit((totalCount + overPageSize - 1) / overPageSize)
|
||||
.forEach(pageNumber -> {
|
||||
log.info("处理进度: {} %, current pageNo: {}, pageSize: {}", pageNumber == 0 ? 0 : (double) (totalCount / (pageNumber * overPageSize)) * 100,
|
||||
totalCount, overPageSize);
|
||||
executeSync(finalSearch, new Page<>(pageNumber, overPageSize));
|
||||
});
|
||||
return ReturnT.SUCCESS;
|
||||
}
|
||||
|
||||
private void executeSync(HistoricProcessInstanceSearchForEsDTO search, IPage page) {
|
||||
List<HistoricProcessInstance> historicProcessInstances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, page);
|
||||
historicProcessInstances.forEach(processInstance -> {
|
||||
EsProcessInstance esProcessInstance = new EsProcessInstance();
|
||||
@ -68,20 +129,135 @@ public class ProcessInstanceSyncJobHandler extends IJobHandler {
|
||||
esProcessInstance.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc());
|
||||
esProcessInstance.setAgentTenantId(String.valueOf(variables.getOrDefault(INTERNAL_PROCESS_AGENT, "")));
|
||||
|
||||
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(processInstance.getProcessDefinitionId());
|
||||
BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(processInstance.getProcessDefinitionId());
|
||||
BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> {
|
||||
esProcessInstance.setSupportBatch(cfg.getSupportBatchOperation());
|
||||
esProcessInstance.setUserAgreeSignature(cfg.getUserAgreeSignature());
|
||||
});
|
||||
String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, ""));
|
||||
|
||||
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
|
||||
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
|
||||
if (supportVersion.compareTo(version) >= 0) {
|
||||
log.info("通过新日志表进行任务纬度的数据同步");
|
||||
// 用新日志表数据任务同步
|
||||
executeTaskSyncForNew(processInstance);
|
||||
} else {
|
||||
log.info("通过引擎的任务表进行任务纬度的数据同步");
|
||||
// 用引擎的任务表处理任务同步
|
||||
executeTaskSyncForOld(processInstance, instanceVersion, bpmnModel);
|
||||
}
|
||||
|
||||
Integer inserted = esProcessInstanceService.insert(esProcessInstance);
|
||||
|
||||
if (inserted > 0) {
|
||||
log.debug("write processInstance to es success! id: {}", processInstance.getId());
|
||||
XxlJobLogger.log("write processInstance to es success! id: {}", processInstance.getId());
|
||||
} else {
|
||||
log.warn("write to es caught exception! id: {}", processInstance.getId());
|
||||
XxlJobLogger.log("write to es caught exception! id: {}", processInstance.getId());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return ReturnT.SUCCESS;
|
||||
private void executeTaskSyncForNew(HistoricProcessInstance processInstance) {
|
||||
bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(processInstance.getId());
|
||||
}
|
||||
|
||||
|
||||
private void executeTaskSyncForOld(HistoricProcessInstance processInstance, String instanceVersion, BpmnModel bpmnModel) {
|
||||
List<HistoricTaskInstance> tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(processInstance.getId());
|
||||
|
||||
Map<String, ExtAxHiTaskInst> extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(processInstance.getId())
|
||||
.stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s));
|
||||
|
||||
Map<String, List<Comment>> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(processInstance.getId())
|
||||
.stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
Map<String, List<Attachment>> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(processInstance.getId())
|
||||
.stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
|
||||
|
||||
Map<String, Object> variables = processInstance.getProcessVariables();
|
||||
|
||||
// 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理
|
||||
filterEffectiveTasks(tasks, instanceVersion);
|
||||
|
||||
tasks.forEach(task -> {
|
||||
log.debug("task: {}", task.getId());
|
||||
ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst());
|
||||
List<Comment> comments = commentMap.getOrDefault(task.getId(), Collections.emptyList());
|
||||
List<Attachment> attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList());
|
||||
|
||||
EsProcessTask esProcessTask = new EsProcessTask();
|
||||
esProcessTask.setId(task.getId());
|
||||
esProcessTask.setProcessInstanceId(task.getProcessInstanceId());
|
||||
esProcessTask.setProcessDefinitionId(task.getProcessDefinitionId());
|
||||
esProcessTask.setTaskDefinitionKey(task.getTaskDefinitionKey());
|
||||
esProcessTask.setName(task.getName());
|
||||
esProcessTask.setStatus(extTask.getStatus());
|
||||
// 处理 advice
|
||||
esProcessTask.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE))
|
||||
.findFirst().orElse(new CommentEntityImpl()).getFullMessage());
|
||||
// 处理 operationDesc
|
||||
esProcessTask.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType()))
|
||||
.max(Comparator.comparing(Comment::getTime))
|
||||
.orElse(new CommentEntityImpl()).getFullMessage());
|
||||
esProcessTask.setStartTime(task.getCreateTime());
|
||||
esProcessTask.setEndTime(task.getEndTime());
|
||||
esProcessTask.setDuration(task.getDurationInMillis());
|
||||
// esProcessTask.setLastUpdateTime(task);
|
||||
esProcessTask.setTenantId(task.getTenantId());
|
||||
esProcessTask.setAssigner(BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null)));
|
||||
esProcessTask.setAttachments(attachmentConverter.toVos(attachments));
|
||||
if (Objects.nonNull(bpmnModel)) {
|
||||
FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey());
|
||||
BpmnMetaParserHelper.getApprovalMethod(flowElement)
|
||||
.ifPresent(e -> esProcessTask.setApprovalMethod(e.getDesc()));
|
||||
BpmnMetaParserHelper.getNodeType(flowElement)
|
||||
.ifPresent(e -> esProcessTask.setNodeType(e.getDesc()));
|
||||
// esProcessTask.setNodeMode();
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private List<HistoricTaskInstance> filterEffectiveTasks(List<HistoricTaskInstance> tasks, String instanceVersion) {
|
||||
List<HistoricTaskInstance> effectiveTasks = new ArrayList<>();
|
||||
if (CollectionUtils.isEmpty(tasks)) {
|
||||
return effectiveTasks;
|
||||
}
|
||||
Stream<HistoricTaskInstance> taskInstanceStream = tasks.stream()
|
||||
.filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID))
|
||||
.filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID))
|
||||
.filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus()));
|
||||
if (Objects.isNull(instanceVersion)) {
|
||||
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
|
||||
} else {
|
||||
if (StringUtils.hasText(instanceVersion)) {
|
||||
DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
|
||||
DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130);
|
||||
if (version.compareTo(supportVersion) < 0) {
|
||||
compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
|
||||
} else {
|
||||
taskInstanceStream.forEach(effectiveTasks::add);
|
||||
}
|
||||
} else {
|
||||
taskInstanceStream.forEach(effectiveTasks::add);
|
||||
}
|
||||
}
|
||||
return effectiveTasks;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 兼容 1.2.1 版本的审批日志
|
||||
*
|
||||
* @param stream
|
||||
* @return
|
||||
*/
|
||||
private Stream<HistoricTaskInstance> compatibleVersion(Stream<HistoricTaskInstance> stream) {
|
||||
return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
|
||||
|| (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
|
||||
).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason())));
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user