REQ-3502: fix bugs
This commit is contained in:
parent
7428c69723
commit
0565c17a24
@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event;
|
|||||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||||
import cn.axzo.framework.rocketmq.EventHandler;
|
import cn.axzo.framework.rocketmq.EventHandler;
|
||||||
import cn.axzo.msg.center.api.mq.CardPresetButtonPressedMessage;
|
import cn.axzo.msg.center.api.mq.CardPresetButtonPressedMessage;
|
||||||
|
import cn.axzo.msg.center.mq.ConsumerIsolation;
|
||||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -23,6 +24,8 @@ public class CardPresetButtonSyncTodoHandler implements EventHandler, Initializi
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEvent(Event event, EventConsumer.Context context) {
|
public void onEvent(Event event, EventConsumer.Context context) {
|
||||||
|
if (ConsumerIsolation.getIsolation() != ConsumerIsolation.CARD_PRESET_BUTTON_CLICKED_SYNC_TODO)
|
||||||
|
return;
|
||||||
CardPresetButtonPressedMessage message = event.normalizedData(CardPresetButtonPressedMessage.class);
|
CardPresetButtonPressedMessage message = event.normalizedData(CardPresetButtonPressedMessage.class);
|
||||||
log.info("received CardPresetButtonPressedMessage: {}", message);
|
log.info("received CardPresetButtonPressedMessage: {}", message);
|
||||||
todoSyncCardService.syncCardPresetButtonPressed(message);
|
todoSyncCardService.syncCardPresetButtonPressed(message);
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event;
|
|||||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||||
import cn.axzo.framework.rocketmq.EventHandler;
|
import cn.axzo.framework.rocketmq.EventHandler;
|
||||||
import cn.axzo.msg.center.api.mq.PresetButtonPressedMessage;
|
import cn.axzo.msg.center.api.mq.PresetButtonPressedMessage;
|
||||||
|
import cn.axzo.msg.center.mq.ConsumerIsolation;
|
||||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -23,6 +24,8 @@ public class TodoPresetButtonSyncCardHandler implements EventHandler, Initializi
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEvent(Event event, EventConsumer.Context context) {
|
public void onEvent(Event event, EventConsumer.Context context) {
|
||||||
|
if (ConsumerIsolation.getIsolation() != ConsumerIsolation.TODO_PRESET_BUTTON_CLICKED_SYNC_CARD)
|
||||||
|
return;
|
||||||
PresetButtonPressedMessage message = event.normalizedData(PresetButtonPressedMessage.class);
|
PresetButtonPressedMessage message = event.normalizedData(PresetButtonPressedMessage.class);
|
||||||
log.info("received PresetButtonPressedMessage: {}", message);
|
log.info("received PresetButtonPressedMessage: {}", message);
|
||||||
todoSyncCardService.syncTodoPresetButtonPressed(message);
|
todoSyncCardService.syncTodoPresetButtonPressed(message);
|
||||||
|
|||||||
@ -7,6 +7,8 @@ public enum ConsumerIsolation {
|
|||||||
|
|
||||||
TODO_SYNC_CARD_BIZ,
|
TODO_SYNC_CARD_BIZ,
|
||||||
TODO_SYNC_CARD_FLOW,
|
TODO_SYNC_CARD_FLOW,
|
||||||
|
TODO_PRESET_BUTTON_CLICKED_SYNC_CARD,
|
||||||
|
CARD_PRESET_BUTTON_CLICKED_SYNC_TODO
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
|
|||||||
@ -125,7 +125,7 @@ public class RocketMQConfig {
|
|||||||
eventConsumer.onEvent(value, EventConsumer.Context.builder()
|
eventConsumer.onEvent(value, EventConsumer.Context.builder()
|
||||||
.msgId(message.getMsgId())
|
.msgId(message.getMsgId())
|
||||||
.ext(ImmutableMap.of("topic", topic))
|
.ext(ImmutableMap.of("topic", topic))
|
||||||
.headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[] {})))
|
.headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[]{})))
|
||||||
.lagSupplier(() -> partitionLag)
|
.lagSupplier(() -> partitionLag)
|
||||||
.maxAllowElapsedMillis(cfg.getMsgCenterMqSelfConsumeMaxExecMs())
|
.maxAllowElapsedMillis(cfg.getMsgCenterMqSelfConsumeMaxExecMs())
|
||||||
.build());
|
.build());
|
||||||
@ -154,7 +154,12 @@ public class RocketMQConfig {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(MessageExt message) {
|
public void onMessage(MessageExt message) {
|
||||||
super.onEvent(message, eventConsumer);
|
ConsumerIsolation.setIsolation(ConsumerIsolation.CARD_PRESET_BUTTON_CLICKED_SYNC_TODO);
|
||||||
|
try {
|
||||||
|
super.onEvent(message, eventConsumer);
|
||||||
|
} finally {
|
||||||
|
ConsumerIsolation.clearIsolation();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,7 +178,12 @@ public class RocketMQConfig {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(MessageExt message) {
|
public void onMessage(MessageExt message) {
|
||||||
super.onEvent(message, eventConsumer);
|
ConsumerIsolation.setIsolation(ConsumerIsolation.TODO_PRESET_BUTTON_CLICKED_SYNC_CARD);
|
||||||
|
try {
|
||||||
|
super.onEvent(message, eventConsumer);
|
||||||
|
} finally {
|
||||||
|
ConsumerIsolation.clearIsolation();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user