feat(REQ-2616) - 新增业务接口定时触发流转的功能

This commit is contained in:
wangli 2024-08-16 18:36:12 +08:00
parent 344ebd149e
commit 4a21d256dc
11 changed files with 246 additions and 2 deletions

View File

@ -1,10 +1,10 @@
package cn.axzo.workflow.client.feign.bpmn;
import cn.axzo.workflow.client.config.CommonFeignConfiguration;
import cn.axzo.workflow.common.annotation.Manageable;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.azxo.framework.common.model.CommonResponse;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@ -44,4 +44,14 @@ public interface ProcessActivityApi {
@PostMapping("/api/process/activity/assignee/set")
@Operation(summary = "业务节点设置审批人,不支持重复调用设置审批人,需一次性传入所有审批人")
CommonResponse<Boolean> setAssignee(@Validated @RequestBody BpmnActivitySetAssigneeDTO dto);
/**
* 该功能应该利用引擎的 TimerBoundaryEvent 来实现但为了简便先利用引擎的任务调度来实现
*
* @return
*/
@PostMapping("/api/process/activity/timeout")
@Manageable
@Operation(summary = "设置当前业务节点在什么时间继续往下执行")
CommonResponse<Boolean> setTimeout(@Validated @RequestBody BpmnActivityTimeoutDTO dto);
}

View File

@ -0,0 +1,30 @@
package cn.axzo.workflow.common.model.request.bpmn.activity;
import io.swagger.annotations.ApiModel;
import lombok.Data;
import javax.validation.constraints.NotBlank;
/**
* 为指定业务节点设置倒计时在啥时候继续往下流转
*
* @author wangli
* @since 2024-08-16 15:30
*/
@ApiModel("为指定业务节点设置倒计时")
@Data
public class BpmnActivityTimeoutDTO {
/**
* 业务节点的触发 ID
*/
@NotBlank(message = "触发 ID 不能为空")
private String triggerId;
/**
* 触发往下流转的时间点 格式yyyy-MM-dd HH:mm:ss
* <p>
* 应该为 Date 类型但为了确保特殊情况需要通过 pod 内直接触发
*/
@NotBlank(message = "触发时间不能为空格式yyyy-MM-ddTHH:mm:ssZ")
private String endTime;
}

View File

@ -11,6 +11,7 @@ import cn.axzo.workflow.common.model.request.bpmn.BpmnNoticeConf;
import cn.axzo.workflow.common.model.request.bpmn.model.BpmnModelCreateDTO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.converter.json.AbstractBpmnJsonConverter;
import cn.axzo.workflow.core.converter.json.BoundaryEventJsonConverter;
import cn.axzo.workflow.core.converter.json.EndEventJsonConverter;
import cn.axzo.workflow.core.converter.json.ExclusiveGatewayJsonConverter;
import cn.axzo.workflow.core.converter.json.NotSupportConverter;
@ -27,6 +28,7 @@ import com.google.common.collect.Lists;
import org.flowable.bpmn.BpmnAutoLayout;
import org.flowable.bpmn.converter.BpmnXMLConverter;
import org.flowable.bpmn.model.BaseElement;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.ExclusiveGateway;
@ -142,6 +144,7 @@ public final class BpmnJsonConverterUtil {
CONVERTERS.put(UserTask.class, new UserTaskJsonConverter());
CONVERTERS.put(ServiceTask.class, new ServiceTaskJsonConverter());
CONVERTERS.put(ReceiveTask.class, new ReceiveTaskJsonConverter());
CONVERTERS.put(BoundaryEvent.class, new BoundaryEventJsonConverter());
}
/**

View File

@ -6,6 +6,7 @@ import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory;
import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator;
import cn.axzo.workflow.core.engine.interceptor.CustomRetryInterceptor;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncCancelProcessInstanceHandler;
@ -89,6 +90,7 @@ public class FlowableConfiguration {
configuration.addCustomJobHandler(new AsyncCancelProcessInstanceHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncAbortProcessInstanceHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncBpmnProcessActivityJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler());
// 异步任务异常重试时间间隔
configuration.setDefaultFailedJobWaitTime(30);
configuration.setAsyncFailedJobWaitTime(30);

View File

@ -0,0 +1,57 @@
package cn.axzo.workflow.core.converter.json;
import cn.axzo.workflow.common.model.request.bpmn.BpmnJsonNode;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.EventDefinition;
import org.flowable.bpmn.model.FlowableListener;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.engine.delegate.BaseExecutionListener;
import java.util.ArrayList;
import java.util.List;
import static org.flowable.bpmn.model.ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION;
/**
* 边界事件
*
* @author wangli
* @since 2024-08-16 11:47
*/
public class BoundaryEventJsonConverter extends AbstractBpmnJsonConverter<BoundaryEvent> {
@Override
public BoundaryEvent convertJsonToElement(BpmnJsonNode node, Process process) {
BoundaryEvent boundaryEvent = new BoundaryEvent();
boundaryEvent.setId(node.getId() + "_boundaryEvent");
boundaryEvent.setCancelActivity(false);
boundaryEvent.setAttachedToRefId(process.getId());
// 设置执行监听
setExecutionListeners(boundaryEvent);
// 设置 TimerEventDefinition
setTimerEventDefinition(boundaryEvent);
return boundaryEvent;
}
private void setTimerEventDefinition(BoundaryEvent boundaryEvent) {
List<EventDefinition> eventDefinitions = new ArrayList<>();
TimerEventDefinition timerEventDefinition = new TimerEventDefinition();
timerEventDefinition.setEndDate("${timerEndDate}");
eventDefinitions.add(timerEventDefinition);
boundaryEvent.setEventDefinitions(eventDefinitions);
}
private void setExecutionListeners(BoundaryEvent boundaryEvent) {
List<FlowableListener> executionListeners = new ArrayList<>();
// 设置执行监听
FlowableListener executionListener = new FlowableListener();
executionListener.setEvent(BaseExecutionListener.EVENTNAME_START);
executionListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
executionListener.setImplementation("${boundaryEventExecutionStartListener}");
executionListeners.add(executionListener);
boundaryEvent.setExecutionListeners(executionListeners);
}
}

