消息队列rockeMQ封装

This commit is contained in:
tianliyong 2023-07-27 11:46:56 +08:00
parent 8482475330
commit f3c78c77a8
14 changed files with 1480 additions and 0 deletions

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>axzo-framework-commons</artifactId>
<groupId>cn.axzo.framework</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>cn.axzo.framework.rocketmq</groupId>
<artifactId>axzo-common-rocketmq</artifactId>
<name>Axzo Common RocketMQ</name>
<dependencies>
<dependency>
<groupId>cn.axzo.framework</groupId>
<artifactId>axzo-common-domain</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,138 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.rocketmq.utils.TraceUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
/**
* 默认EventProducer抽象里事件的组装和事务后发送的逻辑提供一个Sender具体服务具体提供
*/
@Slf4j
public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta> {
private final AfterCommitExecutorImpl afterCommitExecutor;
/**
* @return 返回发送消息的发送器
*/
public abstract BiConsumer<Event, Context<Meta>> getSender();
private Context defaultContext;
public AbstractEventProducer(Context defaultContext) {
this.afterCommitExecutor = new AfterCommitExecutorImpl();
this.defaultContext = defaultContext;
}
@Override
public void send(@NonNull Event event, @NonNull Context context) {
// XXX不要在send的时候修改event的值有副作用
// 例如当将同一个event发送到不同的topic的时候buildSchemaHash会用不同的topic赋值两次导致一些异常case
Event copiedEvent = Event.builder().build();
BeanUtils.copyProperties(event, copiedEvent);
if (Strings.isNullOrEmpty(copiedEvent.getTargetId())) {
log.warn("targetId of event is black, best practice of targetId is present, event = {}", event.toJsonString());
}
if (copiedEvent.getData() == null) {
log.warn("data of event is empty, best practice of data must present, event = {}", event.toJsonString());
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventModule()), "eventModule不能为空");
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventName()), "eventName不能为空");
// 复制一份 context并加入链路跟踪信息 traceId
HashMap newHeaders = Maps.newHashMap(Optional.ofNullable(context.getHeaders()).orElse(ImmutableMap.of()));
newHeaders.put(TraceUtils.TRACE_ID, TraceUtils.getOrCreateTraceId());
final Context copiedContext = context.toBuilder().headers(newHeaders).build();
Runnable runnable = () -> {
try {
getSender().accept(copiedEvent, copiedContext);
} catch (Exception e) {
log.error("====MQ PRODUCER ====, context={}, message = {}", copiedContext, copiedEvent.toPrettyJsonString(), e);
throw e;
}
};
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
afterCommitExecutor.execute(() -> runnable.run());
}
}
@Override
public void send(@NonNull Event event) {
send(event, defaultContext);
}
/**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
RUNNABLES.remove();
}
}
}

View File

@ -0,0 +1,100 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.domain.ServiceException;
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* 批量处理事件处理器
* 按照 handler 维度来路由感兴趣的 events
*/
@Slf4j
public class BatchEventConsumer implements EventConsumer {
final protected ListMultimap<EventHandler, Event.EventCode> handlers = ArrayListMultimap.create();
private final boolean logEnabled;
private BiConsumer<Exception, String> exceptionHandler;
public BatchEventConsumer(BiConsumer<Exception, String> exceptionHandler, boolean logEnabled) {
this.exceptionHandler = exceptionHandler;
this.logEnabled = logEnabled;
}
@Override
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) {
handlers.put(eventHandler, eventCode);
return null;
}
@Override
public EventHandlerRepository registerHandlers(List<Event.EventCode> list, EventHandler eventHandler) {
handlers.putAll(eventHandler, list);
return null;
}
@Override
public boolean onEvent(String s, Context context) {
throw new UnsupportedOperationException();
}
public boolean onEvents(List<BatchEvent> batchEvents) {
// 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false
if (batchEvents.isEmpty()) {
return false;
}
log.info("====MQ CONSUMER BATCH2==== events={}", logEnabled ? FastjsonUtils.toJsonPettyLogString(batchEvents) : batchEvents.size());
List<Event> events = batchEvents.stream().map(EventConsumer.BatchEvent::toEvent).collect(Collectors.toList());
Stopwatch stopwatch = Stopwatch.createUnstarted();
handlers.asMap().forEach((handler, codes) -> {
try {
stopwatch.start();
ImmutableSet<Event.EventCode> handleCodes = ImmutableSet.copyOf(codes);
// 过滤感兴趣的数据给 handler
List<Event> filteredEvents = events.stream().filter(e -> handleCodes.contains(e.getEventCode())).collect(Collectors.toList());
if (filteredEvents.isEmpty()) {
return;
}
handler.onEvents(filteredEvents, null);
String clazzName = handler.getClass().getCanonicalName();
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("====MQ CONSUMER BATCH2====, handledSize={}, handled by {}, cost {} millis", filteredEvents.size(), clazzName, elapsed);
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
String msg = String.format("take too long %d millis for %s to handle %s",
elapsed, clazzName, filteredEvents);
handleException(new ServiceException("程序处理超时"), msg);
}
} catch (ServiceException ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents);
log.warn("====MQ CONSUMER BATCH====, handler={}, handle event warning, event = {}", handler.getClass().getCanonicalName(),
payloads, ex);
handleException(ex, payloads);
} catch (Exception ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents);
log.error("====MQ CONSUMER BATCH====, handler={}, handle event error, event = {}", handler.getClass().getCanonicalName(),
payloads, ex);
handleException(ex, payloads);
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
return true;
}
private void handleException(Exception ex, String msg) {
if (exceptionHandler != null) {
exceptionHandler.accept(ex, msg);
}
}
}

