diff --git a/pom.xml b/pom.xml index fb2acc5e9..e3676b268 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,11 @@ easy-es-boot-starter ${easy-es.version} + + org.dromara.easy-es + easy-es-annotation + ${easy-es.version} + diff --git a/workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/es/EsProcessInstanceApi.java b/workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/es/EsProcessInstanceApi.java new file mode 100644 index 000000000..a373d1089 --- /dev/null +++ b/workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/es/EsProcessInstanceApi.java @@ -0,0 +1,16 @@ +package cn.axzo.workflow.client.feign.es; + +import cn.axzo.workflow.client.annotation.WorkflowEngineFeignClient; +import cn.axzo.workflow.common.annotation.Manageable; + +/** + * 操作 ES 的流程实例 API + * + * @author wangli + * @since 2024-10-07 21:12 + */ +@WorkflowEngineFeignClient +@Manageable +public interface EsProcessInstanceApi { + +} diff --git a/workflow-engine-common/pom.xml b/workflow-engine-common/pom.xml index fcb362248..f02bb046b 100644 --- a/workflow-engine-common/pom.xml +++ b/workflow-engine-common/pom.xml @@ -29,6 +29,10 @@ org.projectlombok lombok + + org.dromara.easy-es + easy-es-annotation + diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java index 49661639a..faf4b8b02 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java @@ -58,6 +58,8 @@ public interface BpmnConstants { String FLOW_SERVER_VERSION = "serverVersion"; String FLOW_SERVER_VERSION_121 = "1.2.1"; String FLOW_SERVER_VERSION_130 = "1.3.0"; + // 1.4.2 开始启用新版本日志 + String FLOW_SERVER_VERSION_142 = "1.4.2"; String CONFIG_NOTICE = "noticeConfig"; String CONFIG_APPROVE = "approveConfig"; String TEMPLATE_NOTICE_MESSAGE_CONFIG = "noticeMessageConfig"; @@ -187,4 +189,9 @@ public interface BpmnConstants { * 回退操作次数上限 */ Integer MAX_BACKED_OPERATE_COUNT = 20; + String LATEST_SYNC_TO_ELASTICSEARCH_TIME = "latest.sync.to.elasticsearch.time"; + /** + * 固定父子文档在相同分片 + */ + String ES_FIXED_ROUTING = "routing"; } diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java new file mode 100644 index 000000000..114403b07 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/TimeQueryDirection.java @@ -0,0 +1,16 @@ +package cn.axzo.workflow.common.enums; + +/** + * 时间查询方向 + *

+ * 注意: 该枚举用在查询 flowable 引擎数据时, 都是包含自身时间点的. + * 例如, 使用 Before 时,也就是说在某个时间点之前,是包含"某个时间"自身的. + * + * @author wangli + * @since 2024-09-29 09:56 + */ +public enum TimeQueryDirection { + BEFORE, + AFTER, + ; +} diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/DataSyncSummaryDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/DataSyncSummaryDTO.java new file mode 100644 index 000000000..39a18fb86 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/DataSyncSummaryDTO.java @@ -0,0 +1,25 @@ +package cn.axzo.workflow.common.model.dto.es; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +/** + * 数据同步 ES 的结果统计 + * + * @author wangli + * @since 2024-09-30 14:42 + */ +@Data +@Accessors(chain = true) +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class DataSyncSummaryDTO { + + private Long processInstanceCount; + + private Long processTaskCount; +} diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java index 36c8283ae..a68adccc7 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/dto/es/HistoricProcessInstanceSearchForEsDTO.java @@ -1,11 +1,14 @@ package cn.axzo.workflow.common.model.dto.es; +import cn.axzo.workflow.common.enums.TimeQueryDirection; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import java.util.Date; + /** * 历史流程实例的搜索对象 * @@ -49,5 +52,40 @@ public class HistoricProcessInstanceSearchForEsDTO { */ private String tenantId; + /** + * 实例是否已结束 + * + */ + private Boolean finished; + + /** + * 开始时间 + */ + private Date startTime; + + /** + * 控制查询开始时间的方向,包含自身的时间点 + *

+ * 默认是查询开始时间之后 + */ + private TimeQueryDirection startTimeDirection = TimeQueryDirection.AFTER; + + /** + * 结束时间 + */ + private Date endTime; + + /** + * 控制查询结束时间的方向,包含自身的时间点 + *

+ * 默认是查询结束时间点之前 + */ + private TimeQueryDirection endTimeDirection = TimeQueryDirection.BEFORE; + + /** + * 用于覆盖同步逻辑中的PageSize,一般不需要传 + */ + private Integer overPageSize = 100; + } diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/AttachmentDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/AttachmentDTO.java index 6fd66819b..0aa49fac4 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/AttachmentDTO.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/AttachmentDTO.java @@ -7,6 +7,8 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import org.dromara.easyes.annotation.IndexField; +import org.dromara.easyes.annotation.rely.FieldType; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -29,6 +31,7 @@ public class AttachmentDTO implements Serializable { /** * 附件 ID */ + @IndexField(fieldType = FieldType.KEYWORD) private String id; /** @@ -36,6 +39,7 @@ public class AttachmentDTO implements Serializable { */ @ApiModelProperty(value = "附件类型") @NotNull(message = "附件类型不能为空") + @IndexField(fieldType = FieldType.KEYWORD) private AttachmentTypeEnum type; /** @@ -43,12 +47,14 @@ public class AttachmentDTO implements Serializable { */ @ApiModelProperty(value = "文件名称不能为空") @NotBlank(message = "文件名称不能为空") + @IndexField(fieldType = FieldType.KEYWORD) private String name; /** * 文件描述 */ @ApiModelProperty(value = "文件描述") + @IndexField(exist = false) private String description; /** @@ -56,6 +62,7 @@ public class AttachmentDTO implements Serializable { */ @ApiModelProperty(value = "附件地址") @NotBlank(message = "附件地址不能为空") + @IndexField(fieldType = FieldType.KEYWORD) private String url; } diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/BpmnTaskDelegateAssigner.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/BpmnTaskDelegateAssigner.java index 6243c4a06..440ce176e 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/BpmnTaskDelegateAssigner.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/task/BpmnTaskDelegateAssigner.java @@ -8,6 +8,8 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.dromara.easyes.annotation.IndexField; +import org.dromara.easyes.annotation.rely.FieldType; import org.springframework.util.StringUtils; import org.springframework.validation.annotation.Validated; @@ -50,6 +52,7 @@ public class BpmnTaskDelegateAssigner implements Serializable { * 安心筑:身份 ID */ @Deprecated + @IndexField(fieldType = FieldType.KEYWORD) private String assignee; /** @@ -59,6 +62,7 @@ public class BpmnTaskDelegateAssigner implements Serializable { * 安心筑:身份 Type 应该必传 */ @Deprecated + @IndexField(fieldType = FieldType.KEYWORD) private String assigneeType; /** @@ -67,6 +71,7 @@ public class BpmnTaskDelegateAssigner implements Serializable { * 枢智:用户姓名 * 安心筑:用户姓名 */ + @IndexField(fieldType = FieldType.KEYWORD) private String assignerName; /** @@ -74,6 +79,7 @@ public class BpmnTaskDelegateAssigner implements Serializable { *

* 仅安心筑使用, 应该必传 */ + @IndexField(fieldType = FieldType.KEYWORD) private String personId; /** @@ -82,18 +88,21 @@ public class BpmnTaskDelegateAssigner implements Serializable { * 枢智: 企业 ID * 安心筑: 工作台 ID */ + @IndexField(fieldType = FieldType.KEYWORD) private String tenantId; /** * 人所在的单位 ID * 仅安心筑使用, 工人可以没有, 其他身份一定需要 */ + @IndexField(fieldType = FieldType.KEYWORD) private String ouId; /** * 该属性业务接入时无需关心. 尽量保证其他属性都设值. * 头像 */ + @IndexField(exist = false) private String avatar; public final String buildAssigneeId_1_2_1() { diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/es/InstanceSearchReqDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/es/InstanceSearchReqDTO.java new file mode 100644 index 000000000..201684bf4 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/es/InstanceSearchReqDTO.java @@ -0,0 +1,47 @@ +package cn.axzo.workflow.common.model.request.es; + +import cn.axzo.workflow.common.model.request.BpmPageParam; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import javax.validation.constraints.NotBlank; +import java.util.List; + +/** + * 搜索 Es 中实例纬度的数据 + * + * @author wangli + * @since 2024-10-08 14:03 + */ +@EqualsAndHashCode(callSuper = true) +@ApiModel("搜索 Es 中实例纬度的数据") +@Data +public class InstanceSearchReqDTO extends BpmPageParam { + + @ApiModelProperty(value = "审批人所属租户 ID") + @NotBlank(message = "租户 ID 不能为空") + private String tenantId; + + @ApiModelProperty(value = "审批人所属单位 ID") + private String ouId; + + @ApiModelProperty(value = "审批人自然人 ID") + private String personId; + + @ApiModelProperty(value = "审批人姓名") + private String assigneeName; + + @ApiModelProperty(value = "流程实例状态", notes = "参考 BpmnProcessInstanceResultEnum 枚举") + private String businessStatus; + + @ApiModelProperty(value = "流程实例名称") + private String processInstanceName; + + /** + * 流程实例 ID, 与其它所有的属性互斥 + */ + @ApiModelProperty(value = "流程实例 ID 集合") + private List processInstanceIds; +} diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/es/TaskSearchReqDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/es/TaskSearchReqDTO.java new file mode 100644 index 000000000..c695ecc44 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/es/TaskSearchReqDTO.java @@ -0,0 +1,55 @@ +package cn.axzo.workflow.common.model.request.es; + +import cn.axzo.workflow.common.model.request.BpmPageParam; +import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.List; + +/** + * 搜索 Es 中任务纬度的数据 + * + * @author wangli + * @since 2024-10-08 09:49 + */ +@EqualsAndHashCode(callSuper = true) +@ApiModel("搜索 Es 中任务纬度的数据") +@Data +public class TaskSearchReqDTO extends BpmPageParam { + private static final long serialVersionUID = -1L; + + /** + * 搜索含有指定该租户的任务 + */ + @ApiModelProperty(value = "租户 ID 集合") + private List tenantIds; + + /** + * 搜索含有指定单位的任务 + */ + @ApiModelProperty(value = "单位 ID 集合") + private List ouIds; + + /** + * 搜索含有指定自然人的任务 + */ + @ApiModelProperty(value = "自然人 ID") + private String personId; + + /** + * 搜索含有指定自然人名称的任务 + */ + @ApiModelProperty(value = "自然人姓名集合") + private String name; + + /** + * 与上面三个属性互斥 + *

+ * 成对是数据项 + */ + @ApiModelProperty(value = "审批人信息集合") + private List assigners; +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomClearPropertyCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomClearPropertyCmd.java new file mode 100644 index 000000000..a64903e12 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomClearPropertyCmd.java @@ -0,0 +1,34 @@ +package cn.axzo.workflow.core.engine.cmd; + +import org.flowable.common.engine.impl.interceptor.Command; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.common.engine.impl.persistence.entity.PropertyEntity; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; + +import java.util.Objects; + +/** + * 操作 ACT_GE_PROPERTY 表的删除指定属性的命令 + * + * @author wangli + * @since 2024-09-30 15:15 + */ +public class CustomClearPropertyCmd implements Command { + private final String propertyName; + + public CustomClearPropertyCmd(String propertyName) { + this.propertyName = propertyName; + } + + @Override + public Void execute(CommandContext commandContext) { + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(commandContext); + PropertyEntity entity = processEngineConfiguration.getPropertyEntityManager().findById(propertyName); + if (Objects.nonNull(entity)) { + processEngineConfiguration.getPropertyEntityManager().delete(entity.getId()); + } + return null; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomInsertPropertyCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomInsertPropertyCmd.java new file mode 100644 index 000000000..2e1441f17 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomInsertPropertyCmd.java @@ -0,0 +1,43 @@ +package cn.axzo.workflow.core.engine.cmd; + +import org.flowable.common.engine.impl.interceptor.Command; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.common.engine.impl.persistence.entity.PropertyEntity; +import org.flowable.common.engine.impl.persistence.entity.PropertyEntityImpl; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; + +import java.util.Objects; + +/** + * 操作 ACT_GE_PROPERTY 表的写命令 + * + * @author wangli + * @since 2024-09-30 15:15 + */ +public class CustomInsertPropertyCmd implements Command { + private final String propertyName; + private final String propertyValue; + + public CustomInsertPropertyCmd(String propertyName, String propertyValue) { + this.propertyName = propertyName; + this.propertyValue = propertyValue; + } + + @Override + public Void execute(CommandContext commandContext) { + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(commandContext); + PropertyEntity entity = processEngineConfiguration.getPropertyEntityManager().findById(propertyName); + if (Objects.nonNull(entity)) { + entity.setValue(propertyValue); + processEngineConfiguration.getPropertyEntityManager().update(entity); + } else { + entity = new PropertyEntityImpl(); + entity.setName(propertyName); + entity.setValue(propertyValue); + processEngineConfiguration.getPropertyEntityManager().insert(entity); + } + return null; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java index 7dd04b9e3..a8fca56e6 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EngineEntityEventListener.java @@ -21,7 +21,7 @@ import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventT import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ENTITY_UPDATED; /** - * TODO + * 引擎内的 Entity 事件监听处理 * * @author wangli * @since 2024-09-02 15:34 @@ -33,20 +33,19 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener { private final List handles; public static final Set SUPPORTED = - ImmutableSet.builder() - .add(ENTITY_CREATED) - .add(ENTITY_INITIALIZED) - .add(ENTITY_UPDATED) - .add(ENTITY_DELETED) - .add(ENTITY_SUSPENDED) - .add(ENTITY_ACTIVATED) - .build(); + ImmutableSet.builder() + .add(ENTITY_CREATED) + .add(ENTITY_INITIALIZED) + .add(ENTITY_UPDATED) + .add(ENTITY_DELETED) + .add(ENTITY_SUSPENDED) + .add(ENTITY_ACTIVATED) + .build(); @Override public void onEvent(FlowableEvent event) { if (event instanceof FlowableEntityEvent && SUPPORTED.contains(event.getType())) { FlowableEntityEvent entityEvent = (FlowableEntityEvent) event; -// log.warn("entity event type: {}, class: {}",entityEvent.getType(), entityEvent.getEntity().getClass()); handles.forEach(handle -> { Object entity = entityEvent.getEntity(); if (handle.support(entity)) { @@ -66,48 +65,10 @@ public class EngineEntityEventListener extends AbstractFlowableEventListener { } } }); -// if (entityEvent.getEntity() instanceof TaskEntity) { -// TaskEntity taskEntity = (TaskEntity) entityEvent.getEntity(); -// log.error("event taskId :{}, taskDefKey: {}", taskEntity.getId(), taskEntity.getTaskDefinitionKey()); -// -// if (Objects.equals(event.getType(), ENTITY_CREATED)) { -// onCreate(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) { -// onInitialized(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) { -// onUpdated(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) { -// onDeleted(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) { -// onSuspended(taskEntity); -// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) { -// onActivated(taskEntity); -// } -// } else if(entityEvent.getEntity() instanceof CommentEntity) { -// CommentEntity commentEntity = (CommentEntity) entityEvent.getEntity(); -// log.error("event taskId :{}", commentEntity.getId()); -// -// if (Objects.equals(event.getType(), ENTITY_CREATED)) { -// onCreate(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_INITIALIZED)) { -// onInitialized(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_UPDATED)) { -// onUpdated(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_DELETED)) { -// onDeleted(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_SUSPENDED)) { -// onSuspended(commentEntity); -// } else if (Objects.equals(event.getType(), ENTITY_ACTIVATED)) { -// onActivated(commentEntity); -// } -// } - } } - - @Override public boolean isFailOnException() { return true; diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EntityEventHandle.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EntityEventHandle.java index e3ae053ff..9c1e71731 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EntityEventHandle.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/EntityEventHandle.java @@ -1,7 +1,7 @@ package cn.axzo.workflow.core.engine.listener.entity; /** - * TODO + * Entity life cycle * * @author wangli * @since 2024-09-06 00:03 @@ -12,16 +12,22 @@ public interface EntityEventHandle { T convert(Object entity); - void onCreate(T entity); + default void onCreate(T entity) { + } - void onInitialized(T entity); + default void onInitialized(T entity) { + } - void onUpdated(T entity); + default void onUpdated(T entity) { + } - void onDeleted(T entity); + default void onDeleted(T entity) { + } - void onSuspended(T entity); + default void onSuspended(T entity) { + } - void onActivated(T entity); + default void onActivated(T entity) { + } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java index c7e03e86d..e83553c89 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/entity/type/CommentEntityEventHandle.java @@ -22,7 +22,6 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERAT //@Component @AllArgsConstructor public class CommentEntityEventHandle implements EntityEventHandle { - private final ExtAxProcessLogService processLogService; @Override public boolean support(Object entity) { @@ -37,45 +36,30 @@ public class CommentEntityEventHandle implements EntityEventHandle { } @Override - public void onActivated(TaskEntity taskEntity) { - log.debug("onActivated"); + public void onCreate(TaskEntity taskEntity) { + // 记录发起人 + boolean isNodeStarter = Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType()); + + BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(taskEntity.getProcessDefinitionId()); + FlowElement flowElement = bpmnModel.getFlowElement(taskEntity.getTaskDefinitionKey()); + + ExtAxProcessLog log = new ExtAxProcessLog(); + log.setProcessInstanceId(taskEntity.getProcessInstanceId()); + log.setTenantId(taskEntity.getTenantId()); + log.setActivityId(taskEntity.getTaskDefinitionKey()); + log.setActivityName(taskEntity.getName()); + log.setApprovalMethod((isNodeStarter ? nobody : getApprovalMethod(flowElement).orElse(nobody)).getType()); + log.setNodeType((getNodeType(flowElement).orElse(BpmnFlowNodeType.NODE_EMPTY)).getType()); + log.setNodeMode((isNodeStarter ? BpmnFlowNodeMode.GENERAL : getNodeMode(flowElement)).getType()); + log.setTaskId(taskEntity.getId()); + log.setOperationDesc(PENDING.getDesc()); + log.setStartTime(taskEntity.getCreateTime()); + log.setStatus(PROCESSING.getStatus()); + + processLogService.insert(log); } - public void onSuspended(TaskEntity taskEntity) { - log.debug("onSuspended"); + @Override + public void onInitialized(TaskEntity taskEntity) { + BpmnMetaParserHelper.getButtonConfig(ProcessDefinitionUtil.getProcess(taskEntity.getProcessDefinitionId()), taskEntity.getTaskDefinitionKey()) + .ifPresent(buttons -> { + ExtAxProcessLog queryLog = new ExtAxProcessLog(); + queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); + queryLog.setTaskId(taskEntity.getId()); + + ExtAxProcessLog updateLog = new ExtAxProcessLog(); + updateLog.setButtonConf(buttons); + processLogService.update(queryLog, updateLog); + }); } + @Override + public void onUpdated(TaskEntity taskEntity) { + if (Objects.equals(HIDDEN_ASSIGNEE_ID, taskEntity.getAssignee())) { + ExtAxProcessLog queryLog = new ExtAxProcessLog(); + queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); + queryLog.setTaskId(taskEntity.getId()); + processLogService.delete(queryLog); + } else { + ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); + RuntimeService runtimeService = processEngineConfiguration.getRuntimeService(); + List assigneeList = runtimeService.getVariable(taskEntity.getProcessInstanceId(), + INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class); + ListUtils.emptyIfNull(assigneeList).stream().filter(e -> Objects.equals(e.buildAssigneeId(), taskEntity.getAssignee())).findAny() + .ifPresent(assignee -> { + ExtAxProcessLog queryLog = new ExtAxProcessLog(); + queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); + queryLog.setTaskId(taskEntity.getId()); + processLogService.updateAssignee(queryLog, assignee); + }); + } + } + + @Override public void onDeleted(TaskEntity taskEntity) { - log.debug("onDeleted"); ExtAxProcessLog queryLog = new ExtAxProcessLog(); queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); queryLog.setTaskId(taskEntity.getId()); @@ -109,23 +158,18 @@ public class TaskEntityEventHandle implements EntityEventHandle { Object advice = taskEntity.getTransientVariableLocal(COMMENT_TYPE_ADVICE); if (Objects.nonNull(advice) && StringUtils.hasText(advice.toString())) { - log.info("COMMENT_TYPE_ADVICE: {}", advice); update.setAdvice(advice.toString()); } Object operationDesc = taskEntity.getTransientVariableLocal(COMMENT_TYPE_OPERATION_DESC); if (Objects.nonNull(operationDesc) && StringUtils.hasText(operationDesc.toString())) { - log.info("COMMENT_TYPE_OPERATION_DESC: {}", operationDesc); update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() + operationDesc : operationDesc.toString()); } else { update.setOperationDesc(Objects.nonNull(assignee) ? assignee.getAssignerName() : ""); - // 评论节点会给 operationDesc 赋 COMMENTED 的值,所以注释 -// update.setOperationDesc(BpmnProcessInstanceResultEnum.valueOfStatus(completionType).getDesc()); } String completionType = taskEntity.getVariable(TASK_COMPLETE_OPERATION_TYPE + taskEntity.getId(), String.class); if (StringUtils.hasText(completionType) && !Objects.equals(DELETED.getStatus(), completionType)) { - log.info("TASK_COMPLETE_OPERATION_TYPE: {}", completionType); update.setStatus(completionType); } else { // 多实例除操作人以外的任务,直接删除日志, 例如一个节点有两个人或签,A 人驳回了,那么 B 人不再需要操作,任务自动删除。而会签也同理 @@ -162,69 +206,6 @@ public class TaskEntityEventHandle implements EntityEventHandle { return Objects.equals(logs.get(0).getNodeType(), BpmnFlowNodeType.NODE_CARBON_COPY.getType()); } - public void onUpdated(TaskEntity taskEntity) { - log.debug("onUpdated"); - if (Objects.equals(HIDDEN_ASSIGNEE_ID, taskEntity.getAssignee())) { - ExtAxProcessLog queryLog = new ExtAxProcessLog(); - queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); - queryLog.setTaskId(taskEntity.getId()); - processLogService.delete(queryLog); - } else { - ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); - RuntimeService runtimeService = processEngineConfiguration.getRuntimeService(); - List assigneeList = runtimeService.getVariable(taskEntity.getProcessInstanceId(), - INTERNAL_ACTIVITY_RELATION_ASSIGNEE_LIST_INFO_SNAPSHOT + taskEntity.getTaskDefinitionKey(), List.class); - ListUtils.emptyIfNull(assigneeList).stream().filter(e -> Objects.equals(e.buildAssigneeId(), taskEntity.getAssignee())).findAny() - .ifPresent(assignee -> { - log.debug("审批人: {}", JSON.toJSONString(assignee)); - ExtAxProcessLog queryLog = new ExtAxProcessLog(); - queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); - queryLog.setTaskId(taskEntity.getId()); - processLogService.updateAssignee(queryLog, assignee); - }); - } - } - - public void onInitialized(TaskEntity taskEntity) { - log.debug("onInitialized"); - BpmnMetaParserHelper.getButtonConfig(ProcessDefinitionUtil.getProcess(taskEntity.getProcessDefinitionId()), taskEntity.getTaskDefinitionKey()) - .ifPresent(buttons -> { - ExtAxProcessLog queryLog = new ExtAxProcessLog(); - queryLog.setProcessInstanceId(taskEntity.getProcessInstanceId()); - queryLog.setTaskId(taskEntity.getId()); - - ExtAxProcessLog updateLog = new ExtAxProcessLog(); - updateLog.setButtonConf(buttons); - processLogService.update(queryLog, updateLog); - }); - } - - public void onCreate(TaskEntity taskEntity) { - log.debug("onCreate"); - ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); - // 记录发起人 - boolean isNodeStarter = Objects.equals(taskEntity.getTaskDefinitionKey(), NODE_STARTER.getType()); - - RepositoryService repositoryService = processEngineConfiguration.getRepositoryService(); - BpmnModel bpmnModel = repositoryService.getBpmnModel(taskEntity.getProcessDefinitionId()); - FlowElement flowElement = bpmnModel.getFlowElement(taskEntity.getTaskDefinitionKey()); - - ExtAxProcessLog log = new ExtAxProcessLog(); - log.setProcessInstanceId(taskEntity.getProcessInstanceId()); - log.setTenantId(taskEntity.getTenantId()); - log.setActivityId(taskEntity.getTaskDefinitionKey()); - log.setActivityName(taskEntity.getName()); - log.setApprovalMethod((isNodeStarter ? nobody : getApprovalMethod(flowElement).orElse(nobody)).getType()); - log.setNodeType((getNodeType(flowElement).orElse(BpmnFlowNodeType.NODE_EMPTY)).getType()); - log.setNodeMode((isNodeStarter ? BpmnFlowNodeMode.GENERAL : getNodeMode(flowElement)).getType()); - log.setTaskId(taskEntity.getId()); - log.setOperationDesc(PENDING.getDesc()); - log.setStartTime(taskEntity.getCreateTime()); - log.setStatus(PROCESSING.getStatus()); - - processLogService.insert(log); - } - private BpmnFlowNodeMode getNodeMode(FlowElement flowElement) { BpmnFlowNodeMode node = GENERAL; if (flowElement instanceof UserTask) { diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java index ecd2206f6..a62bb3e79 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessInstanceForEsService.java @@ -2,6 +2,7 @@ package cn.axzo.workflow.core.service; import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; import com.baomidou.mybatisplus.core.metadata.IPage; +import org.flowable.bpmn.model.BpmnModel; import org.flowable.engine.history.HistoricProcessInstance; import java.util.List; @@ -14,5 +15,10 @@ import java.util.List; */ public interface BpmnProcessInstanceForEsService { + Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search); + List queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page); + + BpmnModel queryBpmnModel(String processDefinitionId); + } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java new file mode 100644 index 000000000..de8ad282c --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessTaskForEsService.java @@ -0,0 +1,28 @@ +package cn.axzo.workflow.core.service; + +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; +import org.flowable.engine.task.Attachment; +import org.flowable.engine.task.Comment; +import org.flowable.task.api.history.HistoricTaskInstance; + +import java.util.List; + +/** + * 专用与对接 ES 的流程任务相关操作 + * + * @author wangli + * @since 2024-09-29 10:55 + */ +public interface BpmnProcessTaskForEsService { + + List queryHistoricProcessTaskByProcessInstanceId(String processInstanceId); + + List queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId); + + List queryCommentByProcessInstanceId(String processInstanceId); + + List queryAttachmentByProcessInstanceId(String processInstanceId); + + List queryProcessLogByProcessInstanceId(String processInstanceId); +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java index e4238d230..9f89cfdb5 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceForEsServiceImpl.java @@ -4,14 +4,19 @@ import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDT import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; import com.baomidou.mybatisplus.core.metadata.IPage; import lombok.extern.slf4j.Slf4j; +import org.flowable.bpmn.model.BpmnModel; import org.flowable.engine.HistoryService; +import org.flowable.engine.RepositoryService; import org.flowable.engine.history.HistoricProcessInstance; import org.flowable.engine.history.HistoricProcessInstanceQuery; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.annotation.Resource; +import java.util.Collections; import java.util.List; +import java.util.Objects; /** * 专用与对接 ES 的流程实例相关操作 @@ -23,10 +28,44 @@ import java.util.List; @Slf4j public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceForEsService { @Resource + @Lazy private HistoryService historyService; + @Resource + @Lazy + private RepositoryService repositoryService; + + @Override + public Long queryHistoricProcessInstanceTotalCount(HistoricProcessInstanceSearchForEsDTO search) { + if (Objects.isNull(search)) { + return 0L; + } + return buildHistoricProcessInstanceQuery(search).count(); + } @Override public List queryHistoricProcessInstance(HistoricProcessInstanceSearchForEsDTO search, IPage page) { + if (Objects.isNull(search)) { + return Collections.emptyList(); + } + HistoricProcessInstanceQuery query = buildHistoricProcessInstanceQuery(search) + .includeProcessVariables() + .orderByProcessInstanceId() + .asc(); + if (Objects.nonNull(page)) { + int firstResult = Math.toIntExact((page.getCurrent() - 1) * page.getSize()); + int maxResult = Math.toIntExact(page.getSize()); + log.debug("search pageable info, firstResult: {}, maxResult: {}", firstResult, maxResult); + return query.listPage(firstResult, maxResult); + } + return query.list(); + } + + @Override + public BpmnModel queryBpmnModel(String processDefinitionId) { + return repositoryService.getBpmnModel(processDefinitionId); + } + + private HistoricProcessInstanceQuery buildHistoricProcessInstanceQuery(HistoricProcessInstanceSearchForEsDTO search) { HistoricProcessInstanceQuery historicProcessInstanceQuery = historyService.createHistoricProcessInstanceQuery(); if (StringUtils.hasText(search.getProcessInstanceId())) { historicProcessInstanceQuery.processInstanceId(search.getProcessInstanceId()); @@ -46,10 +85,34 @@ public class BpmnProcessInstanceForEsServiceImpl implements BpmnProcessInstanceF if (StringUtils.hasText(search.getTenantId())) { historicProcessInstanceQuery.processInstanceTenantId(search.getTenantId()); } - return historicProcessInstanceQuery - .orderByProcessInstanceId() - .asc() - .listPage(((Long) (page.getPages() * page.getSize())).intValue(), - ((Long) ((page.getPages() + 1) * page.getSize())).intValue()); + // 只有传值后, 才接入该数据, 否则不控制 + if (Objects.equals(Boolean.TRUE, search.getFinished())) { + historicProcessInstanceQuery.finished(); + } else if (Objects.equals(Boolean.FALSE, search.getFinished())) { + historicProcessInstanceQuery.unfinished(); + } + if (Objects.nonNull(search.getStartTime())) { + // 引擎默认仅支撑两种 + switch (search.getStartTimeDirection()) { + case BEFORE: + historicProcessInstanceQuery.startedBefore(search.getStartTime()); + break; + default: + historicProcessInstanceQuery.startedAfter(search.getStartTime()); + break; + } + } + if (Objects.nonNull(search.getEndTime())) { + // 引擎默认仅支撑两种 + switch (search.getEndTimeDirection()) { + case AFTER: + historicProcessInstanceQuery.finishedAfter(search.getEndTime()); + break; + default: + historicProcessInstanceQuery.finishedBefore(search.getEndTime()); + break; + } + } + return historicProcessInstanceQuery; } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java index b099d99ef..5dabaf217 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessInstanceServiceImpl.java @@ -1061,6 +1061,7 @@ public class BpmnProcessInstanceServiceImpl implements BpmnProcessInstanceServic .orElse(variables.getOrDefault(OLD_INTERNAL_INITIATOR, null)))) .tenantId(historicProcessInstance.getTenantId()) .agented((Boolean) Optional.ofNullable(variables.get(INTERNAL_PROCESS_AGENT)).orElse(false)) + // 任务 .taskDetails(genericTaskLogVos(historicProcessInstance.getId(), logs, forecasting, dto)) .defaultButtonConf(getButtonConfig(bpmnModel.getMainProcess()).orElse(new BpmnButtonConf())) .supportBatchOperation(getProcessApproveConf(bpmnModel.getMainProcess()).orElse(new BpmnApproveConf()).getSupportBatchOperation()) diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java new file mode 100644 index 000000000..cca82ba97 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessTaskForEsServiceImpl.java @@ -0,0 +1,88 @@ +package cn.axzo.workflow.core.service.impl; + +import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO; +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; +import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService; +import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; +import cn.axzo.workflow.core.service.ExtAxProcessLogService; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.HistoryService; +import org.flowable.engine.TaskService; +import org.flowable.engine.task.Attachment; +import org.flowable.engine.task.Comment; +import org.flowable.task.api.history.HistoricTaskInstance; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.util.Collections; +import java.util.List; + +/** + * 专用与对接 ES 的流程任务相关操作 + * + * @author wangli + * @since 2024-09-29 10:55 + */ +@Service +@Slf4j +public class BpmnProcessTaskForEsServiceImpl implements BpmnProcessTaskForEsService { + @Resource + @Lazy + private HistoryService historyService; + @Resource + @Lazy + private TaskService taskService; + @Resource + private ExtAxHiTaskInstService extAxHiTaskInstService; + @Resource + private ExtAxProcessLogService extAxProcessLogService; + + @Override + public List queryHistoricProcessTaskByProcessInstanceId(String processInstanceId) { +// List historicTaskListByProcessInstanceId = bpmnProcessTaskService.getHistoricTaskListByProcessInstanceId(processInstanceId, null); + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + return historyService.createHistoricTaskInstanceQuery() + .processInstanceId(processInstanceId) + .list(); + } + + public List queryExtAxHiTaskInstByProcessInstanceId(String processInstanceId) { + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + ExtHiTaskSearchDTO searchDTO = new ExtHiTaskSearchDTO(); + searchDTO.setProcessInstanceId(processInstanceId); + return extAxHiTaskInstService.queryList(searchDTO); + } + + @Override + public List queryCommentByProcessInstanceId(String processInstanceId) { + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + return taskService.getProcessInstanceComments(processInstanceId); + } + + @Override + public List queryAttachmentByProcessInstanceId(String processInstanceId) { + if (!StringUtils.hasText(processInstanceId)) { + return Collections.emptyList(); + } + return taskService.getProcessInstanceAttachments(processInstanceId); + } + + @Override + public List queryProcessLogByProcessInstanceId(String processInstanceId) { + if(!StringUtils.hasText(processInstanceId)){ + return Collections.emptyList(); + } + ExtAxProcessLog queryLog = new ExtAxProcessLog(); + queryLog.setProcessInstanceId(processInstanceId); + return extAxProcessLogService.genericQuery(queryLog); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxAxReModelServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxReModelServiceImpl.java similarity index 97% rename from workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxAxReModelServiceImpl.java rename to workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxReModelServiceImpl.java index d825a654b..aac4766eb 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxAxReModelServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxReModelServiceImpl.java @@ -27,7 +27,7 @@ import static cn.axzo.workflow.core.common.code.BpmnModelRespCode.MODEL_ID_NOT_E @Service @RequiredArgsConstructor @Slf4j -public class ExtAxAxReModelServiceImpl implements ExtAxReModelService { +public class ExtAxReModelServiceImpl implements ExtAxReModelService { @Resource private ExtAxReModelMapper extAxReModelMapper; diff --git a/workflow-engine-elasticsearch/pom.xml b/workflow-engine-elasticsearch/pom.xml index 712da88e9..588687a96 100644 --- a/workflow-engine-elasticsearch/pom.xml +++ b/workflow-engine-elasticsearch/pom.xml @@ -16,15 +16,14 @@ - - cn.axzo.workflow - workflow-engine-axzo-ext - - cn.axzo.workflow workflow-engine-core + + cn.axzo.framework.data + axzo-data-mybatis-plus + org.elasticsearch.client @@ -40,6 +39,5 @@ org.dromara.easy-es easy-es-boot-starter - diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessInstanceMapper.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessInstanceMapper.java index eedf8bb79..1e200c59f 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessInstanceMapper.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessInstanceMapper.java @@ -1,6 +1,6 @@ package cn.axzo.workflow.es.mapper; -import cn.axzo.workflow.es.model.EsProcessInstance; +import cn.axzo.workflow.es.model.ProcessInstanceDocument; import org.dromara.easyes.core.kernel.BaseEsMapper; /** @@ -9,5 +9,5 @@ import org.dromara.easyes.core.kernel.BaseEsMapper; * @author wangli * @since 2024-09-23 10:52 */ -public interface EsProcessInstanceMapper extends BaseEsMapper { +public interface EsProcessInstanceMapper extends BaseEsMapper { } diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessTaskMapper.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessTaskMapper.java index 972b48b47..aacf0dc2f 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessTaskMapper.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/mapper/EsProcessTaskMapper.java @@ -1,6 +1,6 @@ package cn.axzo.workflow.es.mapper; -import cn.axzo.workflow.es.model.EsProcessTask; +import cn.axzo.workflow.es.model.ProcessTaskDocument; import org.dromara.easyes.core.kernel.BaseEsMapper; /** @@ -9,5 +9,5 @@ import org.dromara.easyes.core.kernel.BaseEsMapper; * @author wangli * @since 2024-09-23 10:52 */ -public interface EsProcessTaskMapper extends BaseEsMapper { +public interface EsProcessTaskMapper extends BaseEsMapper { } diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessInstance.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/ProcessInstanceDocument.java similarity index 65% rename from workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessInstance.java rename to workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/ProcessInstanceDocument.java index ab0cb5165..9ddbd13e0 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessInstance.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/ProcessInstanceDocument.java @@ -4,8 +4,11 @@ import lombok.Data; import org.dromara.easyes.annotation.IndexField; import org.dromara.easyes.annotation.IndexId; import org.dromara.easyes.annotation.IndexName; +import org.dromara.easyes.annotation.Join; +import org.dromara.easyes.annotation.Node; import org.dromara.easyes.annotation.rely.FieldType; import org.dromara.easyes.annotation.rely.IdType; +import org.dromara.easyes.annotation.rely.RefreshPolicy; import java.util.Date; @@ -16,8 +19,9 @@ import java.util.Date; * @since 2024-09-25 20:32 */ @Data -@IndexName("process_instance") -public class EsProcessInstance { +@IndexName(value = "process_instance_document", keepGlobalPrefix = true, refreshPolicy = RefreshPolicy.IMMEDIATE) +@Join(nodes = {@Node(parentClass = ProcessInstanceDocument.class ,parentAlias = "process_instance_document", childClasses = {ProcessTaskDocument.class}, childAliases = {"process_task_document"})}) +public class ProcessInstanceDocument { /** * 流程实例 ID @@ -28,7 +32,7 @@ public class EsProcessInstance { /** * 流程实例名称 */ - @IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true) + @IndexField(fieldType = FieldType.KEYWORD) private String name; /** @@ -44,6 +48,7 @@ public class EsProcessInstance { /** * 流程对应业务分类的类型,项目/单位/监管/OMS */ + @IndexField(fieldType = FieldType.KEYWORD) private String processCategoryType; /** @@ -59,32 +64,33 @@ public class EsProcessInstance { /** * 流程实例的持续时间 ms */ - private Long duration; + private Long durationInMillis; /** - * 该流程中上次操作完成的时间 + * 该流程中上次操作完成的时间, 如果是已完成的实例, 就清空该值 */ private Date lastOperationTime; /** * 租户 ID */ - @IndexField(fieldData = true) + @IndexField(fieldType = FieldType.KEYWORD) private String tenantId; /** * 流程实例业务状态 */ - @IndexField(fieldType = FieldType.KEYWORD_TEXT) + @IndexField(fieldType = FieldType.KEYWORD) private String businessStatus; /** * 实例对应的流程引擎服务端迭代版本 */ + @IndexField(fieldType = FieldType.KEYWORD) private String workflowEngineVersion; /** - * 是否使用代运营 + * 是否代运营 */ private Boolean agent; diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessTask.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/ProcessTaskDocument.java similarity index 62% rename from workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessTask.java rename to workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/ProcessTaskDocument.java index 70a1332f0..40447643c 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/EsProcessTask.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/model/ProcessTaskDocument.java @@ -1,17 +1,12 @@ package cn.axzo.workflow.es.model; -import cn.axzo.workflow.common.model.request.bpmn.task.AttachmentDTO; -import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; import lombok.Data; import org.dromara.easyes.annotation.IndexField; import org.dromara.easyes.annotation.IndexId; -import org.dromara.easyes.annotation.IndexName; -import org.dromara.easyes.annotation.Settings; import org.dromara.easyes.annotation.rely.FieldType; import org.dromara.easyes.annotation.rely.IdType; import java.util.Date; -import java.util.List; /** * 流程任务文档模型 @@ -20,9 +15,8 @@ import java.util.List; * @since 2024-09-25 20:32 */ @Data -@IndexName("process_task") -@Settings -public class EsProcessTask { +//@IndexName("process_task_document") +public class ProcessTaskDocument { /** * 任务 ID @@ -33,29 +27,31 @@ public class EsProcessTask { /** * 归属的流程实例 ID */ - @IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true) + @IndexField(fieldType = FieldType.KEYWORD) private String processInstanceId; /** * 归属的流程实例定义 ID */ + @IndexField(fieldType = FieldType.KEYWORD) private String processDefinitionId; /** * 任务定义 KEY,对应节点 ID */ + @IndexField(fieldType = FieldType.KEYWORD) private String taskDefinitionKey; /** * 任务名称 */ - @IndexField(fieldType = FieldType.KEYWORD_TEXT) + @IndexField(fieldType = FieldType.KEYWORD) private String name; /** * 任务状态:审批中/通过/驳回/转交/加签... */ - @IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true) + @IndexField(fieldType = FieldType.KEYWORD) private String status; /** @@ -83,40 +79,58 @@ public class EsProcessTask { */ private Long duration; - /** - * 上次操作时间 - */ - private Date lastUpdateTime; - /** * 归属租户,与 processInstance#tenantId 一致 */ + @IndexField(fieldType = FieldType.KEYWORD) private String tenantId; /** - * 审批人相关信息 + * 审批人姓名 */ - @IndexField(fieldType = FieldType.KEYWORD_TEXT, fieldData = true) - private BpmnTaskDelegateAssigner assigner; + @IndexField(fieldType = FieldType.KEYWORD) + private String assigneeName; + + /** + * 审批人自然人 ID + */ + @IndexField(fieldType = FieldType.KEYWORD) + private String assigneePersonId; + + /** + * 审批人所属单位 ID + */ + @IndexField(fieldType = FieldType.KEYWORD) + private String assigneeOuId; + + /** + * 审批人所属租户 ID + */ + @IndexField(fieldType = FieldType.KEYWORD) + private String assigneeTenantId; /** * 任务关联的附件(含文件,图片,签名) */ - private List attachments; +// @IndexField(fieldType = FieldType.NESTED, nestedClass = AttachmentDTO.class) +// private List attachments; /** * 审批方式:配置审批人/业务指定/业务触发(不含人) */ + @IndexField(fieldType = FieldType.KEYWORD) private String approvalMethod; /** * 节点类型:审批节点/业务节点/评论节点/抄送节点 */ + @IndexField(fieldType = FieldType.KEYWORD) private String nodeType; /** * 节点模式:会签/或签 */ + @IndexField(fieldType = FieldType.KEYWORD) private String nodeMode; } diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessInstanceService.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessInstanceService.java index 3f23370c0..8f9a32931 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessInstanceService.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessInstanceService.java @@ -1,6 +1,8 @@ package cn.axzo.workflow.es.service; -import cn.axzo.workflow.es.model.EsProcessInstance; +import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO; +import cn.axzo.workflow.common.model.response.BpmPageResult; +import cn.axzo.workflow.es.model.ProcessInstanceDocument; import java.util.Collection; @@ -12,19 +14,44 @@ import java.util.Collection; */ public interface EsProcessInstanceService { + /** + * 删除流程实例索引 + * + * @return + */ + Boolean deleteIndex(); + /** * 新增流程实例文档 * - * @param esProcessInstance + * @param processInstanceDocument * @return 成功的条数 */ - Integer insert(EsProcessInstance esProcessInstance); + Integer insert(String routing, ProcessInstanceDocument processInstanceDocument); /** * 批量新增流程实例文档 * - * @param esProcessInstances + * @param processInstanceDocuments * @return */ - Integer insertBatch(Collection esProcessInstances); + Integer insertBatch(String routing, Collection processInstanceDocuments); + + /** + * 更新流程实例文档 + * + * @param routing + * @param processInstanceDocument + */ + Integer update(String routing, ProcessInstanceDocument processInstanceDocument); + + /** + * 删除指定文档 + * + * @param processInstanceId + * @return + */ + Integer delete(String processInstanceId); + + BpmPageResult search(InstanceSearchReqDTO dto); } diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessTaskService.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessTaskService.java index 88f9e60a3..83d13b18d 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessTaskService.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/EsProcessTaskService.java @@ -1,5 +1,11 @@ package cn.axzo.workflow.es.service; +import cn.axzo.workflow.common.model.request.es.TaskSearchReqDTO; +import cn.axzo.workflow.es.model.ProcessTaskDocument; +import org.dromara.easyes.core.biz.EsPageInfo; + +import java.util.Collection; + /** * 操作流程任务的 ES Service * @@ -7,4 +13,17 @@ package cn.axzo.workflow.es.service; * @since 2024-09-27 11:07 */ public interface EsProcessTaskService { + + /** + * 删除流程实例索引 + * + * @return + */ + Boolean deleteIndex(); + + Integer insert(String routing, String parentId, ProcessTaskDocument processTaskDocument); + + Integer insertBatch(String routing, String parentId, Collection processTaskDocuments); + + EsPageInfo search(TaskSearchReqDTO dto); } diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessInstanceService.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessInstanceService.java new file mode 100644 index 000000000..97b8eacd4 --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessInstanceService.java @@ -0,0 +1,149 @@ +package cn.axzo.workflow.es.service.aggregation; + +import cn.axzo.workflow.common.enums.WorkspaceType; +import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO; +import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; +import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO; +import cn.axzo.workflow.common.model.response.BpmPageResult; +import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; +import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; +import cn.axzo.workflow.es.model.ProcessInstanceDocument; +import cn.axzo.workflow.es.model.ProcessTaskDocument; +import cn.axzo.workflow.es.service.EsProcessInstanceService; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.maven.artifact.versioning.DefaultArtifactVersion; +import org.flowable.bpmn.model.BpmnModel; +import org.flowable.engine.history.HistoricProcessInstance; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_121; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142; +import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_AGENT; +import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_PROCESS_WORKSPACE_TYPE; +import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION; +import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.PROCESSING; + +/** + * 实例纬度的聚合操作 + * + * @author wangli + * @since 2024-09-30 11:46 + */ +@Component +@AllArgsConstructor +@Slf4j +public class AggregateProcessInstanceService { + private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; + private final EsProcessInstanceService esProcessInstanceService; + private final AggregateProcessTaskService aggregateProcessTaskService; + + /** + * 同步指定查询结果的数据至 ES + *

+ * 内部自动分页循环同步 + * + * @param search + * @return 应同步至 ES 的统计数据 + */ + public DataSyncSummaryDTO syncProcessInstanceForSearch(HistoricProcessInstanceSearchForEsDTO search) { + Long totalProcessInstanceCount = bpmnProcessInstanceForEsService.queryHistoricProcessInstanceTotalCount(search); + log.info("查询到待同步数据量: {} 条", totalProcessInstanceCount); + AtomicLong totalProcessTaskCount = new AtomicLong(0); + IntStream.iterate(0, i -> i + search.getOverPageSize()).limit((totalProcessInstanceCount + search.getOverPageSize() - 1) / search.getOverPageSize()) + .forEach(skipRows -> { + log.info("处理进度: {} %, current startRow: {}, endRow: {}", String.format("%.2f", (double) skipRows / totalProcessInstanceCount * 100), + skipRows, skipRows + search.getOverPageSize()); + int pageNo = skipRows / search.getOverPageSize() + 1; + List instances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, new Page(pageNo, search.getOverPageSize())); + instances.forEach(hpi -> { + if (Objects.nonNull(hpi.getEndTime())) { + // 已结束的实例 + List processTaskDocuments = syncProcessInstance(hpi); + totalProcessTaskCount.getAndSet(totalProcessTaskCount.get() + processTaskDocuments.size()); + } else { + // 未结束的实例 + } + }); + }); + return new DataSyncSummaryDTO(totalProcessInstanceCount, totalProcessTaskCount.get()); + } + + /** + * 同步结束审批的数据至 ES + */ + public List syncProcessInstance(HistoricProcessInstance hpi) { + if (Objects.isNull(hpi)) { + return Collections.emptyList(); + } + ProcessInstanceDocument processInstanceDocument = new ProcessInstanceDocument(); + processInstanceDocument.setId(hpi.getId()); + processInstanceDocument.setName(hpi.getName()); + processInstanceDocument.setBusinessKey(hpi.getBusinessKey()); + processInstanceDocument.setProcessDefinitionId(hpi.getProcessDefinitionId()); + processInstanceDocument.setStartTime(hpi.getStartTime()); + processInstanceDocument.setEndTime(hpi.getEndTime()); + processInstanceDocument.setDurationInMillis(hpi.getDurationInMillis()); + processInstanceDocument.setTenantId(hpi.getTenantId()); + processInstanceDocument.setBusinessStatus(hpi.getBusinessStatus()); + + Map variables = hpi.getProcessVariables(); + + processInstanceDocument.setWorkflowEngineVersion(String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121))); + processInstanceDocument.setProcessCategoryType(WorkspaceType.getType((Integer) variables.getOrDefault(INTERNAL_PROCESS_WORKSPACE_TYPE, WorkspaceType.UN_KNOW.getCode())).getDesc()); + processInstanceDocument.setAgent((Boolean) variables.getOrDefault(INTERNAL_PROCESS_AGENT, false)); + + BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId()); + BpmnMetaParserHelper.getProcessApproveConf(bpmnModel.getMainProcess()).ifPresent(cfg -> { + processInstanceDocument.setSupportBatch(cfg.getSupportBatchOperation()); + processInstanceDocument.setUserAgreeSignature(cfg.getUserAgreeSignature()); + }); + + // 实例纬度数据同步 ES + esProcessInstanceService.insert(ES_FIXED_ROUTING, processInstanceDocument); + + String instanceVersion = String.valueOf(variables.getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)); + DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); + DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142); + List toEsProcessTaskDocuments = new ArrayList<>(); + if (version.compareTo(supportVersion) >= 0) { + log.info("通过新日志表进行任务纬度的数据同步"); + // 用新日志表数据任务同步 + toEsProcessTaskDocuments.addAll(aggregateProcessTaskService.syncProcessTaskForNew(hpi, instanceVersion)); + } else { + log.info("通过引擎的任务表进行任务纬度的数据同步"); + // 用引擎的任务表处理任务同步 + toEsProcessTaskDocuments.addAll(aggregateProcessTaskService.syncProcessTaskForOld(hpi, instanceVersion, bpmnModel)); + } + + //更新实例上的最后操作时间 + if (Objects.equals(PROCESSING.getStatus(), hpi.getBusinessStatus())) { + toEsProcessTaskDocuments.stream() + .filter(i -> Objects.equals(i.getStatus(), PROCESSING.getStatus())) + .max(Comparator.comparing(ProcessTaskDocument::getEndTime)) + .ifPresent(processTaskDocument -> { + processInstanceDocument.setLastOperationTime(processTaskDocument.getEndTime()); + esProcessInstanceService.update(ES_FIXED_ROUTING, processInstanceDocument); + }); + } else { + processInstanceDocument.setLastOperationTime(null); + } + return toEsProcessTaskDocuments; + } + + + public BpmPageResult search(InstanceSearchReqDTO dto) { + return esProcessInstanceService.search(dto); + } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessTaskService.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessTaskService.java new file mode 100644 index 000000000..5423cf03c --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/aggregation/AggregateProcessTaskService.java @@ -0,0 +1,231 @@ +package cn.axzo.workflow.es.service.aggregation; + +import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; +import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.repository.entity.ExtAxProcessLog; +import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; +import cn.axzo.workflow.core.service.BpmnProcessTaskForEsService; +import cn.axzo.workflow.core.service.converter.BpmnHistoricAttachmentConverter; +import cn.axzo.workflow.es.model.ProcessTaskDocument; +import cn.axzo.workflow.es.service.EsProcessTaskService; +import cn.hutool.core.date.DateUtil; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.maven.artifact.versioning.DefaultArtifactVersion; +import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.FlowElement; +import org.flowable.engine.history.HistoricProcessInstance; +import org.flowable.engine.impl.persistence.entity.CommentEntityImpl; +import org.flowable.engine.task.Comment; +import org.flowable.task.api.history.HistoricTaskInstance; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_ADVICE; +import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC; +import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_130; +import static cn.axzo.workflow.common.constant.BpmnConstants.FLOW_SERVER_VERSION_142; +import static cn.axzo.workflow.common.constant.BpmnConstants.HIDDEN_ASSIGNEE_ID; +import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO; +import static cn.axzo.workflow.common.constant.BpmnConstants.OLD_TASK_ASSIGNEE_SKIP_FLAT; +import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.MI_END; +import static cn.axzo.workflow.core.common.enums.BpmnProcessTaskResultEnum.REJECTION_AUTO_COMPLETED; + +/** + * 任务纬度的聚合操作 + * + * @author wangli + * @since 2024-09-30 11:46 + */ +@Component +@AllArgsConstructor +@Slf4j +public class AggregateProcessTaskService { + private final static DefaultArtifactVersion SUPPORT_VERSION = new DefaultArtifactVersion(FLOW_SERVER_VERSION_142); + private final EsProcessTaskService esProcessTaskService; + private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; + private final BpmnProcessTaskForEsService bpmnProcessTaskForEsService; + private final BpmnHistoricAttachmentConverter attachmentConverter; + + /** + * 通过新的日志表进行任务纬度的数据同步至 ES + */ + public List syncProcessTaskForNew(HistoricProcessInstance hpi, String instanceVersion) { + DefaultArtifactVersion currentVersion = new DefaultArtifactVersion(instanceVersion); + if (currentVersion.compareTo(SUPPORT_VERSION) < 0) { + return Collections.emptyList(); + } + + List logs = bpmnProcessTaskForEsService.queryProcessLogByProcessInstanceId(hpi.getId()); +// Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId()) +// .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + List toEsProcessTaskDocuments = new ArrayList<>(); + logs.forEach(log -> { + ProcessTaskDocument processTaskDocument = new ProcessTaskDocument(); + processTaskDocument.setId(log.getTaskId()); + processTaskDocument.setProcessInstanceId(log.getProcessInstanceId()); + processTaskDocument.setProcessDefinitionId(hpi.getProcessDefinitionId()); + processTaskDocument.setTaskDefinitionKey(log.getActivityId()); + processTaskDocument.setName(log.getActivityName()); + processTaskDocument.setStatus(log.getStatus()); + processTaskDocument.setAdvice(log.getAdvice()); + processTaskDocument.setOperationDesc(log.getOperationDesc()); + processTaskDocument.setStartTime(log.getStartTime()); + processTaskDocument.setEndTime(log.getEndTime()); + if (Objects.nonNull(log.getEndTime())) { + processTaskDocument.setDuration(DateUtil.betweenMs(log.getEndTime(), log.getStartTime())); + } + processTaskDocument.setTenantId(log.getTenantId()); + BpmnTaskDelegateAssigner assigner = CollectionUtils.isEmpty(log.getAssigneeFull()) ? null : log.getAssigneeFull().get(0); + if(Objects.nonNull(assigner)) { + processTaskDocument.setAssigneeName(assigner.getAssignerName()); + processTaskDocument.setAssigneeOuId(assigner.getOuId()); + processTaskDocument.setAssigneeTenantId(assigner.getTenantId()); + } +// esProcessTask.setAttachments(attachmentConverter.toVos(attachmentMap.getOrDefault(log.getTaskId(), Collections.emptyList()))); + processTaskDocument.setApprovalMethod(log.getApprovalMethod()); + processTaskDocument.setNodeType(log.getNodeType()); + processTaskDocument.setNodeMode(log.getNodeMode()); + toEsProcessTaskDocuments.add(processTaskDocument); + }); + + esProcessTaskService.insertBatch(ES_FIXED_ROUTING, hpi.getId(), toEsProcessTaskDocuments); + return toEsProcessTaskDocuments; + } + + /** + * 优先使用三个参数的同步方法. + *

+ * 通过原本引擎表进行任务纬度的数据同步至 ES + */ + public List syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion) { + BpmnModel bpmnModel = bpmnProcessInstanceForEsService.queryBpmnModel(hpi.getProcessDefinitionId()); + return syncProcessTaskForOld(hpi, instanceVersion, bpmnModel); + } + + /** + * 通过原本引擎表进行任务纬度的数据同步至 ES + * + * @param hpi + * @param instanceVersion + * @param bpmnModel + */ + public List syncProcessTaskForOld(HistoricProcessInstance hpi, String instanceVersion, BpmnModel bpmnModel) { + + List tasks = bpmnProcessTaskForEsService.queryHistoricProcessTaskByProcessInstanceId(hpi.getId()); + + Map extTaskMap = bpmnProcessTaskForEsService.queryExtAxHiTaskInstByProcessInstanceId(hpi.getId()) + .stream().collect(Collectors.toMap(ExtAxHiTaskInst::getTaskId, Function.identity(), (s, t) -> s)); + + Map> commentMap = bpmnProcessTaskForEsService.queryCommentByProcessInstanceId(hpi.getId()) + .stream().collect(Collectors.groupingBy(Comment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + // 父子文档中子文档不支持 Nested 类型字段 +// Map> attachmentMap = bpmnProcessTaskForEsService.queryAttachmentByProcessInstanceId(hpi.getId()) +// .stream().collect(Collectors.groupingBy(Attachment::getTaskId, Collectors.mapping(Function.identity(), Collectors.toList()))); + + Map variables = hpi.getProcessVariables(); + + // 过滤出有效任务, 比如同节点的多实例下或签, 一个人操作了同节点的一个任务,会把同节点的其他任务处理了,此时被处理的任务认为非人工处理 + List filteredEffectiveTasks = filterEffectiveTasks(tasks, instanceVersion); + + List toEsProcessTaskDocuments = new ArrayList<>(); + for (HistoricTaskInstance task : filteredEffectiveTasks) { + ExtAxHiTaskInst extTask = extTaskMap.getOrDefault(task.getId(), new ExtAxHiTaskInst()); + List comments = commentMap.getOrDefault(task.getId(), Collections.emptyList()); +// List attachments = attachmentMap.getOrDefault(task.getId(), Collections.emptyList()); + + ProcessTaskDocument processTaskDocument = new ProcessTaskDocument(); + processTaskDocument.setId(task.getId()); + processTaskDocument.setProcessInstanceId(task.getProcessInstanceId()); + processTaskDocument.setProcessDefinitionId(task.getProcessDefinitionId()); + processTaskDocument.setTaskDefinitionKey(task.getTaskDefinitionKey()); + processTaskDocument.setName(task.getName()); + processTaskDocument.setStatus(extTask.getStatus()); + // 处理 advice + processTaskDocument.setAdvice(comments.stream().filter(i -> Objects.equals(i.getType(), COMMENT_TYPE_ADVICE)) + .findFirst().orElse(new CommentEntityImpl()).getFullMessage()); + // 处理 operationDesc + processTaskDocument.setOperationDesc(comments.stream().filter(i -> Objects.equals(COMMENT_TYPE_OPERATION_DESC, i.getType())) + .max(Comparator.comparing(Comment::getTime)) + .orElse(new CommentEntityImpl()).getFullMessage()); + processTaskDocument.setStartTime(task.getCreateTime()); + processTaskDocument.setEndTime(task.getEndTime()); + processTaskDocument.setDuration(task.getDurationInMillis()); +// esProcessTask.setLastUpdateTime(task); + processTaskDocument.setTenantId(task.getTenantId()); + BpmnTaskDelegateAssigner assigner = BpmnTaskDelegateAssigner.toObjectCompatible(variables.getOrDefault(INTERNAL_TASK_RELATION_ASSIGNEE_INFO + task.getId(), null)); + if(Objects.nonNull(assigner)) { + processTaskDocument.setAssigneeName(assigner.getAssignerName()); + processTaskDocument.setAssigneeOuId(assigner.getOuId()); + processTaskDocument.setAssigneeTenantId(assigner.getTenantId()); + } +// esProcessTask.setAttachments(attachmentConverter.toVos(attachments)); + if (Objects.nonNull(bpmnModel)) { + FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey()); + BpmnMetaParserHelper.getApprovalMethod(flowElement) + .ifPresent(e -> processTaskDocument.setApprovalMethod(e.getType())); + BpmnMetaParserHelper.getNodeType(flowElement) + .ifPresent(e -> processTaskDocument.setNodeType(e.getType())); +// esProcessTask.setNodeMode(); + } + toEsProcessTaskDocuments.add(processTaskDocument); + } + esProcessTaskService.insertBatch(ES_FIXED_ROUTING, hpi.getId(), toEsProcessTaskDocuments); + return toEsProcessTaskDocuments; + } + + private List filterEffectiveTasks(List tasks, String instanceVersion) { + List effectiveTasks = new ArrayList<>(); + if (CollectionUtils.isEmpty(tasks)) { + return effectiveTasks; + } + Stream taskInstanceStream = tasks.stream() + .filter(i -> !Objects.equals(i.getAssignee(), HIDDEN_ASSIGNEE_ID)) + .filter(i -> !Objects.equals(i.getDeleteReason(), HIDDEN_ASSIGNEE_ID)) + .filter(i -> !Objects.equals(i.getDeleteReason(), MI_END.getStatus())); + if (Objects.isNull(instanceVersion)) { + compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); + } else { + if (StringUtils.hasText(instanceVersion)) { + DefaultArtifactVersion version = new DefaultArtifactVersion(instanceVersion); + DefaultArtifactVersion supportVersion = new DefaultArtifactVersion(FLOW_SERVER_VERSION_130); + if (version.compareTo(supportVersion) < 0) { + compatibleVersion(taskInstanceStream).forEach(effectiveTasks::add); + } else { + taskInstanceStream.forEach(effectiveTasks::add); + } + } else { + taskInstanceStream.forEach(effectiveTasks::add); + } + } + return effectiveTasks; + } + + + /** + * 兼容 1.2.1 版本的审批日志 + * + * @param stream + * @return + */ + private Stream compatibleVersion(Stream stream) { + return stream.filter(i -> (!Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) + || (Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(REJECTION_AUTO_COMPLETED.getDesc(), i.getDeleteReason())) + ).filter(i -> !(!Objects.equals(i.getAssignee(), OLD_TASK_ASSIGNEE_SKIP_FLAT) && Objects.equals(MI_END.getStatus(), i.getDeleteReason()))); + } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java index 67d01f99a..6053c1484 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessInstanceServiceImpl.java @@ -1,13 +1,26 @@ package cn.axzo.workflow.es.service.impl; +import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO; +import cn.axzo.workflow.common.model.response.BpmPageResult; import cn.axzo.workflow.es.mapper.EsProcessInstanceMapper; -import cn.axzo.workflow.es.model.EsProcessInstance; +import cn.axzo.workflow.es.model.ProcessInstanceDocument; +import cn.axzo.workflow.es.model.ProcessTaskDocument; import cn.axzo.workflow.es.service.EsProcessInstanceService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; +import org.dromara.easyes.annotation.IndexName; +import org.dromara.easyes.core.biz.EsPageInfo; +import org.dromara.easyes.core.conditions.select.LambdaEsQueryWrapper; +import org.dromara.easyes.core.toolkit.FieldUtils; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import java.util.Collection; +import java.util.List; + +import static cn.axzo.workflow.common.constant.BpmnConstants.ES_FIXED_ROUTING; /** * 操作流程实例的 ES Service 实现 @@ -15,26 +28,77 @@ import java.util.Collection; * @author wangli * @since 2024-09-27 11:09 */ -@Component +@Service @AllArgsConstructor @Slf4j public class EsProcessInstanceServiceImpl implements EsProcessInstanceService { private final EsProcessInstanceMapper esProcessInstanceMapper; + /** + * 删除流程实例索引 + * + * @return + */ + @Override + public Boolean deleteIndex() { + IndexName annotation = AnnotationUtils.findAnnotation(ProcessInstanceDocument.class, IndexName.class); + return esProcessInstanceMapper.deleteIndex(annotation.value()); + } + /** * 新增流程实例文档文档 * - * @param esProcessInstance + * @param processInstanceDocument * @return 成功的条数 */ @Override - public Integer insert(EsProcessInstance esProcessInstance) { - return esProcessInstanceMapper.insert(esProcessInstance); + public Integer insert(String routing, ProcessInstanceDocument processInstanceDocument) { + return esProcessInstanceMapper.insert(routing, processInstanceDocument); } @Override - public Integer insertBatch(Collection esProcessInstances) { - return esProcessInstanceMapper.insertBatch(esProcessInstances); + public Integer insertBatch(String routing, Collection processInstanceDocuments) { + return esProcessInstanceMapper.insertBatch(routing, processInstanceDocuments); + } + + /** + * 更新流程实例文档 + * + * @param routing + * @param processInstanceDocument + */ + @Override + public Integer update(String routing, ProcessInstanceDocument processInstanceDocument) { + return esProcessInstanceMapper.updateById(ES_FIXED_ROUTING, processInstanceDocument); + } + + @Override + public Integer delete(String processInstanceId) { + return esProcessInstanceMapper.deleteById(ES_FIXED_ROUTING, processInstanceId); + } + + @Override + public BpmPageResult search(InstanceSearchReqDTO dto) { + if (!CollectionUtils.isEmpty(dto.getProcessInstanceIds())) { + List processInstanceDocuments = esProcessInstanceMapper + .selectList(new LambdaEsQueryWrapper() + .in(FieldUtils.val(ProcessInstanceDocument::getId), dto.getProcessInstanceIds())); + return new BpmPageResult<>(processInstanceDocuments, (long) processInstanceDocuments.size()); + } + + LambdaEsQueryWrapper wrapper = new LambdaEsQueryWrapper<>(); + wrapper.hasChild("process_task_document", + w -> w.like(StringUtils.hasText(dto.getAssigneeName()), FieldUtils.val(ProcessTaskDocument::getAssigneeName), dto.getAssigneeName()) + .eq(StringUtils.hasText(dto.getTenantId()), FieldUtils.val(ProcessTaskDocument::getAssigneeTenantId), dto.getTenantId()) + .eq(StringUtils.hasText(dto.getOuId()), FieldUtils.val(ProcessTaskDocument::getAssigneeOuId), dto.getOuId()) + .eq(StringUtils.hasText(dto.getPersonId()), FieldUtils.val(ProcessTaskDocument::getAssigneePersonId), dto.getPersonId()) + ) + .eq(StringUtils.hasText(dto.getBusinessStatus()), FieldUtils.val(ProcessInstanceDocument::getBusinessStatus), dto.getBusinessStatus()) + .like(StringUtils.hasText(dto.getProcessInstanceName()), FieldUtils.val(ProcessInstanceDocument::getName), dto.getProcessInstanceName()) + .orderByDesc(FieldUtils.val(ProcessInstanceDocument::getStartTime)) + ; + EsPageInfo pageInfo = esProcessInstanceMapper.pageQuery(wrapper, dto.getPageNo(), dto.getPageSize()); + return new BpmPageResult<>(pageInfo.getList(), pageInfo.getTotal()); } } diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java index b92481fd7..bfc8cc7dc 100644 --- a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/service/impl/EsProcessTaskServiceImpl.java @@ -1,14 +1,62 @@ package cn.axzo.workflow.es.service.impl; +import cn.axzo.workflow.common.model.request.es.TaskSearchReqDTO; +import cn.axzo.workflow.es.mapper.EsProcessTaskMapper; +import cn.axzo.workflow.es.model.ProcessTaskDocument; import cn.axzo.workflow.es.service.EsProcessTaskService; -import org.springframework.stereotype.Component; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.easyes.annotation.IndexName; +import org.dromara.easyes.core.biz.EsPageInfo; +import org.dromara.easyes.core.conditions.select.LambdaEsQueryWrapper; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.stereotype.Service; + +import java.util.Collection; /** - * 操作流程任务的 ES Service + * 操作流程任务的 ES Service 实现 * * @author wangli * @since 2024-09-27 11:09 */ -@Component +@Service +@AllArgsConstructor +@Slf4j public class EsProcessTaskServiceImpl implements EsProcessTaskService { + + private final EsProcessTaskMapper esProcessTaskMapper; + + /** + * 删除流程实例索引 + * + * @return + */ + @Override + public Boolean deleteIndex() { + IndexName annotation = AnnotationUtils.findAnnotation(ProcessTaskDocument.class, IndexName.class); + return esProcessTaskMapper.deleteIndex(annotation.value()); + } + + /** + * 新增流程实例文档文档 + * + * @param processTaskDocument + * @return 成功的条数 + */ + public Integer insert(String routing, String parentId, ProcessTaskDocument processTaskDocument) { + return esProcessTaskMapper.insert(routing, parentId, processTaskDocument); + } + + @Override + public Integer insertBatch(String routing, String parentId, Collection processTaskDocuments) { + return esProcessTaskMapper.insertBatch(routing, parentId, processTaskDocuments); + } + + @Override + public EsPageInfo search(TaskSearchReqDTO dto) { + LambdaEsQueryWrapper wrapper = new LambdaEsQueryWrapper<>(); + + return esProcessTaskMapper.pageQuery(wrapper, dto.getPageNo(), dto.getPageSize()); + } } diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/listener/SyncEsTaskEntityEventHandle.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/listener/SyncEsTaskEntityEventHandle.java new file mode 100644 index 000000000..05e8227c7 --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/listener/SyncEsTaskEntityEventHandle.java @@ -0,0 +1,49 @@ +package cn.axzo.workflow.listener; + +import cn.axzo.workflow.core.engine.listener.entity.EntityEventHandle; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.HistoryService; +import org.flowable.engine.history.HistoricProcessInstance; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.task.service.impl.persistence.entity.TaskEntity; +import org.springframework.stereotype.Component; + +/** + * 用于处理非结束的实例的数据同步至 ES + * + * @author wangli + * @since 2024-09-06 00:02 + */ +@Slf4j +@Component +@AllArgsConstructor +public class SyncEsTaskEntityEventHandle implements EntityEventHandle { + private final AggregateProcessInstanceService aggregateProcessInstanceService; + + @Override + public boolean support(Object entity) { + return entity instanceof TaskEntity; + } + + @Override + public TaskEntity convert(Object entity) { + return (TaskEntity) entity; + } + + + @Override + public void onDeleted(TaskEntity taskEntity) { + log.info("SyncEsTaskEntityEventHandle onDeleted processInstanceId:{}, taskId: {}", taskEntity.getProcessInstanceId(), taskEntity.getId()); + ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); + HistoryService historyService = processEngineConfiguration.getHistoryService(); + HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() + .processInstanceId(taskEntity.getProcessInstanceId()) + .includeProcessVariables() + .singleResult(); + aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance); + } + +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java index 7d34b5eb4..42c4500b3 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/config/XxlJobConfiguration.java @@ -34,7 +34,7 @@ public class XxlJobConfiguration { @Value("") private String accessToken; - @Value("") + @Value("${xxl.job.executor.logpath:}") private String logPath; @Value("-1") diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/es/EsIndexController.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/es/EsIndexController.java deleted file mode 100644 index c9ce04eae..000000000 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/es/EsIndexController.java +++ /dev/null @@ -1,36 +0,0 @@ -package cn.axzo.workflow.server.controller.es; - -import cn.axzo.workflow.es.mapper.EsProcessInstanceMapper; -import cn.axzo.workflow.es.mapper.EsProcessTaskMapper; -import lombok.extern.slf4j.Slf4j; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import javax.annotation.Resource; - -/** - * ES 相关操作 - * - * @author wangli - * @since 2024-09-26 14:47 - */ -@Slf4j -@RequestMapping("/web/v1/api/es") -@RestController -@Validated -public class EsIndexController { - - @Resource - public EsProcessInstanceMapper esProcessInstanceMapper; - - @Resource - private EsProcessTaskMapper esProcessTaskMapper; - - @GetMapping("/index/create") - public void init() { - esProcessInstanceMapper.createIndex(); - esProcessTaskMapper.createIndex(); - } -} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/es/ElasticSearchController.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/es/ElasticSearchController.java new file mode 100644 index 000000000..407c51b3c --- /dev/null +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/es/ElasticSearchController.java @@ -0,0 +1,55 @@ +package cn.axzo.workflow.server.controller.web.es; + +import cn.axzo.workflow.client.feign.es.EsProcessInstanceApi; +import cn.axzo.workflow.common.model.request.es.InstanceSearchReqDTO; +import cn.axzo.workflow.common.model.response.BpmPageResult; +import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; +import cn.axzo.workflow.es.model.ProcessInstanceDocument; +import cn.axzo.workflow.es.service.EsProcessInstanceService; +import cn.axzo.workflow.es.service.EsProcessTaskService; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; +import cn.azxo.framework.common.model.CommonResponse; +import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * ES 相关操作 + * + * @author wangli + * @since 2024-09-26 14:47 + */ +@Slf4j +@RequestMapping("/web/v1/api/es") +@RestController +@Validated +public class ElasticSearchController implements EsProcessInstanceApi { + @Resource + private AggregateProcessInstanceService aggregateProcessInstanceService; + @Resource + private BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; + + @Resource + private EsProcessInstanceService esProcessInstanceService; + @Resource + private EsProcessTaskService esProcessTaskService; + + /** + * 审批数据搜索 + * + * @param dto + * @return + */ + @PostMapping("/instance/search") + public CommonResponse> processInstanceSearch(@Validated @RequestBody InstanceSearchReqDTO dto) { + log.info("审批数据搜索 processInstanceSearch===>>>参数:{}", JSONUtil.toJsonStr(dto)); + return CommonResponse.success(aggregateProcessInstanceService.search(dto)); + } + +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncEsJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncEsJobHandler.java new file mode 100644 index 000000000..c8f8d8b0b --- /dev/null +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncEsJobHandler.java @@ -0,0 +1,94 @@ +package cn.axzo.workflow.server.xxljob; + +import cn.axzo.workflow.common.model.dto.es.DataSyncSummaryDTO; +import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; +import cn.axzo.workflow.core.engine.cmd.CustomInsertPropertyCmd; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; +import cn.axzo.workflow.listener.SyncEsTaskEntityEventHandle; +import cn.hutool.core.date.DateUtil; +import com.alibaba.fastjson.JSON; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.annotation.XxlJob; +import com.xxl.job.core.log.XxlJobLogger; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.impl.interceptor.CommandExecutor; +import org.flowable.spring.SpringProcessEngineConfiguration; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import java.util.Date; + +import static cn.axzo.workflow.common.constant.BpmnConstants.LATEST_SYNC_TO_ELASTICSEARCH_TIME; + +/** + * 流程实例数据同步至 ES + *

+ * 在途的,或新发起的流程实例,均通过 {@link SyncEsTaskEntityEventHandle} 进行实时同步 + * + * @author wangli + * @since 2024-09-27 11:03 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class ProcessInstanceSyncEsJobHandler extends IJobHandler { + private final AggregateProcessInstanceService aggregateProcessInstanceService; + private final SpringProcessEngineConfiguration springProcessEngineConfiguration; + + @XxlJob("processInstanceSyncToEs") + public ReturnT execute(String s) { + log.debug("start exec finished process instance data sync... "); + XxlJobLogger.log("start exec finished process instance data sync... "); + + HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO(); + search.setFinished(true); + Date endTime = new Date(); + search.setEndTime(endTime); + + try { + if (StringUtils.hasText(s)) { + search = JSON.parseObject(s, HistoricProcessInstanceSearchForEsDTO.class); + log.info("根据入参转换后的查询入参:{}", JSON.toJSONString(search)); + XxlJobLogger.log("根据入参转换后的查询入参:{}", JSON.toJSONString(search)); + } else { + log.info("入参为空, 将以默认条件执行"); + XxlJobLogger.log("入参为空, 将以默认条件执行"); + } + } catch (Exception e) { + log.warn("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型"); + XxlJobLogger.log("无法解析传参, 入参必须是 JSON 格式, 入参可参考 HistoricProcessInstanceSearchForEsDTO 模型"); + return ReturnT.FAIL; + } + + // 同步前的一些额外操作 + beforeSync(endTime); + + // 开始同步完成的实例到 ES + DataSyncSummaryDTO summary = doSync(search); + + log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); + XxlJobLogger.log("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", summary.getProcessInstanceCount(), summary.getProcessTaskCount()); + return ReturnT.SUCCESS; + } + + /** + * 1. 记录本次操作的时间点 + */ + private void beforeSync(Date endTime) { + //1. 记录本次操作的时间点 + CommandExecutor commandExecutor = springProcessEngineConfiguration.getCommandExecutor(); + commandExecutor.execute(new CustomInsertPropertyCmd(LATEST_SYNC_TO_ELASTICSEARCH_TIME, DateUtil.formatDateTime(endTime))); + } + + /** + * 开始处理的入口 + * + * @param search + */ + private DataSyncSummaryDTO doSync(HistoricProcessInstanceSearchForEsDTO search) { + return aggregateProcessInstanceService.syncProcessInstanceForSearch(search); + } + +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java deleted file mode 100644 index dabb8ed36..000000000 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/xxljob/ProcessInstanceSyncJobHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -package cn.axzo.workflow.server.xxljob; - -import cn.axzo.workflow.common.model.dto.es.HistoricProcessInstanceSearchForEsDTO; -import cn.axzo.workflow.core.service.BpmnProcessInstanceForEsService; -import cn.axzo.workflow.es.model.EsProcessInstance; -import cn.axzo.workflow.es.service.EsProcessInstanceService; -import cn.axzo.workflow.es.service.EsProcessTaskService; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.handler.annotation.XxlJob; -import com.xxl.job.core.log.XxlJobLogger; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.flowable.engine.history.HistoricProcessInstance; -import org.springframework.stereotype.Component; - -import java.util.List; - -/** - * 流程实例数据同步 - * - * @author wangli - * @since 2024-09-27 11:03 - */ -@Component -@RequiredArgsConstructor -@Slf4j -public class ProcessInstanceSyncJobHandler extends IJobHandler { - private final BpmnProcessInstanceForEsService bpmnProcessInstanceForEsService; - private final EsProcessInstanceService esProcessInstanceService; - private final EsProcessTaskService esProcessTaskService; - - @XxlJob("processInstanceToEs") - public ReturnT execute(String s) { - XxlJobLogger.log("start exec process instance data sync... "); - - HistoricProcessInstanceSearchForEsDTO search = new HistoricProcessInstanceSearchForEsDTO(); - IPage page = new Page(0, 10); - List historicProcessInstances = bpmnProcessInstanceForEsService.queryHistoricProcessInstance(search, page); - historicProcessInstances.forEach(processInstance -> { - EsProcessInstance esProcessInstance = new EsProcessInstance(); - Integer inserted = esProcessInstanceService.insert(esProcessInstance); - if (inserted > 0) { - XxlJobLogger.log("write processInstance to es success! id: {}", processInstance.getId()); - } else { - XxlJobLogger.log("write to es caught exception! id: {}", processInstance.getId()); - } - }); - - return ReturnT.SUCCESS; - } -}