View File

@ -28,6 +28,12 @@ public abstract class AbstractCommand<T> implements Command<T> {
}
}
/**
* 实现该方法默认就捕获了 command 一些引擎内部异常
*
* @param commandContext
* @return
*/
public T executeInternal(CommandContext commandContext) {
return null;
}

View File

@ -0,0 +1,77 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import com.alibaba.fastjson.JSON;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.jobexecutor.TimerEventHandler;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.TimerUtil;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
/**
* 业务节点触发倒计时
*
* @author wangli
* @since 2024-08-16 15:38
*/
public class CustomBusinessNodeTimeoutCmd extends AbstractCommand<Void> implements Serializable {
private static final long serialVersionUID = 1L;
private final String triggerId;
private final String endTime;
public CustomBusinessNodeTimeoutCmd(String triggerId, String endTime) {
this.triggerId = triggerId;
this.endTime = endTime;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("triggerId", triggerId);
params.put("endTime", endTime);
return JSON.toJSONString(params);
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
ExecutionEntity executionEntity = (ExecutionEntity) processEngineConfiguration.getRuntimeService().createExecutionQuery().executionId(triggerId).singleResult();
String newEndTime = parseDateTimeWithTimeZone();
TimerEventDefinition timerEventDefinition = new TimerEventDefinition();
timerEventDefinition.setTimeDate(newEndTime);
TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition, executionEntity.getCurrentFlowElement(),
true, executionEntity, AsyncActivityLeaveJobHandler.TYPE, TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(),
timerEventDefinition.getEndDate(), timerEventDefinition.getCalendarName()));
if (timerJob != null) {
CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob);
}
return null;
}
private String parseDateTimeWithTimeZone() {
DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.parse(endTime, inputFormat);
// 转换为东八区的ZonedDateTime
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.of("Asia/Shanghai"));
// 格式化为包含时区偏移量的ISO 8601字符串
DateTimeFormatter outputFormat = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
return zonedDateTime.format(outputFormat);
}
}

View File

@ -0,0 +1,29 @@
package cn.axzo.workflow.core.engine.job;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
/**
* 业务节点的触发离开的任务处理器
*
* @author wangli
* @since 2024-08-16 16:29
*/
@Slf4j
public class AsyncActivityLeaveJobHandler extends AbstractJobHandler implements JobHandler {
public static final String TYPE = "business-node-leave-task";
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.warn("AsyncActivityLeaveJobHandler exec start...");
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.workflow.core.service;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
@ -45,4 +46,12 @@ public interface BpmnProcessActivityService {
* @param dto
*/
void setAssigneeAsync(BpmnActivitySetAssigneeDTO dto);
/**
* 设置指定业务接口继续往下流转的触发时间
*
* @param dto
* @return
*/
Boolean setTimeout(BpmnActivityTimeoutDTO dto);
}

View File

@ -1,9 +1,11 @@
package cn.axzo.workflow.core.service.impl;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.engine.cmd.CustomAbortProcessInstanceCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBizSpecifyAssigneeToTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutCmd;
import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler;
import cn.axzo.workflow.core.service.BpmnProcessActivityService;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
@ -130,4 +132,11 @@ public class BpmnProcessActivityServiceImpl implements BpmnProcessActivityServic
});
}
@Override
public Boolean setTimeout(BpmnActivityTimeoutDTO dto) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomBusinessNodeTimeoutCmd(dto.getTriggerId(), dto.getEndTime()));
return true;
}
}

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.server.controller.web.bpmn;
import cn.axzo.workflow.client.feign.bpmn.ProcessActivityApi;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
@ -109,4 +110,15 @@ public class BpmnProcessActivityController implements ProcessActivityApi {
}
return CommonResponse.success(true);
}
/**
* 该功能应该利用引擎的 TimerBoundaryEvent 来实现但为了简便先利用引擎的任务调度来实现
*
* @param dto
* @return
*/
@Override
public CommonResponse<Boolean> setTimeout(BpmnActivityTimeoutDTO dto) {
return CommonResponse.success(bpmnProcessActivityService.setTimeout(dto));
}
}