Merge remote-tracking branch 'refs/remotes/origin/master' into feature/REQ-2924

# Conflicts:
#	workflow-engine-api/src/main/java/cn/axzo/workflow/client/feign/bpmn/ProcessActivityApi.java
#	workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java
#	workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/BpmnProcessActivityService.java
#	workflow-engine-core/src/main/java/cn/axzo/workflow/core/service/impl/BpmnProcessActivityServiceImpl.java
#	workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/api/WorkflowCoreService.java
#	workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/api/WorkflowManageService.java
#	workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java
This commit is contained in:
yangqicheng 2024-09-25 17:49:37 +08:00
commit c036742415
40 changed files with 886 additions and 23 deletions

View File

@ -2,7 +2,6 @@ package cn.axzo.workflow.client.config;
import feign.RequestInterceptor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@ -43,7 +42,7 @@ public class WorkflowEngineClientAutoConfiguration {
private static final Pattern POM_VERSION = Pattern.compile("<parent>.*<version>(\\S+)</version>.*</parent>",
Pattern.DOTALL);
@Bean
@Bean("serviceVersion")
public String serviceVersion() {
Map<String, String> env = System.getenv();
if (env != null) {

View File

@ -1,6 +1,9 @@
package cn.axzo.workflow.client.feign.bpmn;
import cn.axzo.workflow.client.annotation.WorkflowEngineFeignClient;
import cn.axzo.workflow.common.annotation.Manageable;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO;
import cn.azxo.framework.common.model.CommonResponse;
@ -55,4 +58,24 @@ public interface ProcessActivityApi {
@PostMapping("/api/process/activity/assignee/set")
@Operation(summary = "业务节点设置审批人,不支持重复调用设置审批人,需一次性传入所有审批人")
CommonResponse<Boolean> setAssignee(@Validated @RequestBody BpmnActivitySetAssigneeDTO dto);
/**
* 该功能应该利用引擎的 TimerBoundaryEvent 来实现但为了简便先利用引擎的任务调度来实现
*
* @return
*/
@PostMapping("/api/process/activity/timeout/trigger")
@Manageable
@Operation(summary = "设置指定业务节点定时继续往下执行")
CommonResponse<Boolean> setTimeoutTrigger(@Validated @RequestBody BpmnActivityTimeoutTriggerDTO dto);
/**
* 为指定业务节点设置定时回调
*
* @return
*/
@Manageable
@PostMapping("/api/process/activity/timeout/callback")
@Operation(summary = "设置指定业务节点定时回调")
CommonResponse<Boolean> setTimeoutCallback(@Validated @RequestBody BpmnActivityTimeoutCallbackDTO dto);
}

View File

@ -13,6 +13,7 @@ public enum ProcessActivityEventEnum {
PROCESS_ACTIVITY_START("process-activity", "process-activity-start", "流程活动节点已开始"),
PROCESS_ACTIVITY_WAIT_ASSIGNEE("process-activity", "process-activity-wait-assignee", "流程活动节点等待指定审批人"),
PROCESS_ACTIVITY_TAKE("process-activity", "process-activity-take", "该事件类型暂时不存在"),
PROCESS_ACTIVITY_CALLBACK("process-activity", "process-activity-callback", "业务节点定时回调"),
PROCESS_ACTIVITY_END("process-activity", "process-activity-end", "流程活动节点已取消"),
;

View File

@ -0,0 +1,41 @@
package cn.axzo.workflow.common.model.request.bpmn.activity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import java.util.Map;
/**
* 为指定业务节点设置倒计时回调
*
* @author wangli
* @since 2024-08-16 15:30
*/
@ApiModel("为指定业务节点设置倒计时回调")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BpmnActivityTimeoutCallbackDTO {
/**
* 业务节点的触发 ID
*/
@NotBlank(message = "触发 ID 不能为空")
private String triggerId;
/**
* 触发往下流转的时间点 格式yyyy-MM-dd HH:mm:ss
* <p>
* 应该为 Date 类型但为了确保特殊情况需要通过 pod 内直接触发
*/
@NotBlank(message = "触发时间不能为空格式yyyy-MM-dd HH:mm:ss")
private String endTime;
/**
* 用于追加或修改现有的变量值
*/
private Map<String, Object> variables;
}

View File

@ -0,0 +1,41 @@
package cn.axzo.workflow.common.model.request.bpmn.activity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import java.util.Map;
/**
* 为指定业务节点设置倒计时在啥时候继续往下流转
*
* @author wangli
* @since 2024-08-16 15:30
*/
@ApiModel("为指定业务节点设置倒计时")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BpmnActivityTimeoutTriggerDTO {
/**
* 业务节点的触发 ID
*/
@NotBlank(message = "触发 ID 不能为空")
private String triggerId;
/**
* 触发往下流转的时间点 格式yyyy-MM-dd HH:mm:ss
* <p>
* 应该为 Date 类型但为了确保特殊情况需要通过 pod 内直接触发
*/
@NotBlank(message = "触发时间不能为空格式yyyy-MM-dd HH:mm:ss")
private String endTime;
/**
* 用于追加或修改现有的变量值(暂不支持)
*/
@Deprecated
private Map<String, Object> variables;
}

View File

@ -11,6 +11,7 @@ import cn.axzo.workflow.common.model.request.bpmn.BpmnNoticeConf;
import cn.axzo.workflow.common.model.request.bpmn.model.BpmnModelCreateDTO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.converter.json.AbstractBpmnJsonConverter;
import cn.axzo.workflow.core.converter.json.BoundaryEventJsonConverter;
import cn.axzo.workflow.core.converter.json.EndEventJsonConverter;
import cn.axzo.workflow.core.converter.json.ExclusiveGatewayJsonConverter;
import cn.axzo.workflow.core.converter.json.NotSupportConverter;
@ -27,6 +28,7 @@ import com.google.common.collect.Lists;
import org.flowable.bpmn.BpmnAutoLayout;
import org.flowable.bpmn.converter.BpmnXMLConverter;
import org.flowable.bpmn.model.BaseElement;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.ExclusiveGateway;
@ -142,6 +144,7 @@ public final class BpmnJsonConverterUtil {
CONVERTERS.put(UserTask.class, new UserTaskJsonConverter());
CONVERTERS.put(ServiceTask.class, new ServiceTaskJsonConverter());
CONVERTERS.put(ReceiveTask.class, new ReceiveTaskJsonConverter());
CONVERTERS.put(BoundaryEvent.class, new BoundaryEventJsonConverter());
}
/**

View File

@ -5,6 +5,9 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory;
import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator;
import cn.axzo.workflow.core.engine.interceptor.CustomRetryInterceptor;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivitySetAssigneeJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncActivityTriggerJobHandler;
@ -64,6 +67,7 @@ public class FlowableConfiguration {
ObjectProvider<FlowableEventListener> listeners,
CustomActivityBehaviorFactory customActivityBehaviorFactory,
ExtAxHiTaskInstService extAxHiTaskInstService,
BpmnProcessActivityService bpmnProcessActivityService,
List<JobProcessor> jobProcessors,
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties,
@ -94,6 +98,12 @@ public class FlowableConfiguration {
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncTransferUserTaskJobHandler());
configuration.addCustomJobHandler(new AsyncTermNodeAlterJobHandler(refreshProperties));
configuration.addCustomJobHandler(new AsyncCountersignUserTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncCancelProcessInstanceHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncAbortProcessInstanceHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncBpmnProcessActivityJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityLeaveJobHandler(bpmnProcessActivityService));
configuration.addCustomJobHandler(new AsyncActivityCallbackJobHandler());
// 异步任务异常重试时间间隔
configuration.setDefaultFailedJobWaitTime(30);
configuration.setAsyncFailedJobWaitTime(30);

View File

@ -0,0 +1,57 @@
package cn.axzo.workflow.core.converter.json;
import cn.axzo.workflow.common.model.request.bpmn.BpmnJsonNode;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.EventDefinition;
import org.flowable.bpmn.model.FlowableListener;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.engine.delegate.BaseExecutionListener;
import java.util.ArrayList;
import java.util.List;
import static org.flowable.bpmn.model.ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION;
/**
* 边界事件
*
* @author wangli
* @since 2024-08-16 11:47
*/
public class BoundaryEventJsonConverter extends AbstractBpmnJsonConverter<BoundaryEvent> {
@Override
public BoundaryEvent convertJsonToElement(BpmnJsonNode node, Process process) {
BoundaryEvent boundaryEvent = new BoundaryEvent();
boundaryEvent.setId(node.getId() + "_boundaryEvent");
boundaryEvent.setCancelActivity(false);
boundaryEvent.setAttachedToRefId(process.getId());
// 设置执行监听
setExecutionListeners(boundaryEvent);
// 设置 TimerEventDefinition
setTimerEventDefinition(boundaryEvent);
return boundaryEvent;
}
private void setTimerEventDefinition(BoundaryEvent boundaryEvent) {
List<EventDefinition> eventDefinitions = new ArrayList<>();
TimerEventDefinition timerEventDefinition = new TimerEventDefinition();
timerEventDefinition.setEndDate("${timerEndDate}");
eventDefinitions.add(timerEventDefinition);
boundaryEvent.setEventDefinitions(eventDefinitions);
}
private void setExecutionListeners(BoundaryEvent boundaryEvent) {
List<FlowableListener> executionListeners = new ArrayList<>();
// 设置执行监听
FlowableListener executionListener = new FlowableListener();
executionListener.setEvent(BaseExecutionListener.EVENTNAME_START);
executionListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
executionListener.setImplementation("${boundaryEventExecutionStartListener}");
executionListeners.add(executionListener);
boundaryEvent.setExecutionListeners(executionListeners);
}
}

View File

@ -28,6 +28,12 @@ public abstract class AbstractCommand<T> implements Command<T> {
}
}
/**
* 实现该方法默认就捕获了 command 一些引擎内部异常
*
* @param commandContext
* @return
*/
public T executeInternal(CommandContext commandContext) {
return null;
}

View File

@ -0,0 +1,84 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.core.engine.job.AsyncActivityCallbackJobHandler;
import com.alibaba.fastjson.JSON;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.jobexecutor.TimerEventHandler;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.TimerUtil;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
/**
* 业务节点触发倒计时回调
*
* @author wangli
* @since 2024-08-20 16:59
*/
public class CustomBusinessNodeTimeoutCallbackCmd extends AbstractCommand<Void> implements Serializable {
private static final long serialVersionUID = 1L;
private final String triggerId;
private final String endTime;
private final Map<String, Object> variables;
public CustomBusinessNodeTimeoutCallbackCmd(String triggerId, String endTime, Map<String, Object> variables) {
this.triggerId = triggerId;
this.endTime = endTime;
this.variables = variables;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("triggerId", triggerId);
params.put("endTime", endTime);
return JSON.toJSONString(params);
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
ExecutionEntity executionEntity = (ExecutionEntity) processEngineConfiguration.getRuntimeService().createExecutionQuery().executionId(triggerId).singleResult();
if (executionEntity == null) {
return null;
}
String newEndTime = parseDateTimeWithTimeZone();
TimerEventDefinition timerEventDefinition = new TimerEventDefinition();
timerEventDefinition.setTimeDate(newEndTime);
TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition, executionEntity.getCurrentFlowElement(),
true, executionEntity, AsyncActivityCallbackJobHandler.TYPE, TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(),
timerEventDefinition.getEndDate(), timerEventDefinition.getCalendarName()));
if (timerJob != null) {
timerJob.setCustomValues(JSON.toJSONString(new BpmnActivityTimeoutCallbackDTO(triggerId, endTime, variables)));
CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob);
}
return null;
}
private String parseDateTimeWithTimeZone() {
DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.parse(endTime, inputFormat);
// 转换为东八区的ZonedDateTime
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.of("Asia/Shanghai"));
// 格式化为包含时区偏移量的ISO 8601字符串
DateTimeFormatter outputFormat = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
return zonedDateTime.format(outputFormat);
}
}

