update - 调整工作流的 Activity 事件整体的实现逻辑,由于多实例是多人审批,会意外的发送多次 Activity_start 及其他的Activity 事件,不符合预期

This commit is contained in:
wangli 2023-12-21 17:34:37 +08:00
parent 899978f843
commit ec9986a6c9
11 changed files with 180 additions and 127 deletions

View File

@ -1,8 +1,15 @@
package cn.axzo.workflow.core.converter.json;
import cn.axzo.workflow.common.model.request.bpmn.BpmnJsonNode;
import org.flowable.bpmn.model.FlowableListener;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.ReceiveTask;
import org.flowable.engine.delegate.BaseExecutionListener;
import java.util.ArrayList;
import java.util.List;
import static org.flowable.bpmn.model.ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION;
/**
* 接收任务节点
@ -19,6 +26,32 @@ public class ReceiveTaskJsonConverter extends AbstractBpmnJsonConverter<ReceiveT
receiveTask.setId(node.getId());
receiveTask.setName(node.getName());
setExecutionListeners(receiveTask);
return receiveTask;
}
private static void setExecutionListeners(ReceiveTask receiveTask) {
List<FlowableListener> executionListeners = new ArrayList<>();
// 设置执行监听
FlowableListener executionListener = new FlowableListener();
executionListener.setEvent(BaseExecutionListener.EVENTNAME_START);
executionListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
executionListener.setImplementation("${engineAssigneeExecutionStartListener}");
executionListeners.add(executionListener);
FlowableListener activityStartListener = new FlowableListener();
activityStartListener.setEvent(BaseExecutionListener.EVENTNAME_START);
activityStartListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
activityStartListener.setImplementation("${engineActivityStartEventListener}");
executionListeners.add(activityStartListener);
FlowableListener activityEndListener = new FlowableListener();
activityEndListener.setEvent(BaseExecutionListener.EVENTNAME_END);
activityEndListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
activityEndListener.setImplementation("${engineActivityEndEventListener}");
executionListeners.add(activityEndListener);
receiveTask.setExecutionListeners(executionListeners);
}
}

View File

@ -1,8 +1,15 @@
package cn.axzo.workflow.core.converter.json;
import cn.axzo.workflow.common.model.request.bpmn.BpmnJsonNode;
import org.flowable.bpmn.model.FlowableListener;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.engine.delegate.BaseExecutionListener;
import java.util.ArrayList;
import java.util.List;
import static org.flowable.bpmn.model.ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION;
/**
* 服务任务节点
@ -16,6 +23,33 @@ public class ServiceTaskJsonConverter extends AbstractBpmnJsonConverter<ServiceT
ServiceTask serviceTask = new ServiceTask();
serviceTask.setId(node.getId());
serviceTask.setName(node.getName());
setExecutionListeners(serviceTask);
return serviceTask;
}
private static void setExecutionListeners(ServiceTask serviceTask) {
List<FlowableListener> executionListeners = new ArrayList<>();
// 设置执行监听
FlowableListener executionListener = new FlowableListener();
executionListener.setEvent(BaseExecutionListener.EVENTNAME_START);
executionListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
executionListener.setImplementation("${engineAssigneeExecutionStartListener}");
executionListeners.add(executionListener);
FlowableListener activityStartListener = new FlowableListener();
activityStartListener.setEvent(BaseExecutionListener.EVENTNAME_START);
activityStartListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
activityStartListener.setImplementation("${engineActivityStartEventListener}");
executionListeners.add(activityStartListener);
FlowableListener activityEndListener = new FlowableListener();
activityEndListener.setEvent(BaseExecutionListener.EVENTNAME_END);
activityEndListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
activityEndListener.setImplementation("${engineActivityEndEventListener}");
executionListeners.add(activityEndListener);
serviceTask.setExecutionListeners(executionListeners);
}
}

View File

@ -241,9 +241,21 @@ public class UserTaskJsonConverter extends AbstractBpmnJsonConverter<UserTask> {
FlowableListener executionListener = new FlowableListener();
executionListener.setEvent(BaseExecutionListener.EVENTNAME_START);
executionListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
executionListener.setImplementation("${engineExecutionStartListener}");
executionListener.setImplementation("${engineAssigneeExecutionStartListener}");
executionListeners.add(executionListener);
FlowableListener activityStartListener = new FlowableListener();
activityStartListener.setEvent(BaseExecutionListener.EVENTNAME_START);
activityStartListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
activityStartListener.setImplementation("${engineActivityStartEventListener}");
executionListeners.add(activityStartListener);
FlowableListener activityEndListener = new FlowableListener();
activityEndListener.setEvent(BaseExecutionListener.EVENTNAME_END);
activityEndListener.setImplementationType(IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
activityEndListener.setImplementation("${engineActivityEndEventListener}");
executionListeners.add(activityEndListener);
userTask.setExecutionListeners(executionListeners);
}

View File

@ -1,7 +1,7 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.engine.listener.EngineExecutionStartListener;
import cn.axzo.workflow.core.engine.listener.EngineAssigneeExecutionStartListener;
import org.flowable.bpmn.model.UserTask;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
@ -25,13 +25,13 @@ import static cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper.getApprove
public class CustomForecastUserTaskAssigneeCmd implements Command<List<BpmnTaskDelegateAssigner>>, Serializable {
private final String processInstanceId;
private final UserTask userTask;
private final EngineExecutionStartListener engineExecutionStartListener;
private final EngineAssigneeExecutionStartListener engineAssigneeExecutionStartListener;
public CustomForecastUserTaskAssigneeCmd(String processInstanceId, UserTask userTask,
EngineExecutionStartListener engineExecutionStartListener) {
EngineAssigneeExecutionStartListener engineAssigneeExecutionStartListener) {
this.processInstanceId = processInstanceId;
this.userTask = userTask;
this.engineExecutionStartListener = engineExecutionStartListener;
this.engineAssigneeExecutionStartListener = engineAssigneeExecutionStartListener;
}
@Override
@ -42,7 +42,7 @@ public class CustomForecastUserTaskAssigneeCmd implements Command<List<BpmnTaskD
processEngineConfiguration.getRuntimeService().createExecutionQuery().processInstanceId(processInstanceId).list();
List<BpmnTaskDelegateAssigner> forecastAssigners = new ArrayList<>();
getApproverSpecify(userTask).ifPresent(specify -> {
forecastAssigners.addAll(engineExecutionStartListener.approverSelect(specify.getType(), userTask,
forecastAssigners.addAll(engineAssigneeExecutionStartListener.approverSelect(specify.getType(), userTask,
(DelegateExecution) list.get(0), false));
});
return forecastAssigners;

View File

@ -0,0 +1,41 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.ExecutionListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* 活动节点结束事件监听器
*
* @author wangli
* @since 2023/12/21 16:55
*/
@Component
@Slf4j
public class EngineActivityEndEventListener implements ExecutionListener {
@Resource
ObjectProvider<List<BpmnActivityEventListener>> activityListeners;
@Override
public void notify(DelegateExecution execution) {
String currentActivityId = execution.getCurrentActivityId();
}
private List<BpmnActivityEventListener> getOrderedListeners() {
List<BpmnActivityEventListener> orderListeners = new ArrayList<>();
activityListeners.ifAvailable(orderListeners::addAll);
if (log.isDebugEnabled()) {
log.debug("Order Lists: {}", JSON.toJSONString(orderListeners));
}
return orderListeners;
}
}