View File

@ -0,0 +1,118 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* 消费消息用法如下
* <pre>
* // EVENT_TOPIC_KEY = "topic"
* &#064;Bean
* EventConsumer eventConsumer(AppRuntime appRuntime, EventHandlerRepository eventHandlerRepository, ApiStatsClient apiStatsClient) {
* Consumer<EventConsumer.EventWrapper> callback = (eventWrapper) -> {
* if (eventWrapper.isHandled()) {
* // 只收集被App真正消费的消息.
* String topic = (String)eventWrapper.getExt().get(EVENT_TOPIC_KEY);
* apiStatsClient.reportConsumedEvent(eventWrapper.getEvent(), topic);
* }
* };
* return new DefaultEventConsumer(appRuntime.getAppName(), eventHandlerRepository, callback);
* }
* </pre>
*/
@Slf4j
public class DefaultEventConsumer implements EventConsumer {
private final EventHandlerRepository handlerRepository;
private final String appName;
private final Consumer<EventWrapper> consumeCallback;
public DefaultEventConsumer(String appName, EventHandlerRepository handlerRepository, Consumer<EventWrapper> consumeCallback) {
this.handlerRepository = handlerRepository;
this.appName = appName;
this.consumeCallback = consumeCallback;
}
@Override
public boolean onEvent(String message, Context context) {
Event event;
//如果header中没有eventCode. 则尝试从body里面获取
if (context.getEventCode() == null) {
try {
JSONObject evenJSON = JSONObject.parseObject(message);
context.setEventCode(new Event.EventCode(evenJSON.getString("eventModule"), evenJSON.getString("eventName")));
} catch (Exception ex) {
//ignore. 忽略message不是JSON的情况
}
}
// 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false
if (!handlerRepository.canHandle(context.getEventCode())) {
// ignore due to no registering handler
return false;
}
try {
event = JSONObject.parseObject(message, Event.class);
} catch (Exception e) {
log.error("====MQ CONSUMER {} ===={}, msgId = {}, parse event error, event = {}",
context.getTraceId(), appName, context.getMsgId(), message, e);
return false;
}
boolean logEnabled = context.getLogEnabled() && (context.getLogFilter() == null || context.getLogFilter().apply(event));
if (logEnabled) {
log.info("====MQ CONSUMER {} ===={}, msgId = {}, message = {}",
context.getTraceId(), appName, context.getMsgId(), message);
}
boolean handled = handlerRepository.process(event, context);
if (consumeCallback != null) {
consumeCallback.accept(EventWrapper.builder()
.event(event)
.consumer(this)
.isHandled(handled)
.ext(context.getExt())
.context(context)
.build());
}
return handled;
}
@Override
public boolean onEvents(List<BatchEvent> events) {
// 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false
log.info("====MQ CONSUMER BATCH===={} , message = {}, events={}", appName, FastjsonUtils.toJsonPettyLogString(events));
Map<Event.EventCode, List<Event>> eventGroup = events.stream().filter(e -> handlerRepository.canHandle(e.getContext().getEventCode()))
.map(BatchEvent::toEvent).collect(Collectors.groupingBy(Event::getEventCode));
if (eventGroup.isEmpty()) {
return false;
}
for (final Map.Entry<Event.EventCode, List<Event>> entry : eventGroup.entrySet()) {
handlerRepository.batch(entry.getValue(),
Context.builder()
.eventCode(entry.getKey())
.maxAllowElapsedMillis(TimeUnit.MINUTES.toMillis(1L))
.build());
}
return true;
}
@Override
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) {
return handlerRepository.registerHandler(eventCode, eventHandler);
}
@Override
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler) {
return handlerRepository.registerHandlers(eventCodes, eventHandler);
}
}

View File