View File

@ -0,0 +1,84 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.core.engine.job.AsyncActivityLeaveJobHandler;
import com.alibaba.fastjson.JSON;
import org.flowable.bpmn.model.TimerEventDefinition;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.jobexecutor.TimerEventHandler;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.TimerUtil;
import org.flowable.job.service.impl.persistence.entity.TimerJobEntity;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* 业务节点触发倒计时
*
* @author wangli
* @since 2024-08-16 15:38
*/
public class CustomBusinessNodeTimeoutTriggerCmd extends AbstractCommand<Void> implements Serializable {
private static final long serialVersionUID = 1L;
private final String triggerId;
private final String endTime;
private final Map<String, Object> variables;
public CustomBusinessNodeTimeoutTriggerCmd(String triggerId, String endTime, Map<String, Object> variables) {
this.triggerId = triggerId;
this.endTime = endTime;
this.variables = variables;
}
@Override
public String paramToJsonString() {
Map<String, Object> params = new HashMap<>();
params.put("triggerId", triggerId);
params.put("endTime", endTime);
return JSON.toJSONString(params);
}
@Override
public Void execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(commandContext);
ExecutionEntity executionEntity = (ExecutionEntity) processEngineConfiguration.getRuntimeService().createExecutionQuery().executionId(triggerId).singleResult();
if (Objects.isNull(executionEntity)) {
return null;
}
String newEndTime = parseDateTimeWithTimeZone();
TimerEventDefinition timerEventDefinition = new TimerEventDefinition();
timerEventDefinition.setTimeDate(newEndTime);
TimerJobEntity timerJob = TimerUtil.createTimerEntityForTimerEventDefinition(timerEventDefinition, executionEntity.getCurrentFlowElement(),
true, executionEntity, AsyncActivityLeaveJobHandler.TYPE, TimerEventHandler.createConfiguration(executionEntity.getCurrentActivityId(),
timerEventDefinition.getEndDate(), timerEventDefinition.getCalendarName()));
if (timerJob != null) {
timerJob.setCustomValues(JSON.toJSONString(new BpmnActivityTimeoutTriggerDTO(triggerId, endTime, variables)));
CommandContextUtil.getTimerJobService().scheduleTimerJob(timerJob);
}
return null;
}
private String parseDateTimeWithTimeZone() {
DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.parse(endTime, inputFormat);
// 转换为东八区的ZonedDateTime
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.of("Asia/Shanghai"));
// 格式化为包含时区偏移量的ISO 8601字符串
DateTimeFormatter outputFormat = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
return zonedDateTime.format(outputFormat);
}
}

