Merge branch 'feature/REQ-1217'

This commit is contained in:
金海洋 2023-09-27 11:05:21 +08:00
commit c00f76dafa

View File

@ -1,11 +1,9 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.rocketmq.utils.UUIDBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.Objects;
/**
@ -25,55 +23,73 @@ public class EventProduceTemplate {
private String defaultName = "BUSINESS";
public EventProduceTemplate(RockerEventProduceFactory rockerProduceFactory){
public EventProduceTemplate(RockerEventProduceFactory rockerProduceFactory) {
this.rockerProduceFactory = rockerProduceFactory;
}
public EventProduceTemplate(RockerEventProduceFactory rockerProduceFactory, String serverModule, String serverName){
public EventProduceTemplate(RockerEventProduceFactory rockerProduceFactory, String serverModule, String serverName) {
this.rockerProduceFactory = rockerProduceFactory;
this.defaultModule = serverModule;
this.defaultName = serverName;
}
public void send(String topic, Object message){
send(topic,defaultModule,defaultName, message, null, null, null);
public void send(String topic, Serializable message, String tag) {
send(topic, defaultModule, defaultName, message, null, null, null, tag);
}
public void send(String topic, Serializable message) {
send(topic, message, null);
}
/**
* desc: 发送分区消息消息按shardingKey做分区可以保证同一个分区类的消息FIFO适合消息体量大有顺序消费的场景
*
* @Param: [topic, module, name, message, targetId, targetType]
` * topic 消息主题
* ` * topic 消息主题
*/
public void sendShardingMsg(String topic, Object message, String shardingKey){
sendShardingMsg(topic, message, shardingKey, null, null);
public void sendShardingMsg(String topic, Serializable message, String shardingKey) {
sendShardingMsg(topic, message, shardingKey, null);
}
public void sendShardingMsg(String topic, Serializable message, String shardingKey, String tag) {
sendShardingMsg(topic, message, shardingKey, null, null, tag);
}
/**
* desc: 发送分区消息消息按shardingKey做分区可以保证同一个分区类的消息FIFO适合消息体量大有顺序消费的场景
*
* @Param: [topic, module, name, message, targetId, targetType]
` * topic 消息主题
* ` * topic 消息主题
* targetId 事件目标对象ID 例如人员注册targetId可以设置为user.id此字段慎传若shardingKey为空此时会拿取targetId做为shardingKey,将导致分区不可控
* targetType 事件目标类型 例如user-create 可用于过滤数据
*/
public void sendShardingMsg(String topic, Object message, String shardingKey, String targetId, String targetType){
send(topic,defaultModule,defaultName, message, shardingKey, targetId, targetType);
public void sendShardingMsg(String topic, Serializable message, String shardingKey, String targetId, String targetType, String tag) {
send(topic, defaultModule, defaultName, message, shardingKey, targetId, targetType, tag);
}
/**
* @Param: [topic, module, name, message, targetId, targetType]
` * topic 消息主题 ,必填
* ` * topic 消息主题 ,必填
* module 业务模块
* name 业务名称
* targetId 事件目标对象ID 例如人员注册targetId可以设置为userId 此字段慎传若shardingKey为空此时会拿取targetId做为shardingKey,将导致分区不可控
* targetType 事件目标类型 例如user-create, user-delete可用于过滤数据
* message 消息内容 必填
*/
public void send(String topic, String module, String name, Object message, String shardingKey, String targetId, String targetType){
public void send(String topic, String module, String name, Serializable message, String shardingKey, String targetId, String targetType) {
send(topic, module, name, message, shardingKey, targetId, targetType, null);
}
public void send(String topic, String module, String name, Serializable message, String shardingKey, String targetId, String targetType, String tag) {
//检查参数
checkParam(topic,message);
checkParam(topic, message);
Event.EventCode eventCode = new Event.EventCode(module, name);
EventProducer.Context content = EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder().topic(topic).build()).build();
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(topic)
.tag(tag)
.build()).build();
if (StringUtils.isEmpty(targetType)) {
targetType = topic;
@ -84,11 +100,10 @@ public class EventProduceTemplate {
.targetType(targetType).eventCode(eventCode)
.eventModule(module)
.eventName(name)
.operatorId("operatorId").data(message.toString()).build(), content);
.operatorId("operatorId").data(message).build(), content);
}
private void checkParam(String topic, Object message){
private void checkParam(String topic, Object message) {
Objects.requireNonNull(topic, "topic not null");
Objects.requireNonNull(message, "message not null");
}