diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java
index 49661639a..0697ef5ba 100644
--- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java
+++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java
@@ -58,6 +58,8 @@ public interface BpmnConstants {
String FLOW_SERVER_VERSION = "serverVersion";
String FLOW_SERVER_VERSION_121 = "1.2.1";
String FLOW_SERVER_VERSION_130 = "1.3.0";
+ // 1.4.2 开始启用新版本日志
+ String FLOW_SERVER_VERSION_142 = "1.4.2";
String CONFIG_NOTICE = "noticeConfig";
String CONFIG_APPROVE = "approveConfig";
String TEMPLATE_NOTICE_MESSAGE_CONFIG = "noticeMessageConfig";
diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java
new file mode 100644
index 000000000..114403b07
--- /dev/null
+++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java
@@ -0,0 +1,16 @@
+package cn.axzo.workflow.common.enums;
+
+/**
+ * 时间查询方向
+ *
+ * 注意: 该枚举用在查询 flowable 引擎数据时, 都是包含自身时间点的.
+ * 例如, 使用 Before 时,也就是说在某个时间点之前,是包含"某个时间"自身的.
+ *
+ * @author wangli
+ * @since 2024-09-29 09:56
+ */
+public enum TimeQueryDirection {
+ BEFORE,
+ AFTER,
+ ;
+}
diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java
index 36c8283ae..fea89403d 100644
--- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java
+++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java
@@ -1,11 +1,14 @@
package cn.axzo.workflow.common.model.dto.es;
+import cn.axzo.workflow.common.enums.TimeQueryDirection;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
+import java.util.Date;
+
/**
* 历史流程实例的搜索对象
*
@@ -49,5 +52,39 @@ public class HistoricProcessInstanceSearchForEsDTO {
*/
private String tenantId;
+ /**
+ * 实例是否已结束
+ */
+ private Boolean finished;
+
+ /**
+ * 开始时间
+ */
+ private Date startTime;
+
+ /**
+ * 控制查询开始时间的方向,包含自身的时间点
+ *
+ * 默认是查询开始时间之后
+ */
+ private TimeQueryDirection startTimeDirection = TimeQueryDirection.AFTER;
+
+ /**
+ * 结束时间
+ */
+ private Date endTime;
+
+ /**
+ * 控制查询结束时间的方向,包含自身的时间点
+ *
+ * 默认是查询结束时间点之前
+ */
+ private TimeQueryDirection endTimeDirection = TimeQueryDirection.BEFORE;
+
+ /**
+ * 用于覆盖同步逻辑中的PageSize,一般不需要传
+ */
+ private Integer overPageSize = 10;
+
}
diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java
index ecd2206f6..8386d297a 100644
--- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java
+++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java
@@ -2,6 +2,7 @@ package cn.axzo.workflow.core.service;
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
import com.baomidou.mybatisplus.core.metadata.IPage;
+import org.flowable.bpmn.model.BpmnModel;
import org.flowable.engine.history.HistoricProcessInstance;
import java.util.List;
@@ -14,5 +15,9 @@ import java.util.List;
*/
public interface BpmnProcessInstanceForEsService {
+ Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search);
+
List queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page);
+
+ BpmnModel queryBpmnModel(String processDefinitionId);
}
diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java
new file mode 100644
index 000000000..de8ad282c
--- /dev/null
+++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java
@@ -0,0 +1,28 @@
+package cn.axzo.workflow.core.service;
+
+import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
+import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
+import org.flowable.engine.task.Attachment;
+import org.flowable.engine.task.Comment;
+import org.flowable.task.api.history.HistoricTaskInstance;
+
+import java.util.List;
+
+/**
+ * 专用与对接 ES 的流程任务相关操作
+ *
+ * @author wangli
+ * @since 2024-09-29 10:55
+ */
+public interface BpmnProcessTaskForEsService {
+
+ List queryHistoricProcessTaskByProcessInstanceId(String processInstanceId);
+
+ List queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId);
+
+ List queryCommentByProcessInstanceId(String processInstanceId);
+
+ List queryAttachmentByProcessInstanceId(String processInstanceId);
+
+ List queryProcessLogByProcessInstanceId(String processInstanceId);
+}
diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java
index a1f536fdc..9d1030db2 100644
--- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java
+++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java
@@ -4,14 +4,20 @@ import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDT
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.extern.slf4j.Slf4j;
+import org.flowable.bpmn.model.BpmnModel;
import org.flowable.engine.HistoryService;
+import org.flowable.engine.RepositoryService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.history.HistoricProcessInstanceQuery;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
/**
* 专用与对接 ES 的流程实例相关操作
@@ -24,9 +30,36 @@ import java.util.List;
public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceForEsService {
@Resource
private HistoryService historyService;
+ @Resource
+ private RepositoryService repositoryService;
+
+ @Override
+ public Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search) {
+ if(Objects.isNull(search)){
+ return 0L;
+ }
+ return buildHistoricProcessInstanceQuery(search).count();
+ }
@Override
public List queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page) {
+ if(Objects.isNull(search)){
+ return Collections.emptyList();
+ }
+ return buildHistoricProcessInstanceQuery(search)
+ .includeProcessVariables()
+ .orderByProcessInstanceId()
+ .asc()
+ .listPage(((Long) (page.getPages() * page.getSize())).intValue(),
+ ((Long) ((page.getPages() + 1) * page.getSize())).intValue());
+ }
+
+ @Override
+ public BpmnModel queryBpmnModel(String processDefinitionId) {
+ return repositoryService.getBpmnModel(processDefinitionId);
+ }
+
+ private HistoricProcessInstanceQuery buildHistoricProcessInstanceQuery(HistoricProcessInstanceSearchForEsDTO search) {
HistoricProcessInstanceQuery historicProcessInstanceQuery = historyService.createHistoricProcessInstanceQuery();
if (StringUtils.hasText(search.getProcessInstanceId())) {
historicProcessInstanceQuery.processInstanceId(search.getProcessInstanceId());
@@ -46,11 +79,31 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF
if (StringUtils.hasText(search.getTenantId())) {
historicProcessInstanceQuery.processInstanceTenantId(search.getTenantId());
}
- return historicProcessInstanceQuery
- .includeProcessVariables()
- .orderByProcessInstanceId()
- .asc()
- .listPage(((Long) (page.getPages() * page.getSize())).intValue(),
- ((Long) ((page.getPages() + 1) * page.getSize())).intValue());
+ if (Objects.equals(Boolean.TRUE, search.getFinished())) {
+ historicProcessInstanceQuery.finished();
+ }
+ if (Objects.nonNull(search.getStartTime())) {
+ // 引擎默认仅支撑两种
+ switch (search.getStartTimeDirection()) {
+ case BEFORE:
+ historicProcessInstanceQuery.startedBefore(search.getStartTime());
+ break;
+ default:
+ historicProcessInstanceQuery.startedAfter(search.getStartTime());
+ break;
+ }
+ }
+ if (Objects.nonNull(search.getEndTime())) {
+ // 引擎默认仅支撑两种
+ switch (search.getEndTimeDirection()) {
+ case AFTER:
+ historicProcessInstanceQuery.finishedAfter(search.getEndTime());
+ break;
+ default:
+ historicProcessInstanceQuery.finishedBefore(search.getEndTime());
+ break;
+ }
+ }
+ return historicProcessInstanceQuery;
}
}
diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java
index b099d99ef..5dabaf217 100644
--- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java
+++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java
@@ -1061,6 +1061,7 @@ public class BpmnProcessInstanceServiceImpl implements BpmnProcessInstanceServic
.orElse(variables.getOrDefault(OLD_INTERNAL_INITIATOR, null))))
.tenantId(historicProcessInstance.getTenantId())
.agented((Boolean) Optional.ofNullable(variables.get(INTERNAL_PROCESS_AGENT)).orElse(false))
+ // 任务
.taskDetails(genericTaskLogVos(historicProcessInstance.getId(), logs, forecasting, dto))
.defaultButtonConf(getButtonConfig(bpmnModel.getMainProcess()).orElse(new BpmnButtonConf()))
.supportBatchOperation(getProcessApproveConf(bpmnModel.getMainProcess()).orElse(new BpmnApproveConf()).getSupportBatchOperation())
diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java
new file mode 100644
index 000000000..9e6b4cba1
--- /dev/null
+++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java
@@ -0,0 +1,85 @@
+package cn.axzo.workflow.core.service.impl;
+
+import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO;
+import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
+import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog;
+import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
+import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
+import cn.axzo.workflow.core.service.ExtAxProcessLogService;
+import lombok.extern.slf4j.Slf4j;
+import org.flowable.engine.HistoryService;
+import org.flowable.engine.TaskService;
+import org.flowable.engine.task.Attachment;
+import org.flowable.engine.task.Comment;
+import org.flowable.task.api.history.HistoricTaskInstance;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * 专用与对接 ES 的流程任务相关操作
+ *
+ * @author wangli
+ * @since 2024-09-29 10:55
+ */
+@Service
+@Slf4j
+public class BpmnProcessTaskForEsServiceImpl implements BpmnProcessTaskForEsService {
+ @Resource
+ private HistoryService historyService;
+ @Resource
+ private TaskService taskService;
+ @Resource
+ private ExtAxHiTaskInstService extAxHiTaskInstService;
+ @Resource
+ private ExtAxProcessLogService extAxProcessLogService;
+
+ @Override
+ public List queryHistoricProcessTaskByProcessInstanceId(String processInstanceId) {
+// List historicTaskListByProcessInstanceId = bpmnProcessTaskService.getHistoricTaskListByProcessInstanceId(processInstanceId, null);
+ if (!StringUtils.hasText(processInstanceId)) {
+ return Collections.emptyList();
+ }
+ return historyService.createHistoricTaskInstanceQuery()
+ .processInstanceId(processInstanceId)
+ .list();
+ }
+
+ public List queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId) {
+ if (!StringUtils.hasText(processInstanceId)) {
+ return Collections.emptyList();
+ }
+ ExtHiTaskSearchDTO searchDTO = new ExtHiTaskSearchDTO();
+ searchDTO.setProcessInstanceId(processInstanceId);
+ return extAxHiTaskInstService.queryList(searchDTO);
+ }
+
+ @Override
+ public List queryCommentByProcessInstanceId(String processInstanceId) {
+ if (!StringUtils.hasText(processInstanceId)) {
+ return Collections.emptyList();
+ }
+ return taskService.getProcessInstanceComments(processInstanceId);
+ }
+
+ @Override
+ public List queryAttachmentByProcessInstanceId(String processInstanceId) {
+ if (!StringUtils.hasText(processInstanceId)) {
+ return Collections.emptyList();
+ }
+ return taskService.getProcessInstanceAttachments(processInstanceId);
+ }
+
+ @Override
+ public List queryProcessLogByProcessInstanceId(String processInstanceId) {
+ if(!StringUtils.hasText(processInstanceId)){
+ return Collections.emptyList();
+ }
+ ExtAxProcessLog queryLog = new ExtAxProcessLog();
+ queryLog.setProcessInstanceId(processInstanceId);
+ return extAxProcessLogService.genericQuery(queryLog);
+ }
+}
diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java
index 42c4500b3..a4b786ce3 100644
--- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java
+++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java
@@ -14,7 +14,7 @@ import org.springframework.context.annotation.Configuration;
* @author wangli
* @since 2024/4/28 14:02
*/
-@OnlyPodsEnvironment
+//@OnlyPodsEnvironment
@Configuration
public class XxlJobConfiguration {
Logger logger = LoggerFactory.getLogger(XxlJobConfiguration.class);
diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java
index f3f43a44b..e58cfe60e 100644
--- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java
+++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java
@@ -2,11 +2,17 @@ package cn.axzo.workflow.server.xxljob;
import cn.axzo.workflow.common.enums.WorkspaceType;
import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO;
+import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
+import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService;
+import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService;
+import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter;
import cn.axzo.workflow.es.model.EsProcessInstance;
+import cn.axzo.workflow.es.model.EsProcessTask;
import cn.axzo.workflow.es.service.EsProcessInstanceService;
import cn.axzo.workflow.es.service.EsProcessTaskService;
+import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.xxl.job.core.biz.model.ReturnT;
@@ -15,18 +21,43 @@ import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.flowable.bpmn.model.BpmnModel;
+import org.flowable.bpmn.model.FlowElement;
import org.flowable.engine.history.HistoricProcessInstance;
-import org.flowable.engine.impl.util.ProcessDefinitionUtil;
+import org.flowable.engine.impl.persistence.entity.CommentEntityImpl;
+import org.flowable.engine.task.Attachment;
+import org.flowable.engine.task.Comment;
+import org.flowable.task.api.history.HistoricTaskInstance;
import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE;
+import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC;
import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121;
+import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130;
+import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142;
+import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE;
+import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO;
+import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT;
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
+import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END;
+import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED;
/**
* 流程实例数据同步
@@ -39,15 +70,45 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VER
@Slf4j
public class ProcessInstanceSyncJobHandler extends IJobHandler {
private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService;
+ private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService;
+ private final BpmnHistoricAttachmentConverter attachmentConverter;
private final EsProcessInstanceService esProcessInstanceService;
private final EsProcessTaskService esProcessTaskService;
@XxlJob("processInstanceToEs")
public ReturnT execute(String s) {
+ log.debug("start exec process instance data sync... ");
XxlJobLogger.log("start exec process instance data sync... ");
-
HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO();
- IPage page = new Page(0, 10);
+ search.setFinished(true);
+ search.setEndTime(new Date());
+ try {
+ if (StringUtils.hasText(s)) {
+ search = JSON.parseObject(s, search.getClass());
+ }
+ log.info("入参为空, 将以默认条件执行");
+ XxlJobLogger.log("入参为空, 将以默认条件执行");
+ } catch (Exception e) {
+ log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
+ XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型");
+ return ReturnT.FAIL;
+ }
+
+ Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search);
+ log.info("查询到待同步数据量: {} 条", totalCount);
+ XxlJobLogger.log("查询到待同步数据量: {} 条", totalCount);
+ Integer overPageSize = search.getOverPageSize();
+ HistoricProcessInstanceSearchForEsDTO finalSearch = search;
+ IntStream.iterate(0, i -> i + overPageSize).limit((totalCount + overPageSize - 1) / overPageSize)
+ .forEach(pageNumber -> {
+ log.info("处理进度: {} %, current pageNo: {}, pageSize: {}", pageNumber == 0 ? 0 : (double) (totalCount / (pageNumber * overPageSize)) * 100,
+ totalCount, overPageSize);
+ executeSync(finalSearch, new Page<>(pageNumber, overPageSize));
+ });
+ return ReturnT.SUCCESS;
+ }
+
+ private void executeSync(HistoricProcessInstanceSearchForEsDTO search, IPage page) {
List historicProcessInstances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, page);
historicProcessInstances.forEach(processInstance -> {
EsProcessInstance esProcessInstance = new EsProcessInstance();
@@ -68,20 +129,135 @@ public class ProcessInstanceSyncJobHandler extends IJobHandler {
esProcessInstance.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc());
esProcessInstance.setAgentTenantId(String.valueOf(variables.getOrDefault(INTERNAL_PROCESS_AGENT, "")));
- BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(processInstance.getProcessDefinitionId());
+ BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(processInstance.getProcessDefinitionId());
BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> {
esProcessInstance.setSupportBatch(cfg.getSupportBatchOperation());
esProcessInstance.setUserAgreeSignature(cfg.getUserAgreeSignature());
});
+ String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, ""));
+
+ DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
+ DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142);
+ if (supportVersion.compareTo(version) >= 0) {
+ log.info("通过新日志表进行任务纬度的数据同步");
+ // 用新日志表数据任务同步
+ executeTaskSyncForNew(processInstance);
+ } else {
+ log.info("通过引擎的任务表进行任务纬度的数据同步");
+ // 用引擎的任务表处理任务同步
+ executeTaskSyncForOld(processInstance, instanceVersion, bpmnModel);
+ }
Integer inserted = esProcessInstanceService.insert(esProcessInstance);
+
if (inserted > 0) {
+ log.debug("write processInstance to es success! id: {}", processInstance.getId());
XxlJobLogger.log("write processInstance to es success! id: {}", processInstance.getId());
} else {
+ log.warn("write to es caught exception! id: {}", processInstance.getId());
XxlJobLogger.log("write to es caught exception! id: {}", processInstance.getId());
}
});
+ }
- return ReturnT.SUCCESS;
+ private void executeTaskSyncForNew(HistoricProcessInstance processInstance) {
+ bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(processInstance.getId());
+ }
+
+
+ private void executeTaskSyncForOld(HistoricProcessInstance processInstance, String instanceVersion, BpmnModel bpmnModel) {
+ List tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(processInstance.getId());
+
+ Map extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(processInstance.getId())
+ .stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s));
+
+ Map> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(processInstance.getId())
+ .stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
+
+ Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(processInstance.getId())
+ .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList())));
+
+ Map variables = processInstance.getProcessVariables();
+
+ // 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理
+ filterEffectiveTasks(tasks, instanceVersion);
+
+ tasks.forEach(task -> {
+ log.debug("task: {}", task.getId());
+ ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst());
+ List comments = commentMap.getOrDefault(task.getId(), Collections.emptyList());
+ List attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList());
+
+ EsProcessTask esProcessTask = new EsProcessTask();
+ esProcessTask.setId(task.getId());
+ esProcessTask.setProcessInstanceId(task.getProcessInstanceId());
+ esProcessTask.setProcessDefinitionId(task.getProcessDefinitionId());
+ esProcessTask.setTaskDefinitionKey(task.getTaskDefinitionKey());
+ esProcessTask.setName(task.getName());
+ esProcessTask.setStatus(extTask.getStatus());
+ // 处理 advice
+ esProcessTask.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE))
+ .findFirst().orElse(new CommentEntityImpl()).getFullMessage());
+ // 处理 operationDesc
+ esProcessTask.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType()))
+ .max(Comparator.comparing(Comment::getTime))
+ .orElse(new CommentEntityImpl()).getFullMessage());
+ esProcessTask.setStartTime(task.getCreateTime());
+ esProcessTask.setEndTime(task.getEndTime());
+ esProcessTask.setDuration(task.getDurationInMillis());
+// esProcessTask.setLastUpdateTime(task);
+ esProcessTask.setTenantId(task.getTenantId());
+ esProcessTask.setAssigner(BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null)));
+ esProcessTask.setAttachments(attachmentConverter.toVos(attachments));
+ if (Objects.nonNull(bpmnModel)) {
+ FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey());
+ BpmnMetaParserHelper.getApprovalMethod(flowElement)
+ .ifPresent(e -> esProcessTask.setApprovalMethod(e.getDesc()));
+ BpmnMetaParserHelper.getNodeType(flowElement)
+ .ifPresent(e -> esProcessTask.setNodeType(e.getDesc()));
+// esProcessTask.setNodeMode();
+ }
+
+ });
+ }
+
+ private List filterEffectiveTasks(List tasks, String instanceVersion) {
+ List effectiveTasks = new ArrayList<>();
+ if (CollectionUtils.isEmpty(tasks)) {
+ return effectiveTasks;
+ }
+ Stream taskInstanceStream = tasks.stream()
+ .filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID))
+ .filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID))
+ .filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus()));
+ if (Objects.isNull(instanceVersion)) {
+ compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
+ } else {
+ if (StringUtils.hasText(instanceVersion)) {
+ DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion);
+ DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130);
+ if (version.compareTo(supportVersion) < 0) {
+ compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add);
+ } else {
+ taskInstanceStream.forEach(effectiveTasks::add);
+ }
+ } else {
+ taskInstanceStream.forEach(effectiveTasks::add);
+ }
+ }
+ return effectiveTasks;
+ }
+
+
+ /**
+ * 兼容 1.2.1 版本的审批日志
+ *
+ * @param stream
+ * @return
+ */
+ private Stream compatibleVersion(Stream stream) {
+ return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
+ || (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason()))
+ ).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason())));
}
}