View File

@ -1,109 +0,0 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import com.google.common.collect.ImmutableSet;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener;
import org.flowable.engine.delegate.event.FlowableActivityCancelledEvent;
import org.flowable.engine.delegate.event.FlowableActivityEvent;
import org.flowable.engine.delegate.event.FlowableMultiInstanceActivityCancelledEvent;
import org.flowable.engine.delegate.event.FlowableMultiInstanceActivityCompletedEvent;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ACTIVITY_CANCELLED;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ACTIVITY_COMPLETED;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ACTIVITY_STARTED;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.MULTI_INSTANCE_ACTIVITY_CANCELLED;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.MULTI_INSTANCE_ACTIVITY_COMPLETED;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.MULTI_INSTANCE_ACTIVITY_COMPLETED_WITH_CONDITION;
/**
* 活动节点完成事件
*
* @author wangli
* @since 2023/7/24 17:36
*/
@Slf4j
@Component
public class EngineActivityEventListener extends AbstractFlowableEngineEventListener {
@Resource
ObjectProvider<List<BpmnActivityEventListener>> activityListeners;
public static final Set<FlowableEngineEventType> PROCESS_INSTANCE_EVENTS =
ImmutableSet.<FlowableEngineEventType>builder()
.add(ACTIVITY_STARTED)
.add(ACTIVITY_COMPLETED)
.add(MULTI_INSTANCE_ACTIVITY_COMPLETED_WITH_CONDITION)
.add(MULTI_INSTANCE_ACTIVITY_COMPLETED)
.add(MULTI_INSTANCE_ACTIVITY_CANCELLED)
.add(ACTIVITY_CANCELLED)
.build();
public EngineActivityEventListener() {
super(PROCESS_INSTANCE_EVENTS);
}
@Override
protected void activityStarted(FlowableActivityEvent event) {
log.info("activityStarted: activityId: {} activityName: {}", event.getActivityId(), event.getActivityName());
getOrderedListeners().forEach(i -> i.onStarted(event));
}
/**
* 通过?
*
* @param event
*/
@Override
protected void activityCompleted(FlowableActivityEvent event) {
log.info("activityCompleted: activityId: {} activityName: {}", event.getActivityId(), event.getActivityName());
getOrderedListeners().forEach(i -> i.onCompleted(event));
}
/**
* 多实例 UserTask 条件通过
*
* @param event
*/
@Override
protected void multiInstanceActivityCompletedWithCondition(FlowableMultiInstanceActivityCompletedEvent event) {
log.info("multiInstanceActivityCompletedWithCondition: activityId: {} activityName: {}", event.getActivityId(), event.getActivityName());
getOrderedListeners().forEach(i -> i.onCompleted(event));
}
@Override
protected void multiInstanceActivityCompleted(FlowableMultiInstanceActivityCompletedEvent event) {
log.info("multiInstanceActivityCompleted: activityId: {} activityName: {}", event.getActivityId(), event.getActivityName());
getOrderedListeners().forEach(i -> i.onCompleted(event));
}
@Override
protected void activityCancelled(FlowableActivityCancelledEvent event) {
log.info("activityCancelled: activityId: {} activityName: {}", event.getActivityId(), event.getActivityName());
getOrderedListeners().forEach(i -> i.onCancelled(event));
}
@Override
protected void multiInstanceActivityCancelled(FlowableMultiInstanceActivityCancelledEvent event) {
log.info("multiInstanceActivityCancelled: activityId: {} activityName: {}", event.getActivityId(), event.getActivityName());
getOrderedListeners().forEach(i -> i.onCancelled(event));
}
private List<BpmnActivityEventListener> getOrderedListeners() {
List<BpmnActivityEventListener> orderListeners = new ArrayList<>();
activityListeners.ifAvailable(orderListeners::addAll);
if (log.isDebugEnabled()) {
log.debug("Order Lists: {}", JSON.toJSONString(orderListeners));
}
return orderListeners;
}
}

