feat(REQ-2616) - 新增业务节点定时回调的事件

This commit is contained in:
wangli 2024-08-20 17:52:50 +08:00
parent 4a21d256dc
commit f9573d06bf
17 changed files with 507 additions and 20 deletions

View File

@ -1,7 +1,8 @@
package cn.axzo.workflow.client.feign.bpmn;
import cn.axzo.workflow.common.annotation.Manageable;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.azxo.framework.common.model.CommonResponse;
import io.swagger.v3.oas.annotations.Operation;
@ -50,8 +51,18 @@ public interface ProcessActivityApi {
*
* @return
*/
@PostMapping("/api/process/activity/timeout")
@PostMapping("/api/process/activity/timeout/trigger")
@Manageable
@Operation(summary = "设置当前业务节点在什么时间继续往下执行")
CommonResponse<Boolean> setTimeout(@Validated @RequestBody BpmnActivityTimeoutDTO dto);
@Operation(summary = "设置指定业务节点定时继续往下执行")
CommonResponse<Boolean> setTimeoutTrigger(@Validated @RequestBody BpmnActivityTimeoutTriggerDTO dto);
/**
* 为指定业务节点设置定时回调
*
* @return
*/
@Manageable
@PostMapping("/api/process/activity/timeout/callback")
@Operation(summary = "设置指定业务节点定时回调")
CommonResponse<Boolean> setTimeoutCallback(@Validated @RequestBody BpmnActivityTimeoutCallbackDTO dto);
}

View File

