update(REQ-2324) - 新增引擎广播的事件的过滤, 目前支持根据应用名和processDefinitionKeys 过滤,支持开关

This commit is contained in:
wangli 2024-06-04 16:03:31 +08:00
parent f70ee0a1d0
commit 53b6b49641
16 changed files with 473 additions and 237 deletions

View File

@ -151,4 +151,8 @@ public interface BpmnConstants {
* 批量操作配置默认值
*/
Boolean SUPPORT_BATCH_OPERATION_DEFAULT_VALUE = false;
/**
* 用于 MQ Header 记录当前事件的归属应用
*/
String MQ_ATTRIBUTION_APPS = "MQ_ATTRIBUTION_APPLICATION";
}

View File

@ -17,6 +17,8 @@ import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_ATTRIBUTION_APPS;
/**
* 默认的 RocketMQ 事件生产者的装饰器
*
@ -28,6 +30,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer {
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendBeforeCallback;
private BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler;
private final String applicationName;
public CustomRocketMQEventProducer(RocketMQTemplate rocketMQTemplate, String defaultModule,
String appName, Context<RocketMQMessageMeta> defaultContext,
@ -35,6 +38,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer {
BiConsumer<Event, Context<RocketMQMessageMeta>> sendAfterCallback,
BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler) {
super(rocketMQTemplate, defaultModule, appName, defaultContext, sendAfterCallback);
this.applicationName = appName;
this.sendBeforeCallback = sendBeforeCallback;
this.rollbackHandler = rollbackHandler;
}
@ -64,6 +68,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer {
newHeaders.put(TraceUtils.TRACE_ID, TraceUtils.getOrCreateTraceId());
newHeaders.put(TraceUtils.CTX_LOG_ID, TraceUtils.getOrCreateTraceId());
newHeaders.put(TraceUtils.TRACE_ID_IN_MDC, TraceUtils.getOrCreateTraceId());
newHeaders.put(MQ_ATTRIBUTION_APPS, applicationName);
final Context copiedContext = context.toBuilder().headers(newHeaders).build();
Runnable runnable = () -> {
@ -89,7 +94,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer {
} else {
// 并发会导致事件时序出现问题. 所以串行执行
log.info("runnable not transaction event={}", copiedEvent.toJsonString());
getAfterCommitExecutor().executeAndRollback(() -> runnable.run(), () -> rollbackRunnable.run());
getAfterCommitExecutor().executeAndRollback(runnable, rollbackRunnable);
}
List<Runnable> runnables = getAfterCommitExecutor().getRunnables();

View File

@ -0,0 +1,117 @@
package cn.axzo.workflow.starter;
import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import java.util.HashSet;
import java.util.Set;
import static cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum.FAIL_OVER;
/**
* 监听流程引擎广播的事件时一些行为控制
* <pre>比如
* 1. 在遇到异常时的处理策略
* 2. 过滤事件
* </pre>
*
* @author wangli
* @since 2024/6/4 14:14
*/
public class BroadcastListenerConfigurationProperties {
/**
* 是否开启根据应用名过滤 MQ 事件
*/
private Boolean enableFilterApplicationName = false;
/**
* 是否开启根据业务 ID 集合过滤 MQ 事件
*/
private Boolean enableFilterDefinitionKey = true;
/**
* 过滤出 MQ 事件中包含这些业务 ID 的事件
* <p>
* 只有当 {@link BroadcastListenerConfigurationProperties#enableFilterDefinitionKey} 才生效
*/
private Set<String> filterProcessDefinitionKeys = new HashSet<>();
/**
* 失败处理策略
* <pre>
* 1FAIL_OVER, 当前listener执行出错忽略继续往下执行可配置重试相关参数不抛出异常(默认策略)
* 2FAIL_FAST, 快速失败出错直接抛出异常listener不再往下执行
* 3FAIL_BACK, 失败自动恢复在后台记录失败的消息并按照一定的策略后期再进行重试目前暂不支持
* </pre>
*/
private FailHandleTypeEnum failHandleType = FAIL_OVER;
/**
* 自动重试次数
*/
private int numOfRetries = 3;
/**
* 初始等待时间,单位毫秒
*/
private int waitTimeInMs = 50;
/**
* 重试累乘因子
*/
private int waitIncreaseFactor = 3;
public Boolean getEnableFilterApplicationName() {
return enableFilterApplicationName;
}
public void setEnableFilterApplicationName(Boolean enableFilterApplicationName) {
this.enableFilterApplicationName = enableFilterApplicationName;
}
public Boolean getEnableFilterDefinitionKey() {
return enableFilterDefinitionKey;
}
public void setEnableFilterDefinitionKey(Boolean enableFilterDefinitionKey) {
this.enableFilterDefinitionKey = enableFilterDefinitionKey;
}
public Set<String> getFilterProcessDefinitionKeys() {
return filterProcessDefinitionKeys;
}
public void setFilterProcessDefinitionKeys(Set<String> filterProcessDefinitionKeys) {
this.filterProcessDefinitionKeys = filterProcessDefinitionKeys;
}
public FailHandleTypeEnum getFailHandleType() {
return failHandleType;
}
public void setFailHandleType(FailHandleTypeEnum failHandleType) {
this.failHandleType = failHandleType;
}
public int getNumOfRetries() {
return numOfRetries;
}
public void setNumOfRetries(int numOfRetries) {
this.numOfRetries = numOfRetries;
}
public int getWaitTimeInMs() {
return waitTimeInMs;
}
public void setWaitTimeInMs(int waitTimeInMs) {
this.waitTimeInMs = waitTimeInMs;
}
public int getWaitIncreaseFactor() {
return waitIncreaseFactor;
}
public void setWaitIncreaseFactor(int waitIncreaseFactor) {
this.waitIncreaseFactor = waitIncreaseFactor;
}
}

View File

@ -82,7 +82,8 @@ public class WorkflowEngineStarterAutoConfiguration {
}
private ExecuteInterceptor getFailInterceptor(WorkflowEngineStarterProperties starterProperties) {
FailHandleTypeEnum failHandleType = starterProperties.getFailHandleType();
BroadcastListenerConfigurationProperties listenerRetry = starterProperties.getListenerRetry();
FailHandleTypeEnum failHandleType = listenerRetry.getFailHandleType();
log.info("workflow engine starter fail handle type : {}", failHandleType);
switch (failHandleType) {
case FAIL_BACK:
@ -91,11 +92,10 @@ public class WorkflowEngineStarterAutoConfiguration {
return new FailFastInterceptor();
case FAIL_OVER:
default:
return new FailOverInterceptor(starterProperties.getNumOfRetries(),
starterProperties.getWaitTimeInMs(),
starterProperties.getWaitIncreaseFactor());
return new FailOverInterceptor(listenerRetry.getNumOfRetries(),
listenerRetry.getWaitTimeInMs(),
listenerRetry.getWaitIncreaseFactor());
}
}
}

View File

@ -3,14 +3,15 @@ package cn.axzo.workflow.starter;
import cn.axzo.workflow.common.annotation.InvokeMode;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.ASYNC;
import static cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum.FAIL_OVER;
/**
* Workflow Engine Starter Properties
* <p>
* 全量参数参考META-INF/application.yaml.demo
*
* @author wangli
* @since 2024/5/21 15:24
@ -46,35 +47,10 @@ public class WorkflowEngineStarterProperties {
private RpcInvokeModeEnum invokeMode = ASYNC;
/**
* 失败处理策略
* <pre>
* 1FAIL_OVER, 当前listener执行出错忽略继续往下执行可配置重试相关参数不抛出异常(默认策略)
* 2FAIL_FAST, 快速失败出错直接抛出异常listener不再往下执行
* 3FAIL_BACK, 失败自动恢复在后台记录失败的消息并按照一定的策略后期再进行重试目前暂不支持
* </pre>
* 监听流程引擎广播的处理器异常后的重试相关策略及配置
*/
private FailHandleTypeEnum failHandleType = FAIL_OVER;
/**
* 自动重试次数
*/
private int numOfRetries = 3;
/**
* 初始等待时间,单位毫秒
*/
private int waitTimeInMs = 50;
/**
* 重试累乘因子
*/
private int waitIncreaseFactor = 3;
/**
* 该属性还不确定能否实现
*/
@Deprecated
private Boolean autoAck = false;
@NestedConfigurationProperty
private BroadcastListenerConfigurationProperties listenerRetry = new BroadcastListenerConfigurationProperties();
public Boolean getManageable() {
return manageable;
@ -100,43 +76,11 @@ public class WorkflowEngineStarterProperties {
this.invokeMode = invokeMode;
}
public Boolean getAutoAck() {
return autoAck;
public BroadcastListenerConfigurationProperties getListenerRetry() {
return listenerRetry;
}
public void setAutoAck(Boolean autoAck) {
this.autoAck = autoAck;
}
public FailHandleTypeEnum getFailHandleType() {
return failHandleType;
}
public void setFailHandleType(FailHandleTypeEnum failHandleType) {
this.failHandleType = failHandleType;
}
public int getNumOfRetries() {
return numOfRetries;
}
public void setNumOfRetries(int numOfRetries) {
this.numOfRetries = numOfRetries;
}
public int getWaitTimeInMs() {
return waitTimeInMs;
}
public void setWaitTimeInMs(int waitTimeInMs) {
this.waitTimeInMs = waitTimeInMs;
}
public int getWaitIncreaseFactor() {
return waitIncreaseFactor;
}
public void setWaitIncreaseFactor(int waitIncreaseFactor) {
this.waitIncreaseFactor = waitIncreaseFactor;
public void setListenerRetry(BroadcastListenerConfigurationProperties listenerRetry) {
this.listenerRetry = listenerRetry;
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.workflow.starter;
import cn.axzo.framework.domain.data.IdHelper;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.DefaultEventConsumer;
import cn.axzo.framework.rocketmq.Event;
@ -20,7 +21,6 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -29,13 +29,19 @@ import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID;
import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_ATTRIBUTION_APPS;
/**
* 配置 RocketMQ 事件监听器
*
@ -48,7 +54,6 @@ public class WorkflowEngineStarterRocketMQConfiguration {
private static final String DEFAULT_MODULE = "workflowEngine";
private static final String DEFAULT_EVENT = "topic_workflow_engine_";
@Value("${spring.application.name}")
private String applicationName;
@Value("${spring.profiles.active:dev}")
@ -87,19 +92,35 @@ public class WorkflowEngineStarterRocketMQConfiguration {
nameServer = "${rocketmq.name-server}"
)
public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt> {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineBroadcastConsumer.class);
@Resource(name = "eventConsumer")
private EventConsumer eventConsumer;
@Resource
private WorkflowEngineStarterProperties workflowEngineStarterProperties;
@Value("${spring.application.name}")
private String applicationName;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
if (workflowEngineStarterProperties.getListenerRetry().getEnableFilterApplicationName()) {
Map<String, String> properties = message.getProperties();
String mqAttributionApp = properties.getOrDefault(MQ_ATTRIBUTION_APPS, null);
if (StringUtils.hasText(mqAttributionApp) && Objects.equals(mqAttributionApp, applicationName)) {
super.onEvent(message, eventConsumer);
} else {
log.info("broadcast message not attribution this application, will be ignored. messageId: {}", message.getMsgId());
}
} else {
super.onEvent(message, eventConsumer);
}
}
}
@Bean
public WorkflowEngineBroadcastEventListener workflowEngineBroadcastEventListener(@Qualifier("eventConsumer") EventConsumer eventConsumer,
ObjectProvider<List<WorkflowListener>> listenerProvider) {
return new WorkflowEngineBroadcastEventListener(eventConsumer, listenerProvider);
WorkflowEngineStarterProperties workflowEngineStarterProperties,
List<WorkflowListener> listenerProvider) {
return new WorkflowEngineBroadcastEventListener(eventConsumer, workflowEngineStarterProperties, listenerProvider);
}
@ -112,8 +133,8 @@ public class WorkflowEngineStarterRocketMQConfiguration {
* @return
*/
@Bean
public EventProducer workflowEngineClientEventProducer(RocketMQTemplate rocketMQTemplate) {
return new RocketMQEventProducer(rocketMQTemplate,
public EventProducer workflowEngineStarterEventProducer(RocketMQTemplate rocketMQTemplate) {
return new RpcInvokeEventProducer(rocketMQTemplate,
DEFAULT_MODULE,
applicationName + "Starter",
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
@ -129,16 +150,51 @@ public class WorkflowEngineStarterRocketMQConfiguration {
context.getThrowable());
})
.build(),
(event, context) -> {
// 调用 Rocket 发送 API 后的回调不代表真实发送
}
getSendBeforeCallback(),
getSendAfterCallback(),
getTransactionRollbackHandler()
);
}
@Bean
public RpcInvokeEventProducer rpcInvokeEventProducer(@Qualifier("workflowEngineClientEventProducer") EventProducer workflowEngineClientEventProducer,
Environment environment) {
return new RpcInvokeEventProducer(workflowEngineClientEventProducer, environment);
/**
* 真实执行 MQ 发送前的回调
* <p>
* 将整个待发送的事件内容通过 spring 的事件分发器发送出去 现目前主要是记录 MQ 的发送记录
*
* @return
*/
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendBeforeCallback() {
return (event, context) -> {
event.setEventId(IdHelper.get32UUID());
log.info("mq_send_Before: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
};
}
/**
* 真实执行 MQ 发送后的回调
* <p>
* 将发送前的 MQ 发送记录更新 MQ 组件自己的 MessageId 字段
*
* @return
*/
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getSendAfterCallback() {
return (event, context) -> {
String messageId = context.getHeaders().get(MQ_MESSAGE_ID);
log.info("mq_send_after: {}, uniqueId: {}, messageId: {}", event.getShardingKey(), event.getEventId(), messageId);
};
}
/**
* 如果 MQ 注册的事务回滚后的回调
* <p>
* MQ 发送记录更新为删除状态意为这类数据可以不关注可以物理删除但该功能还是用逻辑删除
*
* @return
*/
private BiConsumer<Event, EventProducer.Context<RocketMQEventProducer.RocketMQMessageMeta>> getTransactionRollbackHandler() {
return (event, context) -> {
log.info("mq_transaction_rollback: {}, uniqueId: {}", event.getShardingKey(), event.getEventId());
};
}
@Bean
@ -175,12 +231,22 @@ public class WorkflowEngineStarterRocketMQConfiguration {
nameServer = "${rocketmq.name-server}"
)
public static class WorkflowEngineClientRetryConsumer extends BaseListener implements RocketMQListener<MessageExt> {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineClientRetryConsumer.class);
@Resource(name = "workflowEngineStarterEventConsumer")
private EventConsumer workflowEngineStarterEventConsumer;
@Value("${spring.application.name}")
private String applicationName;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, workflowEngineStarterEventConsumer);
Map<String, String> properties = message.getProperties();
String mqAttributionApp = properties.getOrDefault(MQ_ATTRIBUTION_APPS, null);
if (StringUtils.hasText(mqAttributionApp) && Objects.equals(mqAttributionApp, applicationName)) {
super.onEvent(message, workflowEngineStarterEventConsumer);
} else {
log.info("rpc retry message not attribution this application, will be ignored. messageId: {}", message.getMsgId());
}
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.workflow.starter.feign.ext;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeDTO;
import cn.axzo.workflow.common.util.ThreadUtil;
@ -63,14 +64,14 @@ public class ComplexInvokeClient implements Client {
private final Logger log = LoggerFactory.getLogger(ComplexInvokeClient.class);
private final WorkflowEngineStarterProperties starterProperties;
private final RpcInvokeEventProducer rpcInvokeEventProducer;
private final RpcInvokeEventProducer eventProducer;
private final Client feignClient;
public ComplexInvokeClient(WorkflowEngineStarterProperties starterProperties,
RpcInvokeEventProducer rpcInvokeEventProducer,
EventProducer eventProducer,
Client feignClient) {
this.starterProperties = starterProperties;
this.rpcInvokeEventProducer = rpcInvokeEventProducer;
this.eventProducer = (RpcInvokeEventProducer) eventProducer;
this.feignClient = feignClient;
}
@ -114,7 +115,7 @@ public class ComplexInvokeClient implements Client {
event.setParameters(args);
buildArgs(request, metadata, args);
rpcInvokeEventProducer.send(WORKFLOW_ENGINE_STARTER, event);
eventProducer.send(WORKFLOW_ENGINE_STARTER, event);
}
@SneakyThrows

View File

@ -1,9 +1,9 @@
package cn.axzo.workflow.starter.feign.ext;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.common.util.ThreadUtil;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import feign.Client;
import feign.RequestInterceptor;
import feign.Target;
@ -11,6 +11,7 @@ import feign.codec.Decoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
import org.springframework.cloud.openfeign.FeignBuilderCustomizer;
import org.springframework.cloud.openfeign.support.ResponseEntityDecoder;
@ -37,9 +38,9 @@ public class WorkflowEngineStarterFeignConfiguration {
@Bean
public Client complexInvokeClient(WorkflowEngineStarterProperties starterProperties,
RpcInvokeEventProducer rpcInvokeEventProducer,
@Qualifier("workflowEngineStarterEventProducer") EventProducer eventProducer,
Client feignClient) {
return new ComplexInvokeClient(starterProperties, rpcInvokeEventProducer, feignClient);
return new ComplexInvokeClient(starterProperties, eventProducer, feignClient);
}
@Bean

View File

@ -1,9 +1,13 @@
package cn.axzo.workflow.starter.mq.broadcast.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.core.Ordered;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.Comparator;
@ -12,23 +16,41 @@ import java.util.stream.Collectors;
public abstract class AbstractWorkflowListener<T extends Ordered> implements WorkflowListener {
private final ObjectProvider<List<T>> listenerProvider;
private static final Logger log = LoggerFactory.getLogger(AbstractWorkflowListener.class);
protected final List<T> businessListeners;
private final ListenerExecutor listenerExecutor;
public AbstractWorkflowListener(ListenerExecutor listenerExecutor, ObjectProvider<List<T>> provider) {
this.listenerExecutor = listenerExecutor;
this.listenerProvider = provider;
this.businessListeners = getOrderedBusinessListeners(provider);
}
protected final boolean emptyListener() {
if (CollectionUtils.isEmpty(businessListeners)) {
log.info("not {}'s Bean found, will be skip it. please check @Component annotation...", this.getClass().getSimpleName());
return true;
}
return false;
}
protected List<T> getCustomListeners() {
return listenerProvider.getIfAvailable(Collections::emptyList)
protected List<T> getOrderedBusinessListeners(ObjectProvider<List<T>> provider) {
return provider.getIfAvailable(Collections::emptyList)
.stream()
.sorted(Comparator.comparingInt(Ordered::getOrder))
.collect(Collectors.toList());
}
@Override
public void handleEvent(Event event, EventConsumer.Context context) {
if (emptyListener()) {
return;
}
onEvent(event, context);
}
protected abstract void onEvent(Event event, EventConsumer.Context context);
protected ListenerExecutor getListenerExecutor() {
return listenerExecutor;
}

View File

@ -6,11 +6,9 @@ import cn.axzo.workflow.common.enums.ProcessActivityEventEnum;
import cn.axzo.workflow.common.model.response.mq.ProcessActivityDTO;
import cn.axzo.workflow.starter.listener.ProcessActivityEventHandler;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.util.CollectionUtils;
import java.util.Arrays;
import java.util.List;
@ -34,31 +32,28 @@ public class InnerActivityEventListener extends AbstractWorkflowListener<Process
}
@Override
public void handleEvent(Event event, EventConsumer.Context context) {
List<ProcessActivityEventHandler> activityListeners = getCustomListeners();
if (!CollectionUtils.isEmpty(activityListeners)) {
ProcessActivityDTO activityDTO = JSON.parseObject(event.getData().toString(), ProcessActivityDTO.class);
ProcessActivityEventEnum type = activityDTO.getType();
for (ProcessActivityEventHandler activityListener : activityListeners) {
Consumer<ProcessActivityDTO> consumer = null;
switch (type) {
case PROCESS_ACTIVITY_START:
consumer = activityListener::onStart;
break;
case PROCESS_ACTIVITY_WAIT_ASSIGNEE:
consumer = activityListener::onWaitAssignee;
break;
case PROCESS_ACTIVITY_TAKE:
consumer = activityListener::onTake;
break;
case PROCESS_ACTIVITY_END:
consumer = activityListener::onEnd;
break;
default:
log.warn("unknown process activity event type: {}", type);
}
getListenerExecutor().execute(consumer, activityDTO);
public void onEvent(Event event, EventConsumer.Context context) {
ProcessActivityDTO activityDTO = event.normalizedData(ProcessActivityDTO.class);
ProcessActivityEventEnum type = activityDTO.getType();
for (ProcessActivityEventHandler activityListener : businessListeners) {
Consumer<ProcessActivityDTO> consumer = null;
switch (type) {
case PROCESS_ACTIVITY_START:
consumer = activityListener::onStart;
break;
case PROCESS_ACTIVITY_WAIT_ASSIGNEE:
consumer = activityListener::onWaitAssignee;
break;
case PROCESS_ACTIVITY_TAKE:
consumer = activityListener::onTake;
break;
case PROCESS_ACTIVITY_END:
consumer = activityListener::onEnd;
break;
default:
log.warn("unknown process activity event type: {}", type);
}
getListenerExecutor().execute(consumer, activityDTO);
}
}

View File

@ -6,11 +6,9 @@ import cn.axzo.workflow.common.enums.ProcessInstanceEventEnum;
import cn.axzo.workflow.common.model.response.mq.ProcessInstanceDTO;
import cn.axzo.workflow.starter.listener.ProcessInstanceEventHandler;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.util.CollectionUtils;
import java.util.Arrays;
import java.util.List;
@ -34,37 +32,34 @@ public class InnerInstanceEventListener extends AbstractWorkflowListener<Process
}
@Override
public void handleEvent(Event event, EventConsumer.Context context) {
List<ProcessInstanceEventHandler> instanceListeners = getCustomListeners();
if (!CollectionUtils.isEmpty(instanceListeners)) {
ProcessInstanceDTO instanceDTO = JSON.parseObject(event.getData().toString(), ProcessInstanceDTO.class);
ProcessInstanceEventEnum type = instanceDTO.getType();
for (ProcessInstanceEventHandler instanceListener : instanceListeners) {
Consumer<ProcessInstanceDTO> consumer = null;
switch (type) {
case PROCESS_INSTANCE_CREATED:
consumer = instanceListener::onCreated;
break;
case PROCESS_INSTANCE_STARTED:
consumer = instanceListener::onStarted;
break;
case PROCESS_INSTANCE_CANCELLED:
consumer = instanceListener::onCancelled;
break;
case PROCESS_INSTANCE_ABORTED:
consumer = instanceListener::onAborted;
break;
case PROCESS_INSTANCE_COMPLETED:
consumer = instanceListener::onCompleted;
break;
case PROCESS_INSTANCE_REJECTED:
consumer = instanceListener::onRejected;
break;
default:
log.warn("unknown process activity event type: {}", type);
}
getListenerExecutor().execute(consumer, instanceDTO);
public void onEvent(Event event, EventConsumer.Context context) {
ProcessInstanceDTO instanceDTO = event.normalizedData(ProcessInstanceDTO.class);
ProcessInstanceEventEnum type = instanceDTO.getType();
for (ProcessInstanceEventHandler instanceListener : businessListeners) {
Consumer<ProcessInstanceDTO> consumer = null;
switch (type) {
case PROCESS_INSTANCE_CREATED:
consumer = instanceListener::onCreated;
break;
case PROCESS_INSTANCE_STARTED:
consumer = instanceListener::onStarted;
break;
case PROCESS_INSTANCE_CANCELLED:
consumer = instanceListener::onCancelled;
break;
case PROCESS_INSTANCE_ABORTED:
consumer = instanceListener::onAborted;
break;
case PROCESS_INSTANCE_COMPLETED:
consumer = instanceListener::onCompleted;
break;
case PROCESS_INSTANCE_REJECTED:
consumer = instanceListener::onRejected;
break;
default:
log.warn("unknown process activity event type: {}", type);
}
getListenerExecutor().execute(consumer, instanceDTO);
}
}

View File

@ -2,16 +2,13 @@ package cn.axzo.workflow.starter.mq.broadcast.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.common.enums.ProcessInstanceEventEnum;
import cn.axzo.workflow.common.enums.ProcessMessagePushEventEnum;
import cn.axzo.workflow.common.model.response.mq.MessagePushDTO;
import cn.axzo.workflow.starter.listener.MessageNotificationEventHandler;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.util.CollectionUtils;
import java.util.Arrays;
import java.util.List;
@ -27,7 +24,7 @@ public class InnerNotificationEventListener extends AbstractWorkflowListener<Mes
private final static Logger log = LoggerFactory.getLogger(InnerNotificationEventListener.class);
public final static Supplier<List<Event.EventCode>> SUPPORTED_EVENT_CODES_SUPPLIER = () ->
Arrays.stream(ProcessInstanceEventEnum.values()).map(ProcessInstanceEventEnum::getEventCode)
Arrays.stream(ProcessMessagePushEventEnum.values()).map(ProcessMessagePushEventEnum::getEventCode)
.collect(Collectors.toList());
public InnerNotificationEventListener(ListenerExecutor executor, ObjectProvider<List<MessageNotificationEventHandler>> provider) {
@ -35,40 +32,37 @@ public class InnerNotificationEventListener extends AbstractWorkflowListener<Mes
}
@Override
public void handleEvent(Event event, EventConsumer.Context context) {
List<MessageNotificationEventHandler> instanceListeners = getCustomListeners();
if (!CollectionUtils.isEmpty(instanceListeners)) {
MessagePushDTO instanceDTO = JSON.parseObject(event.getData().toString(), MessagePushDTO.class);
ProcessMessagePushEventEnum type = instanceDTO.getType();
for (MessageNotificationEventHandler noticeListener : instanceListeners) {
Consumer<MessagePushDTO> consumer = null;
switch (type) {
case PROCESS_PUSH_NOTICE:
consumer = noticeListener::pushNotice;
break;
case PROCESS_PUSH_PENDING:
consumer = noticeListener::pushPending;
break;
case PROCESS_PUSH_PENDING_COMPLETE:
consumer = noticeListener::completePending;
break;
case PROCESS_PUSH_PENDING_ROLLBACK:
consumer = noticeListener::rollbackPending;
break;
case PROCESS_CARBON_COPY:
consumer = noticeListener::carbonCopy;
break;
case PROCESS_CARBON_COPY_COMPLETE:
consumer = noticeListener::carbonCopyComplete;
break;
case PROCESS_PUSH_SMS:
consumer = noticeListener::pushSms;
break;
default:
log.warn("unknown message event type: {}", type);
}
getListenerExecutor().execute(consumer, instanceDTO);
public void onEvent(Event event, EventConsumer.Context context) {
MessagePushDTO instanceDTO = event.normalizedData(MessagePushDTO.class);
ProcessMessagePushEventEnum type = instanceDTO.getType();
for (MessageNotificationEventHandler noticeListener : businessListeners) {
Consumer<MessagePushDTO> consumer = null;
switch (type) {
case PROCESS_PUSH_NOTICE:
consumer = noticeListener::pushNotice;
break;
case PROCESS_PUSH_PENDING:
consumer = noticeListener::pushPending;
break;
case PROCESS_PUSH_PENDING_COMPLETE:
consumer = noticeListener::completePending;
break;
case PROCESS_PUSH_PENDING_ROLLBACK:
consumer = noticeListener::rollbackPending;
break;
case PROCESS_CARBON_COPY:
consumer = noticeListener::carbonCopy;
break;
case PROCESS_CARBON_COPY_COMPLETE:
consumer = noticeListener::carbonCopyComplete;
break;
case PROCESS_PUSH_SMS:
consumer = noticeListener::pushSms;
break;
default:
log.warn("unknown message event type: {}", type);
}
getListenerExecutor().execute(consumer, instanceDTO);
}
}

View File

@ -6,11 +6,9 @@ import cn.axzo.workflow.common.enums.ProcessTaskEventEnum;
import cn.axzo.workflow.common.model.response.mq.ProcessTaskDTO;
import cn.axzo.workflow.starter.listener.ProcessTaskEventHandler;
import cn.axzo.workflow.starter.mq.execute.ListenerExecutor;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.util.CollectionUtils;
import java.util.Arrays;
import java.util.List;
@ -34,32 +32,30 @@ public class InnerTaskEventListener extends AbstractWorkflowListener<ProcessTask
}
@Override
public void handleEvent(Event event, EventConsumer.Context context) {
List<ProcessTaskEventHandler> taskListeners = getCustomListeners();
if (!CollectionUtils.isEmpty(taskListeners)) {
ProcessTaskDTO taskDTO = JSON.parseObject(event.getData().toString(), ProcessTaskDTO.class);
ProcessTaskEventEnum type = taskDTO.getType();
for (ProcessTaskEventHandler taskListener : taskListeners) {
Consumer<ProcessTaskDTO> consumer = null;
switch (type) {
case PROCESS_TASK_CREATED:
consumer = taskListener::onCreated;
break;
case PROCESS_TASK_COMPLETED:
consumer = taskListener::onCompleted;
break;
case PROCESS_TASK_ASSIGNED:
consumer = taskListener::onAssigned;
break;
case PROCESS_TASK_DELETED:
consumer = taskListener::onDeleted;
break;
default:
log.warn("unknown task event type: {}", type);
}
getListenerExecutor().execute(consumer, taskDTO);
public void onEvent(Event event, EventConsumer.Context context) {
ProcessTaskDTO taskDTO = event.normalizedData(ProcessTaskDTO.class);
ProcessTaskEventEnum type = taskDTO.getType();
for (ProcessTaskEventHandler taskListener : businessListeners) {
Consumer<ProcessTaskDTO> consumer = null;
switch (type) {
case PROCESS_TASK_CREATED:
consumer = taskListener::onCreated;
break;
case PROCESS_TASK_COMPLETED:
consumer = taskListener::onCompleted;
break;
case PROCESS_TASK_ASSIGNED:
consumer = taskListener::onAssigned;
break;
case PROCESS_TASK_DELETED:
consumer = taskListener::onDeleted;
break;
default:
log.warn("unknown task event type: {}", type);
}
getListenerExecutor().execute(consumer, taskDTO);
}
}
@Override

View File

@ -3,13 +3,13 @@ package cn.axzo.workflow.starter.mq.broadcast.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandler;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -22,11 +22,13 @@ import java.util.Objects;
public class WorkflowEngineBroadcastEventListener implements EventHandler, InitializingBean {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineBroadcastEventListener.class);
private final EventConsumer eventConsumer;
private final ObjectProvider<List<WorkflowListener>> workflowListenersProvider;
private final WorkflowEngineStarterProperties starterProperties;
private final List<WorkflowListener> workflowListeners;
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, ObjectProvider<List<WorkflowListener>> workflowListenersProvider) {
public WorkflowEngineBroadcastEventListener(EventConsumer eventConsumer, WorkflowEngineStarterProperties workflowEngineStarterProperties, List<WorkflowListener> workflowListeners) {
this.eventConsumer = eventConsumer;
this.workflowListenersProvider = workflowListenersProvider;
this.starterProperties = workflowEngineStarterProperties;
this.workflowListeners = workflowListeners;
}
@Override
@ -36,7 +38,13 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
return;
}
List<WorkflowListener> workflowListeners = workflowListenersProvider.getIfAvailable(Collections::emptyList);
try {
JSON.parse(JSON.toJSONString(event.getData()));
} catch (Exception e) {
log.warn("event data json format error, will be ignored, messageId: {}", context.getMsgId());
return;
}
if (CollectionUtils.isEmpty(workflowListeners)) {
log.warn("no business listeners implementation found, please check @Component annotation");
return;
@ -56,4 +64,5 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi
eventConsumer.registerHandlers(InnerNotificationEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
eventConsumer.registerHandlers(InnerTaskEventListener.SUPPORTED_EVENT_CODES_SUPPLIER.get(), this);
}
}

View File

@ -1,10 +1,26 @@
package cn.axzo.workflow.starter.mq.retry.producer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.framework.rocketmq.utils.TraceUtils;
import cn.axzo.workflow.common.enums.WorkflowEngineEventEnum;
import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeDTO;
import org.springframework.core.env.Environment;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.NonNull;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_ATTRIBUTION_APPS;
/**
* RPC 的调用动作的 MQ 事件生产者
@ -12,14 +28,85 @@ import org.springframework.core.env.Environment;
* @author wangli
* @since 2024/5/22 10:02
*/
public class RpcInvokeEventProducer {
public class RpcInvokeEventProducer extends RocketMQEventProducer {
private final Logger log = LoggerFactory.getLogger(RpcInvokeEventProducer.class);
private final EventProducer workflowEngineClientEventProducer;
private final String currentApplicationName;
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendBeforeCallback;
private BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler;
private final String applicationName;
public RpcInvokeEventProducer(EventProducer workflowEngineClientEventProducer, Environment environment) {
this.workflowEngineClientEventProducer = workflowEngineClientEventProducer;
this.currentApplicationName = environment.getProperty("spring.application.name");
public RpcInvokeEventProducer(RocketMQTemplate rocketMQTemplate, String defaultModule,
String appName, Context<RocketMQMessageMeta> defaultContext,
BiConsumer<Event, Context<RocketMQMessageMeta>> sendBeforeCallback,
BiConsumer<Event, Context<RocketMQMessageMeta>> sendAfterCallback,
BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler) {
super(rocketMQTemplate, defaultModule, appName, defaultContext, sendAfterCallback);
this.applicationName = appName;
this.sendBeforeCallback = sendBeforeCallback;
this.rollbackHandler = rollbackHandler;
}
@Override
public void send(@NonNull Event event, @NonNull Context context) {
log.info("发送事件内容:{}", event.toPrettyJsonString());
if (sendBeforeCallback != null) {
sendBeforeCallback.accept(event, context);
}
// XXX不要在send的时候修改event的值有副作用
// 例如当将同一个event发送到不同的topic的时候buildSchemaHash会用不同的topic赋值两次导致一些异常case
Event copiedEvent = Event.builder().build();
BeanUtils.copyProperties(event, copiedEvent);
if (Strings.isNullOrEmpty(copiedEvent.getTargetId())) {
log.warn("targetId of event is black, best practice of targetId is present, event = {}", event.toJsonString());
}
if (copiedEvent.getData() == null) {
log.warn("data of event is empty, best practice of data must present, event = {}", event.toJsonString());
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventModule()), "eventModule不能为空");
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventName()), "eventName不能为空");
// 复制一份 context并加入链路跟踪信息 traceId
HashMap newHeaders = Maps.newHashMap(Optional.ofNullable(context.getHeaders()).orElse(ImmutableMap.of()));
newHeaders.put(TraceUtils.TRACE_ID, TraceUtils.getOrCreateTraceId());
newHeaders.put(TraceUtils.CTX_LOG_ID, TraceUtils.getOrCreateTraceId());
newHeaders.put(TraceUtils.TRACE_ID_IN_MDC, TraceUtils.getOrCreateTraceId());
newHeaders.put(MQ_ATTRIBUTION_APPS, applicationName);
final Context copiedContext = context.toBuilder().headers(newHeaders).build();
Runnable runnable = () -> {
try {
getSender().accept(copiedEvent, copiedContext);
} catch (Exception e) {
log.error("====MQ PRODUCER ====, context={}, message = {}", copiedContext, copiedEvent.toPrettyJsonString(), e);
throw e;
}
};
Runnable rollbackRunnable = () -> {
try {
getRollbackHandler().accept(copiedEvent, copiedContext);
} catch (Exception e) {
// ignore
}
};
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
log.info("runnable is transaction event={}", copiedEvent.toJsonString());
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
log.info("runnable not transaction event={}", copiedEvent.toJsonString());
getAfterCommitExecutor().executeAndRollback(runnable, rollbackRunnable);
}
List<Runnable> runnables = getAfterCommitExecutor().getRunnables();
log.info("runnables.size(): {}", runnables.size());
}
@Override
public BiConsumer<Event, Context<RocketMQMessageMeta>> getRollbackHandler() {
return rollbackHandler;
}
/**
@ -29,9 +116,9 @@ public class RpcInvokeEventProducer {
* @param data {@link WorkflowEngineStarterRpcInvokeDTO}
*/
public void send(WorkflowEngineEventEnum eventEnum, WorkflowEngineStarterRpcInvokeDTO data) {
workflowEngineClientEventProducer.send(Event.builder()
.shardingKey(currentApplicationName)
.eventCode(eventEnum.getEventCode(currentApplicationName))
send(Event.builder()
.shardingKey(applicationName)
.eventCode(eventEnum.getEventCode(applicationName))
.targetId(data.getMethodName())
.targetType(eventEnum.getTag())
.data(data)