Merge remote-tracking branch 'origin/feature/REQ-1954'

# Conflicts:
#	axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/RocketMQEventProducer.java
This commit is contained in:
周敏 2024-12-25 09:39:59 +08:00
commit 13f1e142a2
2 changed files with 27 additions and 7 deletions

View File

@ -93,6 +93,14 @@ public class Event {
@JSONField(ordinal = Integer.MAX_VALUE)
private Serializable data;
/**
* 延迟消息
* 参数只支持1到18,分别对应如下18个level时间层级
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
private int delayTimeLevel;
/**
* 提供默认构造函数仅用于json反序列化时使用<br>
* <b>不建议直接使用默认构造函数进行event的构建构建event请使用Event.builder().xx(xx).build();</b>
@ -104,7 +112,7 @@ public class Event {
@Builder
public Event(String eventModule, String eventName, String operatorId, String operatorType,
String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey) {
String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey,int delayTimeLevel) {
if (eventCode != null) {
this.eventModule = eventCode.getModule();
this.eventName = eventCode.getName();
@ -124,6 +132,7 @@ public class Event {
this.eventTime = System.currentTimeMillis();
this.shardingKey = shardingKey;
this.delayTimeLevel = delayTimeLevel;
}
public String toJsonString() {

View File

@ -68,12 +68,19 @@ public class RocketMQEventProducer extends AbstractEventProducer {
}
// 将eventCode放入header以便快速过滤自己不关注的信息
messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString());
//设置 延迟队列参数
int delayTimeLevel = event.getDelayTimeLevel();
// 同步发送
if (context.getSyncSending()) {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
SendResult sendResult = null;
if (delayTimeLevel > 0) {
messageBuilder.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
sendResult = rocketMQTemplate.syncSend(destination, messageBuilder.build(), 5000, delayTimeLevel);
} else {
sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
}
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, messageId = {} queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMsgId(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {
@ -91,9 +98,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
}
return;
}
// 异步发送
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() {
SendCallback asyncSendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}",
@ -111,7 +116,13 @@ public class RocketMQEventProducer extends AbstractEventProducer {
.context(context).event(event).throwable(throwable).build());
}
}
});
};
// 异步发送
if (delayTimeLevel > 0) {
rocketMQTemplate.asyncSend(destination, messageBuilder.build(), asyncSendCallback, 5000, delayTimeLevel);
} else {
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), asyncSendCallback);
}
};
}