@ -13,6 +13,7 @@ public enum ProcessActivityEventEnum {
PROCESS_ACTIVITY_START("process-activity", "process-activity-start", "流程活动节点已开始"),
PROCESS_ACTIVITY_WAIT_ASSIGNEE("process-activity", "process-activity-wait-assignee", "流程活动节点等待指定审批人"),
PROCESS_ACTIVITY_TAKE("process-activity", "process-activity-take", "该事件类型暂时不存在"),
PROCESS_ACTIVITY_CALLBACK("process-activity", "process-activity-callback", "业务节点定时回调"),
PROCESS_ACTIVITY_END("process-activity", "process-activity-end", "流程活动节点已取消"),
;

View File

@ -0,0 +1,35 @@
package cn.axzo.workflow.common.model.request.bpmn.activity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
/**
* 为指定业务节点设置倒计时回调
*
* @author wangli
* @since 2024-08-16 15:30
*/
@ApiModel("为指定业务节点设置倒计时回调")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BpmnActivityTimeoutCallbackDTO {
/**
* 业务节点的触发 ID
*/
@NotBlank(message = "触发 ID 不能为空")
private String triggerId;
/**
* 触发往下流转的时间点 格式yyyy-MM-dd HH:mm:ss
* <p>
* 应该为 Date 类型但为了确保特殊情况需要通过 pod 内直接触发
*/
@NotBlank(message = "触发时间不能为空格式yyyy-MM-dd HH:mm:ss")
private String endTime;
}

View File

@ -1,7 +1,9 @@
package cn.axzo.workflow.common.model.request.bpmn.activity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
@ -13,7 +15,9 @@ import javax.validation.constraints.NotBlank;
*/
@ApiModel("为指定业务节点设置倒计时")
@Data
public class BpmnActivityTimeoutDTO {
@AllArgsConstructor
@NoArgsConstructor
public class BpmnActivityTimeoutTriggerDTO {
/**
* 业务节点的触发 ID
*/
@ -25,6 +29,6 @@ public class BpmnActivityTimeoutDTO {
* <p>
* 应该为 Date 类型但为了确保特殊情况需要通过 pod 内直接触发
*/
@NotBlank(message = "触发时间不能为空格式yyyy-MM-ddTHH:mm:ssZ")
@NotBlank(message = "触发时间不能为空格式yyyy-MM-dd HH:mm:ss")
private String endTime;
}

View File

@ -6,6 +6,7 @@ import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory;
import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator;
import cn.axzo.workflow.core.engine.interceptor.CustomRetryInterceptor;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler;
@ -90,7 +91,8 @@ public class FlowableConfiguration {
configuration.addCustomJobHandler(new AsyncCancelProcessInstanceHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncAbortProcessInstanceHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncBpmnProcessActivityJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler());
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler());
// 异步任务异常重试时间间隔
configuration.setDefaultFailedJobWaitTime(30);
configuration.setAsyncFailedJobWaitTime(30);

View File

@ -0,0 +1,79 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import com.alibaba.fastjson.JSON;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.jobexecutor.TimerEventHandler;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.TimerUtil;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
/**
* 业务节点触发倒计时回调
*
* @author wangli
* @since 2024-08-20 16:59
*/
public class CustomBusinessNodeTimeoutCallbackCmd extends AbstractCommand<Void> implements Serializable {
private static final long serialVersionUID = 1L;
private final String triggerId;
private final String endTime;
public CustomBusinessNodeTimeoutCallbackCmd(String triggerId, String endTime) {
this.triggerId = triggerId;
this.endTime = endTime;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("triggerId", triggerId);
params.put("endTime", endTime);
return JSON.toJSONString(params);
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
ExecutionEntity executionEntity = (ExecutionEntity) processEngineConfiguration.getRuntimeService().createExecutionQuery().executionId(triggerId).singleResult();
String newEndTime = parseDateTimeWithTimeZone();
TimerEventDefinition timerEventDefinition = new TimerEventDefinition();
timerEventDefinition.setTimeDate(newEndTime);
TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition, executionEntity.getCurrentFlowElement(),
true, executionEntity, AsyncActivityLeaveJobHandler.TYPE, TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(),
timerEventDefinition.getEndDate(), timerEventDefinition.getCalendarName()));
if (timerJob != null) {
timerJob.setCustomValues(JSON.toJSONString(new BpmnActivityTimeoutCallbackDTO(triggerId, endTime)));
CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob);
}
return null;
}
private String parseDateTimeWithTimeZone() {
DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.parse(endTime, inputFormat);
// 转换为东八区的ZonedDateTime
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.of("Asia/Shanghai"));
// 格式化为包含时区偏移量的ISO 8601字符串
DateTimeFormatter outputFormat = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
return zonedDateTime.format(outputFormat);
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import com.alibaba.fastjson.JSON;
import org.flowable.bpmn.model.TimerEventDefinition;
@ -25,12 +26,12 @@ import java.util.Map;
* @author wangli
* @since 2024-08-16 15:38
*/
public class CustomBusinessNodeTimeoutCmd extends AbstractCommand<Void> implements Serializable {
public class CustomBusinessNodeTimeoutTriggerCmd extends AbstractCommand<Void> implements Serializable {
private static final long serialVersionUID = 1L;
private final String triggerId;
private final String endTime;
public CustomBusinessNodeTimeoutCmd(String triggerId, String endTime) {
public CustomBusinessNodeTimeoutTriggerCmd(String triggerId, String endTime) {
this.triggerId = triggerId;
this.endTime = endTime;
}
@ -58,6 +59,7 @@ public class CustomBusinessNodeTimeoutCmd extends AbstractCommand<Void> implemen
true, executionEntity, AsyncActivityLeaveJobHandler.TYPE, TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(),
timerEventDefinition.getEndDate(), timerEventDefinition.getCalendarName()));
if (timerJob != null) {
timerJob.setCustomValues(JSON.toJSONString(new BpmnActivityTimeoutTriggerDTO(triggerId, endTime)));
CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob);
}
return null;

View File

@ -0,0 +1,27 @@
package cn.axzo.workflow.core.engine.event;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import java.util.Map;
/**
* TODO
*
* @author wangli
* @since 2024-08-20 17:24
*/
public interface BizCallbackEvent extends FlowableEvent {
String getActivityId();
String getActivityName();
String getProcessInstanceId();
String getProcessDefinitionId();
String getBusinessKey();
String getExecutionId();
Map<String, Object> getVariables();
}

View File

@ -0,0 +1,108 @@
package cn.axzo.workflow.core.engine.event;
import org.apache.commons.lang3.StringUtils;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 业务节点回调事件
*
* @author wangli
* @since 2024-08-20 17:25
*/
public class BizCallbackEventImpl implements BizCallbackEvent {
private final BizCallbackEventType type;
private final String activityId;
private final String activityName;
private final String processInstanceId;
private final String processDefinitionId;
private final String businessKey;
private final String executionId;
private final Map<String, Object> variables;
public BizCallbackEventImpl(BizCallbackEventType type, String activityId, String activityName, String processInstanceId, String processDefinitionId, String businessKey, String executionId, Map<String, Object> variables) {
this.type = type;
this.activityId = activityId;
this.activityName = activityName;
this.processInstanceId = processInstanceId;
this.processDefinitionId = processDefinitionId;
this.businessKey = businessKey;
this.executionId = executionId;
this.variables = variables;
}
/**
* @return type of event.
*/
@Override
public FlowableEventType getType() {
return BizCallbackEventType.CALLBACK;
}
@Override
public String getActivityId() {
return activityId;
}
@Override
public String getActivityName() {
return activityName;
}
@Override
public String getProcessInstanceId() {
return processInstanceId;
}
@Override
public String getProcessDefinitionId() {
return processDefinitionId;
}
@Override
public String getBusinessKey() {
return businessKey;
}
@Override
public String getExecutionId() {
return executionId;
}
@Override
public Map<String, Object> getVariables() {
return variables;
}
public enum BizCallbackEventType implements FlowableEventType {
CALLBACK,
;
public static final BizCallbackEventType[] EMPTY_ARRAY = new BizCallbackEventType[]{};
public static BizCallbackEventType[] getTypesFromString(String string) {
List<BizCallbackEventType> result = new ArrayList<>();
if (string != null && !string.isEmpty()) {
String[] split = StringUtils.split(string, ",");
for (String typeName : split) {
boolean found = false;
for (BizCallbackEventType type : values()) {
if (typeName.toUpperCase().equals(type.name())) {
result.add(type);
found = true;
break;
}
}
if (!found) {
throw new FlowableIllegalArgumentException("Invalid event-type: " + typeName);
}
}
}
return result.toArray(EMPTY_ARRAY);
}
}
}

View File

@ -0,0 +1,63 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.engine.event.BizCallbackEventImpl;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.TaskService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.api.Task;
import org.flowable.variable.api.delegate.VariableScope;
import java.util.Objects;
import static cn.axzo.workflow.core.common.code.BpmnInstanceRespCode.PROCESS_INSTANCE_ID_NOT_EXISTS;
import static cn.axzo.workflow.core.engine.cmd.CustomBizSpecifyAssigneeToTaskCmd.getOperateTask;
/**
* 业务节点的触发离开的任务处理器
*
* @author wangli
* @since 2024-08-16 16:29
*/
@Slf4j
public class AsyncActivityCallbackJobHandler extends AbstractJobHandler implements JobHandler {
public static final String TYPE = "business-node-callback";
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.warn("AsyncActivityLeaveJobHandler exec start...");
BpmnActivityTimeoutCallbackDTO dto = JSON.parseObject(job.getCustomValues(), BpmnActivityTimeoutCallbackDTO.class);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
FlowableEventDispatcher eventDispatcher = processEngineConfiguration.getEventDispatcher();
TaskService taskService = processEngineConfiguration.getTaskService();
Task task = getOperateTask(taskService, dto.getTriggerId());
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery().includeProcessVariables()
.processInstanceId(task.getProcessInstanceId()).singleResult();
if (Objects.isNull(processInstance)) {
throw new WorkflowEngineException(PROCESS_INSTANCE_ID_NOT_EXISTS, task.getProcessInstanceId());
}
eventDispatcher.dispatchEvent(new BizCallbackEventImpl(BizCallbackEventImpl.BizCallbackEventType.CALLBACK,
task.getTaskDefinitionKey(), task.getName(),
task.getProcessInstanceId(), task.getProcessDefinitionId(),
processInstance.getBusinessKey(), dto.getTriggerId(),
processInstance.getProcessVariables()),
processEngineConfiguration.getEngineCfgKey());
}
}

View File

@ -1,5 +1,8 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.core.service.BpmnProcessActivityService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler;
@ -14,8 +17,13 @@ import org.flowable.variable.api.delegate.VariableScope;
*/
@Slf4j
public class AsyncActivityLeaveJobHandler extends AbstractJobHandler implements JobHandler {
private final BpmnProcessActivityService bpmnProcessActivityService;
public static final String TYPE = "business-node-leave-task";
public AsyncActivityLeaveJobHandler(BpmnProcessActivityService bpmnProcessActivityService) {
this.bpmnProcessActivityService = bpmnProcessActivityService;
}
@Override
public String getType() {
return TYPE;
@ -24,6 +32,7 @@ public class AsyncActivityLeaveJobHandler extends AbstractJobHandler implements
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.warn("AsyncActivityLeaveJobHandler exec start...");
BpmnActivityTimeoutTriggerDTO dto = JSON.parseObject(job.getCustomValues(), BpmnActivityTimeoutTriggerDTO.class);
bpmnProcessActivityService.trigger(dto.getTriggerId());
}
}

View File

@ -0,0 +1,73 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.workflow.core.common.context.ActivityOperationContext;
import cn.axzo.workflow.core.engine.event.BizCallbackEvent;
import cn.axzo.workflow.core.engine.event.BizCallbackEventImpl;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import com.google.common.collect.ImmutableSet;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static cn.axzo.workflow.core.engine.event.BizCallbackEventImpl.BizCallbackEventType.CALLBACK;
import static cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEventType.ADD_ASSIGNEE;
import static cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEventType.REMOVE_ASSIGNEE;
import static cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEventType.UPDATE_ASSIGNEE;
/**
* TODO
*
* @author wangli
* @since 2024-08-20 17:47
*/
@Slf4j
@Component
public class EngineActivityCallbackEventListener extends AbstractFlowableEventListener {
@Resource
ObjectProvider<List<BpmnActivityEventListener>> activityListeners;
public static final Set<BizCallbackEventImpl.BizCallbackEventType> BIZ_CALLBACK_EVENTS =
ImmutableSet.<BizCallbackEventImpl.BizCallbackEventType>builder()
.add(CALLBACK)
.build();
@Override
public void onEvent(FlowableEvent flowableEvent) {
if (flowableEvent instanceof BizCallbackEvent) {
BizCallbackEvent event = (BizCallbackEvent) flowableEvent;
BizCallbackEventImpl.BizCallbackEventType type = (BizCallbackEventImpl.BizCallbackEventType) event.getType();
if (BIZ_CALLBACK_EVENTS.contains(type)) {
switch (type) {
case ADD_ASSIGNEE:
getOrderedListeners().forEach(i -> i.onCallback(event));
break;
case UPDATE_ASSIGNEE:
break;
case REMOVE_ASSIGNEE:
break;
default:
}
}
}
}
private List<BpmnActivityEventListener> getOrderedListeners() {
ActivityOperationContext context = new ActivityOperationContext();
List<BpmnActivityEventListener> orderListeners = new ArrayList<>();
activityListeners.ifAvailable(orderListeners::addAll);
orderListeners.forEach(i -> i.setContext(context));
return orderListeners;
}
@Override
public boolean isFailOnException() {
return true;
}
}

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.core.listener;
import cn.axzo.workflow.core.common.context.OperationContext;
import cn.axzo.workflow.core.engine.event.BizCallbackEvent;
import cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEvent;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.core.Ordered;
@ -36,6 +37,12 @@ public interface BpmnActivityEventListener extends OperationContext, Ordered {
*/
default void onTake(DelegateExecution execution) {}
/**
* 节点定时回调
*/
default void onCallback(BizCallbackEvent event) {
}
/**
* 节点已取消
*

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.core.service;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
@ -53,5 +54,13 @@ public interface BpmnProcessActivityService {
* @param dto
* @return
*/
Boolean setTimeout(BpmnActivityTimeoutDTO dto);
Boolean setTimeoutTrigger(BpmnActivityTimeoutTriggerDTO dto);
/**
* 设置指定业务节点定时回调
*
* @param dto
* @return
*/
Boolean setTimeOutCallback(BpmnActivityTimeoutCallbackDTO dto);
}

