Merge branch 'feature/rocketMQ' into 'master'
消息队列rockeMQ封装 See merge request universal/framework/backend/axzo-framework-commons!73
This commit is contained in:
commit
1035ad8cac
36
axzo-common-rocketmq/pom.xml
Normal file
36
axzo-common-rocketmq/pom.xml
Normal file
@ -0,0 +1,36 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<artifactId>axzo-framework-commons</artifactId>
|
||||
<groupId>cn.axzo.framework</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>cn.axzo.framework.rocketmq</groupId>
|
||||
<artifactId>axzo-common-rocketmq</artifactId>
|
||||
<name>Axzo Common RocketMQ</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>cn.axzo.framework</groupId>
|
||||
<artifactId>axzo-common-domain</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>2.2.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-tx</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@ -0,0 +1,138 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.utils.TraceUtils;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.NonNull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* 默认EventProducer,抽象里事件的组装和事务后发送的逻辑,提供一个Sender,具体服务具体提供
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta> {
|
||||
|
||||
private final AfterCommitExecutorImpl afterCommitExecutor;
|
||||
|
||||
/**
|
||||
* @return 返回发送消息的发送器
|
||||
*/
|
||||
public abstract BiConsumer<Event, Context<Meta>> getSender();
|
||||
|
||||
private Context defaultContext;
|
||||
|
||||
public AbstractEventProducer(Context defaultContext) {
|
||||
this.afterCommitExecutor = new AfterCommitExecutorImpl();
|
||||
this.defaultContext = defaultContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(@NonNull Event event, @NonNull Context context) {
|
||||
// XXX:不要在send的时候修改event的值,有副作用。
|
||||
// 例如:当将同一个event发送到不同的topic的时候,buildSchemaHash会用不同的topic赋值两次,导致一些异常case
|
||||
Event copiedEvent = Event.builder().build();
|
||||
BeanUtils.copyProperties(event, copiedEvent);
|
||||
if (Strings.isNullOrEmpty(copiedEvent.getTargetId())) {
|
||||
log.warn("targetId of event is black, best practice of targetId is present, event = {}", event.toJsonString());
|
||||
}
|
||||
|
||||
if (copiedEvent.getData() == null) {
|
||||
log.warn("data of event is empty, best practice of data must present, event = {}", event.toJsonString());
|
||||
}
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventModule()), "eventModule不能为空");
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventName()), "eventName不能为空");
|
||||
|
||||
// 复制一份 context,并加入链路跟踪信息 traceId
|
||||
HashMap newHeaders = Maps.newHashMap(Optional.ofNullable(context.getHeaders()).orElse(ImmutableMap.of()));
|
||||
newHeaders.put(TraceUtils.TRACE_ID, TraceUtils.getOrCreateTraceId());
|
||||
final Context copiedContext = context.toBuilder().headers(newHeaders).build();
|
||||
|
||||
Runnable runnable = () -> {
|
||||
try {
|
||||
getSender().accept(copiedEvent, copiedContext);
|
||||
} catch (Exception e) {
|
||||
log.error("====MQ PRODUCER ====, context={}, message = {}", copiedContext, copiedEvent.toPrettyJsonString(), e);
|
||||
throw e;
|
||||
}
|
||||
};
|
||||
if (copiedContext.isTransactional()) {
|
||||
// https://www.jianshu.com/p/59891ede5f90
|
||||
runnable.run();
|
||||
} else {
|
||||
// 并发会导致事件时序出现问题. 所以串行执行
|
||||
afterCommitExecutor.execute(() -> runnable.run());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(@NonNull Event event) {
|
||||
send(event, defaultContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
|
||||
* 保证在交易结束后被调用.
|
||||
*/
|
||||
private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
|
||||
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
|
||||
|
||||
public void execute(Runnable runnable) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Submitting new runnable {} to run after commit", runnable);
|
||||
}
|
||||
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
|
||||
}
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
List<Runnable> threadRunnables = RUNNABLES.get();
|
||||
if (threadRunnables == null) {
|
||||
threadRunnables = new ArrayList<>();
|
||||
RUNNABLES.set(threadRunnables);
|
||||
TransactionSynchronizationManager.registerSynchronization(this);
|
||||
}
|
||||
threadRunnables.add(runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
List<Runnable> threadRunnables = RUNNABLES.get();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
|
||||
}
|
||||
for (int i = 0; i < threadRunnables.size(); i++) {
|
||||
Runnable runnable = threadRunnables.get(i);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing runnable {}", runnable);
|
||||
}
|
||||
try {
|
||||
runnable.run();
|
||||
} catch (RuntimeException e) {
|
||||
log.error("Failed to execute runnable " + runnable, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
|
||||
}
|
||||
RUNNABLES.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,100 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.domain.ServiceException;
|
||||
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 批量处理事件处理器。
|
||||
* 按照 handler 维度来路由感兴趣的 events
|
||||
*/
|
||||
@Slf4j
|
||||
public class BatchEventConsumer implements EventConsumer {
|
||||
final protected ListMultimap<EventHandler, Event.EventCode> handlers = ArrayListMultimap.create();
|
||||
private final boolean logEnabled;
|
||||
private BiConsumer<Exception, String> exceptionHandler;
|
||||
|
||||
public BatchEventConsumer(BiConsumer<Exception, String> exceptionHandler, boolean logEnabled) {
|
||||
this.exceptionHandler = exceptionHandler;
|
||||
this.logEnabled = logEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) {
|
||||
handlers.put(eventHandler, eventCode);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandlerRepository registerHandlers(List<Event.EventCode> list, EventHandler eventHandler) {
|
||||
handlers.putAll(eventHandler, list);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onEvent(String s, Context context) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public boolean onEvents(List<BatchEvent> batchEvents) {
|
||||
// 默认开启日志,如需关闭日志,在调用onEvent的时候,设置logEnable为false
|
||||
if (batchEvents.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
log.info("====MQ CONSUMER BATCH2==== events={}", logEnabled ? FastjsonUtils.toJsonPettyLogString(batchEvents) : batchEvents.size());
|
||||
List<Event> events = batchEvents.stream().map(EventConsumer.BatchEvent::toEvent).collect(Collectors.toList());
|
||||
Stopwatch stopwatch = Stopwatch.createUnstarted();
|
||||
handlers.asMap().forEach((handler, codes) -> {
|
||||
try {
|
||||
stopwatch.start();
|
||||
ImmutableSet<Event.EventCode> handleCodes = ImmutableSet.copyOf(codes);
|
||||
// 过滤感兴趣的数据给 handler
|
||||
List<Event> filteredEvents = events.stream().filter(e -> handleCodes.contains(e.getEventCode())).collect(Collectors.toList());
|
||||
if (filteredEvents.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
handler.onEvents(filteredEvents, null);
|
||||
|
||||
String clazzName = handler.getClass().getCanonicalName();
|
||||
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
|
||||
log.info("====MQ CONSUMER BATCH2====, handledSize={}, handled by {}, cost {} millis", filteredEvents.size(), clazzName, elapsed);
|
||||
|
||||
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
|
||||
String msg = String.format("take too long %d millis for %s to handle %s",
|
||||
elapsed, clazzName, filteredEvents);
|
||||
handleException(new ServiceException("程序处理超时"), msg);
|
||||
}
|
||||
} catch (ServiceException ex) {
|
||||
String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents);
|
||||
log.warn("====MQ CONSUMER BATCH====, handler={}, handle event warning, event = {}", handler.getClass().getCanonicalName(),
|
||||
payloads, ex);
|
||||
handleException(ex, payloads);
|
||||
} catch (Exception ex) {
|
||||
String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents);
|
||||
log.error("====MQ CONSUMER BATCH====, handler={}, handle event error, event = {}", handler.getClass().getCanonicalName(),
|
||||
payloads, ex);
|
||||
handleException(ex, payloads);
|
||||
} finally {
|
||||
// stopwatch必须reset(),否则下一次stopwatch.start()会报错
|
||||
stopwatch.reset();
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
private void handleException(Exception ex, String msg) {
|
||||
if (exceptionHandler != null) {
|
||||
exceptionHandler.accept(ex, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,118 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 消费消息,用法如下:
|
||||
* <pre>
|
||||
* // EVENT_TOPIC_KEY = "topic"
|
||||
* @Bean
|
||||
* EventConsumer eventConsumer(AppRuntime appRuntime, EventHandlerRepository eventHandlerRepository, ApiStatsClient apiStatsClient) {
|
||||
* Consumer<EventConsumer.EventWrapper> callback = (eventWrapper) -> {
|
||||
* if (eventWrapper.isHandled()) {
|
||||
* // 只收集被App真正消费的消息.
|
||||
* String topic = (String)eventWrapper.getExt().get(EVENT_TOPIC_KEY);
|
||||
* apiStatsClient.reportConsumedEvent(eventWrapper.getEvent(), topic);
|
||||
* }
|
||||
* };
|
||||
* return new DefaultEventConsumer(appRuntime.getAppName(), eventHandlerRepository, callback);
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
@Slf4j
|
||||
public class DefaultEventConsumer implements EventConsumer {
|
||||
|
||||
private final EventHandlerRepository handlerRepository;
|
||||
|
||||
private final String appName;
|
||||
private final Consumer<EventWrapper> consumeCallback;
|
||||
|
||||
public DefaultEventConsumer(String appName, EventHandlerRepository handlerRepository, Consumer<EventWrapper> consumeCallback) {
|
||||
this.handlerRepository = handlerRepository;
|
||||
this.appName = appName;
|
||||
this.consumeCallback = consumeCallback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onEvent(String message, Context context) {
|
||||
Event event;
|
||||
|
||||
//如果header中没有eventCode. 则尝试从body里面获取
|
||||
if (context.getEventCode() == null) {
|
||||
try {
|
||||
JSONObject evenJSON = JSONObject.parseObject(message);
|
||||
context.setEventCode(new Event.EventCode(evenJSON.getString("eventModule"), evenJSON.getString("eventName")));
|
||||
} catch (Exception ex) {
|
||||
//ignore. 忽略message不是JSON的情况
|
||||
}
|
||||
}
|
||||
|
||||
// 默认开启日志,如需关闭日志,在调用onEvent的时候,设置logEnable为false
|
||||
if (!handlerRepository.canHandle(context.getEventCode())) {
|
||||
// ignore due to no registering handler
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
event = JSONObject.parseObject(message, Event.class);
|
||||
} catch (Exception e) {
|
||||
log.error("====MQ CONSUMER {} ===={}, msgId = {}, parse event error, event = {}",
|
||||
context.getTraceId(), appName, context.getMsgId(), message, e);
|
||||
return false;
|
||||
}
|
||||
boolean logEnabled = context.getLogEnabled() && (context.getLogFilter() == null || context.getLogFilter().apply(event));
|
||||
if (logEnabled) {
|
||||
log.info("====MQ CONSUMER {} ===={}, msgId = {}, message = {}",
|
||||
context.getTraceId(), appName, context.getMsgId(), message);
|
||||
}
|
||||
boolean handled = handlerRepository.process(event, context);
|
||||
if (consumeCallback != null) {
|
||||
consumeCallback.accept(EventWrapper.builder()
|
||||
.event(event)
|
||||
.consumer(this)
|
||||
.isHandled(handled)
|
||||
.ext(context.getExt())
|
||||
.context(context)
|
||||
.build());
|
||||
}
|
||||
return handled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onEvents(List<BatchEvent> events) {
|
||||
// 默认开启日志,如需关闭日志,在调用onEvent的时候,设置logEnable为false
|
||||
log.info("====MQ CONSUMER BATCH===={} , message = {}, events={}", appName, FastjsonUtils.toJsonPettyLogString(events));
|
||||
Map<Event.EventCode, List<Event>> eventGroup = events.stream().filter(e -> handlerRepository.canHandle(e.getContext().getEventCode()))
|
||||
.map(BatchEvent::toEvent).collect(Collectors.groupingBy(Event::getEventCode));
|
||||
if (eventGroup.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
for (final Map.Entry<Event.EventCode, List<Event>> entry : eventGroup.entrySet()) {
|
||||
handlerRepository.batch(entry.getValue(),
|
||||
Context.builder()
|
||||
.eventCode(entry.getKey())
|
||||
.maxAllowElapsedMillis(TimeUnit.MINUTES.toMillis(1L))
|
||||
.build());
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) {
|
||||
return handlerRepository.registerHandler(eventCode, eventHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler) {
|
||||
return handlerRepository.registerHandlers(eventCodes, eventHandler);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,224 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson.annotation.JSONField;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.hash.Hashing;
|
||||
import lombok.*;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.cglib.beans.BeanMap;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 统一系统事件对象.
|
||||
* 消息格式如下
|
||||
* {
|
||||
* "event_scene":"teaching",
|
||||
* "event_module":"im",
|
||||
* "operator_id":student_id,
|
||||
* "operator_type":"student",
|
||||
* "target_id":conversation_id,
|
||||
* "target_type":"conversation",
|
||||
* "event_name":"message_sent",
|
||||
* "event_time":"2019-07-22 16:10:10.666",
|
||||
* "data":{"message_id":"xxx","tags":"xxx"} //消息ID,消息标签
|
||||
* "event_id": "0101_xxx" //消息id
|
||||
* "data_schema_hash": "xxxx" //数据的schema hash
|
||||
* }
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class Event {
|
||||
|
||||
/**
|
||||
* 唯一标识.唯一区分一条事件.可以做幂等处理. 默认是app_id + uuid
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* event的schema hash. 在发送的时候填充.
|
||||
* 接受方可以通过简单的检查这个标示来判断event的scheme是否发生变化.
|
||||
*/
|
||||
private String schemaHash;
|
||||
|
||||
private String eventModule;
|
||||
|
||||
private String eventName;
|
||||
|
||||
@Builder.Default
|
||||
private String operatorId = "system";
|
||||
|
||||
@Builder.Default
|
||||
private String operatorType = "system";
|
||||
|
||||
private String targetId;
|
||||
|
||||
private String targetType;
|
||||
|
||||
private Long eventTime;
|
||||
|
||||
private String shardingKey;
|
||||
|
||||
/**
|
||||
* 序列化时将该字段放在最后. 避免遮挡其他关键字段
|
||||
*/
|
||||
@JSONField(ordinal = Integer.MAX_VALUE)
|
||||
private Serializable data;
|
||||
|
||||
/**
|
||||
* 提供默认构造函数,仅用于json反序列化时使用<br>
|
||||
* <b>不建议直接使用默认构造函数进行event的构建。构建event,请使用Event.builder().xx(xx).build();</b>
|
||||
* XXX: 此处不使用@NoArgsConstrutor是因为其于@Builder.Default会产生冲突。;
|
||||
*
|
||||
*/
|
||||
public Event() {
|
||||
}
|
||||
|
||||
@Builder
|
||||
public Event(String eventModule, String eventName, String operatorId, String operatorType,
|
||||
String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey) {
|
||||
if (eventCode != null) {
|
||||
this.eventModule = eventCode.getModule();
|
||||
this.eventName = eventCode.getName();
|
||||
}
|
||||
// 如果eventCode和单个的eventName,eventScene,eventModule都传入了,优先去后面单个传入的参数
|
||||
Optional.ofNullable(eventModule).ifPresent(module -> this.eventModule = module);
|
||||
Optional.ofNullable(eventName).ifPresent(name -> this.eventName = name);
|
||||
if (!Strings.isNullOrEmpty(operatorId)) {
|
||||
this.operatorId = operatorId;
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(operatorType)) {
|
||||
this.operatorType = operatorType;
|
||||
}
|
||||
this.data = Optional.ofNullable(data).orElseGet(HashMap::new);
|
||||
this.targetId = targetId;
|
||||
this.targetType = targetType;
|
||||
|
||||
this.eventTime = System.currentTimeMillis();
|
||||
this.shardingKey = shardingKey;
|
||||
}
|
||||
|
||||
public String toJsonString() {
|
||||
return JSONObject.toJSONString(this, FastjsonUtils.LOCAL_DATE_TIME_SERIALIZE_CONFIG, FastjsonUtils.SERIALIZER_FEATURES);
|
||||
}
|
||||
|
||||
/**
|
||||
* json 的序列化为人眼识别做特殊处理
|
||||
* 1. key 排序
|
||||
* 2. 过滤空的 value
|
||||
* 3. 格式化日期
|
||||
* @return
|
||||
*/
|
||||
public String toPrettyJsonString() {
|
||||
return FastjsonUtils.toJsonPettyLogString(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toPrettyJsonString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 该方法返回一个字符串,该字符串可用于唯一标注一种类型的消息。<br>
|
||||
* 对于kafka,可以将该值用于消息的key,<br>
|
||||
* 对于rabbit,可以将该值用于消息的routingKey。<br>
|
||||
* 以方便消费方快速获取消息类型
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@JSONField(serialize = false)
|
||||
public EventCode getEventCode() {
|
||||
return new EventCode(eventModule, eventName);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送前,对必填参数做校验
|
||||
*/
|
||||
public void check() {
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(eventModule), "发送消息 -> 事件模块不能为空");
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(eventName), "发送消息 -> 事件名字不能为空");
|
||||
}
|
||||
|
||||
/**
|
||||
* 将Object类型的data转为指定类型对象
|
||||
*
|
||||
* @param tClass
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public <T> T normalizedData(Class<T> tClass) {
|
||||
return JSON.parseObject(JSON.toJSONString(data), tClass);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建消息的schema hash
|
||||
*
|
||||
* @param key kafka的topic名称 or rabbit的exchange
|
||||
* @return schema hash
|
||||
*/
|
||||
public String buildSchemaHash(String key) {
|
||||
String schema;
|
||||
// FIXME: beanmap不能处理data是集合类型对象. 这些需要特殊处理
|
||||
// 建议data总是一个bean对象.
|
||||
Object data = getData();
|
||||
if (data == null) {
|
||||
schema = "null";
|
||||
} else if (data instanceof Collection
|
||||
|| data instanceof Map
|
||||
|| data instanceof Number
|
||||
|| data.getClass().isArray()
|
||||
|| data.getClass().isPrimitive()
|
||||
|| data instanceof String) {
|
||||
schema = data.getClass().getCanonicalName();
|
||||
} else {
|
||||
try {
|
||||
BeanMap beanMap = BeanMap.create(data);
|
||||
// FIXME 如何取嵌套的数据? 如何检查类型变化?
|
||||
// 目前的实现只检查了data中第一层数据的名称和类型.
|
||||
// 取payload中每个key作为schema
|
||||
schema = beanMap.keySet().stream().map(e -> e + "=" + beanMap.getPropertyType((String) e).getSimpleName())
|
||||
.sorted().collect(Collectors.joining(",")).toString();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
schema = "error";
|
||||
}
|
||||
}
|
||||
return Hashing.murmur3_128().hashString(key + getEventModule()
|
||||
+ getEventName() + schema, Charsets.UTF_8).toString();
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public static class EventCode {
|
||||
public static final String SEPARATOR = ":";
|
||||
|
||||
@NonNull
|
||||
private String module;
|
||||
@NonNull
|
||||
private String name;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return module + SEPARATOR + name;
|
||||
}
|
||||
|
||||
public static EventCode from(String text) {
|
||||
String[] segments = StringUtils.split(text,SEPARATOR);
|
||||
Preconditions.checkArgument(segments.length == 2);
|
||||
|
||||
return new EventCode(segments[0], segments[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,152 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.domain.data.IdHelper;
|
||||
import cn.axzo.framework.rocketmq.utils.TraceUtils;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Supplier;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* @author haiyangjin
|
||||
* @date 2023/7/23
|
||||
*/
|
||||
public interface EventConsumer {
|
||||
/**
|
||||
* 注册EventHandler
|
||||
*
|
||||
* @param eventCode event类型的唯一标示
|
||||
* @param eventHandler eventHandler
|
||||
* @return EventHandlerRepository
|
||||
*/
|
||||
EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler);
|
||||
|
||||
/**
|
||||
* 为多个eventCodes注册一个eventHandler
|
||||
*
|
||||
* @param eventCodes list of eventCode
|
||||
* @param eventHandler eventHandler
|
||||
* @return EventHandlerRepository
|
||||
*/
|
||||
EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler);
|
||||
|
||||
/**
|
||||
* message的处理方法
|
||||
*
|
||||
* @param message message
|
||||
* @return 如果事件被handler处理返回true, 否则返回false
|
||||
* @Param ext 扩展信息.
|
||||
*/
|
||||
boolean onEvent(String message, Context context);
|
||||
|
||||
/**
|
||||
* message的批量处理方法
|
||||
*
|
||||
* @param batchEvents message
|
||||
* @return 如果事件被handler处理返回true, 否则返回false
|
||||
* @Param ext 扩展信息.
|
||||
*/
|
||||
default boolean onEvents(List<BatchEvent> batchEvents) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
class EventWrapper {
|
||||
private Event event;
|
||||
private EventConsumer consumer;
|
||||
private boolean isHandled;
|
||||
private Map<String, Object> ext;
|
||||
private Context context;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
class Context {
|
||||
@Setter
|
||||
private Event.EventCode eventCode;
|
||||
private String traceId;
|
||||
private String msgId;
|
||||
/**
|
||||
* 只elapse 超过多少毫秒才进行日志记录。默认 3ms
|
||||
*/
|
||||
private Long logElapsedThreshold;
|
||||
|
||||
private Map<String, byte[]> headers;
|
||||
private Map<String, Object> ext;
|
||||
private Long maxAllowElapsedMillis;
|
||||
private Boolean logEnabled;
|
||||
private transient Predicate<Event> logFilter;
|
||||
/**
|
||||
* 返回消息消费者offset与全部消息offset之间的差.
|
||||
*/
|
||||
private transient Supplier<Long> lagSupplier;
|
||||
|
||||
@Builder
|
||||
public Context(Event.EventCode eventCode, String msgId, Long logElapsedThreshold, Map<String, byte[]> headers, Map<String, Object> ext,
|
||||
Long maxAllowElapsedMillis, Boolean logEnabled, Predicate<Event> logFilter, Supplier<Long> lagSupplier) {
|
||||
this.eventCode = eventCode;
|
||||
|
||||
this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L);
|
||||
this.headers = headers;
|
||||
this.ext = ext;
|
||||
this.msgId = Strings.nullToEmpty(msgId);
|
||||
|
||||
this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L);
|
||||
this.logEnabled = Optional.ofNullable(logEnabled).orElse(true);
|
||||
this.logFilter = logFilter;
|
||||
this.lagSupplier = lagSupplier;
|
||||
if (!CollectionUtils.isEmpty(headers) && headers.containsKey(TraceUtils.TRACE_ID)) {
|
||||
this.traceId = new String(headers.get(TraceUtils.TRACE_ID), Charsets.UTF_8);
|
||||
}
|
||||
if (Strings.isNullOrEmpty(this.traceId)) {
|
||||
traceId = IdHelper.get32UUID();
|
||||
}
|
||||
TraceUtils.putTraceId(this.traceId);
|
||||
}
|
||||
|
||||
public Event.EventCode getEventCode() {
|
||||
return Optional.ofNullable(eventCode)
|
||||
.orElseGet(() -> headers.containsKey("eventCode")
|
||||
? Event.EventCode.from(new String(headers.get("eventCode"), Charsets.UTF_8))
|
||||
: null);
|
||||
}
|
||||
|
||||
public String getTraceId() {
|
||||
if (Strings.isNullOrEmpty(traceId)) {
|
||||
traceId = IdHelper.get32UUID();
|
||||
TraceUtils.putTraceId(traceId);
|
||||
}
|
||||
return traceId;
|
||||
}
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
class BatchEvent {
|
||||
private String message;
|
||||
private Context context;
|
||||
|
||||
public Event.EventCode getEventCode() {
|
||||
return context.getEventCode();
|
||||
}
|
||||
|
||||
public Event toEvent() {
|
||||
try {
|
||||
return JSONObject.parseObject(message, Event.class);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author haiyangjin
|
||||
* @date 2023/7/23
|
||||
*/
|
||||
public interface EventHandler {
|
||||
void onEvent(Event event, EventConsumer.Context context);
|
||||
|
||||
/**
|
||||
* 批量处理事件
|
||||
* @param events
|
||||
* @param context
|
||||
*/
|
||||
default void onEvents(List<Event> events, EventConsumer.Context context){
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,179 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.domain.ServiceException;
|
||||
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.AntPathMatcher;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author haiyangjin
|
||||
* @date 2023/7/23
|
||||
*/
|
||||
@Slf4j
|
||||
public class EventHandlerRepository {
|
||||
final protected ListMultimap<Event.EventCode, EventHandler> handlers = ArrayListMultimap.create();
|
||||
private final BiConsumer<Exception, String> exceptionHandler;
|
||||
|
||||
private AntPathMatcher antPathMatcher;
|
||||
|
||||
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler) {
|
||||
this(exceptionHandler, false);
|
||||
}
|
||||
|
||||
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler, boolean supportPattern) {
|
||||
this.exceptionHandler = exceptionHandler;
|
||||
|
||||
if (supportPattern) {
|
||||
antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR);
|
||||
antPathMatcher.setCachePatterns(true);
|
||||
}
|
||||
}
|
||||
|
||||
public EventHandlerRepository() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) {
|
||||
Objects.requireNonNull(eventCode);
|
||||
Objects.requireNonNull(eventHandler);
|
||||
|
||||
handlers.put(eventCode, eventHandler);
|
||||
return this;
|
||||
}
|
||||
|
||||
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler) {
|
||||
Objects.requireNonNull(eventHandler);
|
||||
eventCodes.forEach(e -> handlers.put(e, eventHandler));
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean canHandle(Event.EventCode eventCode) {
|
||||
if (!isSupportPattern()) {
|
||||
return handlers.containsKey(eventCode);
|
||||
}
|
||||
|
||||
return handlers.keySet().stream()
|
||||
.anyMatch(key -> antPathMatcher.match(key.toString(), eventCode.toString()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理事件
|
||||
*
|
||||
* @param event 事件
|
||||
* @return 如果有注册的handler处理事件返回true, 否则false
|
||||
*/
|
||||
public boolean process(Event event, EventConsumer.Context context) {
|
||||
Stopwatch stopwatch = Stopwatch.createUnstarted();
|
||||
List<EventHandler> eventHandlers = getEventHandlers(event.getEventCode());
|
||||
eventHandlers.stream().forEach(handler -> {
|
||||
try {
|
||||
stopwatch.start();
|
||||
handler.onEvent(event, context);
|
||||
|
||||
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
|
||||
String clazzName = handler.getClass().getSimpleName();
|
||||
// 为了避免太多日志输出,只有处理时间超过
|
||||
if (elapsed > context.getLogElapsedThreshold()) {
|
||||
log.info("====MQ CONSUMER {}====, handled by {}, eventCode = {}, cost {} millis",
|
||||
context.getTraceId(), clazzName, event.getEventCode(), elapsed);
|
||||
}
|
||||
|
||||
if (elapsed > context.getMaxAllowElapsedMillis()) {
|
||||
String msg = String.format("[%s] take too long %d millis for %s to handle %s",
|
||||
context.getTraceId(), elapsed, clazzName, event.toPrettyJsonString());
|
||||
handleException(new ServiceException("程序处理超时"), msg);
|
||||
}
|
||||
} catch (ServiceException ex) {
|
||||
log.warn("====MQ CONSUMER {} ====, handle event warning, event = {}", context.getTraceId(), event.toPrettyJsonString(), ex);
|
||||
handleException(ex, event.toPrettyJsonString());
|
||||
} catch (Exception ex) {
|
||||
log.error("====MQ CONSUMER {} ====, handle event error, event = {}", context.getTraceId(), event.toPrettyJsonString(), ex);
|
||||
handleException(ex, event.toPrettyJsonString());
|
||||
} finally {
|
||||
// stopwatch必须reset(),否则下一次stopwatch.start()会报错
|
||||
stopwatch.reset();
|
||||
}
|
||||
});
|
||||
return !eventHandlers.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量处理多条事件
|
||||
*
|
||||
* @param events
|
||||
* @param context
|
||||
* @return
|
||||
*/
|
||||
public boolean batch(List<Event> events, EventConsumer.Context context) {
|
||||
Stopwatch stopwatch = Stopwatch.createUnstarted();
|
||||
List<EventHandler> eventHandlers = getEventHandlers(context.getEventCode());
|
||||
eventHandlers.stream().forEach(handler -> {
|
||||
try {
|
||||
stopwatch.start();
|
||||
String clazzName = handler.getClass().getCanonicalName();
|
||||
log.info("====MQ CONSUMER BATCH====, start handling by {}", clazzName);
|
||||
handler.onEvents(events, context);
|
||||
|
||||
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
|
||||
log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed);
|
||||
|
||||
if (elapsed > context.getMaxAllowElapsedMillis()) {
|
||||
String msg = String.format("take too long %d millis for %s to handle %s",
|
||||
elapsed, clazzName, events);
|
||||
handleException(new ServiceException("程序处理超时"), msg);
|
||||
}
|
||||
} catch (ServiceException ex) {
|
||||
String payloads = FastjsonUtils.toJsonPettyLogString(events);
|
||||
log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex);
|
||||
handleException(ex, payloads);
|
||||
} catch (Exception ex) {
|
||||
String payloads = FastjsonUtils.toJsonPettyLogString(events);
|
||||
log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex);
|
||||
handleException(ex, payloads);
|
||||
} finally {
|
||||
// stopwatch必须reset(),否则下一次stopwatch.start()会报错
|
||||
stopwatch.reset();
|
||||
}
|
||||
});
|
||||
return !eventHandlers.isEmpty();
|
||||
}
|
||||
|
||||
public boolean isHandled(Event event) {
|
||||
Preconditions.checkArgument(event != null);
|
||||
Preconditions.checkArgument(event.getEventCode() != null);
|
||||
|
||||
return canHandle(event.getEventCode());
|
||||
}
|
||||
|
||||
private void handleException(Exception ex, String msg) {
|
||||
if (exceptionHandler != null) {
|
||||
exceptionHandler.accept(ex, msg);
|
||||
}
|
||||
}
|
||||
|
||||
private List<EventHandler> getEventHandlers(Event.EventCode eventCode) {
|
||||
if (!isSupportPattern()) {
|
||||
return handlers.get(eventCode);
|
||||
}
|
||||
|
||||
// 支持pattern的时候,返回所有匹配的Handlers
|
||||
return handlers.keySet().stream()
|
||||
.filter(key -> antPathMatcher.match(key.toString(), eventCode.toString()))
|
||||
.flatMap(key -> handlers.get(key).stream())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private boolean isSupportPattern() {
|
||||
return antPathMatcher != null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,83 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* @author haiyangjin
|
||||
* @date 2023/7/23
|
||||
*/
|
||||
public interface EventProducer<Meta> {
|
||||
|
||||
/**
|
||||
* 发送event
|
||||
*
|
||||
* @param event 事件主体
|
||||
*/
|
||||
void send(Event event, Context<Meta> context);
|
||||
|
||||
/**
|
||||
* 发送event,context信息使用默认的context
|
||||
*
|
||||
* @param event
|
||||
*/
|
||||
void send(Event event);
|
||||
|
||||
@Getter
|
||||
@Builder(toBuilder = true)
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
class Context<Meta> {
|
||||
/**
|
||||
* 存储发送消息需要的一些元数据, 例如rabbitMq的exchange, routingKey等
|
||||
*/
|
||||
private Meta meta;
|
||||
|
||||
/**
|
||||
* 发送消息需要附加的请求头
|
||||
*/
|
||||
private Map<String, String> headers;
|
||||
|
||||
/**
|
||||
* 接受发送过程中的异常
|
||||
*/
|
||||
|
||||
private transient Consumer<ExceptionContext> exceptionHandler;
|
||||
|
||||
/**
|
||||
* 同步发送。默认 异步
|
||||
*/
|
||||
private Boolean syncSending;
|
||||
|
||||
public Boolean getSyncSending() {
|
||||
return Optional.ofNullable(syncSending).orElse(Boolean.TRUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否使用 transactional 的方式发送。查看
|
||||
* https://www.jianshu.com/p/59891ede5f90
|
||||
* 如果是 true,调用者需要做如下改造:
|
||||
* <ul>
|
||||
* <li>spring.kafka.producer.transaction-id-prefix</li>
|
||||
* <li>调用函数需要添加@Transactional</li>
|
||||
* </ul>
|
||||
*/
|
||||
private boolean transactional;
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
@Builder
|
||||
class ExceptionContext<Meta> {
|
||||
Context<Meta> context;
|
||||
Throwable throwable;
|
||||
Event event;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,83 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.domain.data.IdHelper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* 本地消息发送,避免直接发送到测试环境影响测试。
|
||||
* 如果配置了 EventConsumer,会将消息直接传到给 consumer 消费。
|
||||
* XXX:只是为了本地测试,不要使用在测试和线上环境
|
||||
*/
|
||||
@Slf4j
|
||||
public class LocalEventProducer extends AbstractEventProducer {
|
||||
|
||||
private final EventConsumer eventConsumer;
|
||||
|
||||
private String defaultModule;
|
||||
private String appName;
|
||||
|
||||
public LocalEventProducer(EventConsumer eventConsumer,
|
||||
String defaultModule,
|
||||
String appName,
|
||||
Context<LocalMessageMeta> defaultMeta) {
|
||||
super(defaultMeta);
|
||||
this.defaultModule = defaultModule;
|
||||
this.appName = appName;
|
||||
this.eventConsumer = eventConsumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiConsumer<Event, Context<LocalMessageMeta>> getSender() {
|
||||
return (event, context) -> {
|
||||
if (eventConsumer == null) {
|
||||
log.info("====IGNORE even since having no eventconsumer!{}", event);
|
||||
return;
|
||||
}
|
||||
if (Strings.isNullOrEmpty(event.getEventModule())) {
|
||||
event.setEventModule(defaultModule);
|
||||
}
|
||||
if (Strings.isNullOrEmpty(event.getEventId())) {
|
||||
event.setEventId(String.format("%s_%s", appName, IdHelper.get32UUID()));
|
||||
}
|
||||
|
||||
event.setSchemaHash(IdHelper.get32UUID());
|
||||
event.check();
|
||||
|
||||
Map<String, byte[]> headers = Maps.newHashMap();
|
||||
if (!CollectionUtils.isEmpty(context.getHeaders())) {
|
||||
context.getHeaders().entrySet()
|
||||
.forEach(entry -> headers.put(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8)));
|
||||
}
|
||||
// 将eventCode放入header,以便快速过滤自己不关注的信息
|
||||
headers.put("eventCode", event.getEventCode().toString().getBytes());
|
||||
new Thread(() -> {
|
||||
// 模拟真实 kafka 的消息延迟,这里做了短暂延迟。
|
||||
try {
|
||||
Thread.sleep(50L);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
eventConsumer.onEvent(event.toJsonString(), EventConsumer.Context.builder()
|
||||
.eventCode(event.getEventCode()).headers(headers).ext(ImmutableMap.of()).build());
|
||||
}).start();
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
@ToString
|
||||
public static class LocalMessageMeta {
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,160 @@
|
||||
package cn.axzo.framework.rocketmq;
|
||||
|
||||
import cn.axzo.framework.domain.data.IdHelper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@Slf4j
|
||||
public class RocketMQEventProducer extends AbstractEventProducer {
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
private String defaultModule;
|
||||
private String appName;
|
||||
/**
|
||||
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
|
||||
*/
|
||||
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback;
|
||||
|
||||
public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate,
|
||||
String defaultModule,
|
||||
String appName,
|
||||
Context<RocketMQMessageMeta> defaultContext,
|
||||
BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback) {
|
||||
super(defaultContext);
|
||||
this.defaultModule = defaultModule;
|
||||
this.appName = appName;
|
||||
this.sendCallback = sendCallback;
|
||||
this.rocketMQTemplate = rocketMQTemplate;
|
||||
// 设置默认的Destination,格式:topic:tag
|
||||
this.rocketMQTemplate.setDefaultDestination(defaultContext.getMeta().buildDestination(StringUtils.EMPTY));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiConsumer<Event, Context<RocketMQMessageMeta>> getSender() {
|
||||
return (event, context) -> {
|
||||
if (Strings.isNullOrEmpty(event.getEventModule())) {
|
||||
event.setEventModule(defaultModule);
|
||||
}
|
||||
if (Strings.isNullOrEmpty(event.getEventId())) {
|
||||
event.setEventId(String.format("%s_%s", appName, IdHelper.get32UUID()));
|
||||
}
|
||||
|
||||
event.setSchemaHash(event.buildSchemaHash(context.getMeta().getTopic()));
|
||||
event.check();
|
||||
|
||||
String destination = context.getMeta().buildDestination(event.getEventName());
|
||||
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(event.toJsonString());
|
||||
messageBuilder.copyHeaders(context.getHeaders());
|
||||
// 如果没显式指定消息的业务key,则使用targetId作为key
|
||||
if (!Strings.isNullOrEmpty(event.getTargetId())) {
|
||||
messageBuilder.setHeaderIfAbsent(MessageConst.PROPERTY_KEYS, event.getTargetId());
|
||||
}
|
||||
// 将eventCode放入header,以便快速过滤自己不关注的信息
|
||||
messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString());
|
||||
|
||||
// 同步发送
|
||||
if (context.getSyncSending()) {
|
||||
try {
|
||||
// 同步发送超时时间支持在properties中设置
|
||||
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
|
||||
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}",
|
||||
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
|
||||
if (sendCallback != null) {
|
||||
sendCallback.accept(event, context);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("send rocketMQ event ERROR-SYNC! event={} context={}", event.toPrettyJsonString(), context, e);
|
||||
if (context.getExceptionHandler() != null) {
|
||||
context.getExceptionHandler().accept(ExceptionContext.<RocketMQMessageMeta>builder()
|
||||
.context(context).event(event).throwable(e).build());
|
||||
}
|
||||
// 异常需要抛出,便于外部感知
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 异步发送
|
||||
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}",
|
||||
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
|
||||
if (sendCallback != null) {
|
||||
sendCallback.accept(event, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("send rocketMQ event ERROR-ASYNC! event={} context={}", event.toPrettyJsonString(), context, throwable);
|
||||
if (context.getExceptionHandler() != null) {
|
||||
context.getExceptionHandler().accept(ExceptionContext.<RocketMQMessageMeta>builder()
|
||||
.context(context).event(event).throwable(throwable).build());
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据shardingKey获取MessageQueueSelector
|
||||
* rocketMQTemplate暂不支持直接设置shardingKey,使用此方法一样能实现目的:https://github.com/apache/rocketmq-spring/issues/123
|
||||
*
|
||||
* @param event
|
||||
* @param context
|
||||
*/
|
||||
private String getMessageShardingKey(Event event, Context<RocketMQMessageMeta> context) {
|
||||
String shardingKey = Optional.ofNullable(event.getShardingKey()).orElse(context.getMeta().getShardingKey());
|
||||
|
||||
if (Strings.isNullOrEmpty(shardingKey)) {
|
||||
shardingKey = event.getTargetId();
|
||||
}
|
||||
if (Strings.isNullOrEmpty(shardingKey)) {
|
||||
return "";
|
||||
}
|
||||
return shardingKey;
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
@ToString
|
||||
public static class RocketMQMessageMeta {
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* 可选,发送消息时可以指定tag,以便在消费时过滤
|
||||
*/
|
||||
private String tag;
|
||||
|
||||
/**
|
||||
* 可选,相同shardingKey的消息会分配到同一个messageQueue,以此来保证有序性
|
||||
*/
|
||||
private String shardingKey;
|
||||
|
||||
@Builder
|
||||
public RocketMQMessageMeta(String topic, String tag, String shardingKey) {
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "topic不能为空");
|
||||
this.topic = topic;
|
||||
this.tag = tag;
|
||||
this.shardingKey = shardingKey;
|
||||
}
|
||||
|
||||
private String buildDestination(String eventName) {
|
||||
return String.format("%s:%s", topic, Strings.nullToEmpty(Optional.ofNullable(tag).orElse(eventName)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,136 @@
|
||||
package cn.axzo.framework.rocketmq.utils;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson.serializer.ObjectSerializer;
|
||||
import com.alibaba.fastjson.serializer.SerializeConfig;
|
||||
import com.alibaba.fastjson.serializer.SerializeFilter;
|
||||
import com.alibaba.fastjson.serializer.SerializerFeature;
|
||||
import com.alibaba.fastjson.support.config.FastJsonConfig;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class FastjsonUtils {
|
||||
public final static SerializerFeature[] SERIALIZER_FEATURES = {
|
||||
SerializerFeature.WriteMapNullValue,
|
||||
// List字段如果为null,输出为[],而非null
|
||||
SerializerFeature.WriteNullListAsEmpty,
|
||||
// 用枚举name()输出
|
||||
SerializerFeature.WriteEnumUsingName,
|
||||
// 类中的Get方法对应的Field是transient
|
||||
SerializerFeature.SkipTransientField,
|
||||
// 关闭循环引用检查
|
||||
SerializerFeature.DisableCircularReferenceDetect};
|
||||
|
||||
private final static SerializerFeature[] PRETTY_SERIALIZER_FEATURES = {
|
||||
// List字段如果为null,输出为[],而非null
|
||||
SerializerFeature.WriteNullListAsEmpty,
|
||||
// 用枚举name()输出
|
||||
SerializerFeature.WriteEnumUsingName,
|
||||
// 类中的Get方法对应的Field是transient
|
||||
SerializerFeature.SkipTransientField,
|
||||
// 关闭循环引用检查
|
||||
SerializerFeature.DisableCircularReferenceDetect,
|
||||
SerializerFeature.SortField,
|
||||
SerializerFeature.MapSortField,
|
||||
};
|
||||
|
||||
public final static FastJsonConfig PRETTY_FORMAT_CONFIG = buildPrettyConfig(true);
|
||||
|
||||
public final static FastJsonConfig PRETTY_LOG_CONFIG = buildPrettyConfig(false);
|
||||
|
||||
public final static SerializeConfig LOCAL_DATE_TIME_SERIALIZE_CONFIG = buildLocalDateTimeSerializeConfig();
|
||||
|
||||
public static SerializeConfig buildLocalDateTimeSerializeConfig() {
|
||||
SerializeConfig config = new SerializeConfig();
|
||||
buildGlobalSerializers().entrySet()
|
||||
.forEach(e -> config.put(e.getKey(), e.getValue()));
|
||||
return config;
|
||||
}
|
||||
|
||||
public static Map<Type, ObjectSerializer> buildGlobalSerializers() {
|
||||
return new HashMap<>();
|
||||
// todo
|
||||
// return Map.of(
|
||||
// LocalDateTime.class, (serializer, object, fieldName, fieldType, features) -> {
|
||||
// if (object == null) {
|
||||
// serializer.out.writeNull();
|
||||
// return;
|
||||
// }
|
||||
// long value = ((LocalDateTime) object).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
||||
// serializer.out.writeLong(value);
|
||||
// },
|
||||
// LocalDate.class, (serializer, object, fieldName, fieldType, features) -> {
|
||||
// if (object == null) {
|
||||
// serializer.out.writeNull();
|
||||
// return;
|
||||
// }
|
||||
// long value = ((LocalDate) object).atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
||||
// serializer.out.writeLong(value);
|
||||
// });
|
||||
}
|
||||
|
||||
private static FastJsonConfig buildPrettyConfig(boolean needPettyFormat) {
|
||||
FastJsonConfig fastJsonConfig = new FastJsonConfig();
|
||||
String datetimePattern = "yyyy-MM-dd HH:mm:ss:SSS";
|
||||
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(datetimePattern);
|
||||
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
||||
|
||||
fastJsonConfig.setCharset(Charsets.UTF_8);
|
||||
fastJsonConfig.setDateFormat(datetimePattern);
|
||||
|
||||
fastJsonConfig.setSerializeFilters(new SerializeFilter[]{});
|
||||
|
||||
ArrayList<SerializerFeature> serializerFeatures = Lists.newArrayList(FastjsonUtils.PRETTY_SERIALIZER_FEATURES);
|
||||
if (needPettyFormat) {
|
||||
serializerFeatures.add(SerializerFeature.PrettyFormat);
|
||||
}
|
||||
fastJsonConfig.setSerializerFeatures(serializerFeatures.stream().toArray(SerializerFeature[]::new));
|
||||
|
||||
//需要强制设置LocalDateTime, LocalDate的序列化格式. 避免被DefaultWebMvcConfig覆盖
|
||||
SerializeConfig config = new SerializeConfig();
|
||||
config.put(LocalDateTime.class, (serializer, object, fieldName, fieldType, features) -> {
|
||||
if (object == null) {
|
||||
serializer.out.writeNull();
|
||||
return;
|
||||
}
|
||||
String value = ((LocalDateTime) object).format(dateTimeFormatter);
|
||||
serializer.out.writeString(value);
|
||||
});
|
||||
|
||||
config.put(LocalDate.class, (serializer, object, fieldName, fieldType, features) -> {
|
||||
if (object == null) {
|
||||
serializer.out.writeNull();
|
||||
return;
|
||||
}
|
||||
String value = ((LocalDate) object).format(dateFormatter);
|
||||
serializer.out.writeString(value);
|
||||
});
|
||||
fastJsonConfig.setSerializeConfig(config);
|
||||
|
||||
return fastJsonConfig;
|
||||
}
|
||||
|
||||
public static String toJsonPettyLogString(Object object) {
|
||||
// XXX: 如果是枚举和日期类型,null 仍然会被输出。
|
||||
String res = JSONObject.toJSONString(object, PRETTY_LOG_CONFIG.getSerializeConfig(),
|
||||
PRETTY_LOG_CONFIG.getSerializeFilters(), PRETTY_LOG_CONFIG.getDateFormat(),
|
||||
JSON.DEFAULT_GENERATE_FEATURE, PRETTY_LOG_CONFIG.getSerializerFeatures());
|
||||
return res;
|
||||
}
|
||||
|
||||
public static String toJsonPettyFormatString(Object object) {
|
||||
// XXX: 如果是枚举和日期类型,null 任然会被输出。
|
||||
return JSONObject.toJSONString(object, PRETTY_FORMAT_CONFIG.getSerializeConfig(),
|
||||
null, PRETTY_FORMAT_CONFIG.getDateFormat(),
|
||||
JSON.DEFAULT_GENERATE_FEATURE, PRETTY_FORMAT_CONFIG.getSerializerFeatures());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,43 @@
|
||||
package cn.axzo.framework.rocketmq.utils;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import lombok.experimental.UtilityClass;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
/**
|
||||
* @author haiyangjin
|
||||
* @date 2023/7/23
|
||||
*/
|
||||
@UtilityClass
|
||||
public class TraceUtils {
|
||||
public static final String TRACE_ID = "axzo-trace-id";
|
||||
/**
|
||||
* 多设置一个key = TraceId, value为traceId的变量到MDC. 以兼容目前的logback-spring.xml的配置
|
||||
*/
|
||||
public static final String TRACE_ID_IN_MDC = "TraceId";
|
||||
|
||||
public String getOrCreateTraceId() {
|
||||
String res = MDC.get(TRACE_ID);
|
||||
if (Strings.isNullOrEmpty(res)) {
|
||||
// TODO uuid
|
||||
// res = UUIDBuilder.generateShortUuid();
|
||||
MDC.put(TRACE_ID, res);
|
||||
MDC.put(TRACE_ID_IN_MDC, res);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public String getTraceId() {
|
||||
return MDC.get(TRACE_ID);
|
||||
}
|
||||
|
||||
public void putTraceId(String traceId) {
|
||||
MDC.put(TRACE_ID, traceId);
|
||||
MDC.put(TRACE_ID_IN_MDC, traceId);
|
||||
}
|
||||
|
||||
public void removeTraceId() {
|
||||
MDC.remove(TRACE_ID);
|
||||
MDC.remove(TRACE_ID_IN_MDC);
|
||||
}
|
||||
}
|
||||
8
pom.xml
8
pom.xml
@ -35,6 +35,7 @@
|
||||
<module>axzo-common-jackson</module>
|
||||
<module>axzo-common-datas</module>
|
||||
<module>axzo-common-cache</module>
|
||||
<module>axzo-common-rocketmq</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
@ -138,6 +139,13 @@
|
||||
<version>${axzo-commons.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- common rocketmq-->
|
||||
<dependency>
|
||||
<groupId>cn.axzo.framework</groupId>
|
||||
<artifactId>axzo-common-rocketmq</artifactId>
|
||||
<version>${axzo-commons.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--common jackson-->
|
||||
<dependency>
|
||||
<groupId>cn.axzo.framework.jackson</groupId>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user