diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java index c56d901..ade264a 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/Event.java @@ -93,6 +93,14 @@ public class Event { @JSONField(ordinal = Integer.MAX_VALUE) private Serializable data; + + /** + * 延迟消息 + * 参数只支持1到18,分别对应如下18个level时间层级 + * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + */ + private int delayTimeLevel; + /** * 提供默认构造函数,仅用于json反序列化时使用
* 不建议直接使用默认构造函数进行event的构建。构建event,请使用Event.builder().xx(xx).build(); @@ -104,7 +112,7 @@ public class Event { @Builder public Event(String eventModule, String eventName, String operatorId, String operatorType, - String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey) { + String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey,int delayTimeLevel) { if (eventCode != null) { this.eventModule = eventCode.getModule(); this.eventName = eventCode.getName(); @@ -124,6 +132,7 @@ public class Event { this.eventTime = System.currentTimeMillis(); this.shardingKey = shardingKey; + this.delayTimeLevel = delayTimeLevel; } public String toJsonString() { diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java index ec37fee..803fd60 100644 --- a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java @@ -68,12 +68,19 @@ public class RocketMQEventProducer extends AbstractEventProducer { } // 将eventCode放入header,以便快速过滤自己不关注的信息 messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString()); - + //设置 延迟队列参数 + int delayTimeLevel = event.getDelayTimeLevel(); // 同步发送 if (context.getSyncSending()) { try { // 同步发送超时时间支持在properties中设置 - SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context)); + SendResult sendResult = null; + if (delayTimeLevel > 0) { + messageBuilder.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); + sendResult = rocketMQTemplate.syncSend(destination, messageBuilder.build(), 5000, delayTimeLevel); + } else { + sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context)); + } log.info("====MQ PRODUCER SYNC====, context={}, message = {}, messageId = {} queueId = {}", context, event.toPrettyJsonString(), sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId()); if (sendCallback != null) { @@ -91,9 +98,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { } return; } - - // 异步发送 - rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() { + SendCallback asyncSendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}", @@ -111,7 +116,13 @@ public class RocketMQEventProducer extends AbstractEventProducer { .context(context).event(event).throwable(throwable).build()); } } - }); + }; + // 异步发送 + if (delayTimeLevel > 0) { + rocketMQTemplate.asyncSend(destination, messageBuilder.build(), asyncSendCallback, 5000, delayTimeLevel); + } else { + rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), asyncSendCallback); + } }; }