feat(REQ-1954) rocketmq 顺序消息 延迟队列

This commit is contained in:
zuoqinbo 2023-12-18 14:46:31 +08:00
parent f2d96e0420
commit a53f5e69b8

View File

@ -68,7 +68,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
// 将eventCode放入header以便快速过滤自己不关注的信息
messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString());
//设置 延迟队列参数
int delayTimeLevel = event.getDelayTimeLevel();
int delayTimeLevel = event.getDelayTimeLevel();
// 同步发送
if (context.getSyncSending()) {
try {
@ -96,9 +96,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
}
return;
}
// 异步发送
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() {
SendCallback asyncSendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}",
@ -116,7 +114,13 @@ public class RocketMQEventProducer extends AbstractEventProducer {
.context(context).event(event).throwable(throwable).build());
}
}
});
};
// 异步发送
if (delayTimeLevel > 0) {
rocketMQTemplate.asyncSend(destination, messageBuilder.build(), asyncSendCallback, 5000, delayTimeLevel);
} else {
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), asyncSendCallback);
}
};
}