@ -0,0 +1,224 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.hash.Hashing;
import lombok.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cglib.beans.BeanMap;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 统一系统事件对象.
* 消息格式如下
* {
* "event_scene":"teaching",
* "event_module":"im",
* "operator_id":student_id,
* "operator_type":"student",
* "target_id":conversation_id,
* "target_type":"conversation",
* "event_name":"message_sent",
* "event_time":"2019-07-22 16:10:10.666",
* "data":{"message_id":"xxx","tags":"xxx"} //消息ID,消息标签
* "event_id": "0101_xxx" //消息id
* "data_schema_hash": "xxxx" //数据的schema hash
* }
*/
@Data
@AllArgsConstructor
public class Event {
/**
* 唯一标识.唯一区分一条事件.可以做幂等处理. 默认是app_id + uuid
*/
private String eventId;
/**
* event的schema hash. 在发送的时候填充.
* 接受方可以通过简单的检查这个标示来判断event的scheme是否发生变化.
*/
private String schemaHash;
private String eventModule;
private String eventName;
@Builder.Default
private String operatorId = "system";
@Builder.Default
private String operatorType = "system";
private String targetId;
private String targetType;
private Long eventTime;
private String shardingKey;
/**
* 序列化时将该字段放在最后. 避免遮挡其他关键字段
*/
@JSONField(ordinal = Integer.MAX_VALUE)
private Serializable data;
/**
* 提供默认构造函数仅用于json反序列化时使用<br>
* <b>不建议直接使用默认构造函数进行event的构建构建event请使用Event.builder().xx(xx).build();</b>
* XXX: 此处不使用@NoArgsConstrutor是因为其于@Builder.Default会产生冲突;
*
*/
public Event() {
}
@Builder
public Event(String eventModule, String eventName, String operatorId, String operatorType,
String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey) {
if (eventCode != null) {
this.eventModule = eventCode.getModule();
this.eventName = eventCode.getName();
}
// 如果eventCode和单个的eventNameeventSceneeventModule都传入了优先去后面单个传入的参数
Optional.ofNullable(eventModule).ifPresent(module -> this.eventModule = module);
Optional.ofNullable(eventName).ifPresent(name -> this.eventName = name);
if (!Strings.isNullOrEmpty(operatorId)) {
this.operatorId = operatorId;
}
if (!Strings.isNullOrEmpty(operatorType)) {
this.operatorType = operatorType;
}
this.data = Optional.ofNullable(data).orElseGet(HashMap::new);
this.targetId = targetId;
this.targetType = targetType;
this.eventTime = System.currentTimeMillis();
this.shardingKey = shardingKey;
}
public String toJsonString() {
return JSONObject.toJSONString(this, FastjsonUtils.LOCAL_DATE_TIME_SERIALIZE_CONFIG, FastjsonUtils.SERIALIZER_FEATURES);
}
/**
* json 的序列化为人眼识别做特殊处理
* 1. key 排序
* 2. 过滤空的 value
* 3. 格式化日期
* @return
*/
public String toPrettyJsonString() {
return FastjsonUtils.toJsonPettyLogString(this);
}
@Override
public String toString() {
return toPrettyJsonString();
}
/**
* 该方法返回一个字符串该字符串可用于唯一标注一种类型的消息<br>
* 对于kafka可以将该值用于消息的key<br>
* 对于rabbit可以将该值用于消息的routingKey<br>
* 以方便消费方快速获取消息类型
*
* @return
*/
@JSONField(serialize = false)
public EventCode getEventCode() {
return new EventCode(eventModule, eventName);
}
/**
* 发送前对必填参数做校验
*/
public void check() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(eventModule), "发送消息 -> 事件模块不能为空");
Preconditions.checkArgument(!Strings.isNullOrEmpty(eventName), "发送消息 -> 事件名字不能为空");
}
/**
* 将Object类型的data转为指定类型对象
*
* @param tClass
* @param <T>
* @return
*/
public <T> T normalizedData(Class<T> tClass) {
return JSON.parseObject(JSON.toJSONString(data), tClass);
}
/**
* 构建消息的schema hash
*
* @param key kafka的topic名称 or rabbit的exchange
* @return schema hash
*/
public String buildSchemaHash(String key) {
String schema;
// FIXME: beanmap不能处理data是集合类型对象. 这些需要特殊处理
// 建议data总是一个bean对象.
Object data = getData();
if (data == null) {
schema = "null";
} else if (data instanceof Collection
|| data instanceof Map
|| data instanceof Number
|| data.getClass().isArray()
|| data.getClass().isPrimitive()
|| data instanceof String) {
schema = data.getClass().getCanonicalName();
} else {
try {
BeanMap beanMap = BeanMap.create(data);
// FIXME 如何取嵌套的数据? 如何检查类型变化?
// 目前的实现只检查了data中第一层数据的名称和类型.
// 取payload中每个key作为schema
schema = beanMap.keySet().stream().map(e -> e + "=" + beanMap.getPropertyType((String) e).getSimpleName())
.sorted().collect(Collectors.joining(",")).toString();
} catch (Exception e) {
// ignore
schema = "error";
}
}
return Hashing.murmur3_128().hashString(key + getEventModule()
+ getEventName() + schema, Charsets.UTF_8).toString();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public static class EventCode {
public static final String SEPARATOR = ":";
@NonNull
private String module;
@NonNull
private String name;
@Override
public String toString() {
return module + SEPARATOR + name;
}
public static EventCode from(String text) {
String[] segments = StringUtils.split(text,SEPARATOR);
Preconditions.checkArgument(segments.length == 2);
return new EventCode(segments[0], segments[1]);
}
}
}