View File

@ -0,0 +1,27 @@
package cn.axzo.workflow.core.engine.event;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import java.util.Map;
/**
* TODO
*
* @author wangli
* @since 2024-08-20 17:24
*/
public interface BizCallbackEvent extends FlowableEvent {
String getActivityId();
String getActivityName();
String getProcessInstanceId();
String getProcessDefinitionId();
String getBusinessKey();
String getExecutionId();
Map<String, Object> getVariables();
}

View File

@ -0,0 +1,108 @@
package cn.axzo.workflow.core.engine.event;
import org.apache.commons.lang3.StringUtils;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 业务节点回调事件
*
* @author wangli
* @since 2024-08-20 17:25
*/
public class BizCallbackEventImpl implements BizCallbackEvent {
private final BizCallbackEventType type;
private final String activityId;
private final String activityName;
private final String processInstanceId;
private final String processDefinitionId;
private final String businessKey;
private final String executionId;
private final Map<String, Object> variables;
public BizCallbackEventImpl(BizCallbackEventType type, String activityId, String activityName, String processInstanceId, String processDefinitionId, String businessKey, String executionId, Map<String, Object> variables) {
this.type = type;
this.activityId = activityId;
this.activityName = activityName;
this.processInstanceId = processInstanceId;
this.processDefinitionId = processDefinitionId;
this.businessKey = businessKey;
this.executionId = executionId;
this.variables = variables;
}
/**
* @return type of event.
*/
@Override
public FlowableEventType getType() {
return BizCallbackEventType.CALLBACK;
}
@Override
public String getActivityId() {
return activityId;
}
@Override
public String getActivityName() {
return activityName;
}
@Override
public String getProcessInstanceId() {
return processInstanceId;
}
@Override
public String getProcessDefinitionId() {
return processDefinitionId;
}
@Override
public String getBusinessKey() {
return businessKey;
}
@Override
public String getExecutionId() {
return executionId;
}
@Override
public Map<String, Object> getVariables() {
return variables;
}
public enum BizCallbackEventType implements FlowableEventType {
CALLBACK,
;
public static final BizCallbackEventType[] EMPTY_ARRAY = new BizCallbackEventType[]{};
public static BizCallbackEventType[] getTypesFromString(String string) {
List<BizCallbackEventType> result = new ArrayList<>();
if (string != null && !string.isEmpty()) {
String[] split = StringUtils.split(string, ",");
for (String typeName : split) {
boolean found = false;
for (BizCallbackEventType type : values()) {
if (typeName.toUpperCase().equals(type.name())) {
result.add(type);
found = true;
break;
}
}
if (!found) {
throw new FlowableIllegalArgumentException("Invalid event-type: " + typeName);
}
}
}
return result.toArray(EMPTY_ARRAY);
}
}
}

