update - 工作流的主要动作添加异步接口支持

This commit is contained in:
wangli 2024-04-16 11:30:41 +08:00
parent 1201a48498
commit 8f5368494d
8 changed files with 235 additions and 68 deletions

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf;
import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import com.google.common.collect.Lists;
@ -31,7 +32,8 @@ public class FlowableConfiguration {
public EngineConfigurationConfigurer<SpringProcessEngineConfiguration> processEngineConfigurer(
ObjectProvider<FlowableEventListener> listeners,
CustomActivityBehaviorFactory customActivityBehaviorFactory,
StringRedisTemplate stringRedisTemplate) {
StringRedisTemplate stringRedisTemplate,
ExtAxHiTaskInstService extAxHiTaskInstService) {
return configuration -> {
configuration.setEnableHistoricTaskLogging(true);
configuration.setHistoryLevel(HistoryLevel.AUDIT);
@ -44,6 +46,7 @@ public class FlowableConfiguration {
configuration.setIdGenerator(new DistributedTimeBasedIdGenerator(stringRedisTemplate));
configuration.setHistoricProcessInstanceDataManager(new CustomMybatisHistoricProcessInstanceDataManager(configuration));
configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler());
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
};
}
@ -58,4 +61,5 @@ public class FlowableConfiguration {
customActivityBehaviorFactory.setHiTaskInstService(hiTaskInstService);
return customActivityBehaviorFactory;
}
}

View File

@ -0,0 +1,81 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.hutool.json.JSONUtil;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.TaskService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.api.Task;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.flowable.task.api.history.HistoricTaskInstanceQuery;
import org.flowable.task.service.impl.persistence.entity.TaskEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Objects;
import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask;
/**
* 自定义(异步)审批通过任务的命令器实现
*
* @author wangli
* @since 2024/1/4 15:50
*/
public class CustomApproveTaskAsyncCmd implements Command<Void>, Serializable {
private static final Logger log = LoggerFactory.getLogger(CustomApproveTaskAsyncCmd.class);
private final BpmnTaskAuditDTO dto;
public CustomApproveTaskAsyncCmd(BpmnTaskAuditDTO dto) {
this.dto = dto;
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoricTaskInstanceQuery taskQuery =
processEngineConfiguration.getHistoryService().createHistoricTaskInstanceQuery();
TaskService taskService = processEngineConfiguration.getTaskService();
HistoricTaskInstance historicTaskInstance = taskQuery.taskId(dto.getTaskId()).singleResult();
Task task = taskService.createTaskQuery().taskId(dto.getTaskId()).singleResult();
if (Objects.isNull(task)) {
log.info("任务不存在: {}", dto.getTaskId());
}
validTask(historicTaskInstance, (TaskEntity) task, dto.getApprover());
startAsync(processEngineConfiguration, task);
return null;
}
private void startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, Task task) {
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
JobEntity job = jobService.createJob();
// 这里的 executionId 可为 null
job.setExecutionId(task.getExecutionId());
job.setProcessInstanceId(task.getProcessInstanceId());
job.setProcessDefinitionId(task.getProcessDefinitionId());
job.setElementId(task.getTaskDefinitionKey());
job.setElementName(task.getName());
job.setJobHandlerType(AsyncApproveTaskJobHandler.TYPE);
job.setTenantId(task.getTenantId());
// 携带自定义的数据
job.setCustomValues(JSONUtil.toJsonStr(dto));
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
}
}

View File

