REQ-3502: 消息者隔离

This commit is contained in:
yanglin 2025-01-17 11:13:07 +08:00
parent b91f74397b
commit 7a50a808c6
5 changed files with 40 additions and 104 deletions

View File

@ -7,6 +7,8 @@ import cn.axzo.msg.center.mq.ConsumerIsolation;
import cn.axzo.msg.center.mq.IsolationMQListener;
import cn.axzo.msg.center.service.enums.MqMessageType;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;
/**
@ -14,11 +16,17 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
public class CardPresetButtonSyncTodoHandler extends IsolationMQListener {
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_card_preset_button_sync_todo_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
public class CardPresetButtonSyncTodoListener extends IsolationMQListener {
private final TodoSyncCardService todoSyncCardService;
CardPresetButtonSyncTodoHandler(TodoSyncCardService todoSyncCardService) {
CardPresetButtonSyncTodoListener(TodoSyncCardService todoSyncCardService) {
super(ConsumerIsolation.CARD_PRESET_BUTTON_CLICKED_SYNC_TODO,
MqMessageType.CARD_PRESET_BUTTON_PRESSED);
this.todoSyncCardService = todoSyncCardService;

View File

@ -7,6 +7,8 @@ import cn.axzo.msg.center.mq.ConsumerIsolation;
import cn.axzo.msg.center.mq.IsolationMQListener;
import cn.axzo.msg.center.service.enums.MqMessageType;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;
/**
@ -14,11 +16,17 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
public class TodoPresetButtonSyncCardHandler extends IsolationMQListener {
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_todo_preset_button_sync_card_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
public class TodoPresetButtonSyncCardListener extends IsolationMQListener {
private final TodoSyncCardService todoSyncCardService;
TodoPresetButtonSyncCardHandler(TodoSyncCardService todoSyncCardService) {
TodoPresetButtonSyncCardListener(TodoSyncCardService todoSyncCardService) {
super(ConsumerIsolation.TODO_PRESET_BUTTON_CLICKED_SYNC_CARD,
MqMessageType.TODO_PRESET_BUTTON_PRESSED);
this.todoSyncCardService = todoSyncCardService;

View File

@ -8,6 +8,8 @@ import cn.axzo.msg.center.mq.IsolationMQListener;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.MqMessageType;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;
/**
@ -15,11 +17,17 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
class TodoSyncCardBizHandler extends IsolationMQListener {
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_todo_sync_card_biz_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
class TodoSyncCardBizListener extends IsolationMQListener {
private final TodoSyncCardService todoSyncCardService;
TodoSyncCardBizHandler(TodoSyncCardService todoSyncCardService) {
TodoSyncCardBizListener(TodoSyncCardService todoSyncCardService) {
super(ConsumerIsolation.TODO_SYNC_CARD_BIZ, MqMessageType.TODO_STATE_UPDATE);
this.todoSyncCardService = todoSyncCardService;
}

View File

@ -8,6 +8,8 @@ import cn.axzo.msg.center.mq.IsolationMQListener;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.MqMessageType;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;
/**
@ -15,11 +17,17 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
class TodoSyncCardFlowHandler extends IsolationMQListener {
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_todo_sync_card_flow_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
class TodoSyncCardFlowListener extends IsolationMQListener {
private final TodoSyncCardService todoSyncCardService;
TodoSyncCardFlowHandler(TodoSyncCardService todoSyncCardService) {
TodoSyncCardFlowListener(TodoSyncCardService todoSyncCardService) {
super(ConsumerIsolation.TODO_SYNC_CARD_FLOW, MqMessageType.TODO_STATE_UPDATE);
this.todoSyncCardService = todoSyncCardService;
}

View File

@ -139,100 +139,4 @@ public class RocketMQConfig {
});
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_card_preset_button_sync_todo_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
public static class CardPresetButtonSyncTodoListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
ConsumerIsolation.setIsolation(ConsumerIsolation.CARD_PRESET_BUTTON_CLICKED_SYNC_TODO);
try {
super.onEvent(message, eventConsumer);
} finally {
ConsumerIsolation.clearIsolation();
}
}
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_todo_preset_button_sync_card_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
public static class TodoPresetButtonSyncCardListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
ConsumerIsolation.setIsolation(ConsumerIsolation.TODO_PRESET_BUTTON_CLICKED_SYNC_CARD);
try {
super.onEvent(message, eventConsumer);
} finally {
ConsumerIsolation.clearIsolation();
}
}
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_todo_sync_card_biz_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
public static class TodoSyncCardBizListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
ConsumerIsolation.setIsolation(ConsumerIsolation.TODO_SYNC_CARD_BIZ);
try {
super.onEvent(message, eventConsumer);
} finally {
ConsumerIsolation.clearIsolation();
}
}
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_todo_sync_card_flow_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}",
maxReconsumeTimes = 3
)
public static class TodoSyncCardFlowListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
ConsumerIsolation.setIsolation(ConsumerIsolation.TODO_SYNC_CARD_FLOW);
try {
super.onEvent(message, eventConsumer);
} finally {
ConsumerIsolation.clearIsolation();
}
}
}
}