View File

@ -0,0 +1,72 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.engine.event.BizCallbackEventImpl;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.api.Task;
import org.flowable.variable.api.delegate.VariableScope;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static cn.axzo.workflow.core.common.code.BpmnInstanceRespCode.PROCESS_INSTANCE_ID_NOT_EXISTS;
/**
* 业务节点的触发离开的任务处理器
*
* @author wangli
* @since 2024-08-16 16:29
*/
@Slf4j
public class AsyncActivityCallbackJobHandler extends AbstractJobHandler implements JobHandler {
public static final String TYPE = "business-node-callback";
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.warn("AsyncActivityLeaveJobHandler exec start...");
BpmnActivityTimeoutCallbackDTO dto = JSON.parseObject(job.getCustomValues(), BpmnActivityTimeoutCallbackDTO.class);
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
FlowableEventDispatcher eventDispatcher = processEngineConfiguration.getEventDispatcher();
TaskService taskService = processEngineConfiguration.getTaskService();
Task task = taskService.createTaskQuery().executionId(dto.getTriggerId()).singleResult();
HistoryService historyService = processEngineConfiguration.getHistoryService();
HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery().includeProcessVariables()
.processInstanceId(task.getProcessInstanceId()).singleResult();
if (Objects.isNull(processInstance)) {
throw new WorkflowEngineException(PROCESS_INSTANCE_ID_NOT_EXISTS, task.getProcessInstanceId());
}
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
runtimeService.setVariables(processInstance.getId(), dto.getVariables());
Map<String, Object> processVariables = new HashMap<>(processInstance.getProcessVariables());
if (!CollectionUtils.isEmpty(dto.getVariables())) {
processVariables.putAll(dto.getVariables());
}
eventDispatcher.dispatchEvent(new BizCallbackEventImpl(BizCallbackEventImpl.BizCallbackEventType.CALLBACK,
task.getTaskDefinitionKey(), task.getName(),
task.getProcessInstanceId(), task.getProcessDefinitionId(),
processInstance.getBusinessKey(), dto.getTriggerId(),
processVariables),
processEngineConfiguration.getEngineCfgKey());
}
}

View File

@ -0,0 +1,38 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.core.service.BpmnProcessActivityService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
/**
* 业务节点的触发离开的任务处理器
*
* @author wangli
* @since 2024-08-16 16:29
*/
@Slf4j
public class AsyncActivityLeaveJobHandler extends AbstractJobHandler implements JobHandler {
private final BpmnProcessActivityService bpmnProcessActivityService;
public static final String TYPE = "business-node-leave-task";
public AsyncActivityLeaveJobHandler(BpmnProcessActivityService bpmnProcessActivityService) {
this.bpmnProcessActivityService = bpmnProcessActivityService;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.warn("AsyncActivityLeaveJobHandler exec start...");
BpmnActivityTimeoutTriggerDTO dto = JSON.parseObject(job.getCustomValues(), BpmnActivityTimeoutTriggerDTO.class);
bpmnProcessActivityService.trigger(dto.getTriggerId());
}
}

