Merge remote-tracking branch 'origin/feature/starter_add_doc_event' into release-20251022
This commit is contained in:
commit
8d554d8b48
@ -0,0 +1,49 @@
|
||||
package cn.axzo.workflow.common.model.response.mq;
|
||||
|
||||
import cn.axzo.workflow.common.enums.DocChangeEventEnum;
|
||||
import cn.axzo.workflow.common.model.dto.BizDocDTO;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 用于 MQ 传输数据
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2025-09-29 11:03
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class DocChangeDTO implements Serializable {
|
||||
|
||||
/**
|
||||
* 业务 ID
|
||||
*/
|
||||
private String key;
|
||||
|
||||
/**
|
||||
* 工作台 ID
|
||||
*/
|
||||
private Long workspaceId;
|
||||
|
||||
/**
|
||||
* 修改后的文档
|
||||
*/
|
||||
private List<BizDocDTO> newSettings;
|
||||
|
||||
/**
|
||||
* 修改前的文档
|
||||
*/
|
||||
private List<BizDocDTO> oldSettings;
|
||||
|
||||
/**
|
||||
* 模型配置 MQ 事件的数据类型
|
||||
*/
|
||||
private DocChangeEventEnum type;
|
||||
}
|
||||
@ -6,6 +6,7 @@ import java.util.List;
|
||||
|
||||
/**
|
||||
* 模型关联的文档变更事件对象
|
||||
* 该事件,目前仅用于考勤业务(王庆)
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2025-04-07 16:51
|
||||
|
||||
@ -3,10 +3,9 @@ package cn.axzo.workflow.server.controller.listener.doc;
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.workflow.common.enums.DocChangeEventEnum;
|
||||
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
|
||||
import cn.axzo.workflow.core.conf.CustomEventManager;
|
||||
import cn.axzo.workflow.core.engine.event.DocChangeEvent;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.serializer.SerializerFeature;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
@ -48,7 +47,13 @@ public class DocChangeListener {
|
||||
.eventCode(DocChangeEventEnum.DOC_CHANGE.getEventCode())
|
||||
.targetId(event.getKey())
|
||||
.targetType(event.getKey())
|
||||
.data(JSON.toJSONString(event, SerializerFeature.WriteMapNullValue))
|
||||
.data(DocChangeDTO.builder()
|
||||
.key(event.getKey())
|
||||
.workspaceId(event.getWorkspaceId())
|
||||
.newSettings(event.getNewSettings())
|
||||
.oldSettings(event.getOldSettings())
|
||||
.type(DocChangeEventEnum.DOC_CHANGE)
|
||||
.build())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package cn.axzo.workflow.starter;
|
||||
import cn.axzo.workflow.starter.api.WorkflowCoreService;
|
||||
import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
|
||||
import cn.axzo.workflow.starter.common.exception.WorkflowUnsupportedException;
|
||||
import cn.axzo.workflow.starter.handler.DocEventHandler;
|
||||
import cn.axzo.workflow.starter.handler.MessageNotificationEventHandler;
|
||||
import cn.axzo.workflow.starter.handler.ProcessActivityEventHandler;
|
||||
import cn.axzo.workflow.starter.handler.ProcessInstanceEventHandler;
|
||||
@ -13,6 +14,7 @@ import cn.axzo.workflow.starter.handler.execute.interceptor.ExecutorInvoker;
|
||||
import cn.axzo.workflow.starter.handler.execute.interceptor.FailFastInterceptor;
|
||||
import cn.axzo.workflow.starter.handler.execute.interceptor.FailOverInterceptor;
|
||||
import cn.axzo.workflow.starter.handler.execute.interceptor.LogInterceptor;
|
||||
import cn.axzo.workflow.starter.handler.filter.DocEventFilter;
|
||||
import cn.axzo.workflow.starter.handler.filter.MessageNotificationEventFilter;
|
||||
import cn.axzo.workflow.starter.handler.filter.ProcessActivityEventFilter;
|
||||
import cn.axzo.workflow.starter.handler.filter.ProcessInstanceEventFilter;
|
||||
@ -20,6 +22,7 @@ import cn.axzo.workflow.starter.handler.filter.ProcessTaskEventFilter;
|
||||
import cn.axzo.workflow.starter.handler.monitor.BroadcastDLQReporter;
|
||||
import cn.axzo.workflow.starter.handler.monitor.RpcDLQReporter;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerActivityEventListener;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerDocEventListener;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerInstanceEventListener;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerNotificationEventListener;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerTaskEventListener;
|
||||
@ -103,6 +106,13 @@ public class WorkflowEngineStarterAutoConfiguration {
|
||||
return new InnerNotificationEventListener(executor, handlerProvider, filterProvider);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public InnerDocEventListener innerDocEventListener(ListenerExecutor executor,
|
||||
ObjectProvider<List<DocEventHandler>> handlerProvider,
|
||||
ObjectProvider<List<DocEventFilter>> filterProvider) {
|
||||
return new InnerDocEventListener(executor, handlerProvider, filterProvider);
|
||||
}
|
||||
|
||||
private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties) {
|
||||
BroadcastListenerProperties listenerRetry = starterProperties.getBroadcast();
|
||||
FailHandleTypeEnum failHandleType = listenerRetry.getFailHandleType();
|
||||
|
||||
@ -0,0 +1,35 @@
|
||||
package cn.axzo.workflow.starter.handler;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
|
||||
import org.springframework.core.Ordered;
|
||||
|
||||
/**
|
||||
* 流程模板关联的文档配置变更事件
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2025-09-29 11:00
|
||||
*/
|
||||
public interface DocEventHandler extends Ordered {
|
||||
/**
|
||||
* 针对当前接口的实现进行过滤, 为 true 时,往下执行下面的方法
|
||||
*
|
||||
* @param dto
|
||||
* @param event
|
||||
* @param context
|
||||
* @return
|
||||
*/
|
||||
default boolean accept(DocChangeDTO dto, Event event, EventConsumer.Context context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 文档配置变更
|
||||
*
|
||||
* @param dto
|
||||
*/
|
||||
default void changed(DocChangeDTO dto) {
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,21 @@
|
||||
package cn.axzo.workflow.starter.handler.filter;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
|
||||
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
|
||||
|
||||
/**
|
||||
* DocChangeEvent 的自定义过滤接口
|
||||
*
|
||||
* <p>
|
||||
* 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link cn.axzo.workflow.starter.handler.DocEventHandler#accept(ProcessActivityDTO, Event, EventConsumer.Context)} 实例的专属过滤
|
||||
* <p>
|
||||
* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE)
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2025-09-29 11:09
|
||||
*/
|
||||
public interface DocEventFilter extends BasicMessageQueueFilter<DocChangeDTO> {
|
||||
}
|
||||
@ -1,5 +1,7 @@
|
||||
package cn.axzo.workflow.starter.handler.filter;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
|
||||
import cn.axzo.workflow.starter.handler.MessageNotificationEventHandler;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
|
||||
@ -7,7 +9,7 @@ import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
|
||||
/**
|
||||
* MessageNotificationEvent 的自定义过滤接口
|
||||
* <p>
|
||||
* 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link MessageNotificationEventHandler#filter(MessagePushDTO)} 实例的专属过滤
|
||||
* 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link MessageNotificationEventHandler#accept(MessagePushDTO, Event, EventConsumer.Context)} 实例的专属过滤
|
||||
* <p>
|
||||
* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE)
|
||||
*
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
package cn.axzo.workflow.starter.handler.filter;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
|
||||
import cn.axzo.workflow.starter.handler.ProcessActivityEventHandler;
|
||||
import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
|
||||
@ -7,7 +9,7 @@ import cn.axzo.workflow.starter.mq.broadcast.filter.BasicMessageQueueFilter;
|
||||
/**
|
||||
* ProcessActivityEvent 自定义的过滤接口
|
||||
* <p>
|
||||
* 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link ProcessActivityEventHandler#filter(ProcessActivityDTO)} 实例的专属过滤
|
||||
* 该接口的实现,是会全局过滤,比如当应用需要监听多个流程模型时,全局过滤会不精确,所以我们还提供了针对 {@link ProcessActivityEventHandler#accept(ProcessActivityDTO, Event, EventConsumer.Context)} 实例的专属过滤
|
||||
* <p>
|
||||
* 注意:Order 的顺序,遵循值越小越优先。(取值范围:Integer. MIN_VALUE - Integer. MAX_VALUE)
|
||||
*
|
||||
|
||||
@ -0,0 +1,66 @@
|
||||
package cn.axzo.workflow.starter.mq.broadcast.consumer;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.workflow.common.enums.DocChangeEventEnum;
|
||||
import cn.axzo.workflow.common.model.response.mq.DocChangeDTO;
|
||||
import cn.axzo.workflow.starter.handler.DocEventHandler;
|
||||
import cn.axzo.workflow.starter.handler.execute.ListenerExecutor;
|
||||
import cn.axzo.workflow.starter.handler.filter.DocEventFilter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author wangli
|
||||
* @since 2025-09-29 10:57
|
||||
*/
|
||||
public class InnerDocEventListener extends AbstractInnerWorkflowListener<DocEventHandler, DocEventFilter, DocChangeDTO> {
|
||||
private final Logger log = LoggerFactory.getLogger(InnerDocEventListener.class);
|
||||
|
||||
public final static Supplier<List<Event.EventCode>> SUPPORTED_EVENT_CODES_SUPPLIER = () ->
|
||||
Arrays.stream(DocChangeEventEnum.values()).map(DocChangeEventEnum::getEventCode)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
public InnerDocEventListener(ListenerExecutor listenerExecutor,
|
||||
ObjectProvider<List<DocEventHandler>> handlerProvider,
|
||||
ObjectProvider<List<DocEventFilter>> filterProvider) {
|
||||
super(listenerExecutor, handlerProvider, filterProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DocChangeDTO convert(Event event) {
|
||||
return event.normalizedData(DocChangeDTO.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onEvent(DocChangeDTO dto, Event event, EventConsumer.Context context) {
|
||||
log.debug("【{}】new message begin processing, messageId: {}",
|
||||
this.getClass().getSimpleName(), context.getMsgId());
|
||||
DocChangeEventEnum type = dto.getType();
|
||||
for (DocEventHandler activityListener : businessListeners) {
|
||||
Consumer<DocChangeDTO> consumer = null;
|
||||
switch (type) {
|
||||
case DOC_CHANGE:
|
||||
consumer = activityListener::changed;
|
||||
break;
|
||||
default:
|
||||
log.warn("unknown process activity event type: {}", type);
|
||||
}
|
||||
if (activityListener.accept(dto, event, context)) {
|
||||
listenerExecutor.execute(consumer, context, dto);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Event.EventCode> getSupportEventCodes() {
|
||||
return SUPPORTED_EVENT_CODES_SUPPLIER.get();
|
||||
}
|
||||
}
|
||||
@ -26,7 +26,9 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
|
||||
|
||||
private final List<InnerWorkflowListener> workflowListeners;
|
||||
|
||||
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, WorkflowEngineStarterProperties workflowEngineStarterProperties, List<InnerWorkflowListener> workflowListeners) {
|
||||
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer,
|
||||
WorkflowEngineStarterProperties workflowEngineStarterProperties,
|
||||
List<InnerWorkflowListener> workflowListeners) {
|
||||
this.eventConsumer = eventConsumer;
|
||||
this.starterProperties = workflowEngineStarterProperties;
|
||||
this.workflowListeners = workflowListeners;
|
||||
@ -64,6 +66,7 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
|
||||
eventConsumer.registerHandlers(InnerInstanceEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
|
||||
eventConsumer.registerHandlers(InnerNotificationEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
|
||||
eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
|
||||
eventConsumer.registerHandlers(InnerDocEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user