@ -1,8 +1,8 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.task.AttachmentDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import org.flowable.common.engine.impl.identity.Authentication;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
@ -10,8 +10,6 @@ import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.api.Task;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.flowable.task.api.history.HistoricTaskInstanceQuery;
@ -34,7 +32,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.batchAddA
import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask;
/**
* 自定义审批通过任务的命令器实现
* 自定义(同步)审批通过任务的命令器实现
*
* @author wangli
* @since 2024/1/4 15:50
@ -57,17 +55,15 @@ public class CustomApproveTaskCmd implements Command<Void>, Serializable {
* 下级节点的审批,可为空
*/
private final BpmnTaskDelegateAssigner nextApprover;
private final Boolean async;
public CustomApproveTaskCmd(String taskId, String advice, List<AttachmentDTO> attachmentList,
BpmnTaskDelegateAssigner approver, BpmnTaskDelegateAssigner nextApprover, Boolean async) {
this.taskId = taskId;
this.advice = advice;
this.attachmentList = attachmentList;
this.approver = approver;
this.nextApprover = nextApprover;
this.async = async;
public CustomApproveTaskCmd(BpmnTaskAuditDTO dto) {
this.taskId = dto.getTaskId();
this.advice = dto.getAdvice();
this.attachmentList = dto.getAttachmentList();
this.approver = dto.getApprover();
this.nextApprover = dto.getNextApprover();
}
public CustomApproveTaskCmd(String taskId, String advice, String operationDesc, List<AttachmentDTO> attachmentList,
BpmnTaskDelegateAssigner approver, BpmnTaskDelegateAssigner nextApprover) {
this.taskId = taskId;
@ -76,7 +72,6 @@ public class CustomApproveTaskCmd implements Command<Void>, Serializable {
this.attachmentList = attachmentList;
this.approver = approver;
this.nextApprover = nextApprover;
this.async = false;
}
@Override
@ -115,12 +110,7 @@ public class CustomApproveTaskCmd implements Command<Void>, Serializable {
}
((TaskEntity) task).setTransientVariable(TASK_COMPLETE_OPERATION_TYPE + taskId, APPROVED.getStatus());
// TODO 这个范围需要扩大到整个 Command, 而不是在这里, 否则此前的代码逻辑会持久化数据库.
if (async) {
executeAsynchronous(commandContext, runtimeService, task);
} else {
executeSynchronous(task, taskService, runtimeService);
}
executeSynchronous(task, taskService, runtimeService);
return null;
}
@ -134,27 +124,4 @@ public class CustomApproveTaskCmd implements Command<Void>, Serializable {
}
}
private void executeAsynchronous(CommandContext commandContext, RuntimeService runtimeService, Task task) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
JobEntity job = jobService.createJob();
// 这里的 executionId 可为 null
job.setExecutionId(task.getExecutionId());
job.setProcessInstanceId(task.getProcessInstanceId());
job.setProcessDefinitionId(task.getProcessDefinitionId());
job.setElementId(task.getTaskDefinitionKey());
job.setElementName(task.getName());
job.setJobHandlerType(AsyncApproveTaskJobHandler.TYPE);
job.setTenantId(task.getTenantId());
// 携带自定义的数据
job.setCustomValues(task.getId());
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
}
}

View File