View File

@ -0,0 +1,66 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.workflow.core.common.context.ActivityOperationContext;
import cn.axzo.workflow.core.engine.event.BizCallbackEvent;
import cn.axzo.workflow.core.engine.event.BizCallbackEventImpl;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import com.google.common.collect.ImmutableSet;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static cn.axzo.workflow.core.engine.event.BizCallbackEventImpl.BizCallbackEventType.CALLBACK;
/**
* 业务节点回调事件
*
* @author wangli
* @since 2024-08-20 17:47
*/
@Slf4j
@Component
public class EngineActivityCallbackEventListener extends AbstractFlowableEventListener {
@Resource
ObjectProvider<List<BpmnActivityEventListener>> activityListeners;
public static final Set<BizCallbackEventImpl.BizCallbackEventType> BIZ_CALLBACK_EVENTS =
ImmutableSet.<BizCallbackEventImpl.BizCallbackEventType>builder()
.add(CALLBACK)
.build();
@Override
public void onEvent(FlowableEvent flowableEvent) {
if (flowableEvent instanceof BizCallbackEvent) {
BizCallbackEvent event = (BizCallbackEvent) flowableEvent;
BizCallbackEventImpl.BizCallbackEventType type = (BizCallbackEventImpl.BizCallbackEventType) event.getType();
if (BIZ_CALLBACK_EVENTS.contains(type)) {
switch (type) {
case CALLBACK:
getOrderedListeners().forEach(i -> i.onCallback(event));
break;
default:
}
}
}
}
private List<BpmnActivityEventListener> getOrderedListeners() {
ActivityOperationContext context = new ActivityOperationContext();
List<BpmnActivityEventListener> orderListeners = new ArrayList<>();
activityListeners.ifAvailable(orderListeners::addAll);
orderListeners.forEach(i -> i.setContext(context));
return orderListeners;
}
@Override
public boolean isFailOnException() {
return true;
}
}

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.core.listener;
import cn.axzo.workflow.core.common.context.OperationContext;
import cn.axzo.workflow.core.engine.event.BizCallbackEvent;
import cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEvent;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.core.Ordered;
@ -36,6 +37,12 @@ public interface BpmnActivityEventListener extends OperationContext, Ordered {
*/
default void onTake(DelegateExecution execution) {}
/**
* 节点定时回调
*/
default void onCallback(BizCallbackEvent event) {
}
/**
* 节点已取消
*

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.core.service;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
@ -28,4 +30,26 @@ public interface BpmnProcessActivityService {
*/
void setAssignee(BpmnActivitySetAssigneeDTO dto);
/**
* 给指定实例的指定节点重设审批人
*
* @param dto
*/
void setAssigneeAsync(BpmnActivitySetAssigneeDTO dto);
/**
* 设置指定业务接口继续往下流转的触发时间
*
* @param dto
* @return
*/
Boolean setTimeoutTrigger(BpmnActivityTimeoutTriggerDTO dto);
/**
* 设置指定业务节点定时回调(仅通过 MQ 事件广播,自身状态不做任何改变)
*
* @param dto
* @return
*/
Boolean setTimeOutCallback(BpmnActivityTimeoutCallbackDTO dto);
}

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.core.service.impl;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
@ -8,6 +10,9 @@ import cn.axzo.workflow.core.engine.cmd.CustomActivityTriggerAsyncCmd;
import cn.axzo.workflow.core.engine.cmd.CustomActivityTriggerCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBizSpecifyAssigneeToTaskAsyncCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBizSpecifyAssigneeToTaskCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutCallbackCmd;
import cn.axzo.workflow.core.engine.cmd.CustomBusinessNodeTimeoutTriggerCmd;
import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler;
import cn.axzo.workflow.core.service.BpmnProcessActivityService;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import lombok.extern.slf4j.Slf4j;
@ -69,4 +74,24 @@ public class BpmnProcessActivityServiceImpl implements BpmnProcessActivityServic
}
}
@Override
public Boolean setTimeoutTrigger(BpmnActivityTimeoutTriggerDTO dto) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomBusinessNodeTimeoutTriggerCmd(dto.getTriggerId(), dto.getEndTime(),
dto.getVariables()));
return true;
}
/**
* 设置指定业务节点定时回调
*
* @param dto
* @return
*/
@Override
public Boolean setTimeOutCallback(BpmnActivityTimeoutCallbackDTO dto) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new CustomBusinessNodeTimeoutCallbackCmd(dto.getTriggerId(), dto.getEndTime(), dto.getVariables()));
return true;
}
}

View File

@ -56,7 +56,8 @@ public class ExtAxPropertyServiceImpl implements ExtAxPropertyService {
}
QueryWrapper<ExtAxProperty> queryWrapper = new QueryWrapper<ExtAxProperty>()
.eq("name", name);
return Optional.ofNullable(mapper.selectOne(queryWrapper));
Optional<ExtAxProperty> extAxProperty = Optional.ofNullable(mapper.selectOne(queryWrapper));
return extAxProperty;
}
@Override

View File