View File

@ -0,0 +1,152 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.domain.data.IdHelper;
import cn.axzo.framework.rocketmq.utils.TraceUtils;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* @author haiyangjin
* @date 2023/7/23
*/
public interface EventConsumer {
/**
* 注册EventHandler
*
* @param eventCode event类型的唯一标示
* @param eventHandler eventHandler
* @return EventHandlerRepository
*/
EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler);
/**
* 为多个eventCodes注册一个eventHandler
*
* @param eventCodes list of eventCode
* @param eventHandler eventHandler
* @return EventHandlerRepository
*/
EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler);
/**
* message的处理方法
*
* @param message message
* @return 如果事件被handler处理返回true, 否则返回false
* @Param ext 扩展信息.
*/
boolean onEvent(String message, Context context);
/**
* message的批量处理方法
*
* @param batchEvents message
* @return 如果事件被handler处理返回true, 否则返回false
* @Param ext 扩展信息.
*/
default boolean onEvents(List<BatchEvent> batchEvents) {
throw new UnsupportedOperationException();
}
@Builder
@Getter
class EventWrapper {
private Event event;
private EventConsumer consumer;
private boolean isHandled;
private Map<String, Object> ext;
private Context context;
}
@Getter
@NoArgsConstructor
class Context {
@Setter
private Event.EventCode eventCode;
private String traceId;
private String msgId;
/**
* 只elapse 超过多少毫秒才进行日志记录默认 3ms
*/
private Long logElapsedThreshold;
private Map<String, byte[]> headers;
private Map<String, Object> ext;
private Long maxAllowElapsedMillis;
private Boolean logEnabled;
private transient Predicate<Event> logFilter;
/**
* 返回消息消费者offset与全部消息offset之间的差.
*/
private transient Supplier<Long> lagSupplier;
@Builder
public Context(Event.EventCode eventCode, String msgId, Long logElapsedThreshold, Map<String, byte[]> headers, Map<String, Object> ext,
Long maxAllowElapsedMillis, Boolean logEnabled, Predicate<Event> logFilter, Supplier<Long> lagSupplier) {
this.eventCode = eventCode;
this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L);
this.headers = headers;
this.ext = ext;
this.msgId = Strings.nullToEmpty(msgId);
this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L);
this.logEnabled = Optional.ofNullable(logEnabled).orElse(true);
this.logFilter = logFilter;
this.lagSupplier = lagSupplier;
if (!CollectionUtils.isEmpty(headers) && headers.containsKey(TraceUtils.TRACE_ID)) {
this.traceId = new String(headers.get(TraceUtils.TRACE_ID), Charsets.UTF_8);
}
if (Strings.isNullOrEmpty(this.traceId)) {
traceId = IdHelper.get32UUID();
}
TraceUtils.putTraceId(this.traceId);
}
public Event.EventCode getEventCode() {
return Optional.ofNullable(eventCode)
.orElseGet(() -> headers.containsKey("eventCode")
? Event.EventCode.from(new String(headers.get("eventCode"), Charsets.UTF_8))
: null);
}
public String getTraceId() {
if (Strings.isNullOrEmpty(traceId)) {
traceId = IdHelper.get32UUID();
TraceUtils.putTraceId(traceId);
}
return traceId;
}
}
@Builder
@Getter
class BatchEvent {
private String message;
private Context context;
public Event.EventCode getEventCode() {
return context.getEventCode();
}
public Event toEvent() {
try {
return JSONObject.parseObject(message, Event.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,20 @@
package cn.axzo.framework.rocketmq;
import java.util.List;
/**
* @author haiyangjin
* @date 2023/7/23
*/
public interface EventHandler {
void onEvent(Event event, EventConsumer.Context context);
/**
* 批量处理事件
* @param events
* @param context
*/
default void onEvents(List<Event> events, EventConsumer.Context context){
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,179 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.domain.ServiceException;
import cn.axzo.framework.rocketmq.utils.FastjsonUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.AntPathMatcher;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* @author haiyangjin
* @date 2023/7/23
*/
@Slf4j
public class EventHandlerRepository {
final protected ListMultimap<Event.EventCode, EventHandler> handlers = ArrayListMultimap.create();
private final BiConsumer<Exception, String> exceptionHandler;
private AntPathMatcher antPathMatcher;
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler) {
this(exceptionHandler, false);
}
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler, boolean supportPattern) {
this.exceptionHandler = exceptionHandler;
if (supportPattern) {
antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR);
antPathMatcher.setCachePatterns(true);
}
}
public EventHandlerRepository() {
this(null);
}
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) {
Objects.requireNonNull(eventCode);
Objects.requireNonNull(eventHandler);
handlers.put(eventCode, eventHandler);
return this;
}
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler) {
Objects.requireNonNull(eventHandler);
eventCodes.forEach(e -> handlers.put(e, eventHandler));
return this;
}
public boolean canHandle(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.containsKey(eventCode);
}
return handlers.keySet().stream()
.anyMatch(key -> antPathMatcher.match(key.toString(), eventCode.toString()));
}
/**
* 处理事件
*
* @param event 事件
* @return 如果有注册的handler处理事件返回true, 否则false
*/
public boolean process(Event event, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
List<EventHandler> eventHandlers = getEventHandlers(event.getEventCode());
eventHandlers.stream().forEach(handler -> {
try {
stopwatch.start();
handler.onEvent(event, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
String clazzName = handler.getClass().getSimpleName();
// 为了避免太多日志输出只有处理时间超过
if (elapsed > context.getLogElapsedThreshold()) {
log.info("====MQ CONSUMER {}====, handled by {}, eventCode = {}, cost {} millis",
context.getTraceId(), clazzName, event.getEventCode(), elapsed);
}
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("[%s] take too long %d millis for %s to handle %s",
context.getTraceId(), elapsed, clazzName, event.toPrettyJsonString());
handleException(new ServiceException("程序处理超时"), msg);
}
} catch (ServiceException ex) {
log.warn("====MQ CONSUMER {} ====, handle event warning, event = {}", context.getTraceId(), event.toPrettyJsonString(), ex);
handleException(ex, event.toPrettyJsonString());
} catch (Exception ex) {
log.error("====MQ CONSUMER {} ====, handle event error, event = {}", context.getTraceId(), event.toPrettyJsonString(), ex);
handleException(ex, event.toPrettyJsonString());
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
return !eventHandlers.isEmpty();
}
/**
* 批量处理多条事件
*
* @param events
* @param context
* @return
*/
public boolean batch(List<Event> events, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
List<EventHandler> eventHandlers = getEventHandlers(context.getEventCode());
eventHandlers.stream().forEach(handler -> {
try {
stopwatch.start();
String clazzName = handler.getClass().getCanonicalName();
log.info("====MQ CONSUMER BATCH====, start handling by {}", clazzName);
handler.onEvents(events, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed);
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("take too long %d millis for %s to handle %s",
elapsed, clazzName, events);
handleException(new ServiceException("程序处理超时"), msg);
}
} catch (ServiceException ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex);
handleException(ex, payloads);
} catch (Exception ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex);
handleException(ex, payloads);
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
return !eventHandlers.isEmpty();
}
public boolean isHandled(Event event) {
Preconditions.checkArgument(event != null);
Preconditions.checkArgument(event.getEventCode() != null);
return canHandle(event.getEventCode());
}
private void handleException(Exception ex, String msg) {
if (exceptionHandler != null) {
exceptionHandler.accept(ex, msg);
}
}
private List<EventHandler> getEventHandlers(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.get(eventCode);
}
// 支持pattern的时候返回所有匹配的Handlers
return handlers.keySet().stream()
.filter(key -> antPathMatcher.match(key.toString(), eventCode.toString()))
.flatMap(key -> handlers.get(key).stream())
.collect(Collectors.toList());
}
private boolean isSupportPattern() {
return antPathMatcher != null;
}
}

View File

@ -0,0 +1,83 @@
package cn.axzo.framework.rocketmq;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
/**
* @author haiyangjin
* @date 2023/7/23
*/
public interface EventProducer<Meta> {
/**
* 发送event
*
* @param event 事件主体
*/
void send(Event event, Context<Meta> context);
/**
* 发送eventcontext信息使用默认的context
*
* @param event
*/
void send(Event event);
@Getter
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
class Context<Meta> {
/**
* 存储发送消息需要的一些元数据, 例如rabbitMq的exchange, routingKey等
*/
private Meta meta;
/**
* 发送消息需要附加的请求头
*/
private Map<String, String> headers;
/**
* 接受发送过程中的异常
*/
private transient Consumer<ExceptionContext> exceptionHandler;
/**
* 同步发送默认 异步
*/
private Boolean syncSending;
public Boolean getSyncSending() {
return Optional.ofNullable(syncSending).orElse(Boolean.TRUE);
}
/**
* 是否使用 transactional 的方式发送查看
* https://www.jianshu.com/p/59891ede5f90
* 如果是 true调用者需要做如下改造
* <ul>
* <li>spring.kafka.producer.transaction-id-prefix</li>
* <li>调用函数需要添加@Transactional</li>
* </ul>
*/
private boolean transactional;
}
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Builder
class ExceptionContext<Meta> {
Context<Meta> context;
Throwable throwable;
Event event;
}
}

View File

@ -0,0 +1,83 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.domain.data.IdHelper;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* 本地消息发送避免直接发送到测试环境影响测试
* 如果配置了 EventConsumer,会将消息直接传到给 consumer 消费
* XXX只是为了本地测试不要使用在测试和线上环境
*/
@Slf4j
public class LocalEventProducer extends AbstractEventProducer {
private final EventConsumer eventConsumer;
private String defaultModule;
private String appName;
public LocalEventProducer(EventConsumer eventConsumer,
String defaultModule,
String appName,
Context<LocalMessageMeta> defaultMeta) {
super(defaultMeta);
this.defaultModule = defaultModule;
this.appName = appName;
this.eventConsumer = eventConsumer;
}
@Override
public BiConsumer<Event, Context<LocalMessageMeta>> getSender() {
return (event, context) -> {
if (eventConsumer == null) {
log.info("====IGNORE even since having no eventconsumer{}", event);
return;
}
if (Strings.isNullOrEmpty(event.getEventModule())) {
event.setEventModule(defaultModule);
}
if (Strings.isNullOrEmpty(event.getEventId())) {
event.setEventId(String.format("%s_%s", appName, IdHelper.get32UUID()));
}
event.setSchemaHash(IdHelper.get32UUID());
event.check();
Map<String, byte[]> headers = Maps.newHashMap();
if (!CollectionUtils.isEmpty(context.getHeaders())) {
context.getHeaders().entrySet()
.forEach(entry -> headers.put(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8)));
}
// 将eventCode放入header以便快速过滤自己不关注的信息
headers.put("eventCode", event.getEventCode().toString().getBytes());
new Thread(() -> {
// 模拟真实 kafka 的消息延迟这里做了短暂延迟
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
eventConsumer.onEvent(event.toJsonString(), EventConsumer.Context.builder()
.eventCode(event.getEventCode()).headers(headers).ext(ImmutableMap.of()).build());
}).start();
};
}
@NoArgsConstructor
@Getter
@ToString
public static class LocalMessageMeta {
}
}

View File

@ -0,0 +1,160 @@
package cn.axzo.framework.rocketmq;
import cn.axzo.framework.domain.data.IdHelper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import java.util.Optional;
import java.util.function.BiConsumer;
@Slf4j
public class RocketMQEventProducer extends AbstractEventProducer {
private RocketMQTemplate rocketMQTemplate;
private String defaultModule;
private String appName;
/**
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
*/
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback;
public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate,
String defaultModule,
String appName,
Context<RocketMQMessageMeta> defaultContext,
BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback) {
super(defaultContext);
this.defaultModule = defaultModule;
this.appName = appName;
this.sendCallback = sendCallback;
this.rocketMQTemplate = rocketMQTemplate;
// 设置默认的Destination格式topic:tag
this.rocketMQTemplate.setDefaultDestination(defaultContext.getMeta().buildDestination(StringUtils.EMPTY));
}
@Override
public BiConsumer<Event, Context<RocketMQMessageMeta>> getSender() {
return (event, context) -> {
if (Strings.isNullOrEmpty(event.getEventModule())) {
event.setEventModule(defaultModule);
}
if (Strings.isNullOrEmpty(event.getEventId())) {
event.setEventId(String.format("%s_%s", appName, IdHelper.get32UUID()));
}
event.setSchemaHash(event.buildSchemaHash(context.getMeta().getTopic()));
event.check();
String destination = context.getMeta().buildDestination(event.getEventName());
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(event.toJsonString());
messageBuilder.copyHeaders(context.getHeaders());
// 如果没显式指定消息的业务key则使用targetId作为key
if (!Strings.isNullOrEmpty(event.getTargetId())) {
messageBuilder.setHeaderIfAbsent(MessageConst.PROPERTY_KEYS, event.getTargetId());
}
// 将eventCode放入header以便快速过滤自己不关注的信息
messageBuilder.setHeaderIfAbsent("eventCode", event.getEventCode().toString());
// 同步发送
if (context.getSyncSending()) {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context));
log.info("====MQ PRODUCER SYNC====, context={}, message = {}, queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {
sendCallback.accept(event, context);
}
} catch (Throwable e) {
log.error("send rocketMQ event ERROR-SYNC! event={} context={}", event.toPrettyJsonString(), context, e);
if (context.getExceptionHandler() != null) {
context.getExceptionHandler().accept(ExceptionContext.<RocketMQMessageMeta>builder()
.context(context).event(event).throwable(e).build());
}
// 异常需要抛出便于外部感知
throw new RuntimeException(e);
}
return;
}
// 异步发送
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), getMessageShardingKey(event, context), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("====MQ PRODUCER ASYNC====, context={}, message = {}, queueId = {}",
context, event.toPrettyJsonString(), sendResult.getMessageQueue().getQueueId());
if (sendCallback != null) {
sendCallback.accept(event, context);
}
}
@Override
public void onException(Throwable throwable) {
log.error("send rocketMQ event ERROR-ASYNC! event={} context={}", event.toPrettyJsonString(), context, throwable);
if (context.getExceptionHandler() != null) {
context.getExceptionHandler().accept(ExceptionContext.<RocketMQMessageMeta>builder()
.context(context).event(event).throwable(throwable).build());
}
}
});
};
}
/**
* 根据shardingKey获取MessageQueueSelector
* rocketMQTemplate暂不支持直接设置shardingKey使用此方法一样能实现目的https://github.com/apache/rocketmq-spring/issues/123
*
* @param event
* @param context
*/
private String getMessageShardingKey(Event event, Context<RocketMQMessageMeta> context) {
String shardingKey = Optional.ofNullable(event.getShardingKey()).orElse(context.getMeta().getShardingKey());
if (Strings.isNullOrEmpty(shardingKey)) {
shardingKey = event.getTargetId();
}
if (Strings.isNullOrEmpty(shardingKey)) {
return "";
}
return shardingKey;
}
@NoArgsConstructor
@Getter
@ToString
public static class RocketMQMessageMeta {
private String topic;
/**
* 可选发送消息时可以指定tag以便在消费时过滤
*/
private String tag;
/**
* 可选相同shardingKey的消息会分配到同一个messageQueue以此来保证有序性
*/
private String shardingKey;
@Builder
public RocketMQMessageMeta(String topic, String tag, String shardingKey) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "topic不能为空");
this.topic = topic;
this.tag = tag;
this.shardingKey = shardingKey;
}
private String buildDestination(String eventName) {
return String.format("%s:%s", topic, Strings.nullToEmpty(Optional.ofNullable(tag).orElse(eventName)));
}
}
}

