Merge branch 'feature/REQ-1217' into 'master'

Feature/req 1217

See merge request universal/framework/backend/axzo-framework-commons!91
This commit is contained in:
金海洋 2023-09-13 11:31:20 +00:00
commit 6f9e697c97
3 changed files with 201 additions and 0 deletions

View File

@ -0,0 +1,95 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.rocketmq.utils.UUIDBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @Author: zhanglei
* @Package: cn.axzo.apollo.workspace.webapi.config
* @Name: RocketMQProduceEventConfiguration
* @Date: 2023/9/6 17:02
* @Desc: 后期可根据需求自由扩展该 template
* @Version: 1.0
*/
@Slf4j
public class EventProduceTemplate {
private RockerEventProduceFactory rockerProduceFactory;
private String defaultModule = "BUSINESS";
private String defaultName = "BUSINESS";
public EventProduceTemplate(RockerEventProduceFactory rockerProduceFactory){
this.rockerProduceFactory = rockerProduceFactory;
}
public EventProduceTemplate(RockerEventProduceFactory rockerProduceFactory, String serverModule, String serverName){
this.rockerProduceFactory = rockerProduceFactory;
this.defaultModule = serverModule;
this.defaultName = serverName;
}
public void send(String topic, Object message){
send(topic,defaultModule,defaultName, message, null, null, null);
}
/**
* desc: 发送分区消息消息按shardingKey做分区可以保证同一个分区类的消息FIFO适合消息体量大有顺序消费的场景
* @Param: [topic, module, name, message, targetId, targetType]
` * topic 消息主题
*/
public void sendShardingMsg(String topic, Object message, String shardingKey){
sendShardingMsg(topic, message, shardingKey, null, null);
}
/**
* desc: 发送分区消息消息按shardingKey做分区可以保证同一个分区类的消息FIFO适合消息体量大有顺序消费的场景
* @Param: [topic, module, name, message, targetId, targetType]
` * topic 消息主题
* targetId 事件目标对象ID 例如人员注册targetId可以设置为user.id此字段慎传若shardingKey为空此时会拿取targetId做为shardingKey,将导致分区不可控
* targetType 事件目标类型 例如user-create 可用于过滤数据
*/
public void sendShardingMsg(String topic, Object message, String shardingKey, String targetId, String targetType){
send(topic,defaultModule,defaultName, message, shardingKey, targetId, targetType);
}
/**
* @Param: [topic, module, name, message, targetId, targetType]
` * topic 消息主题 ,必填
* module 业务模块
* name 业务名称
* targetId 事件目标对象ID 例如人员注册targetId可以设置为userId 此字段慎传若shardingKey为空此时会拿取targetId做为shardingKey,将导致分区不可控
* targetType 事件目标类型 例如user-create, user-delete可用于过滤数据
* message 消息内容 必填
*/
public void send(String topic, String module, String name, Object message, String shardingKey, String targetId, String targetType){
//检查参数
checkParam(topic,message);
Event.EventCode eventCode = new Event.EventCode(module, name);
EventProducer.Context content = EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder().topic(topic).build()).build();
if (StringUtils.isEmpty(targetType)) {
targetType = topic;
}
//生产消息
rockerProduceFactory.getEventProducer(topic).send(Event.builder().shardingKey(shardingKey)
.targetId(targetId)
.targetType(targetType).eventCode(eventCode)
.eventModule(module)
.eventName(name)
.operatorId("operatorId").data(message.toString()).build(), content);
}
private void checkParam(String topic, Object message){
Objects.requireNonNull(topic, "topic not null");
Objects.requireNonNull(message, "message not null");
}
}

View File

@ -0,0 +1,79 @@
package cn.axzo.framework.rocketmq;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: zhanglei
* @Package: cn.axzo.apollo.workspace.server.config.rocket
* @Name: RockerProduceFactory
* @Date: 2023/9/7 10:58
* @Desc:
* @Version: 1.0
*/
@Data
@Slf4j
public class RockerEventProduceFactory {
private RocketConfigProperties rocketMQProperties;
private Map<String, EventProducer> eventProducerMap = new HashMap<>(16);
private RocketMQTemplate rocketMQTemplate;
private String defaultModule = "BUSINESS";
private String defaultName = "BUSINESS";
public RockerEventProduceFactory(RocketConfigProperties rocketMQProperties, RocketMQTemplate rocketMQTemplate) {
this.rocketMQProperties = rocketMQProperties;
this.rocketMQTemplate = rocketMQTemplate;
this.initEventProducer();
}
public RockerEventProduceFactory(RocketConfigProperties rocketMQProperties, RocketMQTemplate rocketMQTemplate, String serverModule, String serverName) {
this.rocketMQProperties = rocketMQProperties;
this.rocketMQTemplate = rocketMQTemplate;
this.defaultModule = serverModule;
this.defaultName = serverName;
this.initEventProducer();
}
/**
* 此方法用于初始化rocket produce 消费者一个 topic分别对应一个实例此方式有点浪费资源后期可以全局一个实例动态注入topic;
* 受此框架影响暂按此方法处理
*/
private void initEventProducer() {
//把生产者全部初始化到容器中去
if (rocketMQProperties.getProduces() != null) {
for (String key : rocketMQProperties.getProduces().keySet()) {
eventProducerMap.put(rocketMQProperties.getProduces().get(key), eventProducer(rocketMQTemplate, rocketMQProperties.getProduces().get(key)));
}
}
//后期可看需求把消费者也初始化出来
}
private EventProducer eventProducer(RocketMQTemplate rocketMQTemplate, String topic) {
return new RocketMQEventProducer(rocketMQTemplate, defaultModule, defaultName,
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder().meta(RocketMQEventProducer.RocketMQMessageMeta.builder().topic(topic).build()).build(),
(event, context) -> {
log.info("rocker send message success - {}", topic);
}
);
}
public EventProducer getEventProducer(String topic) {
if (StringUtils.isEmpty(topic)) {
throw new NullPointerException("topic is marked non-null but is null");
}
if (eventProducerMap.size() < 1 || !eventProducerMap.containsKey(topic)) {
throw new NullPointerException("eventProducer instance not found , please confirm param");
}
return eventProducerMap.get(topic);
}
}

View File

@ -0,0 +1,27 @@
package cn.axzo.framework.rocketmq;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* @Author: zhanglei
* @Package: cn.axzo.apollo.workspace.webapi.config
* @Name: RocketMQProduceEventConfiguration
* @Date: 2023/9/6 17:02
* @Desc:
* @Version: 1.0
*/
@Configuration
@Slf4j
@ConfigurationProperties(prefix = "rocketmq.topics")
@Data
public class RocketConfigProperties {
private Map<String, String> produces;
private Map<String, String> consumes;
}