@ -67,6 +67,7 @@ public class ErrorReportAspect implements Ordered {
*/
@Around(value = "@within(org.springframework.web.bind.annotation.RestController) || @within(org.springframework.stereotype.Service)")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("{}.{}", joinPoint.getSignature().getDeclaringTypeName(), joinPoint.getSignature().getName());
Signature signature = joinPoint.getSignature();
StopWatch watch = new StopWatch("running controller time");
watch.start(signature.toShortString());

View File

@ -8,6 +8,7 @@ import cn.axzo.workflow.common.model.request.bpmn.task.ExtHiTaskSearchDTO;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import cn.axzo.workflow.core.common.context.ActivityOperationContext;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.engine.event.BizCallbackEvent;
import cn.axzo.workflow.core.engine.event.BizSpecifyAssigneeEvent;
import cn.axzo.workflow.core.listener.AbstractBpmnEventListener;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
@ -39,6 +40,7 @@ import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_ACTIVITY_R
import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_OWNERSHIP_PROCESS_DEFINITION_KEY;
import static cn.axzo.workflow.common.constant.BpmnConstants.WORKFLOW_ENGINE_VERSION;
import static cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum.APPROVED;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_CALLBACK;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_END;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_START;
import static cn.axzo.workflow.common.enums.ProcessActivityEventEnum.PROCESS_ACTIVITY_TAKE;
@ -116,6 +118,35 @@ public class RocketMqBpmActivityEvent_100_Listener extends AbstractBpmnEventList
execution.getCurrentActivityId(), execution.getProcessInstanceId());
}
@Override
public void onCallback(BizCallbackEvent event) {
log.info("RocketMqBpmActivityEventListener#onCallback...activityId: {}, processInstanceId: {}",
event.getActivityId(), event.getProcessInstanceId());
// 特殊的自定义业务事件, 不能使用公共的 build 方法
ProcessActivityDTO dto = new ProcessActivityDTO();
dto.setType(PROCESS_ACTIVITY_CALLBACK);
dto.setActivityId(event.getActivityId());
dto.setActivityName(event.getActivityName());
dto.setProcessInstanceId(event.getProcessInstanceId());
dto.setProcessDefinitionId(event.getProcessDefinitionId());
ProcessInstance processInstance = getContext().getProcessInstance(() ->
runtimeService.createProcessInstanceQuery()
.processInstanceId(event.getProcessInstanceId())
.singleResult());
if (Objects.nonNull(processInstance)) {
dto.setProcessDefinitionKey(processInstance.getProcessDefinitionKey());
dto.setBusinessKey(processInstance.getBusinessKey());
}
dto.setTriggerId(event.getExecutionId());
dto.setVariables(event.getVariables());
dto.setWorkflowEngineVersion(String.valueOf(event.getVariables().getOrDefault(WORKFLOW_ENGINE_VERSION, FLOW_SERVER_VERSION_121)));
sendMessageQueue(dto, PROCESS_ACTIVITY_CALLBACK);
log.info("RocketMqBpmActivityEventListener#onCallback...end, activityId: {}, processInstanceId: {}",
event.getActivityId(), event.getProcessInstanceId());
}
@Override
public void onEnd(DelegateExecution execution) {
log.info("RocketMqMessagePushEventListener#onEnd...activityId: {}, processInstanceId: {}",

View File

@ -4,6 +4,8 @@ import cn.axzo.karma.client.feign.FlowSupportApi;
import cn.axzo.karma.client.model.request.PersonProfileQueryReq;
import cn.axzo.karma.client.model.response.PersonProfileResp;
import cn.axzo.workflow.client.feign.bpmn.ProcessActivityApi;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
@ -119,4 +121,28 @@ public class BpmnProcessActivityController extends BasicPopulateAvatarController
bpmnProcessActivityService.setAssignee(dto);
return success(true);
}
/**
* 该功能应该利用引擎的 TimerBoundaryEvent 来实现但为了简便先利用引擎的任务调度来实现
*
* @param dto
* @return
*/
@PostMapping("/timeout/trigger")
@Override
public CommonResponse<Boolean> setTimeoutTrigger(@Validated @RequestBody BpmnActivityTimeoutTriggerDTO dto) {
return CommonResponse.success(bpmnProcessActivityService.setTimeoutTrigger(dto));
}
/**
* 为指定业务节点设置定时回调
*
* @param dto
* @return
*/
@PostMapping("/timeout/callback")
@Override
public CommonResponse<Boolean> setTimeoutCallback(@Validated @RequestBody BpmnActivityTimeoutCallbackDTO dto) {
return CommonResponse.success(bpmnProcessActivityService.setTimeOutCallback(dto));
}
}

View File

@ -18,7 +18,8 @@ logging:
level:
com.alibaba.nacos.client.config.impl: WARN
org.flowable: INFO
server:
shutdown: graceful
---
#开发环境
spring:

View File

