feat(REQ-1954) rocketmq 顺序消息 延迟队列

This commit is contained in:
zuoqinbo 2023-12-15 18:08:25 +08:00
parent 80a6e1c0af
commit f2d96e0420

View File

@ -69,14 +69,17 @@ public class RocketMQEventProducer extends AbstractEventProducer {
messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString());
//设置 延迟队列参数
int delayTimeLevel = event.getDelayTimeLevel();
if(delayTimeLevel>0){
messageBuilder.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
}
// 同步发送
if (context.getSyncSending()) {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
SendResult sendResult = null;
if (delayTimeLevel > 0) {
messageBuilder.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
sendResult = rocketMQTemplate.syncSend(destination, messageBuilder.build(), 5000, delayTimeLevel);
} else {
sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
}
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {