update - 将更新 extHiTaskInst 表数据的逻辑,改成使用引擎的异步处理,结合重试的能力
This commit is contained in:
parent
47f306a4f3
commit
5a631bf6f1
@ -0,0 +1,39 @@
|
|||||||
|
package cn.axzo.workflow.core.common.code;
|
||||||
|
|
||||||
|
import cn.axzo.framework.domain.web.code.IModuleRespCode;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 异步任务相关响应码
|
||||||
|
*
|
||||||
|
* @author wangli
|
||||||
|
* @since 2024/4/29 20:30
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum AsyncJobRespCode implements IModuleRespCode {
|
||||||
|
DATA_NOT_EXISTS("001", "ExtTaskInst 数据不存在, instId: {}, taskId: {}"),
|
||||||
|
;
|
||||||
|
private String code;
|
||||||
|
private String message;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getModuleCode() {
|
||||||
|
return "11";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getProjectCode() {
|
||||||
|
return "998";
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCode(String code) {
|
||||||
|
this.code = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void setMessage(String message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf;
|
|||||||
import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
|
import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
|
||||||
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
|
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
|
||||||
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
|
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
|
||||||
|
import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
|
||||||
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
|
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
|
||||||
import cn.axzo.workflow.core.engine.job.exception.handler.CustomAsyncRunnableExecutionExceptionHandler;
|
import cn.axzo.workflow.core.engine.job.exception.handler.CustomAsyncRunnableExecutionExceptionHandler;
|
||||||
import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager;
|
import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager;
|
||||||
@ -49,6 +50,7 @@ public class FlowableConfiguration {
|
|||||||
// 自定义的异步任务处理器
|
// 自定义的异步任务处理器
|
||||||
configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler());
|
configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler());
|
||||||
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
|
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
|
||||||
|
configuration.addCustomJobHandler(new AsyncExtTaskInstJobHandler(extAxHiTaskInstService));
|
||||||
// 异步任务异常重试时间间隔
|
// 异步任务异常重试时间间隔
|
||||||
configuration.setDefaultFailedJobWaitTime(30);
|
configuration.setDefaultFailedJobWaitTime(30);
|
||||||
configuration.setAsyncFailedJobWaitTime(30);
|
configuration.setAsyncFailedJobWaitTime(30);
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package cn.axzo.workflow.core.engine.event;
|
|||||||
|
|
||||||
import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum;
|
import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum;
|
||||||
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
||||||
|
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 接收任务的扩展任务记录表创建的事件
|
* 接收任务的扩展任务记录表创建的事件
|
||||||
@ -10,6 +11,8 @@ import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
|||||||
* @since 2024/2/5 18:26
|
* @since 2024/2/5 18:26
|
||||||
*/
|
*/
|
||||||
public interface ExtTaskInstEvent extends FlowableEvent {
|
public interface ExtTaskInstEvent extends FlowableEvent {
|
||||||
|
@Override
|
||||||
|
FlowableEventType getType();
|
||||||
|
|
||||||
String getProcessInstanceId();
|
String getProcessInstanceId();
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,58 @@
|
|||||||
|
package cn.axzo.workflow.core.engine.job;
|
||||||
|
|
||||||
|
import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum;
|
||||||
|
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
|
||||||
|
import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent;
|
||||||
|
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||||
|
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.flowable.common.engine.impl.interceptor.CommandContext;
|
||||||
|
import org.flowable.job.service.JobHandler;
|
||||||
|
import org.flowable.job.service.impl.persistence.entity.JobEntity;
|
||||||
|
import org.flowable.variable.api.delegate.VariableScope;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import static cn.axzo.workflow.core.common.code.AsyncJobRespCode.DATA_NOT_EXISTS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 异步操作扩展任务表的处理器
|
||||||
|
*
|
||||||
|
* @author wangli
|
||||||
|
* @since 2024/4/29 20:22
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class AsyncExtTaskInstJobHandler implements JobHandler {
|
||||||
|
public static final String TYPE = "async-update-ext-task-inst";
|
||||||
|
private final ExtAxHiTaskInstService extAxHiTaskInstService;
|
||||||
|
|
||||||
|
public AsyncExtTaskInstJobHandler(ExtAxHiTaskInstService extAxHiTaskInstService) {
|
||||||
|
this.extAxHiTaskInstService = extAxHiTaskInstService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType() {
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
|
||||||
|
ExtTaskInstUpdateEvent event = JSONUtil.toBean(job.getCustomValues(), ExtTaskInstUpdateEvent.class);
|
||||||
|
String taskId = event.getTaskId();
|
||||||
|
String processInstanceId = event.getProcessInstanceId();
|
||||||
|
String assignee = event.getAssignee();
|
||||||
|
BpmnProcessInstanceResultEnum resultEnum = event.getResultEnum();
|
||||||
|
|
||||||
|
ExtAxHiTaskInst existence = extAxHiTaskInstService.getByTaskId(taskId, processInstanceId);
|
||||||
|
if (Objects.nonNull(existence)) {
|
||||||
|
log.info("更新扩展任务实例表数据: taskId:{}, instanceId:{}, currentStatus:{}, anticipateStatus: {}",
|
||||||
|
taskId, processInstanceId, existence.getStatus(), resultEnum.getStatus());
|
||||||
|
} else {
|
||||||
|
log.warn("无法正确更新扩展任务表数据: taskId:{}, instanceId:{}, anticipateStatus: {}",
|
||||||
|
taskId, processInstanceId, resultEnum.getStatus());
|
||||||
|
throw new WorkflowEngineException(DATA_NOT_EXISTS, processInstanceId, taskId);
|
||||||
|
}
|
||||||
|
extAxHiTaskInstService.updateByTaskIdAndInstanceId(taskId, processInstanceId, assignee, resultEnum);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -4,6 +4,7 @@ import cn.axzo.workflow.core.engine.event.ExtTaskInstCreateEvent;
|
|||||||
import cn.axzo.workflow.core.engine.event.ExtTaskInstEvent;
|
import cn.axzo.workflow.core.engine.event.ExtTaskInstEvent;
|
||||||
import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent;
|
import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent;
|
||||||
import cn.axzo.workflow.core.engine.event.ReceiveTaskEventType;
|
import cn.axzo.workflow.core.engine.event.ReceiveTaskEventType;
|
||||||
|
import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
|
||||||
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
|
||||||
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
|
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
@ -12,10 +13,21 @@ import lombok.AllArgsConstructor;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener;
|
import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener;
|
||||||
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
||||||
|
import org.flowable.common.engine.impl.interceptor.Command;
|
||||||
|
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
|
||||||
|
import org.flowable.engine.impl.util.CommandContextUtil;
|
||||||
|
import org.flowable.job.api.Job;
|
||||||
|
import org.flowable.job.service.JobService;
|
||||||
|
import org.flowable.job.service.JobServiceConfiguration;
|
||||||
|
import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor;
|
||||||
|
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity;
|
||||||
|
import org.flowable.spring.SpringProcessEngineConfiguration;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
|
import java.util.Calendar;
|
||||||
|
import java.util.GregorianCalendar;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.CREATE;
|
import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.CREATE;
|
||||||
@ -34,6 +46,7 @@ import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE;
|
|||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
|
public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
|
||||||
|
private final SpringProcessEngineConfiguration processEngineConfiguration;
|
||||||
private final ExtAxHiTaskInstService extAxHiTaskInstService;
|
private final ExtAxHiTaskInstService extAxHiTaskInstService;
|
||||||
public static final Set<ReceiveTaskEventType> SUPPORT_EVENTS =
|
public static final Set<ReceiveTaskEventType> SUPPORT_EVENTS =
|
||||||
ImmutableSet.<ReceiveTaskEventType>builder()
|
ImmutableSet.<ReceiveTaskEventType>builder()
|
||||||
@ -45,7 +58,7 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
|
|||||||
@Override
|
@Override
|
||||||
public void onEvent(FlowableEvent flowableEvent) {
|
public void onEvent(FlowableEvent flowableEvent) {
|
||||||
if (flowableEvent instanceof ExtTaskInstEvent) {
|
if (flowableEvent instanceof ExtTaskInstEvent) {
|
||||||
log.info("Ext Task Inst Event : {}", JSONUtil.toJsonStr(flowableEvent));
|
log.info("Ext Task Inst Event : {}, eventType: {}", JSONUtil.toJsonStr(flowableEvent), flowableEvent.getType());
|
||||||
ExtTaskInstEvent event = (ExtTaskInstEvent) flowableEvent;
|
ExtTaskInstEvent event = (ExtTaskInstEvent) flowableEvent;
|
||||||
ReceiveTaskEventType eventType = (ReceiveTaskEventType) flowableEvent.getType();
|
ReceiveTaskEventType eventType = (ReceiveTaskEventType) flowableEvent.getType();
|
||||||
if (SUPPORT_EVENTS.contains(eventType)) {
|
if (SUPPORT_EVENTS.contains(eventType)) {
|
||||||
@ -80,9 +93,27 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void updateExtTaskInst(ExtTaskInstUpdateEvent event) {
|
private void updateExtTaskInst(ExtTaskInstUpdateEvent event) {
|
||||||
extAxHiTaskInstService.updateByTaskIdAndInstanceId(event.getTaskId(),
|
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
|
||||||
event.getProcessInstanceId(),
|
commandExecutor.execute((Command<Void>) commandContext -> {
|
||||||
event.getAssignee(),
|
JobService jobService = CommandContextUtil.getJobService(commandContext);
|
||||||
event.getResultEnum());
|
JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration();
|
||||||
|
ExternalWorkerJobEntity workerJob = jobService.createExternalWorkerJob();
|
||||||
|
workerJob.setJobType(Job.JOB_TYPE_EXTERNAL_WORKER);
|
||||||
|
workerJob.setRetries(jobServiceConfiguration.getAsyncExecutorNumberOfRetries());
|
||||||
|
workerJob.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE);
|
||||||
|
workerJob.setProcessInstanceId(event.getProcessInstanceId());
|
||||||
|
|
||||||
|
GregorianCalendar gregorianCalendar = new GregorianCalendar();
|
||||||
|
gregorianCalendar.setTime(jobServiceConfiguration.getClock().getCurrentTime());
|
||||||
|
AsyncExecutor asyncExecutor = jobServiceConfiguration.getAsyncExecutor();
|
||||||
|
gregorianCalendar.add(Calendar.MILLISECOND, asyncExecutor.getAsyncJobLockTimeInMillis());
|
||||||
|
workerJob.setLockExpirationTime(gregorianCalendar.getTime());
|
||||||
|
workerJob.setLockOwner(asyncExecutor.getLockOwner());
|
||||||
|
|
||||||
|
workerJob.setCustomValues(JSONUtil.toJsonStr(event));
|
||||||
|
jobService.insertExternalWorkerJob(workerJob);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -268,6 +268,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<Ta
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void autoReject(DelegateTask delegateTask) {
|
private void autoReject(DelegateTask delegateTask) {
|
||||||
|
// TODO 应该用异步任务实现
|
||||||
log.info("AutoOperatorEventListener#autoReject...{}", delegateTask.getTaskDefinitionKey());
|
log.info("AutoOperatorEventListener#autoReject...{}", delegateTask.getTaskDefinitionKey());
|
||||||
taskService.addComment(delegateTask.getId(), delegateTask.getProcessInstanceId(), COMMENT_TYPE_OPERATION_DESC
|
taskService.addComment(delegateTask.getId(), delegateTask.getProcessInstanceId(), COMMENT_TYPE_OPERATION_DESC
|
||||||
, "自动驳回");
|
, "自动驳回");
|
||||||
@ -294,6 +295,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<Ta
|
|||||||
|
|
||||||
|
|
||||||
private void autoPass(DelegateTask delegateTask) {
|
private void autoPass(DelegateTask delegateTask) {
|
||||||
|
// TODO 用异步任务实现
|
||||||
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
|
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
|
||||||
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
|
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
|
||||||
BpmnTaskDelegateAssigner assigner = BpmnTaskDelegateAssigner.toObjectCompatible(
|
BpmnTaskDelegateAssigner assigner = BpmnTaskDelegateAssigner.toObjectCompatible(
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user