From 1f3fa2fb18a5a86a26f905a802bc47550844978b Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Tue, 4 Jun 2024 18:35:15 +0800 Subject: [PATCH] =?UTF-8?q?update(REQ-2324)=20-=20MQ=20=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E4=B8=AD=E5=A2=9E=E5=8A=A0=E6=B6=88=E6=81=AF=E5=BD=92=E5=B1=9E?= =?UTF-8?q?=E5=BA=94=E7=94=A8=E7=9A=84=E5=B1=9E=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/axzo/workflow/common/constant/BpmnConstants.java | 4 ++++ .../axzo/workflow/core/mq/CustomRocketMQEventProducer.java | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java index 86593ef77..69865ff76 100644 --- a/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java +++ b/workflow-engine-common/src/main/java/cn/axzo/workflow/common/constant/BpmnConstants.java @@ -151,4 +151,8 @@ public interface BpmnConstants { * 批量操作配置默认值 */ Boolean SUPPORT_BATCH_OPERATION_DEFAULT_VALUE = false; + /** + * 用于 MQ 的 Header, 记录当前事件的归属应用 + */ + String MQ_OWNERSHIP_APP = "MQ_OWNERSHIP_APPLICATION"; } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/mq/CustomRocketMQEventProducer.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/mq/CustomRocketMQEventProducer.java index 117ab4875..72c7e36cf 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/mq/CustomRocketMQEventProducer.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/mq/CustomRocketMQEventProducer.java @@ -17,6 +17,8 @@ import java.util.List; import java.util.Optional; import java.util.function.BiConsumer; +import static cn.axzo.workflow.common.constant.BpmnConstants.MQ_OWNERSHIP_APP; + /** * 默认的 RocketMQ 事件生产者的装饰器 * @@ -28,6 +30,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer { private BiConsumer> sendBeforeCallback; private BiConsumer> rollbackHandler; + private final String applicationName; public CustomRocketMQEventProducer(RocketMQTemplate rocketMQTemplate, String defaultModule, String appName, Context defaultContext, @@ -35,6 +38,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer { BiConsumer> sendAfterCallback, BiConsumer> rollbackHandler) { super(rocketMQTemplate, defaultModule, appName, defaultContext, sendAfterCallback); + this.applicationName = appName; this.sendBeforeCallback = sendBeforeCallback; this.rollbackHandler = rollbackHandler; } @@ -64,6 +68,7 @@ public class CustomRocketMQEventProducer extends RocketMQEventProducer { newHeaders.put(TraceUtils.TRACE_ID, TraceUtils.getOrCreateTraceId()); newHeaders.put(TraceUtils.CTX_LOG_ID, TraceUtils.getOrCreateTraceId()); newHeaders.put(TraceUtils.TRACE_ID_IN_MDC, TraceUtils.getOrCreateTraceId()); + newHeaders.put(MQ_OWNERSHIP_APP, applicationName); final Context copiedContext = context.toBuilder().headers(newHeaders).build(); Runnable runnable = () -> {