From a53f5e69b88ac3966156c811b19736b780d123ab Mon Sep 17 00:00:00 2001 From: zuoqinbo Date: Mon, 18 Dec 2023 14:46:31 +0800 Subject: [PATCH] =?UTF-8?q?feat(REQ-1954)=20rocketmq=20=E9=A1=BA=E5=BA=8F?= =?UTF-8?q?=E6=B6=88=E6=81=AF=20=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework/rocketmq/RocketMQEventProducer.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java index 6354ef5..c81736c 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java @@ -68,7 +68,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { // 将eventCode放入header,以便快速过滤自己不关注的信息 messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString()); //设置 延迟队列参数 - int delayTimeLevel = event.getDelayTimeLevel(); + int delayTimeLevel = event.getDelayTimeLevel(); // 同步发送 if (context.getSyncSending()) { try { @@ -96,9 +96,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { } return; } - - // 异步发送 - rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() { + SendCallback asyncSendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}", @@ -116,7 +114,13 @@ public class RocketMQEventProducer extends AbstractEventProducer { .context(context).event(event).throwable(throwable).build()); } } - }); + }; + // 异步发送 + if (delayTimeLevel > 0) { + rocketMQTemplate.asyncSend(destination, messageBuilder.build(), asyncSendCallback, 5000, delayTimeLevel); + } else { + rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), asyncSendCallback); + } }; }