@ -0,0 +1,72 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
import cn.hutool.json.JSONUtil;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.TaskService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.api.Task;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.flowable.task.api.history.HistoricTaskInstanceQuery;
import org.flowable.task.service.impl.persistence.entity.TaskEntity;
import java.io.Serializable;
import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask;
/**
* 自定义驳回任务的命令实现
*
* @author wangli
* @since 2024/1/4 13:36
*/
public class CustomRejectionTaskAsyncCmd implements Command<Void>, Serializable {
private final BpmnTaskAuditDTO dto;
public CustomRejectionTaskAsyncCmd(BpmnTaskAuditDTO dto) {
this.dto = dto;
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
HistoricTaskInstanceQuery taskQuery =
processEngineConfiguration.getHistoryService().createHistoricTaskInstanceQuery();
HistoricTaskInstance historicTaskInstance = taskQuery.taskId(dto.getTaskId()).singleResult();
TaskService taskService = processEngineConfiguration.getTaskService();
Task task = taskService.createTaskQuery().taskId(dto.getTaskId()).singleResult();
validTask(historicTaskInstance, (TaskEntity) task, dto.getApprover());
startAsync(processEngineConfiguration, task);
return null;
}
private void startAsync(ProcessEngineConfigurationImpl processEngineConfiguration, Task task) {
JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
JobEntity job = jobService.createJob();
// 这里的 executionId 可为 null
job.setExecutionId(task.getExecutionId());
job.setProcessInstanceId(task.getProcessInstanceId());
job.setProcessDefinitionId(task.getProcessDefinitionId());
job.setElementId(task.getTaskDefinitionKey());
job.setElementName(task.getName());
job.setJobHandlerType(AsyncRejectTaskJobHandler.TYPE);
job.setTenantId(task.getTenantId());
// 携带自定义的数据
job.setCustomValues(JSONUtil.toJsonStr(dto));
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
}
}

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.task.AttachmentDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.engine.operation.DeleteProcessInstanceOperation;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
@ -48,12 +49,11 @@ public class CustomRejectionTaskCmd implements Command<Void>, Serializable {
private final BpmnTaskDelegateAssigner approver;
private final ExtAxHiTaskInstService extAxHiTaskInstService;
public CustomRejectionTaskCmd(String taskId, String advice, List<AttachmentDTO> attachmentList,
BpmnTaskDelegateAssigner approver, ExtAxHiTaskInstService extAxHiTaskInstService) {
this.taskId = taskId;
this.advice = advice;
this.attachmentList = attachmentList;
this.approver = approver;
public CustomRejectionTaskCmd(BpmnTaskAuditDTO dto, ExtAxHiTaskInstService extAxHiTaskInstService) {
this.taskId = dto.getTaskId();
this.advice = dto.getAdvice();
this.attachmentList = dto.getAttachmentList();
this.approver = dto.getApprover();
this.extAxHiTaskInstService = extAxHiTaskInstService;
}

View File

@ -1,19 +1,18 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.core.engine.cmd.CustomApproveTaskCmd;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
import java.util.Objects;
/**
* TODO
* 异步的通过任务的处理器
*
* @author wangli
* @since 2024/4/15 22:41
@ -32,14 +31,7 @@ public class AsyncApproveTaskJobHandler implements JobHandler {
log.info("AsyncApproveTaskJobHandler executing...");
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
TaskService taskService = processEngineConfiguration.getTaskService();
if (Objects.nonNull(variableScope)) {
// 正常完成流程任务审批通过
taskService.complete(job.getCustomValues(), runtimeService.getVariables(job.getExecutionId()));
} else {
// 特殊的完成单独创建的任务
taskService.complete(job.getCustomValues());
}
BpmnTaskAuditDTO dto = JSONUtil.toBean(job.getCustomValues(), BpmnTaskAuditDTO.class);
processEngineConfiguration.getCommandExecutor().execute(new CustomApproveTaskCmd(dto));
}
}

View File

@ -0,0 +1,43 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.core.engine.cmd.CustomRejectionTaskCmd;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
/**
* 异步的拒绝任务的处理器
*
* @author wangli
* @since 2024/4/16 11:11
*/
@Slf4j
public class AsyncRejectTaskJobHandler implements JobHandler {
public static final String TYPE = "async-reject-task";
private final ExtAxHiTaskInstService extAxHiTaskInstService;
public AsyncRejectTaskJobHandler(ExtAxHiTaskInstService extAxHiTaskInstService) {
this.extAxHiTaskInstService = extAxHiTaskInstService;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.info("AsyncRejectTaskJobHandler executing...");
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
BpmnTaskAuditDTO dto = JSONUtil.toBean(job.getCustomValues(), BpmnTaskAuditDTO.class);
processEngineConfiguration.getCommandExecutor().execute(new CustomRejectionTaskCmd(dto, extAxHiTaskInstService));
}
}

View File

@ -21,11 +21,13 @@ import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskInstanceVO;
import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskTodoPageItemVO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.engine.cmd.CustomApproveTaskAsyncCmd;
import cn.axzo.workflow.core.engine.cmd.CustomApproveTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomCommentTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomCompleteDummyTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomCountersignUserTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomCreateDummyTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomRejectionTaskAsyncCmd;
import cn.axzo.workflow.core.engine.cmd.CustomRejectionTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomTransferUserTaskCmd;
import cn.axzo.workflow.core.engine.event.MessagePushEventBuilder;
@ -290,16 +292,22 @@ public class BpmnProcessTaskServiceImpl implements BpmnProcessTaskService {
@Transactional(rollbackFor = Exception.class)
public void approveTask(BpmnTaskAuditDTO dto) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomApproveTaskCmd(dto.getTaskId(), dto.getAdvice(), dto.getAttachmentList(),
dto.getApprover(), dto.getNextApprover(), dto.getAsync()));
if (dto.getAsync()) {
commandExecutor.execute(new CustomApproveTaskAsyncCmd(dto));
} else {
commandExecutor.execute(new CustomApproveTaskCmd(dto));
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void rejectTask(BpmnTaskAuditDTO dto) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomRejectionTaskCmd(dto.getTaskId(), dto.getAdvice(), dto.getAttachmentList(),
dto.getApprover(), extAxHiTaskInstService));
if (dto.getAsync()) {
commandExecutor.execute(new CustomRejectionTaskAsyncCmd(dto));
} else {
commandExecutor.execute(new CustomRejectionTaskCmd(dto, extAxHiTaskInstService));
}
}
@Override