From bd596e8fb17df6cfc1f50b3186634250464a3afd Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Mon, 29 Sep 2025 11:35:34 +0800 Subject: [PATCH] =?UTF-8?q?feat=20-=20Starter=20=E5=A2=9E=E5=8A=A0=20DocCh?= =?UTF-8?q?ange=20=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/response/mq/DocChangeDTO.java | 49 ++++++++++++++ .../core/engine/event/DocChangeEvent.java | 1 + .../listener/doc/DocChangeListener.java | 8 ++- .../starter/handler/DocEventHandler.java | 35 ++++++++++ .../handler/filter/DocEventFilter.java | 21 ++++++ .../MessageNotificationEventFilter.java | 4 +- .../filter/ProcessActivityEventFilter.java | 4 +- .../consumer/InnerDocEventListener.java | 66 +++++++++++++++++++ .../WorkflowEngineBroadcastEventListener.java | 5 +- 9 files changed, 189 insertions(+), 4 deletions(-) create mode 100644 workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/response/mq/DocChangeDTO.java create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/DocEventHandler.java create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/DocEventFilter.java create mode 100644 workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerDocEventListener.java diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/response/mq/DocChangeDTO.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/response/mq/DocChangeDTO.java new file mode 100644 index 000000000..176671ae5 --- /dev/null +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/model/response/mq/DocChangeDTO.java @@ -0,0 +1,49 @@ +package cn.axzo.workflow.common.model.response.mq; + +import cn.axzo.workflow.common.enums.DocChangeEventEnum; +import cn.axzo.workflow.common.model.dto.BizDocDTO; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.List; + +/** + * 用于 MQ 传输数据 + * + * @author wangli + * @since 2025-09-29 11:03 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class DocChangeDTO implements Serializable { + + /** + * 业务 ID + */ + private String key; + + /** + * 工作台 ID + */ + private Long workspaceId; + + /** + * 修改后的文档 + */ + private List newSettings; + + /** + * 修改前的文档 + */ + private List oldSettings; + + /** + * 模型配置 MQ 事件的数据类型 + */ + private DocChangeEventEnum type; +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/DocChangeEvent.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/DocChangeEvent.java index af78c6866..0046b5b15 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/DocChangeEvent.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/DocChangeEvent.java @@ -6,6 +6,7 @@ import java.util.List; /** * 模型关联的文档变更事件对象 + * 该事件,目前仅用于考勤业务(王庆) * * @author wangli * @since 2025-04-07 16:51 diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/doc/DocChangeListener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/doc/DocChangeListener.java index 375fe0009..463b97c48 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/doc/DocChangeListener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/doc/DocChangeListener.java @@ -3,6 +3,7 @@ package cn.axzo.workflow.server.controller.listener.doc; import cn.axzo.framework.rocketmq.Event; import cn.axzo.framework.rocketmq.EventProducer; import cn.axzo.workflow.common.enums.DocChangeEventEnum; +import cn.axzo.workflow.common.model.response.mq.DocChangeDTO; import cn.axzo.workflow.core.conf.CustomEventManager; import cn.axzo.workflow.core.engine.event.DocChangeEvent; import com.alibaba.fastjson.JSON; @@ -48,7 +49,12 @@ public class DocChangeListener { .eventCode(DocChangeEventEnum.DOC_CHANGE.getEventCode()) .targetId(event.getKey()) .targetType(event.getKey()) - .data(JSON.toJSONString(event, SerializerFeature.WriteMapNullValue)) + .data(JSON.toJSONString(DocChangeDTO.builder() + .key(event.getKey()) + .workspaceId(event.getWorkspaceId()) + .newSettings(event.getNewSettings()) + .oldSettings(event.getOldSettings()) + .build(), SerializerFeature.WriteMapNullValue)) .build()); } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/DocEventHandler.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/DocEventHandler.java new file mode 100644 index 000000000..5cb7eb720 --- /dev/null +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/DocEventHandler.java @@ -0,0 +1,35 @@ +package cn.axzo.workflow.starter.handler; + +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.workflow.common.model.response.mq.DocChangeDTO; +import org.springframework.core.Ordered; + +/** + * 流程模板关联的文档配置变更事件 + * + * @author wangli + * @since 2025-09-29 11:00 + */ +public interface DocEventHandler extends Ordered { + /** + * 针对当前接口的实现进行过滤, 为 true 时,往下执行下面的方法 + * + * @param dto + * @param event + * @param context + * @return + */ + default boolean accept(DocChangeDTO dto, Event event, EventConsumer.Context context) { + return true; + } + + + /** + * 文档配置变更 + * + * @param dto + */ + default void changed(DocChangeDTO dto) { + } +} diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/DocEventFilter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/DocEventFilter.java new file mode 100644 index 000000000..b0bb46c8a --- /dev/null +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/DocEventFilter.java @@ -0,0 +1,21 @@ +package cn.axzo.workflow.starter.handler.filter; + +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.workflow.common.model.response.mq.DocChangeDTO; +import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO; +import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; + +/** + * DocChangeEvent 的自定义过滤接口 + * + *

+ * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link cn.axzo.workflow.starter.handler.DocEventHandler#accept(ProcessActivityDTO, Event, EventConsumer.Context)} 实例的专属过滤 + *

+ * 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE) + * + * @author wangli + * @since 2025-09-29 11:09 + */ +public interface DocEventFilter extends BasicMessageQueueFilter { +} diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java index 32bbe0aa0..c5091304f 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/MessageNotificationEventFilter.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.starter.handler.filter; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.workflow.common.model.response.mq.MessagePushDTO; import cn.axzo.workflow.starter.handler.MessageNotificationEventHandler; import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; @@ -7,7 +9,7 @@ import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; /** * MessageNotificationEvent 的自定义过滤接口 *

- * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link MessageNotificationEventHandler#filter(MessagePushDTO)} 实例的专属过滤 + * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link MessageNotificationEventHandler#accept(MessagePushDTO, Event, EventConsumer.Context)} 实例的专属过滤 *

* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE) * diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java index cd3d56233..eb082f068 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/filter/ProcessActivityEventFilter.java @@ -1,5 +1,7 @@ package cn.axzo.workflow.starter.handler.filter; +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO; import cn.axzo.workflow.starter.handler.ProcessActivityEventHandler; import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; @@ -7,7 +9,7 @@ import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter; /** * ProcessActivityEvent 自定义的过滤接口 *

- * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link ProcessActivityEventHandler#filter(ProcessActivityDTO)} 实例的专属过滤 + * 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link ProcessActivityEventHandler#accept(ProcessActivityDTO, Event, EventConsumer.Context)} 实例的专属过滤 *

* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE) * diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerDocEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerDocEventListener.java new file mode 100644 index 000000000..0e2aef7cc --- /dev/null +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/InnerDocEventListener.java @@ -0,0 +1,66 @@ +package cn.axzo.workflow.starter.mq.broadcast.consumer; + +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.workflow.common.enums.DocChangeEventEnum; +import cn.axzo.workflow.common.model.response.mq.DocChangeDTO; +import cn.axzo.workflow.starter.handler.DocEventHandler; +import cn.axzo.workflow.starter.handler.execute.ListenerExecutor; +import cn.axzo.workflow.starter.handler.filter.DocEventFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * @author wangli + * @since 2025-09-29 10:57 + */ +public class InnerDocEventListener extends AbstractInnerWorkflowListener { + private final Logger log = LoggerFactory.getLogger(InnerDocEventListener.class); + + public final static Supplier> SUPPORTED_EVENT_CODES_SUPPLIER = () -> + Arrays.stream(DocChangeEventEnum.values()).map(DocChangeEventEnum::getEventCode) + .collect(Collectors.toList()); + + public InnerDocEventListener(ListenerExecutor listenerExecutor, + ObjectProvider> handlerProvider, + ObjectProvider> filterProvider) { + super(listenerExecutor, handlerProvider, filterProvider); + } + + @Override + protected DocChangeDTO convert(Event event) { + return event.normalizedData(DocChangeDTO.class); + } + + @Override + protected void onEvent(DocChangeDTO dto, Event event, EventConsumer.Context context) { + log.debug("【{}】new message begin processing, messageId: {}", + this.getClass().getSimpleName(), context.getMsgId()); + DocChangeEventEnum type = dto.getType(); + for (DocEventHandler activityListener : businessListeners) { + Consumer consumer = null; + switch (type) { + case DOC_CHANGE: + consumer = activityListener::changed; + break; + default: + log.warn("unknown process activity event type: {}", type); + } + if (activityListener.accept(dto, event, context)) { + listenerExecutor.execute(consumer, context, dto); + } + } + } + + @Override + protected List getSupportEventCodes() { + return SUPPORTED_EVENT_CODES_SUPPLIER.get(); + } +} diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java index d3c09b78b..3127af4d2 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java @@ -26,7 +26,9 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi private final List workflowListeners; - public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, WorkflowEngineStarterProperties workflowEngineStarterProperties, List workflowListeners) { + public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, + WorkflowEngineStarterProperties workflowEngineStarterProperties, + List workflowListeners) { this.eventConsumer = eventConsumer; this.starterProperties = workflowEngineStarterProperties; this.workflowListeners = workflowListeners; @@ -64,6 +66,7 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this); eventConsumer.registerHandlers(InnerNotificationEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this); eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this); + eventConsumer.registerHandlers(InnerDocEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this); } }