diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java index 195d53b6e..1d0b96682 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java @@ -5,9 +5,9 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory; import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory; import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator; import cn.axzo.workflow.core.engine.interceptor.CustomRetryInterceptor; +import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler; import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler; import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler; -import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler; import cn.axzo.workflow.core.engine.job.AsyncActivitySetAssigneeJobHandler; import cn.axzo.workflow.core.engine.job.AsyncActivityTriggerJobHandler; import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler; @@ -40,7 +40,6 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.lang.Nullable; import java.time.Duration; @@ -70,7 +69,8 @@ public class FlowableConfiguration { List jobProcessors, NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties, - SupportRefreshProperties refreshProperties) { + SupportRefreshProperties refreshProperties, + List configurers) { return configuration -> { configuration.setEnableHistoricTaskLogging(true); configuration.setHistoryLevel(HistoryLevel.AUDIT); @@ -100,6 +100,7 @@ public class FlowableConfiguration { configuration.addCustomJobHandler(new AsyncCountersignUserTaskJobHandler(extAxHiTaskInstService)); configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService)); configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler()); + configurers.forEach(i-> configuration.addCustomJobHandler(i.getJobHandler())); // 异步任务异常重试时间间隔 configuration.setDefaultFailedJobWaitTime(30); configuration.setAsyncFailedJobWaitTime(30); diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/ProcessExtConfigurer.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/ProcessExtConfigurer.java new file mode 100644 index 000000000..11accfb0b --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/ProcessExtConfigurer.java @@ -0,0 +1,13 @@ +package cn.axzo.workflow.core.conf; + +import org.flowable.job.service.JobHandler; + +/** + * TODO + * + * @author wangli + * @since 2024-11-04 10:58 + */ +public interface ProcessExtConfigurer { + JobHandler getJobHandler(); +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AbstractExecuteWithLockJobHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AbstractExecuteWithLockJobHandler.java index a714d31d2..b6eecf9a7 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AbstractExecuteWithLockJobHandler.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AbstractExecuteWithLockJobHandler.java @@ -70,5 +70,5 @@ public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandl extAxPropertyService.delete(processInstanceId, jobId); } - abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext); + public abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext); } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncAbortProcessInstanceJobHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncAbortProcessInstanceJobHandler.java index 979f305fd..a8cd71bf3 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncAbortProcessInstanceJobHandler.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncAbortProcessInstanceJobHandler.java @@ -27,7 +27,7 @@ public class AsyncAbortProcessInstanceJobHandler extends AbstractExecuteWithLock } @Override - void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { + public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { log.info("AsyncAbortProcessInstanceHandler executing...,jobInfo:{}", JSONUtil.toJsonStr(job)); log(job); ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext); diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/conf/FlowableElasticSearchConfiguration.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/conf/FlowableElasticSearchConfiguration.java new file mode 100644 index 000000000..985dd6fe0 --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/conf/FlowableElasticSearchConfiguration.java @@ -0,0 +1,32 @@ +package cn.axzo.workflow.es.conf; + +import cn.axzo.workflow.core.conf.ProcessExtConfigurer; +import cn.axzo.workflow.es.flowable.ext.AsyncElasticSearchSyncJobHandler; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; +import org.flowable.job.service.JobHandler; +import org.flowable.spring.SpringProcessEngineConfiguration; +import org.flowable.spring.boot.EngineConfigurationConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +/** + * ES 相关对 Flowable 扩展的配置 + * + * @author wangli + * @since 2024-11-04 10:39 + */ +@Component +public class FlowableElasticSearchConfiguration implements ProcessExtConfigurer { + private final AggregateProcessInstanceService aggregateProcessInstanceService; + + public FlowableElasticSearchConfiguration(AggregateProcessInstanceService aggregateProcessInstanceService) { + this.aggregateProcessInstanceService = aggregateProcessInstanceService; + } + + @Override + public JobHandler getJobHandler() { + return new AsyncElasticSearchSyncJobHandler(aggregateProcessInstanceService); + } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/AsyncElasticSearchSyncJobHandler.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/AsyncElasticSearchSyncJobHandler.java new file mode 100644 index 000000000..3e64cf27f --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/AsyncElasticSearchSyncJobHandler.java @@ -0,0 +1,44 @@ +package cn.axzo.workflow.es.flowable.ext; + +import cn.axzo.workflow.core.engine.job.AbstractExecuteWithLockJobHandler; +import cn.axzo.workflow.core.engine.job.AbstractJobHandler; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; +import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.job.service.JobHandler; +import org.flowable.job.service.impl.persistence.entity.JobEntity; +import org.flowable.variable.api.delegate.VariableScope; + +/** + * 同步 ElasticSearch 数据 + * + * @author wangli + * @since 2024-11-04 10:09 + */ +@Slf4j +public class AsyncElasticSearchSyncJobHandler extends AbstractExecuteWithLockJobHandler implements JobHandler { + public static final String TYPE = "async-es-sync"; + private final AggregateProcessInstanceService aggregateProcessInstanceService; + + public AsyncElasticSearchSyncJobHandler(AggregateProcessInstanceService aggregateProcessInstanceService) { + super(); + this.aggregateProcessInstanceService = aggregateProcessInstanceService; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { + log.info("AsyncElasticSearchSyncJobHandler exec start..."); + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(commandContext); + String processInstanceId = JSONUtil.toBean(job.getCustomValues(), String.class); + processEngineConfiguration.getCommandExecutor().execute(new CustomElasticSearchCmd(processInstanceId, aggregateProcessInstanceService)); + } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/CustomElasticSearchAsyncCmd.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/CustomElasticSearchAsyncCmd.java new file mode 100644 index 000000000..c8595ac9b --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/CustomElasticSearchAsyncCmd.java @@ -0,0 +1,70 @@ +package cn.axzo.workflow.es.flowable.ext; + +import cn.axzo.workflow.core.engine.cmd.AbstractCommand; +import com.alibaba.fastjson.JSON; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.engine.HistoryService; +import org.flowable.engine.history.HistoricProcessInstance; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.job.service.JobService; +import org.flowable.job.service.impl.persistence.entity.JobEntity; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * 自定义同步处理器 + * + * @author wangli + * @since 2024-11-04 10:00 + */ +public class CustomElasticSearchAsyncCmd extends AbstractCommand implements Serializable { + private final String processInstanceId; + + public CustomElasticSearchAsyncCmd(String processInstanceId) { + this.processInstanceId = processInstanceId; + } + + @Override + public String paramToJsonString() { + Map params = new HashMap<>(); + params.put("processInstanceId", processInstanceId); + return JSON.toJSONString(params); + } + + @Override + public Void executeInternal(CommandContext commandContext) { + ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext); + + HistoryService historyService = processEngineConfiguration.getHistoryService(); + HistoricProcessInstance instance = historyService.createHistoricProcessInstanceQuery() + .processInstanceId(processInstanceId) + .singleResult(); + + startAsync(processEngineConfiguration, instance); + return null; + } + + private void startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, HistoricProcessInstance instance) { + JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService(); + + JobEntity job = jobService.createJob(); + // 这里的 executionId 可为 null + job.setExecutionId(instance.getId()); + job.setProcessInstanceId(instance.getId()); + job.setProcessDefinitionId(instance.getProcessDefinitionId()); + job.setElementId(AsyncElasticSearchSyncJobHandler.TYPE); + job.setElementName(instance.getName()); + job.setJobHandlerType(AsyncElasticSearchSyncJobHandler.TYPE); + job.setTenantId(instance.getTenantId()); + + // 携带自定义的数据 + job.setCustomValues(processInstanceId); + + // 创建异步任务并调度 + jobService.createAsyncJob(job, false); + jobService.scheduleAsyncJob(job); + } +} diff --git a/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/CustomElasticSearchCmd.java b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/CustomElasticSearchCmd.java new file mode 100644 index 000000000..ffa95fd6c --- /dev/null +++ b/workflow-engine-elasticsearch/src/main/java/cn/axzo/workflow/es/flowable/ext/CustomElasticSearchCmd.java @@ -0,0 +1,58 @@ +package cn.axzo.workflow.es.flowable.ext; + +import cn.axzo.workflow.core.engine.cmd.AbstractCommand; +import cn.axzo.workflow.es.model.ProcessTaskDocument; +import cn.axzo.workflow.es.service.aggregation.AggregateProcessInstanceService; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.engine.HistoryService; +import org.flowable.engine.history.HistoricProcessInstance; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 自定义同步处理器 + * + * @author wangli + * @since 2024-11-04 10:00 + */ +@Slf4j +public class CustomElasticSearchCmd extends AbstractCommand implements Serializable { + private final String processInstanceId; + private final AggregateProcessInstanceService aggregateProcessInstanceService; + + public CustomElasticSearchCmd(String processInstanceId, + AggregateProcessInstanceService aggregateProcessInstanceService) { + this.processInstanceId = processInstanceId; + this.aggregateProcessInstanceService = aggregateProcessInstanceService; + } + + @Override + public String paramToJsonString() { + Map params = new HashMap<>(); + params.put("processInstanceId", processInstanceId); + return JSON.toJSONString(params); + } + + @Override + public Void executeInternal(CommandContext commandContext) { + ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext); + HistoryService historyService = processEngineConfiguration.getHistoryService(); + HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() + .processInstanceId(processInstanceId) + .singleResult(); + // 删除指定实例的父子文档 + log.info("delete document processInstanceId: {}", processInstanceId); + aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId); + log.info("reInsert document processInstanceId: {}", processInstanceId); + List processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null); + log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size()); + return null; + } +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/process/SyncToEsProcessEventListener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/process/SyncToEsProcessEventListener.java index 442833128..aaafc03a6 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/process/SyncToEsProcessEventListener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/process/SyncToEsProcessEventListener.java @@ -27,7 +27,6 @@ import org.springframework.stereotype.Component; @Scope("prototype") @AllArgsConstructor public class SyncToEsProcessEventListener extends AbstractBpmnEventListener implements BpmnProcessEventListener, Ordered { - private final AggregateProcessInstanceService aggregateProcessInstanceService; @Override public int getOrder() { @@ -79,6 +78,6 @@ public class SyncToEsProcessEventListener extends AbstractBpmnEventListener implements BpmnTaskEventListener, Ordered { - private final AggregateProcessInstanceService aggregateProcessInstanceService; @Override public int getOrder() { @@ -42,6 +41,6 @@ public class SyncToEsTaskEvent_104_Listener extends AbstractBpmnEventListener processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null); - log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size()); +// ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext); +// HistoryService historyService = processEngineConfiguration.getHistoryService(); +// HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() +// .processInstanceId(processInstanceId) +// .singleResult(); +// // 删除指定实例的父子文档 +// log.info("delete document processInstanceId: {}", processInstanceId); +// aggregateProcessInstanceService.deleteDocumentParentAndChild(processInstanceId); +// log.info("reInsert document processInstanceId: {}", processInstanceId); +// List processTaskDocuments = aggregateProcessInstanceService.syncProcessInstance(historicProcessInstance, null); +// log.info("Insert Summary: ProcessInstance Count: {}, ProcessTask Count: {}", 1, processTaskDocuments.size()); + + CommandContextUtil.getProcessEngineConfiguration().getCommandExecutor() + .execute(new CustomElasticSearchAsyncCmd(processInstanceId)); } }