feat:(feature/REQ-3342) 解决消费消息时出现异常,导致消息进入重试队列,消费重试队列,如果出现异常就打日志,因为再次重试也基本是异常

This commit is contained in:
李龙 2025-01-22 17:30:50 +08:00
parent f16704bba5
commit c1c75930d6

View File

@ -78,12 +78,31 @@ public class RocketMQEventConfiguration {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
}
@Profile("!unittest")
@Slf4j
@Component
@RocketMQMessageListener(topic = "%RETRY%GID_topic_thrones_${spring.profiles.active}",
consumerGroup = "GID_topic_thrones_${spring.application.name}_${spring.profiles.active}",
messageModel = MessageModel.BROADCASTING,
nameServer = "${rocketmq.name-server}"
)
public static class RetryThronesListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
try {
super.onEvent(message, eventConsumer);
} catch (Exception ex) {
log.warn("consumer thrones message error,", ex);
log.warn("retry consumer thrones message error,", ex);
}
}
}
@ -101,12 +120,31 @@ public class RocketMQEventConfiguration {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
}
@Profile("!unittest")
@Slf4j
@Component
@RocketMQMessageListener(topic = "%RETRY%GID_topic_tyr_${spring.profiles.active}",
consumerGroup = "GID_topic_tyr_${spring.application.name}_${spring.profiles.active}",
messageModel = MessageModel.BROADCASTING,
nameServer = "${rocketmq.name-server}"
)
public static class RetryTyrListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
try {
super.onEvent(message, eventConsumer);
} catch (Exception ex) {
log.warn("consumer tyr message error,", ex);
log.warn("retry consumer tyr message error,", ex);
}
}
}
@ -134,6 +172,29 @@ public class RocketMQEventConfiguration {
}
}
@Profile("!unittest")
@Slf4j
@Component
@RocketMQMessageListener(topic = "%RETRY%GID_topic_apisix_plat_${spring.profiles.active}",
consumerGroup = "GID_topic_apisix_plat_${spring.application.name}_${spring.profiles.active}",
messageModel = MessageModel.BROADCASTING,
nameServer = "${rocketmq.name-server}"
)
public static class RetryApiSixPlatListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
try {
super.onEvent(message, eventConsumer);
} catch (Exception ex) {
log.warn("retry consumer apisixPlat message error,", ex);
}
}
}
@Bean
EventHandlerRepository eventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {