From 80a6e1c0af335181809aabd630efb2b17b6c546a Mon Sep 17 00:00:00 2001 From: zuoqinbo Date: Fri, 15 Dec 2023 17:20:05 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat(REQ-1954)=20rocketmq=20=E9=A1=BA?= =?UTF-8?q?=E5=BA=8F=E6=B6=88=E6=81=AF=20=E5=BB=B6=E8=BF=9F=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/cn/axzo/framework/rocketmq/Event.java | 11 ++++++++++- .../framework/rocketmq/RocketMQEventProducer.java | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java index c56d901..ade264a 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java @@ -93,6 +93,14 @@ public class Event { @JSONField(ordinal = Integer.MAX_VALUE) private Serializable data; + + /** + * 延迟消息 + * 参数只支持1到18,分别对应如下18个level时间层级 + * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + */ + private int delayTimeLevel; + /** * 提供默认构造函数,仅用于json反序列化时使用
* 不建议直接使用默认构造函数进行event的构建。构建event,请使用Event.builder().xx(xx).build(); @@ -104,7 +112,7 @@ public class Event { @Builder public Event(String eventModule, String eventName, String operatorId, String operatorType, - String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey) { + String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey,int delayTimeLevel) { if (eventCode != null) { this.eventModule = eventCode.getModule(); this.eventName = eventCode.getName(); @@ -124,6 +132,7 @@ public class Event { this.eventTime = System.currentTimeMillis(); this.shardingKey = shardingKey; + this.delayTimeLevel = delayTimeLevel; } public String toJsonString() { diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java index 406b209..7d7c0dc 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java @@ -67,7 +67,11 @@ public class RocketMQEventProducer extends AbstractEventProducer { } // 将eventCode放入header,以便快速过滤自己不关注的信息 messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString()); - + //设置 延迟队列参数 + int delayTimeLevel = event.getDelayTimeLevel(); + if(delayTimeLevel>0){ + messageBuilder.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); + } // 同步发送 if (context.getSyncSending()) { try { From f2d96e0420f64384a63ffc2b94e5c8d18972886a Mon Sep 17 00:00:00 2001 From: zuoqinbo Date: Fri, 15 Dec 2023 18:08:25 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat(REQ-1954)=20rocketmq=20=E9=A1=BA?= =?UTF-8?q?=E5=BA=8F=E6=B6=88=E6=81=AF=20=E5=BB=B6=E8=BF=9F=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework/rocketmq/RocketMQEventProducer.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java index 7d7c0dc..6354ef5 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java @@ -69,14 +69,17 @@ public class RocketMQEventProducer extends AbstractEventProducer { messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString()); //设置 延迟队列参数 int delayTimeLevel = event.getDelayTimeLevel(); - if(delayTimeLevel>0){ - messageBuilder.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); - } // 同步发送 if (context.getSyncSending()) { try { // 同步发送超时时间支持在properties中设置 - SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context)); + SendResult sendResult = null; + if (delayTimeLevel > 0) { + messageBuilder.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); + sendResult = rocketMQTemplate.syncSend(destination, messageBuilder.build(), 5000, delayTimeLevel); + } else { + sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context)); + } log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}", context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId()); if (sendCallback != null) { From a53f5e69b88ac3966156c811b19736b780d123ab Mon Sep 17 00:00:00 2001 From: zuoqinbo Date: Mon, 18 Dec 2023 14:46:31 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat(REQ-1954)=20rocketmq=20=E9=A1=BA?= =?UTF-8?q?=E5=BA=8F=E6=B6=88=E6=81=AF=20=E5=BB=B6=E8=BF=9F=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework/rocketmq/RocketMQEventProducer.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java index 6354ef5..c81736c 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java @@ -68,7 +68,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { // 将eventCode放入header,以便快速过滤自己不关注的信息 messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString()); //设置 延迟队列参数 - int delayTimeLevel = event.getDelayTimeLevel(); + int delayTimeLevel = event.getDelayTimeLevel(); // 同步发送 if (context.getSyncSending()) { try { @@ -96,9 +96,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { } return; } - - // 异步发送 - rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() { + SendCallback asyncSendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}", @@ -116,7 +114,13 @@ public class RocketMQEventProducer extends AbstractEventProducer { .context(context).event(event).throwable(throwable).build()); } } - }); + }; + // 异步发送 + if (delayTimeLevel > 0) { + rocketMQTemplate.asyncSend(destination, messageBuilder.build(), asyncSendCallback, 5000, delayTimeLevel); + } else { + rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), asyncSendCallback); + } }; }