diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/code/AsyncJobRespCode.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/code/AsyncJobRespCode.java new file mode 100644 index 000000000..97e0fedff --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/code/AsyncJobRespCode.java @@ -0,0 +1,39 @@ +package cn.axzo.workflow.core.common.code; + +import cn.axzo.framework.domain.web.code.IModuleRespCode; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 异步任务相关响应码 + * + * @author wangli + * @since 2024/4/29 20:30 + */ +@Getter +@AllArgsConstructor +public enum AsyncJobRespCode implements IModuleRespCode { + DATA_NOT_EXISTS("001", "ExtTaskInst 数据不存在, instId: {}, taskId: {}"), + ; + private String code; + private String message; + + @Override + public String getModuleCode() { + return "11"; + } + + @Override + public String getProjectCode() { + return "998"; + } + + public void setCode(String code) { + this.code = code; + } + + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/CommonContext.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/CommonContext.java index 10d89e31e..49cba93f3 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/CommonContext.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/CommonContext.java @@ -12,7 +12,7 @@ import java.util.Objects; import java.util.function.Supplier; /** - * TODO + * 事件处理器中通用的上下文 * * @author wangli * @since 2024/4/9 21:00 diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/OperationContext.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/OperationContext.java index d60d6de30..25f74bb2a 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/OperationContext.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/context/OperationContext.java @@ -2,7 +2,7 @@ package cn.axzo.workflow.core.common.context; /** - * TODO + * 操作上下文接口 * * @author wangli * @since 2024/4/9 10:05 diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java index af95fb6bb..06397e071 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java @@ -146,7 +146,6 @@ public final class BpmnJsonConverterUtil { ExtensionAttribute serverVersion = new ExtensionAttribute(); serverVersion.setName(FLOW_SERVER_VERSION); - //FIXME: 尽量在每次版本迭代时, 都修改这里的版本号, 用于以后特殊场景下消息流程相关数据时, 能区别不同时期的处理办法,还需要动态起来 serverVersion.setValue(serverVersionStr); ExtensionAttribute jsonMetaValue = new ExtensionAttribute(); jsonMetaValue.setName(FLOW_NODE_JSON); diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java index e3f70de97..95d95b9fd 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java @@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf; import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory; import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator; import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler; +import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler; import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler; import cn.axzo.workflow.core.engine.job.exception.handle.CustomAsyncRunnableExceptionExceptionHandler; import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager; @@ -44,11 +45,13 @@ public class FlowableConfiguration { configuration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_TRUE); configuration.setEnableSafeBpmnXml(false); // configuration.setCreateDiagramOnDeploy(false); +// configuration.setIdGenerator(new TimeBasedIdGenerator()); configuration.setIdGenerator(new DistributedTimeBasedIdGenerator(stringRedisTemplate)); configuration.setHistoricProcessInstanceDataManager(new CustomMybatisHistoricProcessInstanceDataManager(configuration)); // 自定义的异步任务处理器 configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler()); configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService)); + configuration.addCustomJobHandler(new AsyncExtTaskInstJobHandler(extAxHiTaskInstService)); // 异步任务异常重试时间间隔 configuration.setDefaultFailedJobWaitTime(30); configuration.setAsyncFailedJobWaitTime(30); diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SpringAsyncConfigurer.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SpringAsyncConfigurer.java index 9735f9f39..abd77bdd9 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SpringAsyncConfigurer.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SpringAsyncConfigurer.java @@ -12,7 +12,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** - * TODO + * 自定义线程配置器 * * @author wangli * @since 2024/4/9 23:10 diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java index dcb420a30..ee9713065 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java @@ -2,6 +2,7 @@ package cn.axzo.workflow.core.engine.event; import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum; import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEventType; /** * 接收任务的扩展任务记录表创建的事件 @@ -10,6 +11,8 @@ import org.flowable.common.engine.api.delegate.event.FlowableEvent; * @since 2024/2/5 18:26 */ public interface ExtTaskInstEvent extends FlowableEvent { + @Override + FlowableEventType getType(); String getProcessInstanceId(); diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstUpdateEvent.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstUpdateEvent.java index 1756a0e75..1f71a42ff 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstUpdateEvent.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstUpdateEvent.java @@ -1,6 +1,7 @@ package cn.axzo.workflow.core.engine.event; import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum; +import lombok.Data; import org.flowable.common.engine.api.delegate.event.FlowableEventType; import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE; @@ -11,6 +12,7 @@ import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE; * @author wangli * @since 27/03/2024 15:02 */ +@Data public class ExtTaskInstUpdateEvent implements ExtTaskInstEvent { private final String processInstanceId; private final String activityId; @@ -35,31 +37,6 @@ public class ExtTaskInstUpdateEvent implements ExtTaskInstEvent { this.resultEnum = resultEnum; } - @Override - public String getProcessInstanceId() { - return processInstanceId; - } - - @Override - public String getActivityId() { - return activityId; - } - - @Override - public String getTaskId() { - return taskId; - } - - @Override - public String getAssignee() { - return assignee; - } - - @Override - public BpmnProcessInstanceResultEnum getResultEnum() { - return resultEnum; - } - @Override public FlowableEventType getType() { return UPDATE; diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncExtTaskInstJobHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncExtTaskInstJobHandler.java new file mode 100644 index 000000000..dd984a9e6 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncExtTaskInstJobHandler.java @@ -0,0 +1,58 @@ +package cn.axzo.workflow.core.engine.job; + +import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum; +import cn.axzo.workflow.core.common.exception.WorkflowEngineException; +import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent; +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.job.service.JobHandler; +import org.flowable.job.service.impl.persistence.entity.JobEntity; +import org.flowable.variable.api.delegate.VariableScope; + +import java.util.Objects; + +import static cn.axzo.workflow.core.common.code.AsyncJobRespCode.DATA_NOT_EXISTS; + +/** + * 异步操作扩展任务表的处理器 + * + * @author wangli + * @since 2024/4/29 20:22 + */ +@Slf4j +public class AsyncExtTaskInstJobHandler implements JobHandler { + public static final String TYPE = "async-update-ext-task-inst"; + private final ExtAxHiTaskInstService extAxHiTaskInstService; + + public AsyncExtTaskInstJobHandler(ExtAxHiTaskInstService extAxHiTaskInstService) { + this.extAxHiTaskInstService = extAxHiTaskInstService; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { + ExtTaskInstUpdateEvent event = JSONObject.parseObject(job.getCustomValues(), ExtTaskInstUpdateEvent.class); + String taskId = event.getTaskId(); + String processInstanceId = event.getProcessInstanceId(); + String assignee = event.getAssignee(); + BpmnProcessInstanceResultEnum resultEnum = event.getResultEnum(); + + ExtAxHiTaskInst existence = extAxHiTaskInstService.getByTaskId(taskId, processInstanceId); + if (Objects.nonNull(existence)) { + log.info("更新扩展任务实例表数据: taskId:{}, instanceId:{}, currentStatus:{}, anticipateStatus: {}", + taskId, processInstanceId, existence.getStatus(), resultEnum.getStatus()); + } else { + log.warn("无法正确更新扩展任务表数据: taskId:{}, instanceId:{}, anticipateStatus: {}", + taskId, processInstanceId, resultEnum.getStatus()); + throw new WorkflowEngineException(DATA_NOT_EXISTS, processInstanceId, taskId); + } + extAxHiTaskInstService.updateByTaskIdAndInstanceId(taskId, processInstanceId, assignee, resultEnum); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineTaskEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineTaskEventListener.java index 5cfaa0d7d..dd1a4c509 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineTaskEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineTaskEventListener.java @@ -57,8 +57,8 @@ public class EngineTaskEventListener implements TaskListener { default: } }); - stopWatch.stop(); log.info("StopWatch '" + stopWatch.currentTaskName() + "': running time = " + stopWatch.getTotalTimeSeconds() + " s"); + stopWatch.stop(); } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/AbstractBpmnEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/AbstractBpmnEventListener.java index 9dcbc7595..63399b2ec 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/AbstractBpmnEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/AbstractBpmnEventListener.java @@ -7,7 +7,7 @@ import org.slf4j.MDC; import static cn.azxo.framework.common.constatns.Constants.CTX_LOG_ID_MDC; /** - * TODO + * 操作上下文的抽象实现 * * @author wangli * @since 2024/4/9 14:21 diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java index 7b7ed73ea..2c65e9038 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java @@ -4,6 +4,7 @@ import cn.axzo.workflow.core.engine.event.ExtTaskInstCreateEvent; import cn.axzo.workflow.core.engine.event.ExtTaskInstEvent; import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent; import cn.axzo.workflow.core.engine.event.ReceiveTaskEventType; +import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler; import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; import cn.hutool.json.JSONUtil; @@ -12,7 +13,10 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener; import org.flowable.common.engine.api.delegate.event.FlowableEvent; -import org.springframework.scheduling.annotation.Async; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.job.service.JobService; +import org.flowable.job.service.impl.persistence.entity.JobEntity; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -41,11 +45,10 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener { .add(UPDATE) .build(); - @Async @Override public void onEvent(FlowableEvent flowableEvent) { if (flowableEvent instanceof ExtTaskInstEvent) { - log.info("Ext Task Inst Event : {}", JSONUtil.toJsonStr(flowableEvent)); + log.info("Ext Task Inst Event : {}, eventType: {}", JSONUtil.toJsonStr(flowableEvent), flowableEvent.getType()); ExtTaskInstEvent event = (ExtTaskInstEvent) flowableEvent; ReceiveTaskEventType eventType = (ReceiveTaskEventType) flowableEvent.getType(); if (SUPPORT_EVENTS.contains(eventType)) { @@ -80,9 +83,52 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener { } private void updateExtTaskInst(ExtTaskInstUpdateEvent event) { - extAxHiTaskInstService.updateByTaskIdAndInstanceId(event.getTaskId(), - event.getProcessInstanceId(), - event.getAssignee(), - event.getResultEnum()); +// externalJob(event); + // 利用引擎异步任务的重试能力, 去正确更新扩展任务表的状态 + createAsyncJob(event); } + + private void createAsyncJob(ExtTaskInstUpdateEvent event) { + ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(); + processEngineConfiguration.getCommandExecutor().execute(commandContext -> { + JobService jobService = CommandContextUtil.getJobService(commandContext); + JobEntity job = jobService.createJob(); +// job.setExecutionId(event.getExecutionId()); + job.setProcessInstanceId(event.getProcessInstanceId()); + job.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE); + + // 携带自定义的数据 + job.setCustomValues(JSONUtil.toJsonStr(event)); + + // 创建异步任务并调度 + jobService.createAsyncJob(job, false); + jobService.scheduleAsyncJob(job); + return null; + }); + } + + /*private void externalJob(ExtTaskInstUpdateEvent event) { + CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor(); + ManagementService managementService = processEngineConfiguration.getManagementService(); + commandExecutor.execute((Command) commandContext -> { + JobService jobService = CommandContextUtil.getJobService(commandContext); + JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration(); + ExternalWorkerJobEntity workerJob = jobService.createExternalWorkerJob(); + workerJob.setJobHandlerConfiguration("extHiTaskInst"); + workerJob.setJobType(Job.JOB_TYPE_EXTERNAL_WORKER); + workerJob.setRetries(jobServiceConfiguration.getAsyncExecutorNumberOfRetries()); + workerJob.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE); + workerJob.setProcessInstanceId(event.getProcessInstanceId()); + workerJob.setCustomValues(JSONUtil.toJsonStr(event)); + jobService.insertExternalWorkerJob(workerJob); + return null; + }); + + List acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder() + .topic("extHiTaskInst", Duration.ofMinutes(30)) + .acquireAndLock(3, "testWorker"); + acquiredJobs.forEach(i -> { + managementService.createExternalWorkerCompletionBuilder(i.getId(), "testWorker").complete(); + }); + }*/ } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/repository/mapper/CommonMapper.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/repository/mapper/CommonMapper.java index ddb73069d..e433da08c 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/repository/mapper/CommonMapper.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/repository/mapper/CommonMapper.java @@ -7,7 +7,7 @@ import java.util.List; import java.util.Map; /** - * TODO + * 适用于通用的操作 mapper * * @author wangli * @since 2024/4/28 14:44 diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java index 699e97c53..44575776a 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java @@ -268,6 +268,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener