diff --git a/workflow-engine-api/src/main/java/cn/axzo/workflow/client/config/WorkflowEngineClientAutoConfiguration.java b/workflow-engine-api/src/main/java/cn/axzo/workflow/client/config/WorkflowEngineClientAutoConfiguration.java index 84439b9dc..5fbfc2e15 100644 --- a/workflow-engine-api/src/main/java/cn/axzo/workflow/client/config/WorkflowEngineClientAutoConfiguration.java +++ b/workflow-engine-api/src/main/java/cn/axzo/workflow/client/config/WorkflowEngineClientAutoConfiguration.java @@ -2,7 +2,6 @@ package cn.axzo.workflow.client.config; import feign.RequestInterceptor; import lombok.extern.slf4j.Slf4j; -import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; @@ -43,7 +42,7 @@ public class WorkflowEngineClientAutoConfiguration { private static final Pattern POM_VERSION = Pattern.compile(".*(\\S+).*", Pattern.DOTALL); - @Bean + @Bean("serviceVersion") public String serviceVersion() { Map env = System.getenv(); if (env != null) { diff --git a/workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/bpmn/ProcessActivityApi.java b/workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/bpmn/ProcessActivityApi.java index c8e93323a..7cdfa0b6c 100644 --- a/workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/bpmn/ProcessActivityApi.java +++ b/workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/bpmn/ProcessActivityApi.java @@ -1,6 +1,9 @@ package cn.axzo.workflow.client.feign.bpmn; import cn.axzo.workflow.client.annotation.WorkflowEngineFeignClient; +import cn.axzo.workflow.common.annotation.Manageable; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO; import cn.azxo.framework.common.model.CommonResponse; @@ -55,4 +58,24 @@ public interface ProcessActivityApi { @PostMapping("/api/process/activity/assignee/set") @Operation(summary = "业务节点设置审批人,不支持重复调用设置审批人,需一次性传入所有审批人") CommonResponse setAssignee(@Validated @RequestBody BpmnActivitySetAssigneeDTO dto); + + /** + * 该功能应该利用引擎的 TimerBoundaryEvent 来实现,但为了简便,先利用引擎的任务调度来实现 + * + * @return + */ + @PostMapping("/api/process/activity/timeout/trigger") + @Manageable + @Operation(summary = "设置指定业务节点定时继续往下执行") + CommonResponse setTimeoutTrigger(@Validated @RequestBody BpmnActivityTimeoutTriggerDTO dto); + + /** + * 为指定业务节点设置定时回调 + * + * @return + */ + @Manageable + @PostMapping("/api/process/activity/timeout/callback") + @Operation(summary = "设置指定业务节点定时回调") + CommonResponse setTimeoutCallback(@Validated @RequestBody BpmnActivityTimeoutCallbackDTO dto); } diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/ProcessActivityEventEnum.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/ProcessActivityEventEnum.java index 9017664f2..2b2595fe2 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/ProcessActivityEventEnum.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/enums/ProcessActivityEventEnum.java @@ -13,6 +13,7 @@ public enum ProcessActivityEventEnum { PROCESS_ACTIVITY_START("process-activity", "process-activity-start", "流程活动节点已开始"), PROCESS_ACTIVITY_WAIT_ASSIGNEE("process-activity", "process-activity-wait-assignee", "流程活动节点等待指定审批人"), PROCESS_ACTIVITY_TAKE("process-activity", "process-activity-take", "该事件类型暂时不存在"), + PROCESS_ACTIVITY_CALLBACK("process-activity", "process-activity-callback", "业务节点定时回调"), PROCESS_ACTIVITY_END("process-activity", "process-activity-end", "流程活动节点已取消"), ; diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/activity/BpmnActivityTimeoutCallbackDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/activity/BpmnActivityTimeoutCallbackDTO.java new file mode 100644 index 000000000..93643e5f7 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/activity/BpmnActivityTimeoutCallbackDTO.java @@ -0,0 +1,41 @@ +package cn.axzo.workflow.common.model.request.bpmn.activity; + +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotBlank; +import java.util.Map; + +/** + * 为指定业务节点设置倒计时回调 + * + * @author wangli + * @since 2024-08-16 15:30 + */ +@ApiModel("为指定业务节点设置倒计时回调") +@Data +@AllArgsConstructor +@NoArgsConstructor +public class BpmnActivityTimeoutCallbackDTO { + + /** + * 业务节点的触发 ID + */ + @NotBlank(message = "触发 ID 不能为空") + private String triggerId; + + /** + * 触发往下流转的时间点, 格式:yyyy-MM-dd HH:mm:ss + *

+ * 应该为 Date 类型,但为了确保特殊情况需要通过 pod 内直接触发 + */ + @NotBlank(message = "触发时间不能为空,格式:yyyy-MM-dd HH:mm:ss") + private String endTime; + + /** + * 用于追加或修改现有的变量值 + */ + private Map variables; +} diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/activity/BpmnActivityTimeoutTriggerDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/activity/BpmnActivityTimeoutTriggerDTO.java new file mode 100644 index 000000000..9b1e4ec62 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/request/bpmn/activity/BpmnActivityTimeoutTriggerDTO.java @@ -0,0 +1,41 @@ +package cn.axzo.workflow.common.model.request.bpmn.activity; + +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotBlank; +import java.util.Map; + +/** + * 为指定业务节点设置倒计时,在啥时候继续往下流转 + * + * @author wangli + * @since 2024-08-16 15:30 + */ +@ApiModel("为指定业务节点设置倒计时") +@Data +@AllArgsConstructor +@NoArgsConstructor +public class BpmnActivityTimeoutTriggerDTO { + /** + * 业务节点的触发 ID + */ + @NotBlank(message = "触发 ID 不能为空") + private String triggerId; + + /** + * 触发往下流转的时间点, 格式:yyyy-MM-dd HH:mm:ss + *

+ * 应该为 Date 类型,但为了确保特殊情况需要通过 pod 内直接触发 + */ + @NotBlank(message = "触发时间不能为空,格式:yyyy-MM-dd HH:mm:ss") + private String endTime; + + /** + * 用于追加或修改现有的变量值(暂不支持) + */ + @Deprecated + private Map variables; +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java index bb8691467..8c5f2e53e 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/common/utils/BpmnJsonConverterUtil.java @@ -11,6 +11,7 @@ import cn.axzo.workflow.common.model.request.bpmn.BpmnNoticeConf; import cn.axzo.workflow.common.model.request.bpmn.model.BpmnModelCreateDTO; import cn.axzo.workflow.core.common.exception.WorkflowEngineException; import cn.axzo.workflow.core.converter.json.AbstractBpmnJsonConverter; +import cn.axzo.workflow.core.converter.json.BoundaryEventJsonConverter; import cn.axzo.workflow.core.converter.json.EndEventJsonConverter; import cn.axzo.workflow.core.converter.json.ExclusiveGatewayJsonConverter; import cn.axzo.workflow.core.converter.json.NotSupportConverter; @@ -27,6 +28,7 @@ import com.google.common.collect.Lists; import org.flowable.bpmn.BpmnAutoLayout; import org.flowable.bpmn.converter.BpmnXMLConverter; import org.flowable.bpmn.model.BaseElement; +import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.EndEvent; import org.flowable.bpmn.model.ExclusiveGateway; @@ -142,6 +144,7 @@ public final class BpmnJsonConverterUtil { CONVERTERS.put(UserTask.class, new UserTaskJsonConverter()); CONVERTERS.put(ServiceTask.class, new ServiceTaskJsonConverter()); CONVERTERS.put(ReceiveTask.class, new ReceiveTaskJsonConverter()); + CONVERTERS.put(BoundaryEvent.class, new BoundaryEventJsonConverter()); } /** diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java index b3276cf71..64829b970 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java @@ -5,6 +5,9 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory; import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory; import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator; import cn.axzo.workflow.core.engine.interceptor.CustomRetryInterceptor; +import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceHandler; +import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler; +import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler; import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler; import cn.axzo.workflow.core.engine.job.AsyncActivitySetAssigneeJobHandler; import cn.axzo.workflow.core.engine.job.AsyncActivityTriggerJobHandler; @@ -64,6 +67,7 @@ public class FlowableConfiguration { ObjectProvider listeners, CustomActivityBehaviorFactory customActivityBehaviorFactory, ExtAxHiTaskInstService extAxHiTaskInstService, + BpmnProcessActivityService bpmnProcessActivityService, List jobProcessors, NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties, @@ -94,6 +98,12 @@ public class FlowableConfiguration { configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService)); configuration.addCustomJobHandler(new AsyncTransferUserTaskJobHandler()); configuration.addCustomJobHandler(new AsyncTermNodeAlterJobHandler(refreshProperties)); + configuration.addCustomJobHandler(new AsyncCountersignUserTaskJobHandler(extAxHiTaskInstService)); + configuration.addCustomJobHandler(new AsyncCancelProcessInstanceHandler(extAxHiTaskInstService)); + configuration.addCustomJobHandler(new AsyncAbortProcessInstanceHandler(extAxHiTaskInstService)); + configuration.addCustomJobHandler(new AsyncBpmnProcessActivityJobHandler(bpmnProcessActivityService)); + configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService)); + configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler()); // 异步任务异常重试时间间隔 configuration.setDefaultFailedJobWaitTime(30); configuration.setAsyncFailedJobWaitTime(30); diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/converter/json/BoundaryEventJsonConverter.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/converter/json/BoundaryEventJsonConverter.java new file mode 100644 index 000000000..e854af127 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/converter/json/BoundaryEventJsonConverter.java @@ -0,0 +1,57 @@ +package cn.axzo.workflow.core.converter.json; + +import cn.axzo.workflow.common.model.request.bpmn.BpmnJsonNode; +import org.flowable.bpmn.model.BoundaryEvent; +import org.flowable.bpmn.model.EventDefinition; +import org.flowable.bpmn.model.FlowableListener; +import org.flowable.bpmn.model.Process; +import org.flowable.bpmn.model.TimerEventDefinition; +import org.flowable.engine.delegate.BaseExecutionListener; + +import java.util.ArrayList; +import java.util.List; + +import static org.flowable.bpmn.model.ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION; + +/** + * 边界事件 + * + * @author wangli + * @since 2024-08-16 11:47 + */ +public class BoundaryEventJsonConverter extends AbstractBpmnJsonConverter { + @Override + public BoundaryEvent convertJsonToElement(BpmnJsonNode node, Process process) { + BoundaryEvent boundaryEvent = new BoundaryEvent(); + boundaryEvent.setId(node.getId() + "_boundaryEvent"); + boundaryEvent.setCancelActivity(false); + boundaryEvent.setAttachedToRefId(process.getId()); + + // 设置执行监听 + setExecutionListeners(boundaryEvent); + + // 设置 TimerEventDefinition + setTimerEventDefinition(boundaryEvent); + return boundaryEvent; + } + + private void setTimerEventDefinition(BoundaryEvent boundaryEvent) { + List eventDefinitions = new ArrayList<>(); + TimerEventDefinition timerEventDefinition = new TimerEventDefinition(); + timerEventDefinition.setEndDate("${timerEndDate}"); + eventDefinitions.add(timerEventDefinition); + boundaryEvent.setEventDefinitions(eventDefinitions); + } + + private void setExecutionListeners(BoundaryEvent boundaryEvent) { + List executionListeners = new ArrayList<>(); + // 设置执行监听 + FlowableListener executionListener = new FlowableListener(); + executionListener.setEvent(BaseExecutionListener.EVENTNAME_START); + executionListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); + executionListener.setImplementation("${boundaryEventExecutionStartListener}"); + executionListeners.add(executionListener); + + boundaryEvent.setExecutionListeners(executionListeners); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/AbstractCommand.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/AbstractCommand.java index 05cbd504f..4f9495fcd 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/AbstractCommand.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/AbstractCommand.java @@ -28,6 +28,12 @@ public abstract class AbstractCommand implements Command { } } + /** + * 实现该方法,默认就捕获了 command 一些引擎内部异常 + * + * @param commandContext + * @return + */ public T executeInternal(CommandContext commandContext) { return null; } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomBusinessNodeTimeoutCallbackCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomBusinessNodeTimeoutCallbackCmd.java new file mode 100644 index 000000000..85e11405e --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomBusinessNodeTimeoutCallbackCmd.java @@ -0,0 +1,84 @@ +package cn.axzo.workflow.core.engine.cmd; + +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO; +import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler; +import com.alibaba.fastjson.JSON; +import org.flowable.bpmn.model.TimerEventDefinition; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.jobexecutor.TimerEventHandler; +import org.flowable.engine.impl.persistence.entity.ExecutionEntity; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.engine.impl.util.TimerUtil; +import org.flowable.job.service.impl.persistence.entity.TimerJobEntity; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; + +/** + * 业务节点触发倒计时回调 + * + * @author wangli + * @since 2024-08-20 16:59 + */ +public class CustomBusinessNodeTimeoutCallbackCmd extends AbstractCommand implements Serializable { + private static final long serialVersionUID = 1L; + private final String triggerId; + private final String endTime; + private final Map variables; + + public CustomBusinessNodeTimeoutCallbackCmd(String triggerId, String endTime, Map variables) { + this.triggerId = triggerId; + this.endTime = endTime; + this.variables = variables; + } + + @Override + public String paramToJsonString() { + Map params = new HashMap<>(); + params.put("triggerId", triggerId); + params.put("endTime", endTime); + return JSON.toJSONString(params); + } + + @Override + public Void execute(CommandContext commandContext) { + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(commandContext); + ExecutionEntity executionEntity = (ExecutionEntity) processEngineConfiguration.getRuntimeService().createExecutionQuery().executionId(triggerId).singleResult(); + if (executionEntity == null) { + return null; + } + + String newEndTime = parseDateTimeWithTimeZone(); + + TimerEventDefinition timerEventDefinition = new TimerEventDefinition(); + timerEventDefinition.setTimeDate(newEndTime); + + TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition, executionEntity.getCurrentFlowElement(), + true, executionEntity, AsyncActivityCallbackJobHandler.TYPE, TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(), + timerEventDefinition.getEndDate(), timerEventDefinition.getCalendarName())); + if (timerJob != null) { + timerJob.setCustomValues(JSON.toJSONString(new BpmnActivityTimeoutCallbackDTO(triggerId, endTime, variables))); + CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob); + } + return null; + } + + private String parseDateTimeWithTimeZone() { + DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime localDateTime = LocalDateTime.parse(endTime, inputFormat); + + // 转换为东八区的ZonedDateTime + ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.of("Asia/Shanghai")); + + // 格式化为包含时区偏移量的ISO 8601字符串 + DateTimeFormatter outputFormat = DateTimeFormatter.ISO_OFFSET_DATE_TIME; + return zonedDateTime.format(outputFormat); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomBusinessNodeTimeoutTriggerCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomBusinessNodeTimeoutTriggerCmd.java new file mode 100644 index 000000000..579770edc --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomBusinessNodeTimeoutTriggerCmd.java @@ -0,0 +1,84 @@ +package cn.axzo.workflow.core.engine.cmd; + +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO; +import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler; +import com.alibaba.fastjson.JSON; +import org.flowable.bpmn.model.TimerEventDefinition; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.jobexecutor.TimerEventHandler; +import org.flowable.engine.impl.persistence.entity.ExecutionEntity; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.engine.impl.util.TimerUtil; +import org.flowable.job.service.impl.persistence.entity.TimerJobEntity; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * 业务节点触发倒计时 + * + * @author wangli + * @since 2024-08-16 15:38 + */ +public class CustomBusinessNodeTimeoutTriggerCmd extends AbstractCommand implements Serializable { + private static final long serialVersionUID = 1L; + private final String triggerId; + private final String endTime; + private final Map variables; + + public CustomBusinessNodeTimeoutTriggerCmd(String triggerId, String endTime, Map variables) { + this.triggerId = triggerId; + this.endTime = endTime; + this.variables = variables; + } + + @Override + public String paramToJsonString() { + Map params = new HashMap<>(); + params.put("triggerId", triggerId); + params.put("endTime", endTime); + return JSON.toJSONString(params); + } + + @Override + public Void execute(CommandContext commandContext) { + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(commandContext); + ExecutionEntity executionEntity = (ExecutionEntity) processEngineConfiguration.getRuntimeService().createExecutionQuery().executionId(triggerId).singleResult(); + if (Objects.isNull(executionEntity)) { + return null; + } + String newEndTime = parseDateTimeWithTimeZone(); + + TimerEventDefinition timerEventDefinition = new TimerEventDefinition(); + timerEventDefinition.setTimeDate(newEndTime); + + TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition, executionEntity.getCurrentFlowElement(), + true, executionEntity, AsyncActivityLeaveJobHandler.TYPE, TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(), + timerEventDefinition.getEndDate(), timerEventDefinition.getCalendarName())); + if (timerJob != null) { + timerJob.setCustomValues(JSON.toJSONString(new BpmnActivityTimeoutTriggerDTO(triggerId, endTime, variables))); + CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob); + } + return null; + } + + private String parseDateTimeWithTimeZone() { + DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime localDateTime = LocalDateTime.parse(endTime, inputFormat); + + // 转换为东八区的ZonedDateTime + ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.of("Asia/Shanghai")); + + // 格式化为包含时区偏移量的ISO 8601字符串 + DateTimeFormatter outputFormat = DateTimeFormatter.ISO_OFFSET_DATE_TIME; + return zonedDateTime.format(outputFormat); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/BizCallbackEvent.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/BizCallbackEvent.java new file mode 100644 index 000000000..59c23ac11 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/BizCallbackEvent.java @@ -0,0 +1,27 @@ +package cn.axzo.workflow.core.engine.event; + +import org.flowable.common.engine.api.delegate.event.FlowableEvent; + +import java.util.Map; + +/** + * TODO + * + * @author wangli + * @since 2024-08-20 17:24 + */ +public interface BizCallbackEvent extends FlowableEvent { + String getActivityId(); + + String getActivityName(); + + String getProcessInstanceId(); + + String getProcessDefinitionId(); + + String getBusinessKey(); + + String getExecutionId(); + + Map getVariables(); +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/BizCallbackEventImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/BizCallbackEventImpl.java new file mode 100644 index 000000000..9b7258ccc --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/BizCallbackEventImpl.java @@ -0,0 +1,108 @@ +package cn.axzo.workflow.core.engine.event; + +import org.apache.commons.lang3.StringUtils; +import org.flowable.common.engine.api.FlowableIllegalArgumentException; +import org.flowable.common.engine.api.delegate.event.FlowableEventType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * 业务节点回调事件 + * + * @author wangli + * @since 2024-08-20 17:25 + */ +public class BizCallbackEventImpl implements BizCallbackEvent { + private final BizCallbackEventType type; + private final String activityId; + private final String activityName; + private final String processInstanceId; + private final String processDefinitionId; + private final String businessKey; + private final String executionId; + private final Map variables; + + public BizCallbackEventImpl(BizCallbackEventType type, String activityId, String activityName, String processInstanceId, String processDefinitionId, String businessKey, String executionId, Map variables) { + this.type = type; + this.activityId = activityId; + this.activityName = activityName; + this.processInstanceId = processInstanceId; + this.processDefinitionId = processDefinitionId; + this.businessKey = businessKey; + this.executionId = executionId; + this.variables = variables; + } + + /** + * @return type of event. + */ + @Override + public FlowableEventType getType() { + return BizCallbackEventType.CALLBACK; + } + + @Override + public String getActivityId() { + return activityId; + } + + @Override + public String getActivityName() { + return activityName; + } + + @Override + public String getProcessInstanceId() { + return processInstanceId; + } + + @Override + public String getProcessDefinitionId() { + return processDefinitionId; + } + + @Override + public String getBusinessKey() { + return businessKey; + } + + @Override + public String getExecutionId() { + return executionId; + } + + @Override + public Map getVariables() { + return variables; + } + + public enum BizCallbackEventType implements FlowableEventType { + CALLBACK, + ; + public static final BizCallbackEventType[] EMPTY_ARRAY = new BizCallbackEventType[]{}; + + public static BizCallbackEventType[] getTypesFromString(String string) { + List result = new ArrayList<>(); + if (string != null && !string.isEmpty()) { + String[] split = StringUtils.split(string, ","); + for (String typeName : split) { + boolean found = false; + for (BizCallbackEventType type : values()) { + if (typeName.toUpperCase().equals(type.name())) { + result.add(type); + found = true; + break; + } + } + if (!found) { + throw new FlowableIllegalArgumentException("Invalid event-type: " + typeName); + } + } + } + + return result.toArray(EMPTY_ARRAY); + } + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncActivityCallbackJobHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncActivityCallbackJobHandler.java new file mode 100644 index 000000000..1feee3875 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncActivityCallbackJobHandler.java @@ -0,0 +1,72 @@ +package cn.axzo.workflow.core.engine.job; + +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO; +import cn.axzo.workflow.core.common.exception.WorkflowEngineException; +import cn.axzo.workflow.core.engine.event.BizCallbackEventImpl; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.engine.HistoryService; +import org.flowable.engine.RuntimeService; +import org.flowable.engine.TaskService; +import org.flowable.engine.history.HistoricProcessInstance; +import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.util.CommandContextUtil; +import org.flowable.job.service.JobHandler; +import org.flowable.job.service.impl.persistence.entity.JobEntity; +import org.flowable.task.api.Task; +import org.flowable.variable.api.delegate.VariableScope; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static cn.axzo.workflow.core.common.code.BpmnInstanceRespCode.PROCESS_INSTANCE_ID_NOT_EXISTS; + +/** + * 业务节点的触发离开的任务处理器 + * + * @author wangli + * @since 2024-08-16 16:29 + */ +@Slf4j +public class AsyncActivityCallbackJobHandler extends AbstractJobHandler implements JobHandler { + public static final String TYPE = "business-node-callback"; + + @Override + public String getType() { + return TYPE; + } + + @Override + public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { + log.warn("AsyncActivityLeaveJobHandler exec start..."); + BpmnActivityTimeoutCallbackDTO dto = JSON.parseObject(job.getCustomValues(), BpmnActivityTimeoutCallbackDTO.class); + ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext); + FlowableEventDispatcher eventDispatcher = processEngineConfiguration.getEventDispatcher(); + TaskService taskService = processEngineConfiguration.getTaskService(); + Task task = taskService.createTaskQuery().executionId(dto.getTriggerId()).singleResult(); + + HistoryService historyService = processEngineConfiguration.getHistoryService(); + HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery().includeProcessVariables() + .processInstanceId(task.getProcessInstanceId()).singleResult(); + if (Objects.isNull(processInstance)) { + throw new WorkflowEngineException(PROCESS_INSTANCE_ID_NOT_EXISTS, task.getProcessInstanceId()); + } + RuntimeService runtimeService = processEngineConfiguration.getRuntimeService(); + runtimeService.setVariables(processInstance.getId(), dto.getVariables()); + + Map processVariables = new HashMap<>(processInstance.getProcessVariables()); + if (!CollectionUtils.isEmpty(dto.getVariables())) { + processVariables.putAll(dto.getVariables()); + } + eventDispatcher.dispatchEvent(new BizCallbackEventImpl(BizCallbackEventImpl.BizCallbackEventType.CALLBACK, + task.getTaskDefinitionKey(), task.getName(), + task.getProcessInstanceId(), task.getProcessDefinitionId(), + processInstance.getBusinessKey(), dto.getTriggerId(), + processVariables), + processEngineConfiguration.getEngineCfgKey()); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncActivityLeaveJobHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncActivityLeaveJobHandler.java new file mode 100644 index 000000000..399513265 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/AsyncActivityLeaveJobHandler.java @@ -0,0 +1,38 @@ +package cn.axzo.workflow.core.engine.job; + +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO; +import cn.axzo.workflow.core.service.BpmnProcessActivityService; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.job.service.JobHandler; +import org.flowable.job.service.impl.persistence.entity.JobEntity; +import org.flowable.variable.api.delegate.VariableScope; + +/** + * 业务节点的触发离开的任务处理器 + * + * @author wangli + * @since 2024-08-16 16:29 + */ +@Slf4j +public class AsyncActivityLeaveJobHandler extends AbstractJobHandler implements JobHandler { + private final BpmnProcessActivityService bpmnProcessActivityService; + public static final String TYPE = "business-node-leave-task"; + + public AsyncActivityLeaveJobHandler(BpmnProcessActivityService bpmnProcessActivityService) { + this.bpmnProcessActivityService = bpmnProcessActivityService; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { + log.warn("AsyncActivityLeaveJobHandler exec start..."); + BpmnActivityTimeoutTriggerDTO dto = JSON.parseObject(job.getCustomValues(), BpmnActivityTimeoutTriggerDTO.class); + bpmnProcessActivityService.trigger(dto.getTriggerId()); + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineActivityCallbackEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineActivityCallbackEventListener.java new file mode 100644 index 000000000..40c298feb --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineActivityCallbackEventListener.java @@ -0,0 +1,66 @@ +package cn.axzo.workflow.core.engine.listener; + +import cn.axzo.workflow.core.common.context.ActivityOperationContext; +import cn.axzo.workflow.core.engine.event.BizCallbackEvent; +import cn.axzo.workflow.core.engine.event.BizCallbackEventImpl; +import cn.axzo.workflow.core.listener.BpmnActivityEventListener; +import com.google.common.collect.ImmutableSet; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static cn.axzo.workflow.core.engine.event.BizCallbackEventImpl.BizCallbackEventType.CALLBACK; + +/** + * 业务节点回调事件 + * + * @author wangli + * @since 2024-08-20 17:47 + */ +@Slf4j +@Component +public class EngineActivityCallbackEventListener extends AbstractFlowableEventListener { + @Resource + ObjectProvider> activityListeners; + + public static final Set BIZ_CALLBACK_EVENTS = + ImmutableSet.builder() + .add(CALLBACK) + .build(); + + @Override + public void onEvent(FlowableEvent flowableEvent) { + if (flowableEvent instanceof BizCallbackEvent) { + BizCallbackEvent event = (BizCallbackEvent) flowableEvent; + BizCallbackEventImpl.BizCallbackEventType type = (BizCallbackEventImpl.BizCallbackEventType) event.getType(); + if (BIZ_CALLBACK_EVENTS.contains(type)) { + switch (type) { + case CALLBACK: + getOrderedListeners().forEach(i -> i.onCallback(event)); + break; + default: + } + } + } + } + + private List getOrderedListeners() { + ActivityOperationContext context = new ActivityOperationContext(); + List orderListeners = new ArrayList<>(); + activityListeners.ifAvailable(orderListeners::addAll); + orderListeners.forEach(i -> i.setContext(context)); + return orderListeners; + } + + @Override + public boolean isFailOnException() { + return true; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnActivityEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnActivityEventListener.java index 12bee6e81..b4b6aa6e8 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnActivityEventListener.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnActivityEventListener.java @@ -1,6 +1,7 @@ package cn.axzo.workflow.core.listener; import cn.axzo.workflow.core.common.context.OperationContext; +import cn.axzo.workflow.core.engine.event.BizCallbackEvent; import cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEvent; import org.flowable.engine.delegate.DelegateExecution; import org.springframework.core.Ordered; @@ -36,6 +37,12 @@ public interface BpmnActivityEventListener extends OperationContext, Ordered { */ default void onTake(DelegateExecution execution) {} + /** + * 节点定时回调 + */ + default void onCallback(BizCallbackEvent event) { + } + /** * 节点已取消 * diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessActivityService.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessActivityService.java index 10113ed48..c128dbd0e 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessActivityService.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessActivityService.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.core.service; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO; import org.flowable.job.service.impl.persistence.entity.JobEntity; @@ -28,4 +30,26 @@ public interface BpmnProcessActivityService { */ void setAssignee(BpmnActivitySetAssigneeDTO dto); + /** + * 给指定实例的指定节点重设审批人 + * + * @param dto + */ + void setAssigneeAsync(BpmnActivitySetAssigneeDTO dto); + + /** + * 设置指定业务接口继续往下流转的触发时间 + * + * @param dto + * @return + */ + Boolean setTimeoutTrigger(BpmnActivityTimeoutTriggerDTO dto); + + /** + * 设置指定业务节点定时回调(仅通过 MQ 事件广播,自身状态不做任何改变) + * + * @param dto + * @return + */ + Boolean setTimeOutCallback(BpmnActivityTimeoutCallbackDTO dto); } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessActivityServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessActivityServiceImpl.java index be0fb374b..7bc4d8853 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessActivityServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessActivityServiceImpl.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.core.service.impl; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO; import cn.axzo.workflow.core.common.exception.WorkflowEngineException; @@ -8,6 +10,9 @@ import cn.axzo.workflow.core.engine.cmd.CustomActivityTriggerAsyncCmd; import cn.axzo.workflow.core.engine.cmd.CustomActivityTriggerCmd; import cn.axzo.workflow.core.engine.cmd.CustomBizSpecifyAssigneeToTaskAsyncCmd; import cn.axzo.workflow.core.engine.cmd.CustomBizSpecifyAssigneeToTaskCmd; +import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutCallbackCmd; +import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutTriggerCmd; +import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler; import cn.axzo.workflow.core.service.BpmnProcessActivityService; import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; import lombok.extern.slf4j.Slf4j; @@ -69,4 +74,24 @@ public class BpmnProcessActivityServiceImpl implements BpmnProcessActivityServic } } + @Override + public Boolean setTimeoutTrigger(BpmnActivityTimeoutTriggerDTO dto) { + CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor(); + commandExecutor.execute(new CustomBusinessNodeTimeoutTriggerCmd(dto.getTriggerId(), dto.getEndTime(), + dto.getVariables())); + return true; + } + + /** + * 设置指定业务节点定时回调 + * + * @param dto + * @return + */ + @Override + public Boolean setTimeOutCallback(BpmnActivityTimeoutCallbackDTO dto) { + CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor(); + commandExecutor.execute(new CustomBusinessNodeTimeoutCallbackCmd(dto.getTriggerId(), dto.getEndTime(), dto.getVariables())); + return true; + } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxPropertyServiceImpl.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxPropertyServiceImpl.java index 3a5b50246..1b4e671d5 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxPropertyServiceImpl.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/ExtAxPropertyServiceImpl.java @@ -56,7 +56,8 @@ public class ExtAxPropertyServiceImpl implements ExtAxPropertyService { } QueryWrapper queryWrapper = new QueryWrapper() .eq("name", name); - return Optional.ofNullable(mapper.selectOne(queryWrapper)); + Optional extAxProperty = Optional.ofNullable(mapper.selectOne(queryWrapper)); + return extAxProperty; } @Override diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java index 3e2c96221..35aa887b8 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/aspectj/ErrorReportAspect.java @@ -67,6 +67,7 @@ public class ErrorReportAspect implements Ordered { */ @Around(value = "@within(org.springframework.web.bind.annotation.RestController) || @within(org.springframework.stereotype.Service)") public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable { + log.info("{}.{}", joinPoint.getSignature().getDeclaringTypeName(), joinPoint.getSignature().getName()); Signature signature = joinPoint.getSignature(); StopWatch watch = new StopWatch("running controller time"); watch.start(signature.toShortString()); diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/activity/RocketMqBpmActivityEvent_100_Listener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/activity/RocketMqBpmActivityEvent_100_Listener.java index 46dbd9045..5812c80db 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/activity/RocketMqBpmActivityEvent_100_Listener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/activity/RocketMqBpmActivityEvent_100_Listener.java @@ -8,6 +8,7 @@ import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO; import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO; import cn.axzo.workflow.core.common.context.ActivityOperationContext; import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper; +import cn.axzo.workflow.core.engine.event.BizCallbackEvent; import cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEvent; import cn.axzo.workflow.core.listener.AbstractBpmnEventListener; import cn.axzo.workflow.core.listener.BpmnActivityEventListener; @@ -39,6 +40,7 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_ACTIVITY_R import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_OWNERSHIP_PROCESS_DEFINITION_KEY; import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION; import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.APPROVED; +import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_CALLBACK; import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_END; import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_START; import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_TAKE; @@ -116,6 +118,35 @@ public class RocketMqBpmActivityEvent_100_Listener extends AbstractBpmnEventList execution.getCurrentActivityId(), execution.getProcessInstanceId()); } + @Override + public void onCallback(BizCallbackEvent event) { + log.info("RocketMqBpmActivityEventListener#onCallback...activityId: {}, processInstanceId: {}", + event.getActivityId(), event.getProcessInstanceId()); + + // 特殊的自定义业务事件, 不能使用公共的 build 方法 + ProcessActivityDTO dto = new ProcessActivityDTO(); + dto.setType(PROCESS_ACTIVITY_CALLBACK); + dto.setActivityId(event.getActivityId()); + dto.setActivityName(event.getActivityName()); + dto.setProcessInstanceId(event.getProcessInstanceId()); + dto.setProcessDefinitionId(event.getProcessDefinitionId()); + ProcessInstance processInstance = getContext().getProcessInstance(() -> + runtimeService.createProcessInstanceQuery() + .processInstanceId(event.getProcessInstanceId()) + .singleResult()); + if (Objects.nonNull(processInstance)) { + dto.setProcessDefinitionKey(processInstance.getProcessDefinitionKey()); + dto.setBusinessKey(processInstance.getBusinessKey()); + } + dto.setTriggerId(event.getExecutionId()); + dto.setVariables(event.getVariables()); + dto.setWorkflowEngineVersion(String.valueOf(event.getVariables().getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121))); + sendMessageQueue(dto, PROCESS_ACTIVITY_CALLBACK); + + log.info("RocketMqBpmActivityEventListener#onCallback...end, activityId: {}, processInstanceId: {}", + event.getActivityId(), event.getProcessInstanceId()); + } + @Override public void onEnd(DelegateExecution execution) { log.info("RocketMqMessagePushEventListener#onEnd...activityId: {}, processInstanceId: {}", diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/bpmn/BpmnProcessActivityController.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/bpmn/BpmnProcessActivityController.java index 87b927577..beb45326f 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/bpmn/BpmnProcessActivityController.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/web/bpmn/BpmnProcessActivityController.java @@ -4,6 +4,8 @@ import cn.axzo.karma.client.feign.FlowSupportApi; import cn.axzo.karma.client.model.request.PersonProfileQueryReq; import cn.axzo.karma.client.model.response.PersonProfileResp; import cn.axzo.workflow.client.feign.bpmn.ProcessActivityApi; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO; +import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; @@ -119,4 +121,28 @@ public class BpmnProcessActivityController extends BasicPopulateAvatarController bpmnProcessActivityService.setAssignee(dto); return success(true); } + + /** + * 该功能应该利用引擎的 TimerBoundaryEvent 来实现,但为了简便,先利用引擎的任务调度来实现 + * + * @param dto + * @return + */ + @PostMapping("/timeout/trigger") + @Override + public CommonResponse setTimeoutTrigger(@Validated @RequestBody BpmnActivityTimeoutTriggerDTO dto) { + return CommonResponse.success(bpmnProcessActivityService.setTimeoutTrigger(dto)); + } + + /** + * 为指定业务节点设置定时回调 + * + * @param dto + * @return + */ + @PostMapping("/timeout/callback") + @Override + public CommonResponse setTimeoutCallback(@Validated @RequestBody BpmnActivityTimeoutCallbackDTO dto) { + return CommonResponse.success(bpmnProcessActivityService.setTimeOutCallback(dto)); + } } diff --git a/workflow-engine-server/src/main/resources/bootstrap.yml b/workflow-engine-server/src/main/resources/bootstrap.yml index ada47bc8a..c4f24cc3e 100644 --- a/workflow-engine-server/src/main/resources/bootstrap.yml +++ b/workflow-engine-server/src/main/resources/bootstrap.yml @@ -18,7 +18,8 @@ logging: level: com.alibaba.nacos.client.config.impl: WARN org.flowable: INFO - +server: + shutdown: graceful --- #开发环境 spring: diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java index ec17bb5ac..de0d5e410 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java @@ -107,6 +107,7 @@ public class StarterBroadcastMQConfiguration { @Override public void onMessage(MessageExt message) { + // 处理 properties 配置进行事件过滤 for (InnerMessageQueueHandleBeforeFilter filter : filters) { if (filter.doFilter(message)) { log.info("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId()); diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java index 37c780918..a2761dc16 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java @@ -5,7 +5,6 @@ import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeD import cn.axzo.workflow.starter.WorkflowEngineStarterProperties; import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException; import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer; -import cn.azxo.framework.common.model.CommonResponse; import com.alibaba.fastjson.JSON; import feign.Client; import feign.MethodMetadata; diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java index 4600ecd24..a949514d9 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterDecoder.java @@ -1,5 +1,6 @@ package cn.axzo.workflow.starter.feign.ext; +import cn.axzo.framework.jackson.utility.JSON; import cn.axzo.workflow.common.model.response.BpmPageResult; import cn.axzo.workflow.starter.common.exception.WorkflowRpcInvokeException; import cn.azxo.framework.common.model.CommonResponse; @@ -71,6 +72,7 @@ final class WorkflowEngineStarterDecoder implements Decoder { wrappedType = ParameterizedTypeImpl.make(CommonResponse.class, new Type[]{type}, null); } Object decode = delegate.decode(response, wrappedType); + log.info("workflow engine starter response :{}", JSON.toJSONString(decode)); if (decode instanceof CommonResponse) { CommonResponse commonResponse = (CommonResponse) decode; if (response.status() == 202) { diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/MessageNotificationEventHandler.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/MessageNotificationEventHandler.java index d07200a38..092d93eda 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/MessageNotificationEventHandler.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/MessageNotificationEventHandler.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.starter.handler; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.workflow.common.model.response.mq.MessagePushDTO; import org.springframework.core.Ordered; @@ -13,6 +15,18 @@ import org.springframework.core.Ordered; */ public interface MessageNotificationEventHandler extends Ordered { + /** + * 针对当前接口的实现进行过滤, 为 true 时,往下执行下面的方法 + * + * @param dto + * @param event + * @param context + * @return + */ + default boolean accept(MessagePushDTO dto, Event event, EventConsumer.Context context) { + return true; + } + /** * 站内信推送 * diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessActivityEventHandler.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessActivityEventHandler.java index eeb3c2724..6fa3188f2 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessActivityEventHandler.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessActivityEventHandler.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.starter.handler; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO; import org.springframework.core.Ordered; @@ -18,6 +20,18 @@ import org.springframework.core.Ordered; */ public interface ProcessActivityEventHandler extends Ordered { + /** + * 针对当前接口的实现进行过滤, 为 true 时,往下执行下面的方法 + * + * @param dto + * @param event + * @param context + * @return + */ + default boolean accept(ProcessActivityDTO dto, Event event, EventConsumer.Context context) { + return true; + } + /** * 节点已启动 * @@ -34,6 +48,14 @@ public interface ProcessActivityEventHandler extends Ordered { default void onWaitAssignee(ProcessActivityDTO dto) { } + /** + * 业务节点回调 + * + * @param dto + */ + default void onCallback(ProcessActivityDTO dto) { + } + /** * 节点已完成 * diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessInstanceEventHandler.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessInstanceEventHandler.java index f427ac927..6660dcacc 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessInstanceEventHandler.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessInstanceEventHandler.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.starter.handler; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.workflow.client.feign.bpmn.ProcessInstanceApi; import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAbortDTO; import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO; @@ -15,6 +17,18 @@ import org.springframework.core.Ordered; */ public interface ProcessInstanceEventHandler extends Ordered { + /** + * 针对当前接口的实现进行过滤, 为 true 时,往下执行下面的方法 + * + * @param dto + * @param event + * @param context + * @return + */ + default boolean accept(ProcessInstanceDTO dto, Event event, EventConsumer.Context context) { + return true; + } + /** * 流程实例创建成功后回调 * diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessTaskEventHandler.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessTaskEventHandler.java index abb3d896c..9acdf7970 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessTaskEventHandler.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/ProcessTaskEventHandler.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.starter.handler; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.workflow.common.model.response.mq.ProcessTaskDTO; import org.springframework.core.Ordered; @@ -15,6 +17,18 @@ import org.springframework.core.Ordered; */ public interface ProcessTaskEventHandler extends Ordered { + /** + * 针对当前接口的实现进行过滤, 为 true 时,往下执行下面的方法 + * + * @param dto + * @param event + * @param context + * @return + */ + default boolean accept(ProcessTaskDTO dto, Event event, EventConsumer.Context context) { + return true; + } + /** * 用户任务已指派审核人 */ diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java index aa184f8ed..32bbe0aa0 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java @@ -1,11 +1,14 @@ package cn.axzo.workflow.starter.handler.filter; import cn.axzo.workflow.common.model.response.mq.MessagePushDTO; +import cn.axzo.workflow.starter.handler.MessageNotificationEventHandler; import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; /** * MessageNotificationEvent 的自定义过滤接口 *

+ * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link MessageNotificationEventHandler#filter(MessagePushDTO)} 实例的专属过滤 + *

* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE) * * @author wangli diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java index a769d28b3..cd3d56233 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java @@ -1,11 +1,14 @@ package cn.axzo.workflow.starter.handler.filter; import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO; +import cn.axzo.workflow.starter.handler.ProcessActivityEventHandler; import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; /** * ProcessActivityEvent 自定义的过滤接口 *

+ * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link ProcessActivityEventHandler#filter(ProcessActivityDTO)} 实例的专属过滤 + *

* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE) * * @author wangli diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessInstanceEventFilter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessInstanceEventFilter.java index fb04e3360..310807c08 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessInstanceEventFilter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessInstanceEventFilter.java @@ -1,11 +1,14 @@ package cn.axzo.workflow.starter.handler.filter; import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO; +import cn.axzo.workflow.starter.handler.ProcessInstanceEventHandler; import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; /** * ProcessInstanceEvent 自定义的过滤接口 *

+ * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link ProcessInstanceEventHandler#filter(ProcessInstanceDTO)} 实例的专属过滤 + *

* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE) * * @author wangli diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessTaskEventFilter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessTaskEventFilter.java index 22ac91008..3ede54f29 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessTaskEventFilter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessTaskEventFilter.java @@ -1,11 +1,14 @@ package cn.axzo.workflow.starter.handler.filter; import cn.axzo.workflow.common.model.response.mq.ProcessTaskDTO; +import cn.axzo.workflow.starter.handler.ProcessTaskEventHandler; import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; /** * ProcessTaskEvent 自定义的过滤接口 *

+ * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link ProcessTaskEventHandler#filter(ProcessTaskDTO)} 实例的专属过滤 + *

* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE) * * @author wangli diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractInnerWorkflowListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractInnerWorkflowListener.java index 0956606c0..97a3faca4 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractInnerWorkflowListener.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/AbstractInnerWorkflowListener.java @@ -54,25 +54,23 @@ public abstract class AbstractInnerWorkflowListener