From 0565c17a2463c1b06a2d4269e508d08aa92262b2 Mon Sep 17 00:00:00 2001 From: yanglin Date: Fri, 17 Jan 2025 10:48:42 +0800 Subject: [PATCH] REQ-3502: fix bugs --- .../card/CardPresetButtonSyncTodoHandler.java | 3 +++ .../card/TodoPresetButtonSyncCardHandler.java | 3 +++ .../cn/axzo/msg/center/mq/ConsumerIsolation.java | 2 ++ .../cn/axzo/msg/center/mq/RocketMQConfig.java | 16 +++++++++++++--- 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/CardPresetButtonSyncTodoHandler.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/CardPresetButtonSyncTodoHandler.java index 4677ddc1..9c018efb 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/CardPresetButtonSyncTodoHandler.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/CardPresetButtonSyncTodoHandler.java @@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event; import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.framework.rocketmq.EventHandler; import cn.axzo.msg.center.api.mq.CardPresetButtonPressedMessage; +import cn.axzo.msg.center.mq.ConsumerIsolation; import cn.axzo.msg.center.service.enums.MqMessageType; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -23,6 +24,8 @@ public class CardPresetButtonSyncTodoHandler implements EventHandler, Initializi @Override public void onEvent(Event event, EventConsumer.Context context) { + if (ConsumerIsolation.getIsolation() != ConsumerIsolation.CARD_PRESET_BUTTON_CLICKED_SYNC_TODO) + return; CardPresetButtonPressedMessage message = event.normalizedData(CardPresetButtonPressedMessage.class); log.info("received CardPresetButtonPressedMessage: {}", message); todoSyncCardService.syncCardPresetButtonPressed(message); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/TodoPresetButtonSyncCardHandler.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/TodoPresetButtonSyncCardHandler.java index b022b706..335da4a2 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/TodoPresetButtonSyncCardHandler.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/card/TodoPresetButtonSyncCardHandler.java @@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event; import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.framework.rocketmq.EventHandler; import cn.axzo.msg.center.api.mq.PresetButtonPressedMessage; +import cn.axzo.msg.center.mq.ConsumerIsolation; import cn.axzo.msg.center.service.enums.MqMessageType; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -23,6 +24,8 @@ public class TodoPresetButtonSyncCardHandler implements EventHandler, Initializi @Override public void onEvent(Event event, EventConsumer.Context context) { + if (ConsumerIsolation.getIsolation() != ConsumerIsolation.TODO_PRESET_BUTTON_CLICKED_SYNC_CARD) + return; PresetButtonPressedMessage message = event.normalizedData(PresetButtonPressedMessage.class); log.info("received PresetButtonPressedMessage: {}", message); todoSyncCardService.syncTodoPresetButtonPressed(message); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/mq/ConsumerIsolation.java b/inside-notices/src/main/java/cn/axzo/msg/center/mq/ConsumerIsolation.java index 67dc3d3d..8574bac9 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/mq/ConsumerIsolation.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/mq/ConsumerIsolation.java @@ -7,6 +7,8 @@ public enum ConsumerIsolation { TODO_SYNC_CARD_BIZ, TODO_SYNC_CARD_FLOW, + TODO_PRESET_BUTTON_CLICKED_SYNC_CARD, + CARD_PRESET_BUTTON_CLICKED_SYNC_TODO ; diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/mq/RocketMQConfig.java b/inside-notices/src/main/java/cn/axzo/msg/center/mq/RocketMQConfig.java index 156ee561..c1f70e84 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/mq/RocketMQConfig.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/mq/RocketMQConfig.java @@ -125,7 +125,7 @@ public class RocketMQConfig { eventConsumer.onEvent(value, EventConsumer.Context.builder() .msgId(message.getMsgId()) .ext(ImmutableMap.of("topic", topic)) - .headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[] {}))) + .headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[]{}))) .lagSupplier(() -> partitionLag) .maxAllowElapsedMillis(cfg.getMsgCenterMqSelfConsumeMaxExecMs()) .build()); @@ -154,7 +154,12 @@ public class RocketMQConfig { @Override public void onMessage(MessageExt message) { - super.onEvent(message, eventConsumer); + ConsumerIsolation.setIsolation(ConsumerIsolation.CARD_PRESET_BUTTON_CLICKED_SYNC_TODO); + try { + super.onEvent(message, eventConsumer); + } finally { + ConsumerIsolation.clearIsolation(); + } } } @@ -173,7 +178,12 @@ public class RocketMQConfig { @Override public void onMessage(MessageExt message) { - super.onEvent(message, eventConsumer); + ConsumerIsolation.setIsolation(ConsumerIsolation.TODO_PRESET_BUTTON_CLICKED_SYNC_CARD); + try { + super.onEvent(message, eventConsumer); + } finally { + ConsumerIsolation.clearIsolation(); + } } }