View File

@ -0,0 +1,41 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.workflow.core.listener.BpmnActivityEventListener;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.ExecutionListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* 活动节点开始事件监听器
*
* @author wangli
* @since 2023/12/21 16:55
*/
@Component
@Slf4j
public class EngineActivityStartEventListener implements ExecutionListener {
@Resource
ObjectProvider<List<BpmnActivityEventListener>> activityListeners;
@Override
public void notify(DelegateExecution execution) {
}
private List<BpmnActivityEventListener> getOrderedListeners() {
List<BpmnActivityEventListener> orderListeners = new ArrayList<>();
activityListeners.ifAvailable(orderListeners::addAll);
if (log.isDebugEnabled()) {
log.debug("Order Lists: {}", JSON.toJSONString(orderListeners));
}
return orderListeners;
}
}

View File

@ -53,7 +53,7 @@ import static cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper.getProcess
@Component
@RefreshScope
@Slf4j
public class EngineExecutionStartListener implements ExecutionListener {
public class EngineAssigneeExecutionStartListener implements ExecutionListener {
@Resource
private ObjectProvider<BpmnTaskDelegate> bpmTaskDelegate;
@Resource

View File

@ -4,7 +4,7 @@ import org.flowable.common.engine.api.delegate.event.FlowableEngineEvent;
import org.springframework.core.Ordered;
/**
* 用户审批任务节点完成事件, 供外部引入 jar 包方式使用的钩子
* 用户活动节点的事件, 供外部引入 jar 包方式使用的钩子
* <p>
* BpmTaskEventListener 的区别: 一个 Activity 可以有多个 Task, Task 有多个实例; 而本接口是针对这个节点的, 就算该审批节点是多人审批, 只有一次该节点的生命周期事件
* <p>
@ -16,23 +16,23 @@ import org.springframework.core.Ordered;
public interface BpmnActivityEventListener extends Ordered {
/**
* 节点已启动
* 节点已 TODO
*
* @param event
*/
default void onStarted(FlowableEngineEvent event) {}
default void onStart(FlowableEngineEvent event) {}
/**
* 节点已完成
*
* @param event
*/
default void onCompleted(FlowableEngineEvent event) {}
default void onTake(FlowableEngineEvent event) {}
/**
* 节点已取消
*
* @param event
*/
default void onCancelled(FlowableEngineEvent event) {}
default void onEnd(FlowableEngineEvent event) {}
}

View File

@ -20,7 +20,7 @@ import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.common.utils.BpmnCollectionUtils;
import cn.axzo.workflow.core.common.utils.BpmnMetaParserHelper;
import cn.axzo.workflow.core.engine.cmd.CustomForecastUserTaskAssigneeCmd;
import cn.axzo.workflow.core.engine.listener.EngineExecutionStartListener;
import cn.axzo.workflow.core.engine.listener.EngineAssigneeExecutionStartListener;
import cn.axzo.workflow.core.service.BpmnProcessDefinitionService;
import cn.axzo.workflow.core.service.BpmnProcessInstanceService;
import cn.axzo.workflow.core.service.converter.BpmnHistoricProcessInstanceConverter;
@ -118,7 +118,7 @@ public class BpmnProcessInstanceServiceImpl implements BpmnProcessInstanceServic
@Resource
private FlowNodeForecastService forecastService;
@Resource
private EngineExecutionStartListener engineExecutionStartListener;
private EngineAssigneeExecutionStartListener engineAssigneeExecutionStartListener;
@Resource
private SpringProcessEngineConfiguration springProcessEngineConfiguration;
@ -598,7 +598,8 @@ public class BpmnProcessInstanceServiceImpl implements BpmnProcessInstanceServic
}
// 推测当前节点的审批人
List<BpmnTaskDelegateAssigner> forecastAssigners = springProcessEngineConfiguration.getCommandExecutor()
.execute(new CustomForecastUserTaskAssigneeCmd(processInstanceId, userTask, engineExecutionStartListener));
.execute(new CustomForecastUserTaskAssigneeCmd(processInstanceId, userTask,
engineAssigneeExecutionStartListener));
node.setForecastAssigners(forecastAssigners);
} else if (i instanceof ReceiveTask) {
ReceiveTask receiveTask = (ReceiveTask) i;

View File

@ -58,7 +58,7 @@ public class RocketMqBpmActivityEventListener implements BpmnActivityEventListen
private Boolean sendMQ;
@Override
public void onStarted(FlowableEngineEvent event) {
public void onStart(FlowableEngineEvent event) {
if (log.isDebugEnabled()) {
log.debug("RocketMqBpmActivityEventListener#onStarted...activityId: {}", ((FlowableActivityEvent) event).getActivityId());
}
@ -87,7 +87,7 @@ public class RocketMqBpmActivityEventListener implements BpmnActivityEventListen
}
@Override
public void onCompleted(FlowableEngineEvent event) {
public void onTake(FlowableEngineEvent event) {
if (log.isDebugEnabled()) {
log.debug("RocketMqBpmActivityEventListener#onCompleted...activityId: {}", ((FlowableActivityEvent) event).getActivityId());
}
@ -119,7 +119,7 @@ public class RocketMqBpmActivityEventListener implements BpmnActivityEventListen
}
@Override
public void onCancelled(FlowableEngineEvent event) {
public void onEnd(FlowableEngineEvent event) {
if (log.isDebugEnabled()) {
log.debug("RocketMqMessagePushEventListener#onCancelled...activityId: {}", ((FlowableActivityEvent) event).getActivityId());
}