From b2a4c6b57bcdb74f409fda4a3c1a24f12ad8e9e7 Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Sun, 29 Sep 2024 16:27:36 +0800 Subject: [PATCH] =?UTF-8?q?feat(REQ-2752)=20-=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E7=BA=AC=E5=BA=A6=E6=95=B0=E6=8D=AE=E8=87=B3?= =?UTF-8?q?=20ES=20=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/BpmnConstants.java | 2 + .../common/enums/TimeQueryDirection.java | 16 ++ ...HistoricProcessInstanceSearchForEsDTO.java | 37 ++++ .../BpmnProcessInstanceForEsService.java | 5 + .../service/BpmnProcessTaskForEsService.java | 28 +++ .../BpmnProcessInstanceForEsServiceImpl.java | 65 +++++- .../impl/BpmnProcessInstanceServiceImpl.java | 1 + .../impl/BpmnProcessTaskForEsServiceImpl.java | 85 ++++++++ .../common/config/XxlJobConfiguration.java | 2 +- .../xxljob/ProcessInstanceSyncJobHandler.java | 186 +++++++++++++++++- 10 files changed, 415 insertions(+), 12 deletions(-) create mode 100644 workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java create mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java create mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java index 49661639a..0697ef5ba 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java @@ -58,6 +58,8 @@ public interface BpmnConstants { String FLOW_SERVER_VERSION = "serverVersion"; String FLOW_SERVER_VERSION_121 = "1.2.1"; String FLOW_SERVER_VERSION_130 = "1.3.0"; + // 1.4.2 开始启用新版本日志 + String FLOW_SERVER_VERSION_142 = "1.4.2"; String CONFIG_NOTICE = "noticeConfig"; String CONFIG_APPROVE = "approveConfig"; String TEMPLATE_NOTICE_MESSAGE_CONFIG = "noticeMessageConfig"; diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java new file mode 100644 index 000000000..114403b07 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java @@ -0,0 +1,16 @@ +package cn.axzo.workflow.common.enums; + +/** + * 时间查询方向 + *

+ * 注意: 该枚举用在查询 flowable 引擎数据时, 都是包含自身时间点的. + * 例如, 使用 Before 时,也就是说在某个时间点之前,是包含"某个时间"自身的. + * + * @author wangli + * @since 2024-09-29 09:56 + */ +public enum TimeQueryDirection { + BEFORE, + AFTER, + ; +} diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java index 36c8283ae..fea89403d 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java @@ -1,11 +1,14 @@ package cn.axzo.workflow.common.model.dto.es; +import cn.axzo.workflow.common.enums.TimeQueryDirection; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import java.util.Date; + /** * 历史流程实例的搜索对象 * @@ -49,5 +52,39 @@ public class HistoricProcessInstanceSearchForEsDTO { */ private String tenantId; + /** + * 实例是否已结束 + */ + private Boolean finished; + + /** + * 开始时间 + */ + private Date startTime; + + /** + * 控制查询开始时间的方向,包含自身的时间点 + *

+ * 默认是查询开始时间之后 + */ + private TimeQueryDirection startTimeDirection = TimeQueryDirection.AFTER; + + /** + * 结束时间 + */ + private Date endTime; + + /** + * 控制查询结束时间的方向,包含自身的时间点 + *

+ * 默认是查询结束时间点之前 + */ + private TimeQueryDirection endTimeDirection = TimeQueryDirection.BEFORE; + + /** + * 用于覆盖同步逻辑中的PageSize,一般不需要传 + */ + private Integer overPageSize = 10; + } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java index ecd2206f6..8386d297a 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java @@ -2,6 +2,7 @@ package cn.axzo.workflow.core.service; import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; import com.baomidou.mybatisplus.core.metadata.IPage; +import org.flowable.bpmn.model.BpmnModel; import org.flowable.engine.history.HistoricProcessInstance; import java.util.List; @@ -14,5 +15,9 @@ import java.util.List; */ public interface BpmnProcessInstanceForEsService { + Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search); + List queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page); + + BpmnModel queryBpmnModel(String processDefinitionId); } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java new file mode 100644 index 000000000..de8ad282c --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java @@ -0,0 +1,28 @@ +package cn.axzo.workflow.core.service; + +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; +import org.flowable.engine.task.Attachment; +import org.flowable.engine.task.Comment; +import org.flowable.task.api.history.HistoricTaskInstance; + +import java.util.List; + +/** + * 专用与对接 ES 的流程任务相关操作 + * + * @author wangli + * @since 2024-09-29 10:55 + */ +public interface BpmnProcessTaskForEsService { + + List queryHistoricProcessTaskByProcessInstanceId(String processInstanceId); + + List queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId); + + List queryCommentByProcessInstanceId(String processInstanceId); + + List queryAttachmentByProcessInstanceId(String processInstanceId); + + List queryProcessLogByProcessInstanceId(String processInstanceId); +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java index a1f536fdc..9d1030db2 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java @@ -4,14 +4,20 @@ import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDT import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; import com.baomidou.mybatisplus.core.metadata.IPage; import lombok.extern.slf4j.Slf4j; +import org.flowable.bpmn.model.BpmnModel; import org.flowable.engine.HistoryService; +import org.flowable.engine.RepositoryService; import org.flowable.engine.history.HistoricProcessInstance; import org.flowable.engine.history.HistoricProcessInstanceQuery; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.annotation.Resource; +import java.util.Collections; import java.util.List; +import java.util.Objects; /** * 专用与对接 ES 的流程实例相关操作 @@ -24,9 +30,36 @@ import java.util.List; public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceForEsService { @Resource private HistoryService historyService; + @Resource + private RepositoryService repositoryService; + + @Override + public Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search) { + if(Objects.isNull(search)){ + return 0L; + } + return buildHistoricProcessInstanceQuery(search).count(); + } @Override public List queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page) { + if(Objects.isNull(search)){ + return Collections.emptyList(); + } + return buildHistoricProcessInstanceQuery(search) + .includeProcessVariables() + .orderByProcessInstanceId() + .asc() + .listPage(((Long) (page.getPages() * page.getSize())).intValue(), + ((Long) ((page.getPages() + 1) * page.getSize())).intValue()); + } + + @Override + public BpmnModel queryBpmnModel(String processDefinitionId) { + return repositoryService.getBpmnModel(processDefinitionId); + } + + private HistoricProcessInstanceQuery buildHistoricProcessInstanceQuery(HistoricProcessInstanceSearchForEsDTO search) { HistoricProcessInstanceQuery historicProcessInstanceQuery = historyService.createHistoricProcessInstanceQuery(); if (StringUtils.hasText(search.getProcessInstanceId())) { historicProcessInstanceQuery.processInstanceId(search.getProcessInstanceId()); @@ -46,11 +79,31 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF if (StringUtils.hasText(search.getTenantId())) { historicProcessInstanceQuery.processInstanceTenantId(search.getTenantId()); } - return historicProcessInstanceQuery - .includeProcessVariables() - .orderByProcessInstanceId() - .asc() - .listPage(((Long) (page.getPages() * page.getSize())).intValue(), - ((Long) ((page.getPages() + 1) * page.getSize())).intValue()); + if (Objects.equals(Boolean.TRUE, search.getFinished())) { + historicProcessInstanceQuery.finished(); + } + if (Objects.nonNull(search.getStartTime())) { + // 引擎默认仅支撑两种 + switch (search.getStartTimeDirection()) { + case BEFORE: + historicProcessInstanceQuery.startedBefore(search.getStartTime()); + break; + default: + historicProcessInstanceQuery.startedAfter(search.getStartTime()); + break; + } + } + if (Objects.nonNull(search.getEndTime())) { + // 引擎默认仅支撑两种 + switch (search.getEndTimeDirection()) { + case AFTER: + historicProcessInstanceQuery.finishedAfter(search.getEndTime()); + break; + default: + historicProcessInstanceQuery.finishedBefore(search.getEndTime()); + break; + } + } + return historicProcessInstanceQuery; } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java index b099d99ef..5dabaf217 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java @@ -1061,6 +1061,7 @@ public class BpmnProcessInstanceServiceImpl implements BpmnProcessInstanceServic .orElse(variables.getOrDefault(OLD_INTERNAL_INITIATOR, null)))) .tenantId(historicProcessInstance.getTenantId()) .agented((Boolean) Optional.ofNullable(variables.get(INTERNAL_PROCESS_AGENT)).orElse(false)) + // 任务 .taskDetails(genericTaskLogVos(historicProcessInstance.getId(), logs, forecasting, dto)) .defaultButtonConf(getButtonConfig(bpmnModel.getMainProcess()).orElse(new BpmnButtonConf())) .supportBatchOperation(getProcessApproveConf(bpmnModel.getMainProcess()).orElse(new BpmnApproveConf()).getSupportBatchOperation()) diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java new file mode 100644 index 000000000..9e6b4cba1 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java @@ -0,0 +1,85 @@ +package cn.axzo.workflow.core.service.impl; + +import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO; +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; +import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService; +import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; +import cn.axzo.workflow.core.service.ExtAxProcessLogService; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.HistoryService; +import org.flowable.engine.TaskService; +import org.flowable.engine.task.Attachment; +import org.flowable.engine.task.Comment; +import org.flowable.task.api.history.HistoricTaskInstance; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.util.Collections; +import java.util.List; + +/** + * 专用与对接 ES 的流程任务相关操作 + * + * @author wangli + * @since 2024-09-29 10:55 + */ +@Service +@Slf4j +public class BpmnProcessTaskForEsServiceImpl implements BpmnProcessTaskForEsService { + @Resource + private HistoryService historyService; + @Resource + private TaskService taskService; + @Resource + private ExtAxHiTaskInstService extAxHiTaskInstService; + @Resource + private ExtAxProcessLogService extAxProcessLogService; + + @Override + public List queryHistoricProcessTaskByProcessInstanceId(String processInstanceId) { +// List historicTaskListByProcessInstanceId = bpmnProcessTaskService.getHistoricTaskListByProcessInstanceId(processInstanceId, null); + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + return historyService.createHistoricTaskInstanceQuery() + .processInstanceId(processInstanceId) + .list(); + } + + public List queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId) { + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + ExtHiTaskSearchDTO searchDTO = new ExtHiTaskSearchDTO(); + searchDTO.setProcessInstanceId(processInstanceId); + return extAxHiTaskInstService.queryList(searchDTO); + } + + @Override + public List queryCommentByProcessInstanceId(String processInstanceId) { + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + return taskService.getProcessInstanceComments(processInstanceId); + } + + @Override + public List queryAttachmentByProcessInstanceId(String processInstanceId) { + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + return taskService.getProcessInstanceAttachments(processInstanceId); + } + + @Override + public List queryProcessLogByProcessInstanceId(String processInstanceId) { + if(!StringUtils.hasText(processInstanceId)){ + return Collections.emptyList(); + } + ExtAxProcessLog queryLog = new ExtAxProcessLog(); + queryLog.setProcessInstanceId(processInstanceId); + return extAxProcessLogService.genericQuery(queryLog); + } +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java index 42c4500b3..a4b786ce3 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java @@ -14,7 +14,7 @@ import org.springframework.context.annotation.Configuration; * @author wangli * @since 2024/4/28 14:02 */ -@OnlyPodsEnvironment +//@OnlyPodsEnvironment @Configuration public class XxlJobConfiguration { Logger logger = LoggerFactory.getLogger(XxlJobConfiguration.class); diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java index f3f43a44b..e58cfe60e 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java @@ -2,11 +2,17 @@ package cn.axzo.workflow.server.xxljob; import cn.axzo.workflow.common.enums.WorkspaceType; import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; +import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; +import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService; +import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter; import cn.axzo.workflow.es.model.EsProcessInstance; +import cn.axzo.workflow.es.model.EsProcessTask; import cn.axzo.workflow.es.service.EsProcessInstanceService; import cn.axzo.workflow.es.service.EsProcessTaskService; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.xxl.job.core.biz.model.ReturnT; @@ -15,18 +21,43 @@ import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.maven.artifact.versioning.DefaultArtifactVersion; import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.FlowElement; import org.flowable.engine.history.HistoricProcessInstance; -import org.flowable.engine.impl.util.ProcessDefinitionUtil; +import org.flowable.engine.impl.persistence.entity.CommentEntityImpl; +import org.flowable.engine.task.Attachment; +import org.flowable.engine.task.Comment; +import org.flowable.task.api.history.HistoricTaskInstance; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE; +import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC; import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142; +import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID; import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT; import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE; +import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO; +import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT; import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION; +import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END; +import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED; /** * 流程实例数据同步 @@ -39,15 +70,45 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VER @Slf4j public class ProcessInstanceSyncJobHandler extends IJobHandler { private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; + private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService; + private final BpmnHistoricAttachmentConverter attachmentConverter; private final EsProcessInstanceService esProcessInstanceService; private final EsProcessTaskService esProcessTaskService; @XxlJob("processInstanceToEs") public ReturnT execute(String s) { + log.debug("start exec process instance data sync... "); XxlJobLogger.log("start exec process instance data sync... "); - HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO(); - IPage page = new Page(0, 10); + search.setFinished(true); + search.setEndTime(new Date()); + try { + if (StringUtils.hasText(s)) { + search = JSON.parseObject(s, search.getClass()); + } + log.info("入参为空, 将以默认条件执行"); + XxlJobLogger.log("入参为空, 将以默认条件执行"); + } catch (Exception e) { + log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型"); + XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型"); + return ReturnT.FAIL; + } + + Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search); + log.info("查询到待同步数据量: {} 条", totalCount); + XxlJobLogger.log("查询到待同步数据量: {} 条", totalCount); + Integer overPageSize = search.getOverPageSize(); + HistoricProcessInstanceSearchForEsDTO finalSearch = search; + IntStream.iterate(0, i -> i + overPageSize).limit((totalCount + overPageSize - 1) / overPageSize) + .forEach(pageNumber -> { + log.info("处理进度: {} %, current pageNo: {}, pageSize: {}", pageNumber == 0 ? 0 : (double) (totalCount / (pageNumber * overPageSize)) * 100, + totalCount, overPageSize); + executeSync(finalSearch, new Page<>(pageNumber, overPageSize)); + }); + return ReturnT.SUCCESS; + } + + private void executeSync(HistoricProcessInstanceSearchForEsDTO search, IPage page) { List historicProcessInstances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, page); historicProcessInstances.forEach(processInstance -> { EsProcessInstance esProcessInstance = new EsProcessInstance(); @@ -68,20 +129,135 @@ public class ProcessInstanceSyncJobHandler extends IJobHandler { esProcessInstance.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc()); esProcessInstance.setAgentTenantId(String.valueOf(variables.getOrDefault(INTERNAL_PROCESS_AGENT, ""))); - BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(processInstance.getProcessDefinitionId()); + BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(processInstance.getProcessDefinitionId()); BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> { esProcessInstance.setSupportBatch(cfg.getSupportBatchOperation()); esProcessInstance.setUserAgreeSignature(cfg.getUserAgreeSignature()); }); + String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, "")); + + DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); + DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142); + if (supportVersion.compareTo(version) >= 0) { + log.info("通过新日志表进行任务纬度的数据同步"); + // 用新日志表数据任务同步 + executeTaskSyncForNew(processInstance); + } else { + log.info("通过引擎的任务表进行任务纬度的数据同步"); + // 用引擎的任务表处理任务同步 + executeTaskSyncForOld(processInstance, instanceVersion, bpmnModel); + } Integer inserted = esProcessInstanceService.insert(esProcessInstance); + if (inserted > 0) { + log.debug("write processInstance to es success! id: {}", processInstance.getId()); XxlJobLogger.log("write processInstance to es success! id: {}", processInstance.getId()); } else { + log.warn("write to es caught exception! id: {}", processInstance.getId()); XxlJobLogger.log("write to es caught exception! id: {}", processInstance.getId()); } }); + } - return ReturnT.SUCCESS; + private void executeTaskSyncForNew(HistoricProcessInstance processInstance) { + bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(processInstance.getId()); + } + + + private void executeTaskSyncForOld(HistoricProcessInstance processInstance, String instanceVersion, BpmnModel bpmnModel) { + List tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(processInstance.getId()); + + Map extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(processInstance.getId()) + .stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s)); + + Map> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(processInstance.getId()) + .stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(processInstance.getId()) + .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + Map variables = processInstance.getProcessVariables(); + + // 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理 + filterEffectiveTasks(tasks, instanceVersion); + + tasks.forEach(task -> { + log.debug("task: {}", task.getId()); + ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst()); + List comments = commentMap.getOrDefault(task.getId(), Collections.emptyList()); + List attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList()); + + EsProcessTask esProcessTask = new EsProcessTask(); + esProcessTask.setId(task.getId()); + esProcessTask.setProcessInstanceId(task.getProcessInstanceId()); + esProcessTask.setProcessDefinitionId(task.getProcessDefinitionId()); + esProcessTask.setTaskDefinitionKey(task.getTaskDefinitionKey()); + esProcessTask.setName(task.getName()); + esProcessTask.setStatus(extTask.getStatus()); + // 处理 advice + esProcessTask.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE)) + .findFirst().orElse(new CommentEntityImpl()).getFullMessage()); + // 处理 operationDesc + esProcessTask.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType())) + .max(Comparator.comparing(Comment::getTime)) + .orElse(new CommentEntityImpl()).getFullMessage()); + esProcessTask.setStartTime(task.getCreateTime()); + esProcessTask.setEndTime(task.getEndTime()); + esProcessTask.setDuration(task.getDurationInMillis()); +// esProcessTask.setLastUpdateTime(task); + esProcessTask.setTenantId(task.getTenantId()); + esProcessTask.setAssigner(BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null))); + esProcessTask.setAttachments(attachmentConverter.toVos(attachments)); + if (Objects.nonNull(bpmnModel)) { + FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey()); + BpmnMetaParserHelper.getApprovalMethod(flowElement) + .ifPresent(e -> esProcessTask.setApprovalMethod(e.getDesc())); + BpmnMetaParserHelper.getNodeType(flowElement) + .ifPresent(e -> esProcessTask.setNodeType(e.getDesc())); +// esProcessTask.setNodeMode(); + } + + }); + } + + private List filterEffectiveTasks(List tasks, String instanceVersion) { + List effectiveTasks = new ArrayList<>(); + if (CollectionUtils.isEmpty(tasks)) { + return effectiveTasks; + } + Stream taskInstanceStream = tasks.stream() + .filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID)) + .filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID)) + .filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus())); + if (Objects.isNull(instanceVersion)) { + compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); + } else { + if (StringUtils.hasText(instanceVersion)) { + DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); + DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130); + if (version.compareTo(supportVersion) < 0) { + compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); + } else { + taskInstanceStream.forEach(effectiveTasks::add); + } + } else { + taskInstanceStream.forEach(effectiveTasks::add); + } + } + return effectiveTasks; + } + + + /** + * 兼容 1.2.1 版本的审批日志 + * + * @param stream + * @return + */ + private Stream compatibleVersion(Stream stream) { + return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) + || (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) + ).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason()))); } }