View File

@ -0,0 +1,136 @@
package cn.axzo.framework.rocketmq.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.ObjectSerializer;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.alibaba.fastjson.serializer.SerializeFilter;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.lang.reflect.Type;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class FastjsonUtils {
public final static SerializerFeature[] SERIALIZER_FEATURES = {
SerializerFeature.WriteMapNullValue,
// List字段如果为null,输出为[],而非null
SerializerFeature.WriteNullListAsEmpty,
// 用枚举name()输出
SerializerFeature.WriteEnumUsingName,
// 类中的Get方法对应的Field是transient
SerializerFeature.SkipTransientField,
// 关闭循环引用检查
SerializerFeature.DisableCircularReferenceDetect};
private final static SerializerFeature[] PRETTY_SERIALIZER_FEATURES = {
// List字段如果为null,输出为[],而非null
SerializerFeature.WriteNullListAsEmpty,
// 用枚举name()输出
SerializerFeature.WriteEnumUsingName,
// 类中的Get方法对应的Field是transient
SerializerFeature.SkipTransientField,
// 关闭循环引用检查
SerializerFeature.DisableCircularReferenceDetect,
SerializerFeature.SortField,
SerializerFeature.MapSortField,
};
public final static FastJsonConfig PRETTY_FORMAT_CONFIG = buildPrettyConfig(true);
public final static FastJsonConfig PRETTY_LOG_CONFIG = buildPrettyConfig(false);
public final static SerializeConfig LOCAL_DATE_TIME_SERIALIZE_CONFIG = buildLocalDateTimeSerializeConfig();
public static SerializeConfig buildLocalDateTimeSerializeConfig() {
SerializeConfig config = new SerializeConfig();
buildGlobalSerializers().entrySet()
.forEach(e -> config.put(e.getKey(), e.getValue()));
return config;
}
public static Map<Type, ObjectSerializer> buildGlobalSerializers() {
return new HashMap<>();
// todo
// return Map.of(
// LocalDateTime.class, (serializer, object, fieldName, fieldType, features) -> {
// if (object == null) {
// serializer.out.writeNull();
// return;
// }
// long value = ((LocalDateTime) object).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// serializer.out.writeLong(value);
// },
// LocalDate.class, (serializer, object, fieldName, fieldType, features) -> {
// if (object == null) {
// serializer.out.writeNull();
// return;
// }
// long value = ((LocalDate) object).atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
// serializer.out.writeLong(value);
// });
}
private static FastJsonConfig buildPrettyConfig(boolean needPettyFormat) {
FastJsonConfig fastJsonConfig = new FastJsonConfig();
String datetimePattern = "yyyy-MM-dd HH:mm:ss:SSS";
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(datetimePattern);
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
fastJsonConfig.setCharset(Charsets.UTF_8);
fastJsonConfig.setDateFormat(datetimePattern);
fastJsonConfig.setSerializeFilters(new SerializeFilter[]{});
ArrayList<SerializerFeature> serializerFeatures = Lists.newArrayList(FastjsonUtils.PRETTY_SERIALIZER_FEATURES);
if (needPettyFormat) {
serializerFeatures.add(SerializerFeature.PrettyFormat);
}
fastJsonConfig.setSerializerFeatures(serializerFeatures.stream().toArray(SerializerFeature[]::new));
//需要强制设置LocalDateTime, LocalDate的序列化格式. 避免被DefaultWebMvcConfig覆盖
SerializeConfig config = new SerializeConfig();
config.put(LocalDateTime.class, (serializer, object, fieldName, fieldType, features) -> {
if (object == null) {
serializer.out.writeNull();
return;
}
String value = ((LocalDateTime) object).format(dateTimeFormatter);
serializer.out.writeString(value);
});
config.put(LocalDate.class, (serializer, object, fieldName, fieldType, features) -> {
if (object == null) {
serializer.out.writeNull();
return;
}
String value = ((LocalDate) object).format(dateFormatter);
serializer.out.writeString(value);
});
fastJsonConfig.setSerializeConfig(config);
return fastJsonConfig;
}
public static String toJsonPettyLogString(Object object) {
// XXX: 如果是枚举和日期类型null 仍然会被输出
String res = JSONObject.toJSONString(object, PRETTY_LOG_CONFIG.getSerializeConfig(),
PRETTY_LOG_CONFIG.getSerializeFilters(), PRETTY_LOG_CONFIG.getDateFormat(),
JSON.DEFAULT_GENERATE_FEATURE, PRETTY_LOG_CONFIG.getSerializerFeatures());
return res;
}
public static String toJsonPettyFormatString(Object object) {
// XXX: 如果是枚举和日期类型null 任然会被输出
return JSONObject.toJSONString(object, PRETTY_FORMAT_CONFIG.getSerializeConfig(),
null, PRETTY_FORMAT_CONFIG.getDateFormat(),
JSON.DEFAULT_GENERATE_FEATURE, PRETTY_FORMAT_CONFIG.getSerializerFeatures());
}
}

