diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java index 0697ef5ba..812b77d10 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java @@ -189,4 +189,5 @@ public interface BpmnConstants { * 回退操作次数上限 */ Integer MAX_BACKED_OPERATE_COUNT = 20; + String LATEST_SYNC_TO_ELASTICSEARCH_TIME = "latest.sync.to.elasticsearch.time"; } diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/DataSyncSummaryDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/DataSyncSummaryDTO.java new file mode 100644 index 000000000..39a18fb86 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/DataSyncSummaryDTO.java @@ -0,0 +1,25 @@ +package cn.axzo.workflow.common.model.dto.es; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +/** + * 数据同步 ES 的结果统计 + * + * @author wangli + * @since 2024-09-30 14:42 + */ +@Data +@Accessors(chain = true) +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class DataSyncSummaryDTO { + + private Long processInstanceCount; + + private Long processTaskCount; +} diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java index fea89403d..a68adccc7 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java @@ -54,6 +54,7 @@ public class HistoricProcessInstanceSearchForEsDTO { /** * 实例是否已结束 + * */ private Boolean finished; @@ -84,7 +85,7 @@ public class HistoricProcessInstanceSearchForEsDTO { /** * 用于覆盖同步逻辑中的PageSize,一般不需要传 */ - private Integer overPageSize = 10; + private Integer overPageSize = 100; } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomInsertPropertyCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomInsertPropertyCmd.java new file mode 100644 index 000000000..2e1441f17 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomInsertPropertyCmd.java @@ -0,0 +1,43 @@ +package cn.axzo.workflow.core.engine.cmd; + +import org.flowable.common.engine.impl.interceptor.Command; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.common.engine.impl.persistence.entity.PropertyEntity; +import org.flowable.common.engine.impl.persistence.entity.PropertyEntityImpl; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; + +import java.util.Objects; + +/** + * 操作 ACT_GE_PROPERTY 表的写命令 + * + * @author wangli + * @since 2024-09-30 15:15 + */ +public class CustomInsertPropertyCmd implements Command { + private final String propertyName; + private final String propertyValue; + + public CustomInsertPropertyCmd(String propertyName, String propertyValue) { + this.propertyName = propertyName; + this.propertyValue = propertyValue; + } + + @Override + public Void execute(CommandContext commandContext) { + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(commandContext); + PropertyEntity entity = processEngineConfiguration.getPropertyEntityManager().findById(propertyName); + if (Objects.nonNull(entity)) { + entity.setValue(propertyValue); + processEngineConfiguration.getPropertyEntityManager().update(entity); + } else { + entity = new PropertyEntityImpl(); + entity.setName(propertyName); + entity.setValue(propertyValue); + processEngineConfiguration.getPropertyEntityManager().insert(entity); + } + return null; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java index 7dd04b9e3..a8fca56e6 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java @@ -21,7 +21,7 @@ import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventT import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ENTITY_UPDATED; /** - * TODO + * 引擎内的 Entity 事件监听处理 * * @author wangli * @since 2024-09-02 15:34 @@ -33,20 +33,19 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener { private final List handles; public static final Set SUPPORTED = - ImmutableSet.builder() - .add(ENTITY_CREATED) - .add(ENTITY_INITIALIZED) - .add(ENTITY_UPDATED) - .add(ENTITY_DELETED) - .add(ENTITY_SUSPENDED) - .add(ENTITY_ACTIVATED) - .build(); + ImmutableSet.builder() + .add(ENTITY_CREATED) + .add(ENTITY_INITIALIZED) + .add(ENTITY_UPDATED) + .add(ENTITY_DELETED) + .add(ENTITY_SUSPENDED) + .add(ENTITY_ACTIVATED) + .build(); @Override public void onEvent(FlowableEvent event) { if (event instanceof FlowableEntityEvent && SUPPORTED.contains(event.getType())) { FlowableEntityEvent entityEvent = (FlowableEntityEvent) event; -// log.warn("entity event type: {}, class: {}",entityEvent.getType(), entityEvent.getEntity().getClass()); handles.forEach(handle -> { Object entity = entityEvent.getEntity(); if (handle.support(entity)) { @@ -66,48 +65,10 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener { } } }); -// if (entityEvent.getEntity() instanceof TaskEntity) { -// TaskEntity taskEntity = (TaskEntity) entityEvent.getEntity(); -// log.error("event taskId :{}, taskDefKey: {}", taskEntity.getId(), taskEntity.getTaskDefinitionKey()); -// -// if (Objects.equals(event.getType(), ENTITY_CREATED)) { -// onCreate(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) { -// onInitialized(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) { -// onUpdated(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) { -// onDeleted(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) { -// onSuspended(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) { -// onActivated(taskEntity); -// } -// } else if(entityEvent.getEntity() instanceof CommentEntity) { -// CommentEntity commentEntity = (CommentEntity) entityEvent.getEntity(); -// log.error("event taskId :{}", commentEntity.getId()); -// -// if (Objects.equals(event.getType(), ENTITY_CREATED)) { -// onCreate(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) { -// onInitialized(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) { -// onUpdated(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) { -// onDeleted(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) { -// onSuspended(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) { -// onActivated(commentEntity); -// } -// } - } } - - @Override public boolean isFailOnException() { return true; diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java index c7e03e86d..e83553c89 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java @@ -22,7 +22,6 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERAT //@Component @AllArgsConstructor public class CommentEntityEventHandle implements EntityEventHandle { - private final ExtAxProcessLogService processLogService; @Override public boolean support(Object entity) { @@ -37,45 +36,30 @@ public class CommentEntityEventHandle implements EntityEventHandle queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page); BpmnModel queryBpmnModel(String processDefinitionId); + } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java index 9d1030db2..92deca9f7 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java @@ -9,8 +9,6 @@ import org.flowable.engine.HistoryService; import org.flowable.engine.RepositoryService; import org.flowable.engine.history.HistoricProcessInstance; import org.flowable.engine.history.HistoricProcessInstanceQuery; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @@ -35,7 +33,7 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF @Override public Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search) { - if(Objects.isNull(search)){ + if (Objects.isNull(search)) { return 0L; } return buildHistoricProcessInstanceQuery(search).count(); @@ -43,15 +41,20 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF @Override public List queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page) { - if(Objects.isNull(search)){ + if (Objects.isNull(search)) { return Collections.emptyList(); } - return buildHistoricProcessInstanceQuery(search) + HistoricProcessInstanceQuery query = buildHistoricProcessInstanceQuery(search) .includeProcessVariables() .orderByProcessInstanceId() - .asc() - .listPage(((Long) (page.getPages() * page.getSize())).intValue(), - ((Long) ((page.getPages() + 1) * page.getSize())).intValue()); + .asc(); + if (Objects.nonNull(page)) { + int firstResult = Math.toIntExact((page.getCurrent() - 1) * page.getSize()); + int maxResult = Math.toIntExact(page.getSize()); + log.debug("search pageable info, firstResult: {}, maxResult: {}", firstResult, maxResult); + return query.listPage(firstResult, maxResult); + } + return query.list(); } @Override @@ -79,8 +82,11 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF if (StringUtils.hasText(search.getTenantId())) { historicProcessInstanceQuery.processInstanceTenantId(search.getTenantId()); } + // 只有传值后, 才接入该数据, 否则不控制 if (Objects.equals(Boolean.TRUE, search.getFinished())) { historicProcessInstanceQuery.finished(); + } else if (Objects.equals(Boolean.FALSE, search.getFinished())) { + historicProcessInstanceQuery.unfinished(); } if (Objects.nonNull(search.getStartTime())) { // 引擎默认仅支撑两种 diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxAxReModelServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxReModelServiceImpl.java similarity index 97% rename from workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxAxReModelServiceImpl.java rename to workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxReModelServiceImpl.java index d825a654b..aac4766eb 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxAxReModelServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxReModelServiceImpl.java @@ -27,7 +27,7 @@ import static cn.axzo.workflow.core.common.code.BpmnModelRespCode.MODEL_ID_NOT_E @Service @RequiredArgsConstructor @Slf4j -public class ExtAxAxReModelServiceImpl implements ExtAxReModelService { +public class ExtAxReModelServiceImpl implements ExtAxReModelService { @Resource private ExtAxReModelMapper extAxReModelMapper; diff --git a/workflow-engine-core/src/main/resources/sql/upgrade_to_1.4.9.sql b/workflow-engine-core/src/main/resources/sql/upgrade_to_1.4.9.sql new file mode 100644 index 000000000..3d54eb94b --- /dev/null +++ b/workflow-engine-core/src/main/resources/sql/upgrade_to_1.4.9.sql @@ -0,0 +1,9 @@ +create table ext_ax_unfinished_process +( + id bigint auto_increment comment '主键' + primary key, + process_instance_id varchar(60) default '' not null comment '实例编号', + create_at datetime default CURRENT_TIMESTAMP not null comment '创建时间', + update_at datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间', + is_delete int default 0 not null comment '是否删除' +) comment '未完成实例的记录'; \ No newline at end of file diff --git a/workflow-engine-elasticsearch/pom.xml b/workflow-engine-elasticsearch/pom.xml index 712da88e9..588687a96 100644 --- a/workflow-engine-elasticsearch/pom.xml +++ b/workflow-engine-elasticsearch/pom.xml @@ -16,15 +16,14 @@ - - cn.axzo.workflow - workflow-engine-axzo-ext - - cn.axzo.workflow workflow-engine-core + + cn.axzo.framework.data + axzo-data-mybatis-plus + org.elasticsearch.client @@ -40,6 +39,5 @@ org.dromara.easy-es easy-es-boot-starter - diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/repository/entity/ExtAxUnfinishedProcess.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/repository/entity/ExtAxUnfinishedProcess.java new file mode 100644 index 000000000..b8020699e --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/repository/entity/ExtAxUnfinishedProcess.java @@ -0,0 +1,26 @@ +package cn.axzo.workflow.core.repository.entity; + +import cn.axzo.framework.data.mybatisplus.model.BaseEntity; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * 未完成的实例记录 + * + * @author wangli + * @since 2024-09-30 15:48 + */ +@EqualsAndHashCode(callSuper = true) +@TableName(value = "ext_ax_unfinished_process", autoResultMap = true) +@Data +@ToString(callSuper = true) +public class ExtAxUnfinishedProcess extends BaseEntity { + private static final long serialVersionUID = 1L; + + /** + * 流程实例 ID + */ + private String processInstanceId; +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/repository/mapper/ExtAxUnfinishedProcessMapper.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/repository/mapper/ExtAxUnfinishedProcessMapper.java new file mode 100644 index 000000000..dc234d0af --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/repository/mapper/ExtAxUnfinishedProcessMapper.java @@ -0,0 +1,8 @@ +package cn.axzo.workflow.core.repository.mapper; + +import cn.axzo.workflow.core.repository.entity.ExtAxUnfinishedProcess; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface ExtAxUnfinishedProcessMapper extends BaseMapperX { +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/service/ExtAxUnfinishedProcessService.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/service/ExtAxUnfinishedProcessService.java new file mode 100644 index 000000000..34cae7c01 --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/service/ExtAxUnfinishedProcessService.java @@ -0,0 +1,13 @@ +package cn.axzo.workflow.core.service; + +import java.util.List; + +/** + * Api Log 表操作服务 + * + * @author wangli + * @since 2024/4/3 10:40 + */ +public interface ExtAxUnfinishedProcessService { + void bulkInsertUnfinishedProcess(List unfinishedIds); +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxUnfinishedProcessServiceImpl.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxUnfinishedProcessServiceImpl.java new file mode 100644 index 000000000..6ed3a9b1f --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxUnfinishedProcessServiceImpl.java @@ -0,0 +1,36 @@ +package cn.axzo.workflow.core.service.impl; + +import cn.axzo.workflow.core.repository.entity.ExtAxUnfinishedProcess; +import cn.axzo.workflow.core.repository.mapper.ExtAxUnfinishedProcessMapper; +import cn.axzo.workflow.core.service.ExtAxUnfinishedProcessService; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 用于记录 ES 数据同步的时间点的未处理实例 ID + * + * @author wangli + * @since 2024-09-30 16:19 + */ +@Service +public class ExtAxUnfinishedProcessServiceImpl implements ExtAxUnfinishedProcessService { + @Resource + private ExtAxUnfinishedProcessMapper unfinishedProcessMapper; + + @Override + public void bulkInsertUnfinishedProcess(List unfinishedIds) { + if(CollectionUtils.isEmpty(unfinishedIds)){ + return; + } + List entities = unfinishedIds.stream().map(i -> { + ExtAxUnfinishedProcess unfinishedProcess = new ExtAxUnfinishedProcess(); + unfinishedProcess.setProcessInstanceId(i); + return unfinishedProcess; + }).collect(Collectors.toList()); + unfinishedProcessMapper.insertBatch(entities); + } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessInstance.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessInstance.java index 5a8bb86d7..41f9e9ad3 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessInstance.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessInstance.java @@ -16,7 +16,8 @@ import java.util.Date; * @since 2024-09-25 20:32 */ @Data -@IndexName("process_instance") +@IndexName("process_instance_test") +//@Join(nodes = {@Node(parentClass = EsProcessInstance.class, childClasses = EsProcessTask.class)}) public class EsProcessInstance { /** @@ -62,7 +63,7 @@ public class EsProcessInstance { private Long durationInMillis; /** - * 该流程中上次操作完成的时间 + * 该流程中上次操作完成的时间, 如果是已完成的实例, 就清空该值 */ private Date lastOperationTime; @@ -84,9 +85,9 @@ public class EsProcessInstance { private String workflowEngineVersion; /** - * 代运营的 WorkspaceId + * 是否代运营 */ - private String agentTenantId; + private Boolean agent; /** * 是否支持批量操作任务 diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessTask.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessTask.java index 7b1ad03e3..202eedfdf 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessTask.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessTask.java @@ -20,7 +20,7 @@ import java.util.List; * @since 2024-09-25 20:32 */ @Data -@IndexName("process_task") +@IndexName("process_task_test") public class EsProcessTask { /** diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessInstanceService.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessInstanceService.java new file mode 100644 index 000000000..668cfac57 --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessInstanceService.java @@ -0,0 +1,145 @@ +package cn.axzo.workflow.es.service.aggregation; + +import cn.axzo.workflow.common.enums.WorkspaceType; +import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO; +import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; +import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; +import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; +import cn.axzo.workflow.core.service.ExtAxUnfinishedProcessService; +import cn.axzo.workflow.es.model.EsProcessInstance; +import cn.axzo.workflow.es.model.EsProcessTask; +import cn.axzo.workflow.es.service.EsProcessInstanceService; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.maven.artifact.versioning.DefaultArtifactVersion; +import org.flowable.bpmn.model.BpmnModel; +import org.flowable.engine.HistoryService; +import org.flowable.engine.history.HistoricProcessInstance; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142; +import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT; +import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE; +import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION; + +/** + * 实例纬度的聚合操作 + * + * @author wangli + * @since 2024-09-30 11:46 + */ +@Component +@AllArgsConstructor +@Slf4j +public class AggregateProcessInstanceService { + private HistoryService historyService; + private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; + private final EsProcessInstanceService esProcessInstanceService; + private final AggregateProcessTaskService aggregateProcessTaskService; + private final ExtAxUnfinishedProcessService unfinishedProcessService; + + /** + * 同步指定查询结果的数据至 ES + *

+ * 内部自动分页循环同步 + * + * @param search + * @return 应同步至 ES 的统计数据 + */ + public DataSyncSummaryDTO syncProcessInstanceForSearch(HistoricProcessInstanceSearchForEsDTO search) { + Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search); + log.info("查询到待同步数据量: {} 条", totalProcessInstanceCount); + AtomicLong totalProcessTaskCount = new AtomicLong(0); + IntStream.iterate(0, i -> i + search.getOverPageSize()).limit((totalProcessInstanceCount + search.getOverPageSize() - 1) / search.getOverPageSize()) + .forEach(skipRows -> { + log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100), + skipRows, skipRows + search.getOverPageSize()); + int pageNo = skipRows / search.getOverPageSize() + 1; + List instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page(pageNo, search.getOverPageSize())); + instances.forEach(hpi -> { + if (Objects.nonNull(hpi.getEndTime())) { + // 已结束的实例 + List tasks = syncFinishProcessInstance(hpi); + totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + tasks.size()); + } else { + // 未结束的实例 + } + }); + }); + return new DataSyncSummaryDTO(totalProcessInstanceCount, totalProcessTaskCount.get()); + } + + /** + * 同步结束审批的数据至 ES + */ + public List syncFinishProcessInstance(HistoricProcessInstance hpi) { + if (Objects.isNull(hpi)) { + return Collections.emptyList(); + } + EsProcessInstance esProcessInstance = new EsProcessInstance(); + esProcessInstance.setId(hpi.getId()); + esProcessInstance.setName(hpi.getName()); + esProcessInstance.setBusinessKey(hpi.getBusinessKey()); + esProcessInstance.setProcessDefinitionId(hpi.getProcessDefinitionId()); + esProcessInstance.setStartTime(hpi.getStartTime()); + esProcessInstance.setEndTime(hpi.getEndTime()); + esProcessInstance.setDurationInMillis(hpi.getDurationInMillis()); + esProcessInstance.setTenantId(hpi.getTenantId()); + esProcessInstance.setBusinessStatus(hpi.getBusinessStatus()); + + Map variables = hpi.getProcessVariables(); + + esProcessInstance.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121))); + esProcessInstance.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc()); + esProcessInstance.setAgent((Boolean) variables.getOrDefault(INTERNAL_PROCESS_AGENT, false)); + + BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId()); + BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> { + esProcessInstance.setSupportBatch(cfg.getSupportBatchOperation()); + esProcessInstance.setUserAgreeSignature(cfg.getUserAgreeSignature()); + }); + + // 实例纬度数据同步 ES + esProcessInstanceService.insert(esProcessInstance); + + String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)); + DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); + DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142); + List toEsTasks = new ArrayList<>(); + if (version.compareTo(supportVersion) >= 0) { + log.info("通过新日志表进行任务纬度的数据同步"); + // 用新日志表数据任务同步 + toEsTasks.addAll(aggregateProcessTaskService.syncProcessTaskForNew(hpi, instanceVersion)); + } else { + log.info("通过引擎的任务表进行任务纬度的数据同步"); + // 用引擎的任务表处理任务同步 + toEsTasks.addAll(aggregateProcessTaskService.syncProcessTaskForOld(hpi, instanceVersion, bpmnModel)); + } + return toEsTasks; + } + +// public List recordUnfinishedProcessInstance(HistoricProcessInstanceSearchForEsDTO unfinishedSearch) { +// List unfinishedInstances = historyService.createHistoricProcessInstanceQuery() +// .or() +// .unfinished() +// .finishedAfter(unfinishedSearch.getEndTime()) +// .endOr() +// .list(); +// List unfinishedIds = unfinishedInstances.stream().map(HistoricProcessInstance::getId).collect(Collectors.toList()); +// log.info("未完成的实例数: {}", unfinishedIds.size()); +// +// unfinishedProcessService.bulkInsertUnfinishedProcess(unfinishedIds); +// return null; +// } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessTaskService.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessTaskService.java new file mode 100644 index 000000000..4c081e7ab --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessTaskService.java @@ -0,0 +1,221 @@ +package cn.axzo.workflow.es.service.aggregation; + +import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; +import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; +import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; +import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService; +import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter; +import cn.axzo.workflow.es.model.EsProcessTask; +import cn.axzo.workflow.es.service.EsProcessTaskService; +import cn.hutool.core.date.DateUtil; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.maven.artifact.versioning.DefaultArtifactVersion; +import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.FlowElement; +import org.flowable.engine.history.HistoricProcessInstance; +import org.flowable.engine.impl.persistence.entity.CommentEntityImpl; +import org.flowable.engine.task.Attachment; +import org.flowable.engine.task.Comment; +import org.flowable.task.api.history.HistoricTaskInstance; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE; +import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142; +import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID; +import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO; +import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT; +import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END; +import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED; + +/** + * 任务纬度的聚合操作 + * + * @author wangli + * @since 2024-09-30 11:46 + */ +@Component +@AllArgsConstructor +@Slf4j +public class AggregateProcessTaskService { + private final static DefaultArtifactVersion SUPPORT_VERSION = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142); + private final EsProcessTaskService esProcessTaskService; + private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; + private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService; + private final BpmnHistoricAttachmentConverter attachmentConverter; + + /** + * 通过新的日志表进行任务纬度的数据同步至 ES + */ + public List syncProcessTaskForNew(HistoricProcessInstance hpi, String instanceVersion) { + DefaultArtifactVersion currentVersion = new DefaultArtifactVersion(instanceVersion); + if (currentVersion.compareTo(SUPPORT_VERSION) < 0) { + return Collections.emptyList(); + } + + List logs = bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(hpi.getId()); + Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId()) + .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + List toEsTasks = new ArrayList<>(); + logs.forEach(log -> { + EsProcessTask esProcessTask = new EsProcessTask(); + esProcessTask.setId(log.getTaskId()); + esProcessTask.setProcessInstanceId(log.getProcessInstanceId()); + esProcessTask.setProcessDefinitionId(hpi.getProcessDefinitionId()); + esProcessTask.setTaskDefinitionKey(log.getActivityId()); + esProcessTask.setName(log.getActivityName()); + esProcessTask.setStatus(log.getStatus()); + esProcessTask.setAdvice(log.getAdvice()); + esProcessTask.setOperationDesc(log.getOperationDesc()); + esProcessTask.setStartTime(log.getStartTime()); + esProcessTask.setEndTime(log.getEndTime()); + if (Objects.nonNull(log.getEndTime())) { + esProcessTask.setDuration(DateUtil.betweenMs(log.getEndTime(), log.getStartTime())); + } + esProcessTask.setLastUpdateTime(log.getEndTime()); + esProcessTask.setTenantId(log.getTenantId()); + esProcessTask.setAssigner(CollectionUtils.isEmpty(log.getAssigneeFull()) ? null : log.getAssigneeFull().get(0)); + esProcessTask.setAttachments(attachmentConverter.toVos(attachmentMap.getOrDefault(log.getTaskId(), Collections.emptyList()))); + esProcessTask.setApprovalMethod(log.getApprovalMethod()); + esProcessTask.setNodeType(log.getNodeType()); + esProcessTask.setNodeMode(log.getNodeMode()); + toEsTasks.add(esProcessTask); + }); + + esProcessTaskService.insertBatch(toEsTasks); + return toEsTasks; + } + + /** + * 优先使用三个参数的同步方法. + *

+ * 通过原本引擎表进行任务纬度的数据同步至 ES + */ + public List syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion) { + BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId()); + return syncProcessTaskForOld(hpi, instanceVersion, bpmnModel); + } + + /** + * 通过原本引擎表进行任务纬度的数据同步至 ES + * + * @param hpi + * @param instanceVersion + * @param bpmnModel + */ + public List syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion, BpmnModel bpmnModel) { + + List tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(hpi.getId()); + + Map extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(hpi.getId()) + .stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s)); + + Map> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(hpi.getId()) + .stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId()) + .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + Map variables = hpi.getProcessVariables(); + + // 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理 + List filteredEffectiveTasks = filterEffectiveTasks(tasks, instanceVersion); + + List toEsTasks = new ArrayList<>(); + for (HistoricTaskInstance task : filteredEffectiveTasks) { + ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst()); + List comments = commentMap.getOrDefault(task.getId(), Collections.emptyList()); + List attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList()); + + EsProcessTask esProcessTask = new EsProcessTask(); + esProcessTask.setId(task.getId()); + esProcessTask.setProcessInstanceId(task.getProcessInstanceId()); + esProcessTask.setProcessDefinitionId(task.getProcessDefinitionId()); + esProcessTask.setTaskDefinitionKey(task.getTaskDefinitionKey()); + esProcessTask.setName(task.getName()); + esProcessTask.setStatus(extTask.getStatus()); + // 处理 advice + esProcessTask.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE)) + .findFirst().orElse(new CommentEntityImpl()).getFullMessage()); + // 处理 operationDesc + esProcessTask.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType())) + .max(Comparator.comparing(Comment::getTime)) + .orElse(new CommentEntityImpl()).getFullMessage()); + esProcessTask.setStartTime(task.getCreateTime()); + esProcessTask.setEndTime(task.getEndTime()); + esProcessTask.setDuration(task.getDurationInMillis()); +// esProcessTask.setLastUpdateTime(task); + esProcessTask.setTenantId(task.getTenantId()); + esProcessTask.setAssigner(BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null))); + esProcessTask.setAttachments(attachmentConverter.toVos(attachments)); + if (Objects.nonNull(bpmnModel)) { + FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey()); + BpmnMetaParserHelper.getApprovalMethod(flowElement) + .ifPresent(e -> esProcessTask.setApprovalMethod(e.getType())); + BpmnMetaParserHelper.getNodeType(flowElement) + .ifPresent(e -> esProcessTask.setNodeType(e.getType())); +// esProcessTask.setNodeMode(); + } + toEsTasks.add(esProcessTask); + } + esProcessTaskService.insertBatch(toEsTasks); + return toEsTasks; + } + + private List filterEffectiveTasks(List tasks, String instanceVersion) { + List effectiveTasks = new ArrayList<>(); + if (CollectionUtils.isEmpty(tasks)) { + return effectiveTasks; + } + Stream taskInstanceStream = tasks.stream() + .filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID)) + .filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID)) + .filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus())); + if (Objects.isNull(instanceVersion)) { + compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); + } else { + if (StringUtils.hasText(instanceVersion)) { + DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); + DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130); + if (version.compareTo(supportVersion) < 0) { + compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); + } else { + taskInstanceStream.forEach(effectiveTasks::add); + } + } else { + taskInstanceStream.forEach(effectiveTasks::add); + } + } + return effectiveTasks; + } + + + /** + * 兼容 1.2.1 版本的审批日志 + * + * @param stream + * @return + */ + private Stream compatibleVersion(Stream stream) { + return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) + || (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) + ).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason()))); + } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java index 67d01f99a..c4067c68e 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java @@ -6,6 +6,7 @@ import cn.axzo.workflow.es.service.EsProcessInstanceService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; import java.util.Collection; @@ -15,7 +16,7 @@ import java.util.Collection; * @author wangli * @since 2024-09-27 11:09 */ -@Component +@Service @AllArgsConstructor @Slf4j public class EsProcessInstanceServiceImpl implements EsProcessInstanceService { diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java index 4c1e12060..87322df6c 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java @@ -6,16 +6,17 @@ import cn.axzo.workflow.es.service.EsProcessTaskService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; import java.util.Collection; /** - * 操作流程任务的 ES Service + * 操作流程任务的 ES Service 实现 * * @author wangli * @since 2024-09-27 11:09 */ -@Component +@Service @AllArgsConstructor @Slf4j public class EsProcessTaskServiceImpl implements EsProcessTaskService { diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/listener/SyncEsTaskEntityEventHandle.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/listener/SyncEsTaskEntityEventHandle.java new file mode 100644 index 000000000..b86ffc898 --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/listener/SyncEsTaskEntityEventHandle.java @@ -0,0 +1,207 @@ +package cn.axzo.workflow.listener; + +import cn.axzo.workflow.common.enums.BpmnFlowNodeMode; +import cn.axzo.workflow.core.engine.listener.entity.EntityEventHandle; +import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.flowable.bpmn.model.FlowElement; +import org.flowable.bpmn.model.UserTask; +import org.flowable.engine.impl.bpmn.behavior.MultiInstanceActivityBehavior; +import org.flowable.task.service.impl.persistence.entity.TaskEntity; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +import static cn.axzo.workflow.common.constant.BpmnConstants.AND_SIGN_EXPRESSION; +import static cn.axzo.workflow.common.enums.BpmnFlowNodeMode.AND; +import static cn.axzo.workflow.common.enums.BpmnFlowNodeMode.GENERAL; +import static cn.axzo.workflow.common.enums.BpmnFlowNodeMode.OR; + +/** + * 用于处理非结束的实例的数据同步至 ES + * + * @author wangli + * @since 2024-09-06 00:02 + */ +@Slf4j +@Component +@AllArgsConstructor +public class SyncEsTaskEntityEventHandle implements EntityEventHandle { + + @Override + public boolean support(Object entity) { + return entity instanceof TaskEntity; + } + + @Override + public TaskEntity convert(Object entity) { + return (TaskEntity) entity; + } + + @Override + public void onActivated(TaskEntity taskEntity) { + log.debug("onActivated"); + } + + public void onSuspended(TaskEntity taskEntity) { + log.debug("onSuspended"); + } + + public void onDeleted(TaskEntity taskEntity) { + log.debug("onDeleted"); +// ExtAxProcessLog queryLog = new ExtAxProcessLog(); +// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); +// queryLog.setTaskId(taskEntity.getId()); +// queryLog.setOperationDesc(PENDING.getDesc()); +// ExtAxProcessLog update = new ExtAxProcessLog(); +// +// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); +// RuntimeService runtimeService = processEngineConfiguration.getRuntimeService(); +// BpmnTaskDelegateAssigner assignee = BpmnTaskDelegateAssigner.toObjectCompatible(taskEntity.getVariable(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + taskEntity.getId())); +// if (Objects.nonNull(assignee) && !Objects.equals(NO_ASSIGNEE, assignee.buildAssigneeId())) { +// update.setAssigneeFull(Lists.newArrayList(assignee)); +// update.setAssigneeId(Long.valueOf(assignee.getPersonId())); +// update.setAssigneeTenantId(assignee.getTenantId()); +// update.setAssigneeName(assignee.getAssignerName()); +// update.setAssigneeOuId(assignee.getOuId()); +// } +// +// boolean needDelete = false; +// if (Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType())) { +// update.setStatus(APPROVED.getStatus()); +// } else { +// +// Object advice = taskEntity.getTransientVariableLocal(COMMENT_TYPE_ADVICE); +// if (Objects.nonNull(advice) && StringUtils.hasText(advice.toString())) { +// log.info("COMMENT_TYPE_ADVICE: {}", advice); +// update.setAdvice(advice.toString()); +// } +// Object operationDesc = taskEntity.getTransientVariableLocal(COMMENT_TYPE_OPERATION_DESC); +// if (Objects.nonNull(operationDesc) && StringUtils.hasText(operationDesc.toString())) { +// log.info("COMMENT_TYPE_OPERATION_DESC: {}", operationDesc); +// update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() + operationDesc : operationDesc.toString()); +// } else { +// update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() : ""); +// // 评论节点会给 operationDesc 赋 COMMENTED 的值,所以注释 +//// update.setOperationDesc(BpmnProcessInstanceResultEnum.valueOfStatus(completionType).getDesc()); +// } +// +// +// String completionType = taskEntity.getVariable(TASK_COMPLETE_OPERATION_TYPE + taskEntity.getId(), String.class); +// if (StringUtils.hasText(completionType) && !Objects.equals(DELETED.getStatus(), completionType)) { +// log.info("TASK_COMPLETE_OPERATION_TYPE: {}", completionType); +// update.setStatus(completionType); +// } else { +// // 多实例除操作人以外的任务,直接删除日志, 例如一个节点有两个人或签,A 人驳回了,那么 B 人不再需要操作,任务自动删除。而会签也同理 +// update.setStatus(DELETED.getStatus());// delete标志着是多实例删除 +// needDelete = true; +// } +// } +// update.setEndTime(new Date()); +// +// // 判断是否抄送节点,如果是的话,需要将抄送人集合放入对应字段 +// if (isCarbonCopyNode(queryLog)) { +// // 抄送人集合 +// List carbonCopies = runtimeService.getVariable(taskEntity.getProcessInstanceId(), INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class); +// update.setAssigneeFull(carbonCopies); +// update.setOperationDesc("抄送" + carbonCopies.size() + "人"); +// } +// +// processLogService.update(queryLog, update); +// +// if (needDelete) { +// // 再逻辑删除该记录 +// ExtAxProcessLog deleteLog = new ExtAxProcessLog(); +// deleteLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); +// deleteLog.setTaskId(taskEntity.getId()); +// processLogService.delete(deleteLog); +// } + } + + private boolean isCarbonCopyNode(ExtAxProcessLog queryLog) { +// List logs = processLogService.genericQuery(queryLog); +// if (CollectionUtils.isEmpty(logs) || logs.size() != 1) { +// return false; +// } +// return Objects.equals(logs.get(0).getNodeType(), BpmnFlowNodeType.NODE_CARBON_COPY.getType()); + return false; + } + + public void onUpdated(TaskEntity taskEntity) { + log.debug("onUpdated"); +// if (Objects.equals(HIDDEN_ASSIGNEE_ID, taskEntity.getAssignee())) { +// ExtAxProcessLog queryLog = new ExtAxProcessLog(); +// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); +// queryLog.setTaskId(taskEntity.getId()); +// processLogService.delete(queryLog); +// } else { +// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); +// RuntimeService runtimeService = processEngineConfiguration.getRuntimeService(); +// List assigneeList = runtimeService.getVariable(taskEntity.getProcessInstanceId(), +// INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class); +// ListUtils.emptyIfNull(assigneeList).stream().filter(e -> Objects.equals(e.buildAssigneeId(), taskEntity.getAssignee())).findAny() +// .ifPresent(assignee -> { +// log.debug("审批人: {}", JSON.toJSONString(assignee)); +// ExtAxProcessLog queryLog = new ExtAxProcessLog(); +// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); +// queryLog.setTaskId(taskEntity.getId()); +// processLogService.updateAssignee(queryLog, assignee); +// }); +// } + } + + public void onInitialized(TaskEntity taskEntity) { + log.debug("onInitialized"); +// BpmnMetaParserHelper.getButtonConfig(ProcessDefinitionUtil.getProcess(taskEntity.getProcessDefinitionId()), taskEntity.getTaskDefinitionKey()) +// .ifPresent(buttons -> { +// ExtAxProcessLog queryLog = new ExtAxProcessLog(); +// queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); +// queryLog.setTaskId(taskEntity.getId()); +// +// ExtAxProcessLog updateLog = new ExtAxProcessLog(); +// updateLog.setButtonConf(buttons); +// processLogService.update(queryLog, updateLog); +// }); + } + + public void onCreate(TaskEntity taskEntity) { + log.debug("onCreate"); +// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); +// // 记录发起人 +// boolean isNodeStarter = Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType()); +// +// RepositoryService repositoryService = processEngineConfiguration.getRepositoryService(); +// BpmnModel bpmnModel = repositoryService.getBpmnModel(taskEntity.getProcessDefinitionId()); +// FlowElement flowElement = bpmnModel.getFlowElement(taskEntity.getTaskDefinitionKey()); +// +// ExtAxProcessLog log = new ExtAxProcessLog(); +// log.setProcessInstanceId(taskEntity.getProcessInstanceId()); +// log.setTenantId(taskEntity.getTenantId()); +// log.setActivityId(taskEntity.getTaskDefinitionKey()); +// log.setActivityName(taskEntity.getName()); +// log.setApprovalMethod((isNodeStarter ? nobody : getApprovalMethod(flowElement).orElse(nobody)).getType()); +// log.setNodeType((getNodeType(flowElement).orElse(BpmnFlowNodeType.NODE_EMPTY)).getType()); +// log.setNodeMode((isNodeStarter ? BpmnFlowNodeMode.GENERAL : getNodeMode(flowElement)).getType()); +// log.setTaskId(taskEntity.getId()); +// log.setOperationDesc(PENDING.getDesc()); +// log.setStartTime(taskEntity.getCreateTime()); +// log.setStatus(PROCESSING.getStatus()); +// +// processLogService.insert(log); + } + + private BpmnFlowNodeMode getNodeMode(FlowElement flowElement) { + BpmnFlowNodeMode node = GENERAL; + if (flowElement instanceof UserTask) { + UserTask userTask = (UserTask) flowElement; + if (userTask.getBehavior() instanceof MultiInstanceActivityBehavior) { + MultiInstanceActivityBehavior behavior = + (MultiInstanceActivityBehavior) userTask.getBehavior(); + node = Objects.equals(AND_SIGN_EXPRESSION, behavior.getCompletionCondition()) ? AND : OR; + } + } + return node; + } + +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java index b131e546f..90325fc16 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java @@ -1,65 +1,25 @@ package cn.axzo.workflow.server.xxljob; -import cn.axzo.workflow.common.enums.WorkspaceType; +import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO; import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; -import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; -import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; -import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; -import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; -import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; -import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService; -import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter; -import cn.axzo.workflow.es.model.EsProcessInstance; -import cn.axzo.workflow.es.model.EsProcessTask; -import cn.axzo.workflow.es.service.EsProcessInstanceService; -import cn.axzo.workflow.es.service.EsProcessTaskService; +import cn.axzo.workflow.core.engine.cmd.CustomInsertPropertyCmd; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.maven.artifact.versioning.DefaultArtifactVersion; -import org.flowable.bpmn.model.BpmnModel; -import org.flowable.bpmn.model.FlowElement; -import org.flowable.engine.history.HistoricProcessInstance; -import org.flowable.engine.impl.persistence.entity.CommentEntityImpl; -import org.flowable.engine.task.Attachment; -import org.flowable.engine.task.Comment; -import org.flowable.task.api.history.HistoricTaskInstance; +import org.flowable.common.engine.impl.interceptor.CommandExecutor; +import org.flowable.spring.SpringProcessEngineConfiguration; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE; -import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC; -import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121; -import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130; -import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142; -import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID; -import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT; -import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE; -import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO; -import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT; -import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION; -import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END; -import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED; +import static cn.axzo.workflow.common.constant.BpmnConstants.LATEST_SYNC_TO_ELASTICSEARCH_TIME; /** * 流程实例数据同步 @@ -71,19 +31,19 @@ import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJEC @RequiredArgsConstructor @Slf4j public class ProcessInstanceSyncJobHandler extends IJobHandler { - private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; - private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService; - private final BpmnHistoricAttachmentConverter attachmentConverter; - private final EsProcessInstanceService esProcessInstanceService; - private final EsProcessTaskService esProcessTaskService; + private final AggregateProcessInstanceService aggregateProcessInstanceService; + private final SpringProcessEngineConfiguration springProcessEngineConfiguration; @XxlJob("processInstanceToEs") public ReturnT execute(String s) { - log.debug("start exec process instance data sync... "); - XxlJobLogger.log("start exec process instance data sync... "); + log.debug("start exec finished process instance data sync... "); + XxlJobLogger.log("start exec finished process instance data sync... "); + HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO(); search.setFinished(true); - search.setEndTime(new Date()); + Date endTime = new Date(); + search.setEndTime(endTime); + try { if (StringUtils.hasText(s)) { search = JSON.parseObject(s, search.getClass()); @@ -96,211 +56,39 @@ public class ProcessInstanceSyncJobHandler extends IJobHandler { return ReturnT.FAIL; } - Long totalCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search); - log.info("查询到待同步数据量: {} 条", totalCount); - XxlJobLogger.log("查询到待同步数据量: {} 条", totalCount); - Integer overPageSize = search.getOverPageSize(); - HistoricProcessInstanceSearchForEsDTO finalSearch = search; - IntStream.iterate(0, i -> i + overPageSize).limit((totalCount + overPageSize - 1) / overPageSize) - .forEach(skipRows -> { - log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalCount * 100), - skipRows, skipRows + overPageSize); - int pageNo = skipRows / overPageSize; - log.info("pageNo: {}", pageNo); - executeSync(finalSearch, new Page<>(pageNo, overPageSize)); - }); + // 同步前的一些额外操作 + beforeSync(endTime); + + // 开始同步完成的实例到 ES + DataSyncSummaryDTO summary = doSync(search); + + log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); + XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); return ReturnT.SUCCESS; } - private void executeSync(HistoricProcessInstanceSearchForEsDTO search, IPage page) { - List historicProcessInstances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, page); - historicProcessInstances.forEach(processInstance -> { - EsProcessInstance esProcessInstance = new EsProcessInstance(); - esProcessInstance.setId(processInstance.getId()); - esProcessInstance.setName(processInstance.getName()); - esProcessInstance.setBusinessKey(processInstance.getBusinessKey()); - esProcessInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); - esProcessInstance.setStartTime(processInstance.getStartTime()); - esProcessInstance.setEndTime(processInstance.getEndTime()); - esProcessInstance.setDurationInMillis(processInstance.getDurationInMillis()); - esProcessInstance.setTenantId(processInstance.getTenantId()); - esProcessInstance.setBusinessStatus(processInstance.getBusinessStatus()); + /** + * 1. 记录本次操作的时间点 + */ + private void beforeSync(Date endTime) { + //1. 记录本次操作的时间点 + CommandExecutor commandExecutor = springProcessEngineConfiguration.getCommandExecutor(); + commandExecutor.execute(new CustomInsertPropertyCmd(LATEST_SYNC_TO_ELASTICSEARCH_TIME, DateUtil.formatDateTime(endTime))); -// esProcessInstance.setLastOperationTime(); - Map variables = processInstance.getProcessVariables(); - - esProcessInstance.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121))); - esProcessInstance.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc()); - esProcessInstance.setAgentTenantId(String.valueOf(variables.getOrDefault(INTERNAL_PROCESS_AGENT, ""))); - - BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(processInstance.getProcessDefinitionId()); - BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> { - esProcessInstance.setSupportBatch(cfg.getSupportBatchOperation()); - esProcessInstance.setUserAgreeSignature(cfg.getUserAgreeSignature()); - }); - - Integer inserted = esProcessInstanceService.insert(esProcessInstance); - if (inserted > 0) { - log.debug("write processInstance to es success! id: {}", processInstance.getId()); - XxlJobLogger.log("write processInstance to es success! id: {}", processInstance.getId()); - } else { - log.warn("write processInstance to es caught exception! id: {}", processInstance.getId()); - XxlJobLogger.log("write processInstance to es caught exception! id: {}", processInstance.getId()); - } - - String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)); - DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); - DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142); - List toEsTasks = new ArrayList<>(); - if (version.compareTo(supportVersion) >= 0) { - log.info("通过新日志表进行任务纬度的数据同步"); - // 用新日志表数据任务同步 - toEsTasks.addAll(executeTaskSyncForNew(processInstance)); - } else { - log.info("通过引擎的任务表进行任务纬度的数据同步"); - // 用引擎的任务表处理任务同步 - toEsTasks.addAll(executeTaskSyncForOld(processInstance, instanceVersion, bpmnModel)); - } - Integer insertedTask = esProcessTaskService.insertBatch(toEsTasks); - if (insertedTask > 0) { - log.debug("write processTask to es success! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask); - XxlJobLogger.log("write processTask to es success! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask); - } else { - log.warn("write processTask to es caught exception! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask); - XxlJobLogger.log("write processTask to es caught exception! source size: {}, inserted size: {}", toEsTasks.size(), insertedTask); - } - }); +// // 2. 记录次时间点未完成的实例 ID 数据 +// HistoricProcessInstanceSearchForEsDTO unfinishedSearch = new HistoricProcessInstanceSearchForEsDTO(); +// // 特殊的查询逻辑,底层实现只依赖该 endTime,其他参数设置无效 +// unfinishedSearch.setEndTime(endTime); +// aggregateProcessInstanceService.recordUnfinishedProcessInstance(unfinishedSearch); } - private List executeTaskSyncForNew(HistoricProcessInstance processInstance) { - List logs = bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(processInstance.getId()); - Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(processInstance.getId()) - .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); - - List toEsTasks = new ArrayList<>(); - logs.forEach(log -> { - EsProcessTask esProcessTask = new EsProcessTask(); - esProcessTask.setId(log.getTaskId()); - esProcessTask.setProcessInstanceId(log.getProcessInstanceId()); - esProcessTask.setProcessDefinitionId(processInstance.getProcessDefinitionId()); - esProcessTask.setTaskDefinitionKey(log.getActivityId()); - esProcessTask.setName(log.getActivityName()); - esProcessTask.setStatus(log.getStatus()); - esProcessTask.setAdvice(log.getAdvice()); - esProcessTask.setOperationDesc(log.getOperationDesc()); - esProcessTask.setStartTime(log.getStartTime()); - esProcessTask.setEndTime(log.getEndTime()); - if (Objects.nonNull(log.getEndTime())) { - esProcessTask.setDuration(DateUtil.betweenMs(log.getEndTime(), log.getStartTime())); - } - esProcessTask.setLastUpdateTime(log.getEndTime()); - esProcessTask.setTenantId(log.getTenantId()); - esProcessTask.setAssigner(log.getAssigneeFull().get(0)); - esProcessTask.setAttachments(attachmentConverter.toVos(attachmentMap.getOrDefault(log.getTaskId(), Collections.emptyList()))); - esProcessTask.setApprovalMethod(log.getApprovalMethod()); - esProcessTask.setNodeType(log.getNodeType()); - esProcessTask.setNodeMode(log.getNodeMode()); - toEsTasks.add(esProcessTask); - }); - return toEsTasks; - } - - - private List executeTaskSyncForOld(HistoricProcessInstance processInstance, String instanceVersion, BpmnModel bpmnModel) { - List tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(processInstance.getId()); - - Map extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(processInstance.getId()) - .stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s)); - - Map> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(processInstance.getId()) - .stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); - - Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(processInstance.getId()) - .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); - - Map variables = processInstance.getProcessVariables(); - - // 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理 - filterEffectiveTasks(tasks, instanceVersion); - - List toEsTasks = new ArrayList<>(); - tasks.forEach(task -> { - log.debug("task: {}", task.getId()); - ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst()); - List comments = commentMap.getOrDefault(task.getId(), Collections.emptyList()); - List attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList()); - - EsProcessTask esProcessTask = new EsProcessTask(); - esProcessTask.setId(task.getId()); - esProcessTask.setProcessInstanceId(task.getProcessInstanceId()); - esProcessTask.setProcessDefinitionId(task.getProcessDefinitionId()); - esProcessTask.setTaskDefinitionKey(task.getTaskDefinitionKey()); - esProcessTask.setName(task.getName()); - esProcessTask.setStatus(extTask.getStatus()); - // 处理 advice - esProcessTask.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE)) - .findFirst().orElse(new CommentEntityImpl()).getFullMessage()); - // 处理 operationDesc - esProcessTask.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType())) - .max(Comparator.comparing(Comment::getTime)) - .orElse(new CommentEntityImpl()).getFullMessage()); - esProcessTask.setStartTime(task.getCreateTime()); - esProcessTask.setEndTime(task.getEndTime()); - esProcessTask.setDuration(task.getDurationInMillis()); -// esProcessTask.setLastUpdateTime(task); - esProcessTask.setTenantId(task.getTenantId()); - esProcessTask.setAssigner(BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null))); - esProcessTask.setAttachments(attachmentConverter.toVos(attachments)); - if (Objects.nonNull(bpmnModel)) { - FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey()); - BpmnMetaParserHelper.getApprovalMethod(flowElement) - .ifPresent(e -> esProcessTask.setApprovalMethod(e.getType())); - BpmnMetaParserHelper.getNodeType(flowElement) - .ifPresent(e -> esProcessTask.setNodeType(e.getType())); -// esProcessTask.setNodeMode(); - } - toEsTasks.add(esProcessTask); - }); - return toEsTasks; - } - - private List filterEffectiveTasks(List tasks, String instanceVersion) { - List effectiveTasks = new ArrayList<>(); - if (CollectionUtils.isEmpty(tasks)) { - return effectiveTasks; - } - Stream taskInstanceStream = tasks.stream() - .filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID)) - .filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID)) - .filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus())); - if (Objects.isNull(instanceVersion)) { - compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); - } else { - if (StringUtils.hasText(instanceVersion)) { - DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); - DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130); - if (version.compareTo(supportVersion) < 0) { - compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); - } else { - taskInstanceStream.forEach(effectiveTasks::add); - } - } else { - taskInstanceStream.forEach(effectiveTasks::add); - } - } - return effectiveTasks; - } - - /** - * 兼容 1.2.1 版本的审批日志 + * 开始处理的入口 * - * @param stream - * @return + * @param search */ - private Stream compatibleVersion(Stream stream) { - return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) - || (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) - ).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason()))); + private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search) { + return aggregateProcessInstanceService.syncProcessInstanceForSearch(search); } + }