feat:更改EventProduceTemplate发送消息方式,放出shardingkey相关字段
This commit is contained in:
parent
a5fb759e5d
commit
714238d561
@ -2,9 +2,12 @@ package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.utils.UUIDBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @Author: zhanglei
|
||||
* @Package: cn.axzo.apollo.workspace.webapi.config
|
||||
@ -33,17 +36,58 @@ public class EventProduceTemplate {
|
||||
}
|
||||
|
||||
public void send(String topic, Object message){
|
||||
send(topic,defaultModule,defaultName, message);
|
||||
send(topic,defaultModule,defaultName, message, null, null, null);
|
||||
}
|
||||
|
||||
public void send(String topic, String module, String name, Object message){
|
||||
/**
|
||||
* desc: 发送分区消息,消息按shardingKey做分区,可以保证同一个分区类的消息FIFO,适合消息体量大,有顺序消费的场景
|
||||
* @Param: [topic, module, name, message, targetId, targetType]
|
||||
` * topic 消息主题
|
||||
*/
|
||||
public void sendShardingMsg(String topic, Object message, String shardingKey){
|
||||
sendShardingMsg(topic, message, shardingKey, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* desc: 发送分区消息,消息按shardingKey做分区,可以保证同一个分区类的消息FIFO,适合消息体量大,有顺序消费的场景
|
||||
* @Param: [topic, module, name, message, targetId, targetType]
|
||||
` * topic 消息主题
|
||||
* targetId 事件目标对象ID 例如:人员注册,targetId可以设置为user.id,此字段慎传,若shardingKey为空,此时会拿取targetId做为shardingKey,将导致分区不可控
|
||||
* targetType 事件目标类型 例如:user-create ,可用于过滤数据
|
||||
*/
|
||||
public void sendShardingMsg(String topic, Object message, String shardingKey, String targetId, String targetType){
|
||||
send(topic,defaultModule,defaultName, message, shardingKey, targetId, targetType);
|
||||
}
|
||||
|
||||
/**
|
||||
* @Param: [topic, module, name, message, targetId, targetType]
|
||||
` * topic 消息主题 ,必填
|
||||
* module 业务模块
|
||||
* name 业务名称
|
||||
* targetId 事件目标对象ID 例如:人员注册,targetId可以设置为userId ,此字段慎传,若shardingKey为空,此时会拿取targetId做为shardingKey,将导致分区不可控
|
||||
* targetType 事件目标类型 例如:user-create, user-delete,可用于过滤数据
|
||||
* message 消息内容 ,必填
|
||||
*/
|
||||
public void send(String topic, String module, String name, Object message, String shardingKey, String targetId, String targetType){
|
||||
//检查参数
|
||||
checkParam(topic,message);
|
||||
Event.EventCode eventCode = new Event.EventCode(module, name);
|
||||
EventProducer.Context content = EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
|
||||
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder().topic(topic).build()).build();
|
||||
|
||||
if (StringUtils.isEmpty(targetType)) {
|
||||
targetType = topic;
|
||||
}
|
||||
//生产消息
|
||||
rockerProduceFactory.getEventProducer(topic).send(Event.builder().shardingKey("shardingKey")
|
||||
.targetId(UUIDBuilder.generateLongUuid())
|
||||
.targetType(topic).eventCode(eventCode)
|
||||
rockerProduceFactory.getEventProducer(topic).send(Event.builder().shardingKey(shardingKey)
|
||||
.targetId(targetId)
|
||||
.targetType(targetType).eventCode(eventCode)
|
||||
.operatorId("operatorId").data(message.toString()).build(), content);
|
||||
}
|
||||
|
||||
|
||||
private void checkParam(String topic, Object message){
|
||||
Objects.requireNonNull(topic, "topic not null");
|
||||
Objects.requireNonNull(message, "message not null");
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user