feat - Starter 增加 DocChange 事件

This commit is contained in:
wangli 2025-09-29 11:35:34 +08:00
parent d4f6568de0
commit bd596e8fb1
9 changed files with 189 additions and 4 deletions

View File

@ -0,0 +1,49 @@
package cn.axzo.workflow.common.model.response.mq;
import cn.axzo.workflow.common.enums.DocChangeEventEnum;
import cn.axzo.workflow.common.model.dto.BizDocDTO;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
* 用于 MQ 传输数据
*
* @author wangli
* @since 2025-09-29 11:03
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DocChangeDTO implements Serializable {
/**
* 业务 ID
*/
private String key;
/**
* 工作台 ID
*/
private Long workspaceId;
/**
* 修改后的文档
*/
private List<BizDocDTO> newSettings;
/**
* 修改前的文档
*/
private List<BizDocDTO> oldSettings;
/**
* 模型配置 MQ 事件的数据类型
*/
private DocChangeEventEnum type;
}

View File

@ -6,6 +6,7 @@ import java.util.List;
/**
* 模型关联的文档变更事件对象
* 该事件目前仅用于考勤业务王庆
*
* @author wangli
* @since 2025-04-07 16:51

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.server.controller.listener.doc;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.workflow.common.enums.DocChangeEventEnum;
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
import cn.axzo.workflow.core.conf.CustomEventManager;
import cn.axzo.workflow.core.engine.event.DocChangeEvent;
import com.alibaba.fastjson.JSON;
@ -48,7 +49,12 @@ public class DocChangeListener {
.eventCode(DocChangeEventEnum.DOC_CHANGE.getEventCode())
.targetId(event.getKey())
.targetType(event.getKey())
.data(JSON.toJSONString(event, SerializerFeature.WriteMapNullValue))
.data(JSON.toJSONString(DocChangeDTO.builder()
.key(event.getKey())
.workspaceId(event.getWorkspaceId())
.newSettings(event.getNewSettings())
.oldSettings(event.getOldSettings())
.build(), SerializerFeature.WriteMapNullValue))
.build());
}
}

View File

@ -0,0 +1,35 @@
package cn.axzo.workflow.starter.handler;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
import org.springframework.core.Ordered;
/**
* 流程模板关联的文档配置变更事件
*
* @author wangli
* @since 2025-09-29 11:00
*/
public interface DocEventHandler extends Ordered {
/**
* 针对当前接口的实现进行过滤, true 往下执行下面的方法
*
* @param dto
* @param event
* @param context
* @return
*/
default boolean accept(DocChangeDTO dto, Event event, EventConsumer.Context context) {
return true;
}
/**
* 文档配置变更
*
* @param dto
*/
default void changed(DocChangeDTO dto) {
}
}

View File

@ -0,0 +1,21 @@
package cn.axzo.workflow.starter.handler.filter;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
/**
* DocChangeEvent 的自定义过滤接口
*
* <p>
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link cn.axzo.workflow.starter.handler.DocEventHandler#accept(ProcessActivityDTO, Event, EventConsumer.Context)} 实例的专属过滤
* <p>
* 注意Order 的顺序遵循值越小越优先取值范围Integer. MIN_VALUE - Integer. MAX_VALUE
*
* @author wangli
* @since 2025-09-29 11:09
*/
public interface DocEventFilter extends BasicMessageQueueFilter<DocChangeDTO> {
}

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.starter.handler.filter;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import cn.axzo.workflow.starter.handler.MessageNotificationEventHandler;
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
@ -7,7 +9,7 @@ import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
/**
* MessageNotificationEvent 的自定义过滤接口
* <p>
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link MessageNotificationEventHandler#filter(MessagePushDTO)} 实例的专属过滤
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link MessageNotificationEventHandler#accept(MessagePushDTO, Event, EventConsumer.Context)} 实例的专属过滤
* <p>
* 注意Order 的顺序遵循值越小越优先取值范围Integer. MIN_VALUE - Integer. MAX_VALUE
*

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.starter.handler.filter;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import cn.axzo.workflow.starter.handler.ProcessActivityEventHandler;
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
@ -7,7 +9,7 @@ import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
/**
* ProcessActivityEvent 自定义的过滤接口
* <p>
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link ProcessActivityEventHandler#filter(ProcessActivityDTO)} 实例的专属过滤
* 该接口的实现是会全局过滤比如当应用需要监听多个流程模型时全局过滤会不精确所以我们还提供了针对 {@link ProcessActivityEventHandler#accept(ProcessActivityDTO, Event, EventConsumer.Context)} 实例的专属过滤
* <p>
* 注意Order 的顺序遵循值越小越优先取值范围Integer. MIN_VALUE - Integer. MAX_VALUE
*

View File

@ -0,0 +1,66 @@
package cn.axzo.workflow.starter.mq.broadcast.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.enums.DocChangeEventEnum;
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
import cn.axzo.workflow.starter.handler.DocEventHandler;
import cn.axzo.workflow.starter.handler.execute.ListenerExecutor;
import cn.axzo.workflow.starter.handler.filter.DocEventFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* @author wangli
* @since 2025-09-29 10:57
*/
public class InnerDocEventListener extends AbstractInnerWorkflowListener<DocEventHandler, DocEventFilter, DocChangeDTO> {
private final Logger log = LoggerFactory.getLogger(InnerDocEventListener.class);
public final static Supplier<List<Event.EventCode>> SUPPORTED_EVENT_CODES_SUPPLIER = () ->
Arrays.stream(DocChangeEventEnum.values()).map(DocChangeEventEnum::getEventCode)
.collect(Collectors.toList());
public InnerDocEventListener(ListenerExecutor listenerExecutor,
ObjectProvider<List<DocEventHandler>> handlerProvider,
ObjectProvider<List<DocEventFilter>> filterProvider) {
super(listenerExecutor, handlerProvider, filterProvider);
}
@Override
protected DocChangeDTO convert(Event event) {
return event.normalizedData(DocChangeDTO.class);
}
@Override
protected void onEvent(DocChangeDTO dto, Event event, EventConsumer.Context context) {
log.debug("【{}】new message begin processing, messageId: {}",
this.getClass().getSimpleName(), context.getMsgId());
DocChangeEventEnum type = dto.getType();
for (DocEventHandler activityListener : businessListeners) {
Consumer<DocChangeDTO> consumer = null;
switch (type) {
case DOC_CHANGE:
consumer = activityListener::changed;
break;
default:
log.warn("unknown process activity event type: {}", type);
}
if (activityListener.accept(dto, event, context)) {
listenerExecutor.execute(consumer, context, dto);
}
}
}
@Override
protected List<Event.EventCode> getSupportEventCodes() {
return SUPPORTED_EVENT_CODES_SUPPLIER.get();
}
}

View File

@ -26,7 +26,9 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
private final List<InnerWorkflowListener> workflowListeners;
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, WorkflowEngineStarterProperties workflowEngineStarterProperties, List<InnerWorkflowListener> workflowListeners) {
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer,
WorkflowEngineStarterProperties workflowEngineStarterProperties,
List<InnerWorkflowListener> workflowListeners) {
this.eventConsumer = eventConsumer;
this.starterProperties = workflowEngineStarterProperties;
this.workflowListeners = workflowListeners;
@ -64,6 +66,7 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
eventConsumer.registerHandlers(InnerNotificationEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
eventConsumer.registerHandlers(InnerDocEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
}
}