View File

@ -0,0 +1,43 @@
package cn.axzo.framework.rocketmq.utils;
import com.google.common.base.Strings;
import lombok.experimental.UtilityClass;
import org.slf4j.MDC;
/**
* @author haiyangjin
* @date 2023/7/23
*/
@UtilityClass
public class TraceUtils {
public static final String TRACE_ID = "axzo-trace-id";
/**
* 多设置一个key = TraceId, value为traceId的变量到MDC. 以兼容目前的logback-spring.xml的配置
*/
public static final String TRACE_ID_IN_MDC = "TraceId";
public String getOrCreateTraceId() {
String res = MDC.get(TRACE_ID);
if (Strings.isNullOrEmpty(res)) {
// TODO uuid
// res = UUIDBuilder.generateShortUuid();
MDC.put(TRACE_ID, res);
MDC.put(TRACE_ID_IN_MDC, res);
}
return res;
}
public String getTraceId() {
return MDC.get(TRACE_ID);
}
public void putTraceId(String traceId) {
MDC.put(TRACE_ID, traceId);
MDC.put(TRACE_ID_IN_MDC, traceId);
}
public void removeTraceId() {
MDC.remove(TRACE_ID);
MDC.remove(TRACE_ID_IN_MDC);
}
}

View File

@ -35,6 +35,7 @@
<module>axzo-common-jackson</module>
<module>axzo-common-datas</module>
<module>axzo-common-cache</module>
<module>axzo-common-rocketmq</module>
</modules>
<properties>
@ -138,6 +139,13 @@
<version>${axzo-commons.version}</version>
</dependency>
<!-- common rocketmq-->
<dependency>
<groupId>cn.axzo.framework</groupId>
<artifactId>axzo-common-rocketmq</artifactId>
<version>${axzo-commons.version}</version>
</dependency>
<!--common jackson-->
<dependency>
<groupId>cn.axzo.framework.jackson</groupId>