diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java index 6354ef5..c81736c 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java @@ -68,7 +68,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { // 将eventCode放入header,以便快速过滤自己不关注的信息 messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString()); //设置 延迟队列参数 - int delayTimeLevel = event.getDelayTimeLevel(); + int delayTimeLevel = event.getDelayTimeLevel(); // 同步发送 if (context.getSyncSending()) { try { @@ -96,9 +96,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { } return; } - - // 异步发送 - rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() { + SendCallback asyncSendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}", @@ -116,7 +114,13 @@ public class RocketMQEventProducer extends AbstractEventProducer { .context(context).event(event).throwable(throwable).build()); } } - }); + }; + // 异步发送 + if (delayTimeLevel > 0) { + rocketMQTemplate.asyncSend(destination, messageBuilder.build(), asyncSendCallback, 5000, delayTimeLevel); + } else { + rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), asyncSendCallback); + } }; }