diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/code/AsyncJobRespCode.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/code/AsyncJobRespCode.java new file mode 100644 index 000000000..97e0fedff --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/code/AsyncJobRespCode.java @@ -0,0 +1,39 @@ +package cn.axzo.workflow.core.common.code; + +import cn.axzo.framework.domain.web.code.IModuleRespCode; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 异步任务相关响应码 + * + * @author wangli + * @since 2024/4/29 20:30 + */ +@Getter +@AllArgsConstructor +public enum AsyncJobRespCode implements IModuleRespCode { + DATA_NOT_EXISTS("001", "ExtTaskInst 数据不存在, instId: {}, taskId: {}"), + ; + private String code; + private String message; + + @Override + public String getModuleCode() { + return "11"; + } + + @Override + public String getProjectCode() { + return "998"; + } + + public void setCode(String code) { + this.code = code; + } + + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java index d8d9a7131..a4ea13dbb 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java @@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf; import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory; import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator; import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler; +import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler; import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler; import cn.axzo.workflow.core.engine.job.exception.handler.CustomAsyncRunnableExecutionExceptionHandler; import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager; @@ -49,6 +50,7 @@ public class FlowableConfiguration { // 自定义的异步任务处理器 configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler()); configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService)); + configuration.addCustomJobHandler(new AsyncExtTaskInstJobHandler(extAxHiTaskInstService)); // 异步任务异常重试时间间隔 configuration.setDefaultFailedJobWaitTime(30); configuration.setAsyncFailedJobWaitTime(30); diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java index dcb420a30..ee9713065 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ExtTaskInstEvent.java @@ -2,6 +2,7 @@ package cn.axzo.workflow.core.engine.event; import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum; import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEventType; /** * 接收任务的扩展任务记录表创建的事件 @@ -10,6 +11,8 @@ import org.flowable.common.engine.api.delegate.event.FlowableEvent; * @since 2024/2/5 18:26 */ public interface ExtTaskInstEvent extends FlowableEvent { + @Override + FlowableEventType getType(); String getProcessInstanceId(); diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncExtTaskInstJobHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncExtTaskInstJobHandler.java new file mode 100644 index 000000000..e509e85ee --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncExtTaskInstJobHandler.java @@ -0,0 +1,58 @@ +package cn.axzo.workflow.core.engine.job; + +import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum; +import cn.axzo.workflow.core.common.exception.WorkflowEngineException; +import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent; +import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; +import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; +import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.job.service.JobHandler; +import org.flowable.job.service.impl.persistence.entity.JobEntity; +import org.flowable.variable.api.delegate.VariableScope; + +import java.util.Objects; + +import static cn.axzo.workflow.core.common.code.AsyncJobRespCode.DATA_NOT_EXISTS; + +/** + * 异步操作扩展任务表的处理器 + * + * @author wangli + * @since 2024/4/29 20:22 + */ +@Slf4j +public class AsyncExtTaskInstJobHandler implements JobHandler { + public static final String TYPE = "async-update-ext-task-inst"; + private final ExtAxHiTaskInstService extAxHiTaskInstService; + + public AsyncExtTaskInstJobHandler(ExtAxHiTaskInstService extAxHiTaskInstService) { + this.extAxHiTaskInstService = extAxHiTaskInstService; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { + ExtTaskInstUpdateEvent event = JSONUtil.toBean(job.getCustomValues(), ExtTaskInstUpdateEvent.class); + String taskId = event.getTaskId(); + String processInstanceId = event.getProcessInstanceId(); + String assignee = event.getAssignee(); + BpmnProcessInstanceResultEnum resultEnum = event.getResultEnum(); + + ExtAxHiTaskInst existence = extAxHiTaskInstService.getByTaskId(taskId, processInstanceId); + if (Objects.nonNull(existence)) { + log.info("更新扩展任务实例表数据: taskId:{}, instanceId:{}, currentStatus:{}, anticipateStatus: {}", + taskId, processInstanceId, existence.getStatus(), resultEnum.getStatus()); + } else { + log.warn("无法正确更新扩展任务表数据: taskId:{}, instanceId:{}, anticipateStatus: {}", + taskId, processInstanceId, resultEnum.getStatus()); + throw new WorkflowEngineException(DATA_NOT_EXISTS, processInstanceId, taskId); + } + extAxHiTaskInstService.updateByTaskIdAndInstanceId(taskId, processInstanceId, assignee, resultEnum); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java index 7b7ed73ea..4b5f35abb 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/impl/ExtTaskInstEventListener.java @@ -4,6 +4,7 @@ import cn.axzo.workflow.core.engine.event.ExtTaskInstCreateEvent; import cn.axzo.workflow.core.engine.event.ExtTaskInstEvent; import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent; import cn.axzo.workflow.core.engine.event.ReceiveTaskEventType; +import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler; import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst; import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; import cn.hutool.json.JSONUtil; @@ -12,10 +13,21 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener; import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.impl.interceptor.Command; +import org.flowable.common.engine.impl.interceptor.CommandExecutor; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.job.api.Job; +import org.flowable.job.service.JobService; +import org.flowable.job.service.JobServiceConfiguration; +import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor; +import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity; +import org.flowable.spring.SpringProcessEngineConfiguration; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; +import java.util.Calendar; +import java.util.GregorianCalendar; import java.util.Set; import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.CREATE; @@ -34,6 +46,7 @@ import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE; @Component @Slf4j public class ExtTaskInstEventListener extends AbstractFlowableEventListener { + private final SpringProcessEngineConfiguration processEngineConfiguration; private final ExtAxHiTaskInstService extAxHiTaskInstService; public static final Set SUPPORT_EVENTS = ImmutableSet.builder() @@ -45,7 +58,7 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener { @Override public void onEvent(FlowableEvent flowableEvent) { if (flowableEvent instanceof ExtTaskInstEvent) { - log.info("Ext Task Inst Event : {}", JSONUtil.toJsonStr(flowableEvent)); + log.info("Ext Task Inst Event : {}, eventType: {}", JSONUtil.toJsonStr(flowableEvent), flowableEvent.getType()); ExtTaskInstEvent event = (ExtTaskInstEvent) flowableEvent; ReceiveTaskEventType eventType = (ReceiveTaskEventType) flowableEvent.getType(); if (SUPPORT_EVENTS.contains(eventType)) { @@ -80,9 +93,27 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener { } private void updateExtTaskInst(ExtTaskInstUpdateEvent event) { - extAxHiTaskInstService.updateByTaskIdAndInstanceId(event.getTaskId(), - event.getProcessInstanceId(), - event.getAssignee(), - event.getResultEnum()); + CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor(); + commandExecutor.execute((Command) commandContext -> { + JobService jobService = CommandContextUtil.getJobService(commandContext); + JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration(); + ExternalWorkerJobEntity workerJob = jobService.createExternalWorkerJob(); + workerJob.setJobType(Job.JOB_TYPE_EXTERNAL_WORKER); + workerJob.setRetries(jobServiceConfiguration.getAsyncExecutorNumberOfRetries()); + workerJob.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE); + workerJob.setProcessInstanceId(event.getProcessInstanceId()); + + GregorianCalendar gregorianCalendar = new GregorianCalendar(); + gregorianCalendar.setTime(jobServiceConfiguration.getClock().getCurrentTime()); + AsyncExecutor asyncExecutor = jobServiceConfiguration.getAsyncExecutor(); + gregorianCalendar.add(Calendar.MILLISECOND, asyncExecutor.getAsyncJobLockTimeInMillis()); + workerJob.setLockExpirationTime(gregorianCalendar.getTime()); + workerJob.setLockOwner(asyncExecutor.getLockOwner()); + + workerJob.setCustomValues(JSONUtil.toJsonStr(event)); + jobService.insertExternalWorkerJob(workerJob); + return null; + }); + } } diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java index 699e97c53..44575776a 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/task/AutoOperatorEvent_101_Listener.java @@ -268,6 +268,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener