update(REQ-2324) - MQ 事件中增加消息归属应用的属性

This commit is contained in:
wangli 2024-06-04 18:35:15 +08:00
parent 3678665b9a
commit 1f3fa2fb18
2 changed files with 9 additions and 0 deletions

View File

@ -151,4 +151,8 @@ public interface BpmnConstants {
* 批量操作配置默认值 * 批量操作配置默认值
*/ */
Boolean SUPPORT_BATCH_OPERATION_DEFAULT_VALUE = false; Boolean SUPPORT_BATCH_OPERATION_DEFAULT_VALUE = false;
/**
* 用于 MQ Header 记录当前事件的归属应用
*/
String MQ_OWNERSHIP_APP = "MQ_OWNERSHIP_APPLICATION";
} }

View File

@ -17,6 +17,8 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_OWNERSHIP_APP;
/** /**
* 默认的 RocketMQ 事件生产者的装饰器 * 默认的 RocketMQ 事件生产者的装饰器
* *
@ -28,6 +30,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer {
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendBeforeCallback; private BiConsumer<Event, Context<RocketMQMessageMeta>> sendBeforeCallback;
private BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler; private BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler;
private final String applicationName;
public CustomRocketMQEventProducer(RocketMQTemplate rocketMQTemplate, String defaultModule, public CustomRocketMQEventProducer(RocketMQTemplate rocketMQTemplate, String defaultModule,
String appName, Context<RocketMQMessageMeta> defaultContext, String appName, Context<RocketMQMessageMeta> defaultContext,
@ -35,6 +38,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer {
BiConsumer<Event, Context<RocketMQMessageMeta>> sendAfterCallback, BiConsumer<Event, Context<RocketMQMessageMeta>> sendAfterCallback,
BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler) { BiConsumer<Event, Context<RocketMQMessageMeta>> rollbackHandler) {
super(rocketMQTemplate, defaultModule, appName, defaultContext, sendAfterCallback); super(rocketMQTemplate, defaultModule, appName, defaultContext, sendAfterCallback);
this.applicationName = appName;
this.sendBeforeCallback = sendBeforeCallback; this.sendBeforeCallback = sendBeforeCallback;
this.rollbackHandler = rollbackHandler; this.rollbackHandler = rollbackHandler;
} }
@ -64,6 +68,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer {
newHeaders.put(TraceUtils.TRACE_ID, TraceUtils.getOrCreateTraceId()); newHeaders.put(TraceUtils.TRACE_ID, TraceUtils.getOrCreateTraceId());
newHeaders.put(TraceUtils.CTX_LOG_ID, TraceUtils.getOrCreateTraceId()); newHeaders.put(TraceUtils.CTX_LOG_ID, TraceUtils.getOrCreateTraceId());
newHeaders.put(TraceUtils.TRACE_ID_IN_MDC, TraceUtils.getOrCreateTraceId()); newHeaders.put(TraceUtils.TRACE_ID_IN_MDC, TraceUtils.getOrCreateTraceId());
newHeaders.put(MQ_OWNERSHIP_APP, applicationName);
final Context copiedContext = context.toBuilder().headers(newHeaders).build(); final Context copiedContext = context.toBuilder().headers(newHeaders).build();
Runnable runnable = () -> { Runnable runnable = () -> {