@ -107,6 +107,7 @@ public class StarterBroadcastMQConfiguration {
@Override
public void onMessage(MessageExt message) {
// 处理 properties 配置进行事件过滤
for (InnerMessageQueueHandleBeforeFilter filter : filters) {
if (filter.doFilter(message)) {
log.info("【{}】message has been filtered, messageId: {}", this.getClass().getSimpleName(), message.getMsgId());

View File

@ -5,7 +5,6 @@ import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeD
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
import feign.Client;
import feign.MethodMetadata;

View File

@ -1,5 +1,6 @@
package cn.axzo.workflow.starter.feign.ext;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.starter.common.exception.WorkflowRpcInvokeException;
import cn.azxo.framework.common.model.CommonResponse;
@ -71,6 +72,7 @@ final class WorkflowEngineStarterDecoder implements Decoder {
wrappedType = ParameterizedTypeImpl.make(CommonResponse.class, new Type[]{type}, null);
}
Object decode = delegate.decode(response, wrappedType);
log.info("workflow engine starter response :{}", JSON.toJSONString(decode));
if (decode instanceof CommonResponse) {
CommonResponse<?> commonResponse = (CommonResponse<?>) decode;
if (response.status() == 202) {

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.starter.handler;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import org.springframework.core.Ordered;
@ -13,6 +15,18 @@ import org.springframework.core.Ordered;
*/
public interface MessageNotificationEventHandler extends Ordered {
/**
* 针对当前接口的实现进行过滤, true 往下执行下面的方法
*
* @param dto
* @param event
* @param context
* @return
*/
default boolean accept(MessagePushDTO dto, Event event, EventConsumer.Context context) {
return true;
}
/**
* 站内信推送
*

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.starter.handler;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import org.springframework.core.Ordered;
@ -18,6 +20,18 @@ import org.springframework.core.Ordered;
*/
public interface ProcessActivityEventHandler extends Ordered {
/**
* 针对当前接口的实现进行过滤, true 往下执行下面的方法
*
* @param dto
* @param event
* @param context
* @return
*/
default boolean accept(ProcessActivityDTO dto, Event event, EventConsumer.Context context) {
return true;
}
/**
* 节点已启动
*
@ -34,6 +48,14 @@ public interface ProcessActivityEventHandler extends Ordered {
default void onWaitAssignee(ProcessActivityDTO dto) {
}
/**
* 业务节点回调
*
* @param dto
*/
default void onCallback(ProcessActivityDTO dto) {
}
/**
* 节点已完成
*

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.starter.handler;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.client.feign.bpmn.ProcessInstanceApi;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAbortDTO;
import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO;
@ -15,6 +17,18 @@ import org.springframework.core.Ordered;
*/
public interface ProcessInstanceEventHandler extends Ordered {
/**
* 针对当前接口的实现进行过滤, true 往下执行下面的方法
*
* @param dto
* @param event
* @param context
* @return
*/
default boolean accept(ProcessInstanceDTO dto, Event event, EventConsumer.Context context) {
return true;
}
/**
* 流程实例创建成功后回调
*

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.starter.handler;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.model.response.mq.ProcessTaskDTO;
import org.springframework.core.Ordered;
@ -15,6 +17,18 @@ import org.springframework.core.Ordered;
*/
public interface ProcessTaskEventHandler extends Ordered {
/**
* 针对当前接口的实现进行过滤, true 往下执行下面的方法
*
* @param dto
* @param event
* @param context
* @return
*/
default boolean accept(ProcessTaskDTO dto, Event event, EventConsumer.Context context) {
return true;
}
/**
* 用户任务已指派审核人
*/

View File

@ -1,11 +1,14 @@
package cn.axzo.workflow.starter.handler.filter;
import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import cn.axzo.workflow.starter.handler.MessageNotificationEventHandler;
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
/**
* MessageNotificationEvent 的自定义过滤接口
* <p>
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link MessageNotificationEventHandler#filter(MessagePushDTO)} 实例的专属过滤
* <p>
* 注意Order 的顺序遵循值越小越优先取值范围Integer. MIN_VALUE - Integer. MAX_VALUE
*
* @author wangli

View File

@ -1,11 +1,14 @@
package cn.axzo.workflow.starter.handler.filter;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import cn.axzo.workflow.starter.handler.ProcessActivityEventHandler;
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
/**
* ProcessActivityEvent 自定义的过滤接口
* <p>
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link ProcessActivityEventHandler#filter(ProcessActivityDTO)} 实例的专属过滤
* <p>
* 注意Order 的顺序遵循值越小越优先取值范围Integer. MIN_VALUE - Integer. MAX_VALUE
*
* @author wangli

View File

@ -1,11 +1,14 @@
package cn.axzo.workflow.starter.handler.filter;
import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO;
import cn.axzo.workflow.starter.handler.ProcessInstanceEventHandler;
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
/**
* ProcessInstanceEvent 自定义的过滤接口
* <p>
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link ProcessInstanceEventHandler#filter(ProcessInstanceDTO)} 实例的专属过滤
* <p>
* 注意Order 的顺序遵循值越小越优先取值范围Integer. MIN_VALUE - Integer. MAX_VALUE
*
* @author wangli

View File

@ -1,11 +1,14 @@
package cn.axzo.workflow.starter.handler.filter;
import cn.axzo.workflow.common.model.response.mq.ProcessTaskDTO;
import cn.axzo.workflow.starter.handler.ProcessTaskEventHandler;
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
/**
* ProcessTaskEvent 自定义的过滤接口
* <p>
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link ProcessTaskEventHandler#filter(ProcessTaskDTO)} 实例的专属过滤
* <p>
* 注意Order 的顺序遵循值越小越优先取值范围Integer. MIN_VALUE - Integer. MAX_VALUE
*
* @author wangli

View File

@ -54,25 +54,23 @@ public abstract class AbstractInnerWorkflowListener<H extends Ordered, F extends
@Override
public void handleEvent(Event event, EventConsumer.Context context) {
if (CollectionUtils.isEmpty(businessListeners)) {
return;
}
Target convert = convert(event);
if (!CollectionUtils.isEmpty(businessListeners)) {
for (F filter : businessFilters) {
if (filter.doFilter(event, context, convert)) {
log.info("【{}】filtered message, messageId: {}", filter.getClass().getSimpleName(), context.getMsgId());
return;
}
}
}
onEvent(convert, context);
onEvent(convert, event, context);
}
protected abstract Target convert(Event event);
protected abstract void onEvent(Target data, EventConsumer.Context context);
protected abstract void onEvent(Target data, Event event, EventConsumer.Context context);
@Override
public boolean dispatch(Event event) {

View File

@ -40,7 +40,7 @@ public class InnerActivityEventListener extends AbstractInnerWorkflowListener<Pr
}
@Override
public void onEvent(ProcessActivityDTO dto, EventConsumer.Context context) {
public void onEvent(ProcessActivityDTO dto, Event event, EventConsumer.Context context) {
log.debug("【{}】new message begin processing, messageId: {}",
this.getClass().getSimpleName(), context.getMsgId());
ProcessActivityEventEnum type = dto.getType();
@ -53,6 +53,9 @@ public class InnerActivityEventListener extends AbstractInnerWorkflowListener<Pr
case PROCESS_ACTIVITY_WAIT_ASSIGNEE:
consumer = activityListener::onWaitAssignee;
break;
case PROCESS_ACTIVITY_CALLBACK:
consumer = activityListener::onCallback;
break;
case PROCESS_ACTIVITY_TAKE:
consumer = activityListener::onTake;
break;
@ -62,9 +65,11 @@ public class InnerActivityEventListener extends AbstractInnerWorkflowListener<Pr
default:
log.warn("unknown process activity event type: {}", type);
}
if (activityListener.accept(dto, event, context)) {
listenerExecutor.execute(consumer, context, dto);
}
}
}
@Override
protected List<Event.EventCode> getSupportEventCodes() {

View File

@ -40,7 +40,7 @@ public class InnerInstanceEventListener extends AbstractInnerWorkflowListener<Pr
}
@Override
public void onEvent(ProcessInstanceDTO dto, EventConsumer.Context context) {
public void onEvent(ProcessInstanceDTO dto, Event event, EventConsumer.Context context) {
log.debug("【{}】new message begin processing, messageId: {}",
this.getClass().getSimpleName(), context.getMsgId());
ProcessInstanceEventEnum type = dto.getType();
@ -68,9 +68,11 @@ public class InnerInstanceEventListener extends AbstractInnerWorkflowListener<Pr
default:
log.warn("unknown process activity event type: {}", type);
}
if (instanceListener.accept(dto, event, context)) {
listenerExecutor.execute(consumer, context, dto);
}
}
}
@Override
protected List<Event.EventCode> getSupportEventCodes() {

View File

@ -38,7 +38,7 @@ public class InnerNotificationEventListener extends AbstractInnerWorkflowListene
}
@Override
public void onEvent(MessagePushDTO dto, EventConsumer.Context context) {
public void onEvent(MessagePushDTO dto, Event event, EventConsumer.Context context) {
log.debug("【{}】new message begin processing, messageId: {}",
this.getClass().getSimpleName(), context.getMsgId());
ProcessMessagePushEventEnum type = dto.getType();
@ -69,9 +69,11 @@ public class InnerNotificationEventListener extends AbstractInnerWorkflowListene
default:
log.warn("unknown message event type: {}", type);
}
if (noticeListener.accept(dto, event, context)) {
listenerExecutor.execute(consumer, context, dto);
}
}
}
@Override
protected List<Event.EventCode> getSupportEventCodes() {

View File

@ -38,7 +38,7 @@ public class InnerTaskEventListener extends AbstractInnerWorkflowListener<Proces
}
@Override
public void onEvent(ProcessTaskDTO dto, EventConsumer.Context context) {
public void onEvent(ProcessTaskDTO dto, Event event, EventConsumer.Context context) {
log.debug("【{}】new message begin processing, messageId: {}",
this.getClass().getSimpleName(), context.getMsgId());
ProcessTaskEventEnum type = dto.getType();
@ -60,9 +60,11 @@ public class InnerTaskEventListener extends AbstractInnerWorkflowListener<Proces
default:
log.warn("unknown task event type: {}", type);
}
if (taskListener.accept(dto, event, context)) {
listenerExecutor.execute(consumer, context, dto);
}
}
}
@Override
protected List<Event.EventCode> getSupportEventCodes() {