fix - 利用引擎 AsyncCmd 的能力解决通过监听事务状态不能正确发送 MQ 的问题

This commit is contained in:
wangli 2024-08-08 18:08:20 +08:00
parent 248ce361d1
commit 37ed9a594d
4 changed files with 24 additions and 18 deletions

View File

@ -4,7 +4,6 @@ import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.TaskService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
@ -19,8 +18,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask;
@ -31,7 +28,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask
* @author wangli
* @since 2024/1/4 15:50
*/
public class CustomApproveTaskAsyncCmd extends AbstractCommand<Void> implements Serializable {
public class CustomApproveTaskAsyncCmd extends AbstractCommand<String> implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CustomApproveTaskAsyncCmd.class);
private final BpmnTaskAuditDTO dto;
@ -46,7 +43,7 @@ public class CustomApproveTaskAsyncCmd extends AbstractCommand<Void> implements
}
@Override
public Void execute(CommandContext commandContext) {
public String execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoricTaskInstanceQuery taskQuery =
@ -61,11 +58,10 @@ public class CustomApproveTaskAsyncCmd extends AbstractCommand<Void> implements
}
validTask(historicTaskInstance, (TaskEntity) task, dto.getApprover(), null);
startAsync(processEngineConfiguration, task);
return null;
return startAsync(processEngineConfiguration, task);
}
private void startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, Task task) {
private String startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, Task task) {
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
JobEntity job = jobService.createJob();
@ -84,6 +80,7 @@ public class CustomApproveTaskAsyncCmd extends AbstractCommand<Void> implements
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
return job.getId();
}
}

View File

@ -8,10 +8,14 @@ import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionListener;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.engine.RuntimeService;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.asyncexecutor.ResetExpiredJobsCmd;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.task.service.delegate.DelegateTask;
import java.util.Collections;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELATION_ASSIGNEE_INFO;
/**
@ -33,7 +37,7 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_TASK_RELAT
@Slf4j
@AllArgsConstructor
public class AutoPassTransactionListener implements TransactionListener {
private final ProcessEngineConfiguration processEngineConfiguration;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
private final DelegateTask delegateTask;
private final String advice;
@ -41,17 +45,20 @@ public class AutoPassTransactionListener implements TransactionListener {
public void execute(CommandContext commandContext) {
log.info("exec auto pass transaction listener start, processInstanceId: {}, taskId: {}", delegateTask.getProcessInstanceId(), delegateTask.getId());
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
RuntimeService runtimeService = springProcessEngineConfiguration.getRuntimeService();
BpmnTaskDelegateAssigner assigner = BpmnTaskDelegateAssigner.toObjectCompatible(
runtimeService.getVariable(delegateTask.getProcessInstanceId(), INTERNAL_TASK_RELATION_ASSIGNEE_INFO + delegateTask.getId()));
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
CommandExecutor commandExecutor = springProcessEngineConfiguration.getCommandExecutor();
BpmnTaskAuditDTO pass = new BpmnTaskAuditDTO();
pass.setTaskId(delegateTask.getId());
pass.setAdvice(advice);
pass.setApprover(assigner);
pass.setOperationDesc("自动通过");
commandExecutor.execute(new CustomApproveTaskAsyncCmd(pass));
String jobId = commandExecutor.execute(new CustomApproveTaskAsyncCmd(pass));
JobServiceConfiguration jobServiceConfiguration = springProcessEngineConfiguration.getJobServiceConfiguration();
commandExecutor.execute(new ResetExpiredJobsCmd(Collections.singletonList(jobId), jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration));
log.info("exec auto pass transaction listener end");
}

View File

@ -9,7 +9,7 @@ import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.TransactionListener;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.task.service.delegate.DelegateTask;
/**
@ -26,7 +26,7 @@ import org.flowable.task.service.delegate.DelegateTask;
@Slf4j
@AllArgsConstructor
public class AutoRejectTransactionListener implements TransactionListener {
private final ProcessEngineConfiguration processEngineConfiguration;
private final SpringProcessEngineConfiguration springProcessEngineConfiguration;
private final DelegateTask delegateTask;
private final ExtAxHiTaskInstService extAxHiTaskInstService;
@ -35,7 +35,7 @@ public class AutoRejectTransactionListener implements TransactionListener {
log.info("exec auto reject transaction listener start, processInstanceId: {}, taskId: {}",
delegateTask.getProcessInstanceId(), delegateTask.getId());
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
CommandExecutor commandExecutor = springProcessEngineConfiguration.getCommandExecutor();
BpmnTaskAuditDTO reject = new BpmnTaskAuditDTO();
reject.setTaskId(delegateTask.getId());
reject.setApprover(new BpmnTaskDelegateAssigner("系统", "system", delegateTask.getTenantId()));

View File

@ -28,6 +28,7 @@ import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.context.annotation.Scope;
import org.springframework.core.Ordered;
@ -63,6 +64,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<Ta
private final RuntimeService runtimeService;
private final RepositoryService repositoryService;
private final ExtAxHiTaskInstService extAxHiTaskInstService;
private final SpringProcessEngineConfiguration processEngineConfiguration;
@Override
public void onCreated(DelegateTask delegateTask) {
@ -188,7 +190,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<Ta
*/
private void autoReject(DelegateTask delegateTask, TransactionState transactionState) {
Context.getTransactionContext().addTransactionListener(transactionState,
new AutoRejectTransactionListener(CommandContextUtil.getProcessEngineConfiguration(), delegateTask, extAxHiTaskInstService));
new AutoRejectTransactionListener(processEngineConfiguration, delegateTask, extAxHiTaskInstService));
}
/**
@ -201,7 +203,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<Ta
*/
private void autoPass(DelegateTask delegateTask, TransactionState transactionState, @Nullable String advice) {
Context.getTransactionContext().addTransactionListener(transactionState,
new AutoPassTransactionListener(CommandContextUtil.getProcessEngineConfiguration(), delegateTask, advice));
new AutoPassTransactionListener(processEngineConfiguration, delegateTask, advice));
}
/**