View File

@ -1,11 +1,13 @@
package cn.axzo.workflow.core.service.impl;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.engine.cmd.CustomAbortProcessInstanceCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBizSpecifyAssigneeToTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutCallbackCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutTriggerCmd;
import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler;
import cn.axzo.workflow.core.service.BpmnProcessActivityService;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
@ -133,10 +135,22 @@ public class BpmnProcessActivityServiceImpl implements BpmnProcessActivityServic
}
@Override
public Boolean setTimeout(BpmnActivityTimeoutDTO dto) {
public Boolean setTimeoutTrigger(BpmnActivityTimeoutTriggerDTO dto) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomBusinessNodeTimeoutCmd(dto.getTriggerId(), dto.getEndTime()));
commandExecutor.execute(new CustomBusinessNodeTimeoutTriggerCmd(dto.getTriggerId(), dto.getEndTime()));
return true;
}
/**
* 设置指定业务节点定时回调
*
* @param dto
* @return
*/
@Override
public Boolean setTimeOutCallback(BpmnActivityTimeoutCallbackDTO dto) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomBusinessNodeTimeoutCallbackCmd(dto.getTriggerId(), dto.getEndTime()));
return true;
}
}

View File

@ -8,6 +8,7 @@ import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import cn.axzo.workflow.core.common.context.ActivityOperationContext;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.engine.event.BizCallbackEvent;
import cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEvent;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
@ -18,7 +19,6 @@ import org.flowable.bpmn.model.Process;
import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.impl.persistence.entity.ExecutionEntityImpl;
import org.flowable.engine.runtime.ProcessInstance;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
@ -40,6 +40,7 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_ACTIVITY_R
import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_OWNERSHIP_PROCESS_DEFINITION_KEY;
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.APPROVED;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_CALLBACK;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_END;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_START;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_TAKE;
@ -117,6 +118,36 @@ public class RocketMqBpmActivityEventListener extends AbstractBpmnEventListener<
execution.getCurrentActivityId(), execution.getProcessInstanceId());
}
@Override
public void onCallback(BizCallbackEvent event) {
log.info("RocketMqBpmActivityEventListener#onCallback...activityId: {}, processInstanceId: {}",
event.getActivityId(), event.getProcessInstanceId());
// 特殊的自定义业务事件, 不能使用公共的 build 方法
ProcessActivityDTO dto = new ProcessActivityDTO();
dto.setType(PROCESS_ACTIVITY_CALLBACK);
dto.setActivityId(event.getActivityId());
dto.setActivityName(event.getActivityName());
dto.setProcessInstanceId(event.getProcessInstanceId());
dto.setProcessDefinitionId(event.getProcessDefinitionId());
ProcessInstance processInstance = getContext().getProcessInstance(() ->
runtimeService.createProcessInstanceQuery()
.processInstanceId(event.getProcessInstanceId())
.includeProcessVariables()
.singleResult());
if (Objects.nonNull(processInstance)) {
dto.setProcessDefinitionKey(processInstance.getProcessDefinitionKey());
dto.setBusinessKey(processInstance.getBusinessKey());
}
dto.setTriggerId(event.getExecutionId());
dto.setVariables(event.getVariables());
dto.setWorkflowEngineVersion(String.valueOf(event.getVariables().getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)));
sendMessageQueue(dto, PROCESS_ACTIVITY_CALLBACK);
log.info("RocketMqBpmActivityEventListener#onCallback...end, activityId: {}, processInstanceId: {}",
event.getActivityId(), event.getProcessInstanceId());
}
@Override
public void onEnd(DelegateExecution execution) {
log.info("RocketMqMessagePushEventListener#onEnd...activityId: {}, processInstanceId: {}",

View File

@ -1,7 +1,8 @@
package cn.axzo.workflow.server.controller.web.bpmn;
import cn.axzo.workflow.client.feign.bpmn.ProcessActivityApi;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
@ -118,7 +119,18 @@ public class BpmnProcessActivityController implements ProcessActivityApi {
* @return
*/
@Override
public CommonResponse<Boolean> setTimeout(BpmnActivityTimeoutDTO dto) {
return CommonResponse.success(bpmnProcessActivityService.setTimeout(dto));
public CommonResponse<Boolean> setTimeoutTrigger(BpmnActivityTimeoutTriggerDTO dto) {
return CommonResponse.success(bpmnProcessActivityService.setTimeoutTrigger(dto));
}
/**
* 为指定业务节点设置定时回调
*
* @param dto
* @return
*/
@Override
public CommonResponse<Boolean> setTimeoutCallback(BpmnActivityTimeoutCallbackDTO dto) {
return CommonResponse.success(bpmnProcessActivityService.setTimeOutCallback(dto));
}
}