feat: add

This commit is contained in:
zengxiaobo 2024-04-24 21:32:10 +08:00
parent 3bc130f417
commit 1c3302717b
21 changed files with 2077 additions and 1 deletions

2
.gitignore vendored
View File

@ -46,3 +46,5 @@ build/
/.flattened-pom.xml
/unittest-support-lib/.flattened-pom.xml
/web-support-lib/.flattened-pom.xml
/event-support-lib/.flattened-pom.xml
/gateway-support-lib/.flattened-pom.xml

View File

@ -36,6 +36,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -9,7 +9,8 @@ import lombok.Getter;
public enum ResultCode implements IResultCode {
RUNTIME_EXCEPTION("001", "系统异常,请重试", 500),
INVALID_PARAMS("002", "请求参数格式错误", 400),
NETWORK_FAILURE("X108", "内部网络错误", 500);
NETWORK_FAILURE("003", "内部网络错误", 500),
PROCESS_TIMEOUT("100", "内部处理超时", 500);
final private String code;

View File

@ -0,0 +1,38 @@
package cn.axzo.foundation.util;
import com.google.common.base.Strings;
import lombok.experimental.UtilityClass;
import org.slf4j.MDC;
@UtilityClass
public class TraceUtils {
public static final String TRACE_ID = "axzo-trace-id";
/**
* 多设置一个key = TraceId, value为traceId的变量到MDC. 以兼容目前的logback-spring.xml的配置
*/
public static final String TRACE_ID_IN_MDC = "TraceId";
public String getOrCreateTraceId() {
String res = MDC.get(TRACE_ID);
if (Strings.isNullOrEmpty(res)) {
res = UUIDBuilder.generateShortUuid();
MDC.put(TRACE_ID, res);
MDC.put(TRACE_ID_IN_MDC, res);
}
return res;
}
public String getTraceId() {
return MDC.get(TRACE_ID);
}
public void putTraceId(String traceId) {
MDC.put(TRACE_ID, traceId);
MDC.put(TRACE_ID_IN_MDC, traceId);
}
public void removeTraceId() {
MDC.remove(TRACE_ID);
MDC.remove(TRACE_ID_IN_MDC);
}
}

35
event-support-lib/pom.xml Normal file
View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.axzo.foundation</groupId>
<artifactId>axzo-lib-box</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>event-support-lib</artifactId>
<dependencies>
<dependency>
<groupId>cn.axzo.foundation</groupId>
<artifactId>common-lib</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,16 @@
package cn.axzo.foundation.event.support;
import com.google.common.collect.ImmutableList;
import java.util.List;
public abstract class AbstractDiffablePayload<T> implements DiffablePayload {
protected abstract T getOldValue();
protected abstract T getNewValue();
@Override
public List<DiffMeta> listDiffMetas() {
return ImmutableList.of(DiffMeta.DEFAULT);
}
}

View File

@ -0,0 +1,55 @@
package cn.axzo.foundation.event.support;
import cn.axzo.foundation.event.utils.PayloadUtils;
import com.google.common.collect.ImmutableSet;
import lombok.*;
import java.io.Serializable;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 支持打印事件payload的diff日志
* 实现该接口的payload, 在发送或收到事件时将根据{@link #listDiffMetas}返回的meta信息打印diff日志
*/
public interface DiffablePayload extends Serializable {
/**
* 获取payload对比需要的meta信息. 提供payload中新老数据字段的字段名, 打印时需要忽略的字段等
*
* @return
*/
List<DiffMeta> listDiffMetas();
String DIFF_META_HEADER = "diff-metas";
@Data
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
class DiffMeta {
@NonNull
private String oldValueKey;
@NonNull
private String newValueKey;
private Set<String> ignoreKeys;
private Set<String> ignoreKeyPatterns;
public static DiffMeta DEFAULT = DiffMeta.builder()
.oldValueKey("oldValue")
.newValueKey("newValue")
.ignoreKeys(ImmutableSet.of("modifyTime"))
.build();
public PayloadDifferentiator toDifferentiator() {
return PayloadUtils.buildDifferentiator(oldValueKey, newValueKey, ignoreKeys, ignoreKeyPatterns);
}
public static PayloadDifferentiator toDifferentiator(List<DiffMeta> diffMetas) {
return PayloadUtils.buildDifferentiator(diffMetas.stream()
.collect(Collectors.toMap(m -> String.format("[%s->%s]", m.getOldValueKey(), m.getNewValueKey()),
DiffMeta::toDifferentiator)));
}
}
}

View File

@ -0,0 +1,202 @@
package cn.axzo.foundation.event.support;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.*;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Optional;
/**
* 统一系统事件对象.
* 消息格式如下
* {
* "event_scene":"teaching",
* "event_module":"im",
* "operator_id":student_id,
* "operator_type":"student",
* "target_id":conversation_id,
* "target_type":"conversation",
* "event_name":"message_sent",
* "event_time":"2019-07-22 16:10:10.666",
* "data":{"message_id":"xxx","tags":"xxx"} //消息ID,消息标签
* "event_id": "0101_xxx" //消息id
* "data_schema_hash": "xxxx" //数据的schema hash
* }
*/
@Data
@AllArgsConstructor
public class Event implements Serializable {
/**
* 唯一标识.唯一区分一条事件.可以做幂等处理. 默认是app_id + uuid
*/
private String eventId;
private String eventVersion;
private String eventModule;
private String eventName;
private String operatorId = "system";
private String operatorType = "system";
private String targetId;
private String targetType;
private Long eventTime;
private String shardingKey;
/**
* 序列化时将该字段放在最后. 避免遮挡其他关键字段
*/
@JSONField(ordinal = Integer.MAX_VALUE)
private Serializable data;
private EventCode eventCode;
/**
* 提供默认构造函数仅用于json反序列化时使用<br>
* <b>不建议直接使用默认构造函数进行event的构建构建event请使用Event.builder().xx(xx).build();</b>
* XXX: 此处不使用@NoArgsConstrutor是因为其于@Builder.Default会产生冲突;
*/
public Event() {
}
@Builder
public Event(String operatorId, String operatorType,
String targetId, String targetType, Serializable data, EventCode eventCode, String shardingKey) {
if (eventCode != null) {
this.eventModule = eventCode.getModule();
this.eventName = eventCode.getName();
this.eventCode = eventCode;
}
// 如果eventCode和单个的eventNameeventSceneeventModule都传入了优先去后面单个传入的参数
Optional.ofNullable(eventModule).ifPresent(module -> this.eventModule = module);
Optional.ofNullable(eventName).ifPresent(name -> this.eventName = name);
this.operatorId = Optional.ofNullable(operatorId).orElse("system");
this.operatorType = Optional.ofNullable(operatorType).orElse("system");
this.data = Optional.ofNullable(data).orElseGet(HashMap::new);
this.targetId = targetId;
this.targetType = targetType;
this.eventTime = System.currentTimeMillis();
this.shardingKey = shardingKey;
}
public String toJsonString() {
if (FastjsonUtils.getFastjsonVersion() == FastjsonUtils.FastjsonVersion.V1) {
return JSONObject.toJSONString(this, FastjsonUtils.LOCAL_DATE_TIME_SERIALIZE_CONFIG, FastjsonUtils.SERIALIZER_FEATURES);
} else {
return JSONObject.toJSONString(this, FastjsonUtils.SERIALIZER_FEATURES);
}
}
/**
* json 的序列化为人眼识别做特殊处理
* 1. key 排序
* 2. 过滤空的 value
* 3. 格式化日期
*
* @return
*/
public String toPrettyJsonString() {
return FastjsonUtils.toJsonPettyLogString(this);
}
@Override
public String toString() {
return toPrettyJsonString();
}
/**
* 该方法返回一个字符串该字符串可用于唯一标注一种类型的消息<br>
* 对于kafka可以将该值用于消息的key<br>
* 对于rabbit可以将该值用于消息的routingKey<br>
* 以方便消费方快速获取消息类型
*
* @return
*/
@JSONField(serialize = false)
public EventCode getEventCode() {
if (eventCode != null) {
return eventCode;
}
return new EventCode(eventModule, eventName);
}
/**
* 发送前对必填参数做校验
*
* @throws BusinessException
*/
public void check() throws BusinessException {
Preconditions.checkArgument(!Strings.isNullOrEmpty(eventModule), "发送消息 -> 事件模块不能为空");
Preconditions.checkArgument(!Strings.isNullOrEmpty(eventName), "发送消息 -> 事件名字不能为空");
}
/**
* 将Object类型的data转为指定类型对象
*
* @param tClass
* @param <T>
* @return
*/
public <T> T normalizedData(Class<T> tClass) {
return JSON.parseObject(JSON.toJSONString(data), tClass);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
@EqualsAndHashCode(exclude = {"differentiator"})
public static class EventCode {
public static final String SEPARATOR = ":";
@NonNull
private String module;
@NonNull
private String name;
@JSONField(serialize = false)
private PayloadDifferentiator differentiator;
@Override
public String toString() {
return module + SEPARATOR + name;
}
public EventCode(String module, String name) {
this(module, name, null);
}
public static EventCode from(String text) {
String[] segments = StringUtils.split(text, SEPARATOR);
Preconditions.checkArgument(segments.length == 2);
return new EventCode(segments[0], segments[1]);
}
public Optional<String> diffPayload(JSONObject payload) {
try {
return Optional.ofNullable(differentiator).map(e -> e.buildDiffText(payload));
} catch (Exception e) {
// 做一个容错处理, 防止 diff 异常导致日志发送失败.
return Optional.of("构建 diff 失败. " + e.getMessage());
}
}
}
}

View File

@ -0,0 +1,19 @@
package cn.axzo.foundation.event.support;
import cn.axzo.foundation.event.support.consumer.EventConsumer;
import java.util.List;
public interface EventHandler {
void onEvent(Event event, EventConsumer.Context context);
/**
* 批量处理事件
* @param events
* @param context
*/
default void onEvents(List<Event> events, EventConsumer.Context context){
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,238 @@
package cn.axzo.foundation.event.support;
import cn.axzo.foundation.event.support.consumer.EventConsumer;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* 事件处理器仓库
* 可以在多个地方使用.
*/
@Slf4j
public class EventHandlerRepository {
final protected ListMultimap<Event.EventCode, EventHandlerHolder> handlers = ArrayListMultimap.create();
private final BiConsumer<Exception, String> exceptionHandler;
private AntPathMatcher antPathMatcher;
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler) {
this(exceptionHandler, false);
}
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler, boolean supportPattern) {
this.exceptionHandler = exceptionHandler;
if (supportPattern) {
antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR);
antPathMatcher.setCachePatterns(true);
}
}
public EventHandlerRepository() {
this(null);
}
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) {
Objects.requireNonNull(eventCode);
Objects.requireNonNull(eventHandler);
handlers.put(eventCode, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build());
return this;
}
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name) {
Objects.requireNonNull(eventHandler);
eventCodes.forEach(e -> handlers.put(e, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build()));
return this;
}
public boolean canHandle(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.containsKey(eventCode);
}
return handlers.keySet().stream()
.anyMatch(key -> antPathMatcher.match(key.toString(), eventCode.toString()));
}
/**
* 处理事件
*
* @param event 事件
* @return 如果有注册的handler处理事件返回true, 否则false
*/
public boolean process(Event event, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
Event.EventCode eventCode = event.getEventCode();
List<EventHandlerHolder> eventHandlers = getEventHandlers(eventCode);
Map<String, Long> handleCosts = Maps.newHashMap();
PayloadDifferentiator differentiator = null;
if (!CollectionUtils.isEmpty(context.getHeaders())
&& context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) {
byte[] diffMetaHeader = context.getHeaders().get(DiffablePayload.DIFF_META_HEADER);
List<DiffablePayload.DiffMeta> diffMetas = JSON.parseObject(
new String(diffMetaHeader, Charsets.UTF_8), new TypeReference<List<DiffablePayload.DiffMeta>>() {
});
differentiator = DiffablePayload.DiffMeta.toDifferentiator(diffMetas);
}
if (differentiator == null) {
// 如果header中没有meta从注册的handler中获取differentiator
differentiator = handlers.keys().stream()
.filter(e -> e.equals(eventCode))
.map(Event.EventCode::getDifferentiator)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
String payloadDiffLog = Optional.ofNullable(differentiator)
.map(d -> eventCode.toBuilder().differentiator(d).build())
.flatMap(e -> e.diffPayload((JSONObject) JSON.toJSON(event.getData())))
.map(diff -> "payloadDiff = " + StringUtils.left(diff, 1024))
.orElse(StringUtils.EMPTY);
String eventLogText = event.toPrettyJsonString();
eventHandlers.forEach(handler -> {
try {
stopwatch.start();
handler.getEventHandler().onEvent(event, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
// 为了避免太多日志输出只有处理时间超过
if (elapsed > context.getLogElapsedThreshold()) {
handleCosts.put(handler.getName(), elapsed);
}
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s",
context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg);
}
} catch (BusinessException ex) {
log.warn("==MQ CONSUMER {}==, handle biz failed, handler = {}, {}\nevent = {}",
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex);
handleException(ex, event.toPrettyJsonString());
} catch (Exception ex) {
log.error("==MQ CONSUMER {}==, handle failed, handler = {}, {}\nevent = {}",
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex);
handleException(ex, event.toPrettyJsonString());
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
if (context.getLogEnabled(event)) {
log.info("==MQ CONSUMER {}==, timeCosts = {}, {}\n{}",
context.getSummary(), JSON.toJSONString(handleCosts), payloadDiffLog, eventLogText);
}
return !eventHandlers.isEmpty();
}
/**
* 批量处理多条事件
*
* @param events
* @param context
* @return
*/
public boolean batch(List<Event> events, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
List<EventHandlerHolder> eventHandlers = getEventHandlers(context.getEventCode());
eventHandlers.stream().forEach(handler -> {
try {
stopwatch.start();
String clazzName = handler.getClass().getCanonicalName();
log.info("====MQ CONSUMER BATCH====, start handling by {}", clazzName);
handler.getEventHandler().onEvents(events, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed);
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("take too long %d millis for %s to handle %s",
elapsed, clazzName, events);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg);
}
} catch (BusinessException ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex);
handleException(ex, payloads);
} catch (Exception ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex);
handleException(ex, payloads);
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
return !eventHandlers.isEmpty();
}
public boolean isHandled(Event event) {
Preconditions.checkArgument(event != null);
Preconditions.checkArgument(event.getEventCode() != null);
return canHandle(event.getEventCode());
}
private void handleException(Exception ex, String msg) {
if (exceptionHandler != null) {
exceptionHandler.accept(ex, msg);
}
}
private List<EventHandlerHolder> getEventHandlers(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.get(eventCode);
}
// 支持pattern的时候返回所有匹配的Handlers
return handlers.keySet().stream()
.filter(key -> antPathMatcher.match(key.toString(), eventCode.toString()))
.flatMap(key -> handlers.get(key).stream())
.collect(Collectors.toList());
}
private boolean isSupportPattern() {
return antPathMatcher != null;
}
@Data
private static class EventHandlerHolder {
String name;
EventHandler eventHandler;
@Builder
public EventHandlerHolder(String name, EventHandler eventHandler) {
this.name = Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName());
this.eventHandler = eventHandler;
}
}
}

View File

@ -0,0 +1,22 @@
package cn.axzo.foundation.event.support;
import com.alibaba.fastjson.JSONObject;
import lombok.NonNull;
/**
* 接口返回一个 event payload 中变化的部分文本, 作为事件日志的日志输出
* 通常在 event payload 中会定义 oldValue, newValue, 通过对比 oldValue newValue的值我们可以知道
* event payload 发生了什么变化.
* 这是一个可选接口. 如果event 没有实现, 不会影响日志输出.
*/
@FunctionalInterface
public interface PayloadDifferentiator {
/**
* 构建 payload diff 文本方便日志输出
* 建议使用 FastjsonUtils.diffJson() 产生 diff 数据
* @param payload
* @return diff 文本.
* @see com.fiture.bfs.utils.FastjsonUtils.diffJson()
*/
String buildDiffText(@NonNull JSONObject payload);
}

View File

@ -0,0 +1,124 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* 消费消息用法如下
* <pre>
* // EVENT_TOPIC_KEY = "topic"
* &#064;Bean
* EventConsumer eventConsumer(AppRuntime appRuntime, EventHandlerRepository eventHandlerRepository, ApiStatsClient apiStatsClient) {
* Consumer<EventConsumer.EventWrapper> callback = (eventWrapper) -> {
* if (eventWrapper.isHandled()) {
* // 只收集被App真正消费的消息.
* String topic = (String)eventWrapper.getExt().get(EVENT_TOPIC_KEY);
* apiStatsClient.reportConsumedEvent(eventWrapper.getEvent(), topic);
* }
* };
* return new DefaultEventConsumer(appRuntime.getAppName(), eventHandlerRepository, callback);
* }
* </pre>
*/
@Slf4j
public class DefaultEventConsumer implements EventConsumer {
private final EventHandlerRepository handlerRepository;
private final String appName;
private final Consumer<EventWrapper> consumeCallback;
public DefaultEventConsumer(String appName, EventHandlerRepository handlerRepository, Consumer<EventWrapper> consumeCallback) {
this.handlerRepository = handlerRepository;
this.appName = appName;
this.consumeCallback = consumeCallback;
}
@Override
public boolean onEvent(String message, Context context) {
Event event;
//如果header中没有eventCode. 则尝试从body里面获取
if (context.getEventCode() == null) {
try {
JSONObject evenJSON = JSONObject.parseObject(message);
context.setEventCode(new Event.EventCode(evenJSON.getString("eventModule"), evenJSON.getString("eventName")));
} catch (Exception ex) {
//ignore. 忽略message不是JSON的情况
}
}
// 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false
if (!handlerRepository.canHandle(context.getEventCode())) {
// ignore due to no registering handler
return false;
}
try {
event = JSONObject.parseObject(message, Event.class);
} catch (Exception e) {
log.error("==MQ CONSUMER {}==, parse event error, event = {}",
context.getSummary(), message, e);
return false;
}
Boolean handled = false;
try {
handled = handlerRepository.process(event, context);
} catch (Exception ex) {
//如果是继承自runtime. 直接抛出. 否则包装为runtime
Throwables.throwIfUnchecked(ex);
throw new RuntimeException("process event fail", ex);
}
if (consumeCallback != null) {
consumeCallback.accept(EventWrapper.builder()
.event(event)
.consumer(this)
.isHandled(handled)
.ext(context.getExt())
.context(context)
.build());
}
return handled;
}
@Override
public boolean onEvents(List<BatchEvent> events) {
// 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false
log.info("====MQ CONSUMER BATCH===={}, events={}", appName, FastjsonUtils.toJsonPettyLogString(events));
Map<Event.EventCode, List<Event>> eventGroup = events.stream().filter(e -> handlerRepository.canHandle(e.getContext().getEventCode()))
.map(BatchEvent::toEvent).collect(Collectors.groupingBy(Event::getEventCode));
if (eventGroup.isEmpty()) {
return false;
}
for (final Map.Entry<Event.EventCode, List<Event>> entry : eventGroup.entrySet()) {
handlerRepository.batch(entry.getValue(),
Context.builder()
.eventCode(entry.getKey())
.maxAllowElapsedMillis(TimeUnit.MINUTES.toMillis(1L))
.build());
}
return true;
}
@Override
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) {
return handlerRepository.registerHandler(eventCode, eventHandler, name);
}
@Override
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name) {
return handlerRepository.registerHandlers(eventCodes, eventHandler, name);
}
}

View File

@ -0,0 +1,181 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.event.support.EventHandlerRepository;
import cn.axzo.foundation.event.support.producer.EventProducer;
import cn.axzo.foundation.util.TraceUtils;
import cn.axzo.foundation.util.UUIDBuilder;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import lombok.*;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public interface EventConsumer {
/**
* 注册EventHandler
*
* @param eventCode event类型的唯一标示
* @param eventHandler eventHandler
* @param name handler别名
* @return EventHandlerRepository
*/
EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name);
/**
* 为多个eventCodes注册一个eventHandler
*
* @param eventCodes list of eventCode
* @param eventHandler eventHandler
* @param name handler别名
* @return EventHandlerRepository
*/
EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name);
/**
* /**
* message的处理方法
*
* @param message message
* @return 如果事件被handler处理返回true, 否则返回false
* @Param ext 扩展信息.
*/
boolean onEvent(String message, Context context);
/**
* /**
* message的批量处理方法
*
* @param batchEvents message
* @return 如果事件被handler处理返回true, 否则返回false
* @Param ext 扩展信息.
*/
default boolean onEvents(List<BatchEvent> batchEvents) {
throw new UnsupportedOperationException();
}
@Builder
@Getter
class EventWrapper {
private Event event;
private EventConsumer consumer;
private boolean isHandled;
private Map<String, Object> ext;
private Context context;
}
@Getter
@NoArgsConstructor
class Context {
@Setter
private Event.EventCode eventCode;
private String traceId;
private String msgId;
/**
* 只elapse 超过多少毫秒才进行日志记录默认 3ms
*/
private Long logElapsedThreshold;
private Map<String, byte[]> headers;
private Map<String, Object> ext;
private Long maxAllowElapsedMillis;
@Getter(AccessLevel.NONE)
private Boolean logEnabled;
@Getter(AccessLevel.NONE)
private transient Predicate<Event> logFilter;
/**
* 返回消息消费者offset与全部消息offset之间的差.
*/
private transient Supplier<Long> lagSupplier;
@Builder
public Context(Event.EventCode eventCode, String msgId, Long logElapsedThreshold, Map<String, byte[]> headers, Map<String, Object> ext,
Long maxAllowElapsedMillis, Boolean logEnabled, Predicate<Event> logFilter, Supplier<Long> lagSupplier) {
this.eventCode = eventCode;
this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L);
this.headers = headers;
this.ext = ext;
this.msgId = Strings.nullToEmpty(msgId);
this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L);
this.logEnabled = logEnabled;
this.logFilter = logFilter;
this.lagSupplier = lagSupplier;
if (!CollectionUtils.isEmpty(headers) && headers.containsKey(TraceUtils.TRACE_ID)) {
this.traceId = new String(headers.get(TraceUtils.TRACE_ID), Charsets.UTF_8);
}
if (Strings.isNullOrEmpty(this.traceId)) {
this.traceId = UUIDBuilder.generateShortUuid();
}
TraceUtils.putTraceId(this.traceId);
}
public Event.EventCode getEventCode() {
if (eventCode == null) {
eventCode = headers.containsKey("eventCode")
? Event.EventCode.from(new String(headers.get("eventCode"), Charsets.UTF_8))
: null;
}
return eventCode;
}
public String getTraceId() {
if (Strings.isNullOrEmpty(traceId)) {
// 兼容老的实现
traceId = UUIDBuilder.generateShortUuid();
TraceUtils.putTraceId(traceId);
}
return traceId;
}
public boolean getLogEnabled(Event event) {
// consumer显式声明了日志开关则直接使用
if (logEnabled != null) {
return logEnabled && (logFilter == null || logFilter.apply(event));
}
// consumer未显式声明日志开关时根据event的header中的disableLog=true来决定是否关闭日志
boolean disableLog = BooleanUtils.toBoolean(new String(Optional.ofNullable(headers).orElse(Collections.emptyMap())
.getOrDefault(EventProducer.Context.DISABLE_LOG_HEADER, "".getBytes())));
if (disableLog) {
return false;
}
return logFilter == null || logFilter.apply(event);
}
public String getSummary() {
return String.format("%s %s %s", getEventCode(), getMsgId(), getTraceId());
}
}
@Builder
@Getter
class BatchEvent {
private String message;
private Context context;
public Event.EventCode getEventCode() {
return context.getEventCode();
}
public Event toEvent() {
try {
return JSONObject.parseObject(message, Event.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,242 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.DiffablePayload;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.event.support.PayloadDifferentiator;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* 事件处理器仓库
* 可以在多个地方使用.
*/
@Slf4j
public class EventHandlerRepository {
final protected ListMultimap<Event.EventCode, EventHandlerHolder> handlers = ArrayListMultimap.create();
private final BiConsumer<Exception, String> exceptionHandler;
private AntPathMatcher antPathMatcher;
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler) {
this(exceptionHandler, false);
}
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler, boolean supportPattern) {
this.exceptionHandler = exceptionHandler;
if (supportPattern) {
antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR);
antPathMatcher.setCachePatterns(true);
}
}
public EventHandlerRepository() {
this(null);
}
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) {
Objects.requireNonNull(eventCode);
Objects.requireNonNull(eventHandler);
handlers.put(eventCode, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build());
return this;
}
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name) {
Objects.requireNonNull(eventHandler);
eventCodes.forEach(e -> handlers.put(e, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build()));
return this;
}
public boolean canHandle(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.containsKey(eventCode);
}
return handlers.keySet().stream()
.anyMatch(key -> antPathMatcher.match(key.toString(), eventCode.toString()));
}
/**
* 处理事件
*
* @param event 事件
* @return 如果有注册的handler处理事件返回true, 否则false
*/
public boolean process(Event event, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
Event.EventCode eventCode = event.getEventCode();
List<EventHandlerHolder> eventHandlers = getEventHandlers(eventCode);
Map<String, Long> handleCosts = Maps.newHashMap();
PayloadDifferentiator differentiator = null;
if (!CollectionUtils.isEmpty(context.getHeaders())
&& context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) {
byte[] diffMetaHeader = context.getHeaders().get(DiffablePayload.DIFF_META_HEADER);
List<DiffablePayload.DiffMeta> diffMetas = JSON.parseObject(
new String(diffMetaHeader, Charsets.UTF_8), new TypeReference<List>() {
});
differentiator = DiffablePayload.DiffMeta.toDifferentiator(diffMetas);
}
if (differentiator == null) {
// 如果header中没有meta从注册的handler中获取differentiator
differentiator = handlers.keys().stream()
.filter(e -> e.equals(eventCode))
.map(Event.EventCode::getDifferentiator)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
String payloadDiffLog = Optional.ofNullable(differentiator)
.map(d -> eventCode.toBuilder().differentiator(d).build())
.flatMap(e -> e.diffPayload((JSONObject) JSON.toJSON(event.getData())))
.map(diff -> "payloadDiff = " + StringUtils.left(diff, 1024))
.orElse(StringUtils.EMPTY);
String eventLogText = event.toPrettyJsonString();
eventHandlers.forEach(handler -> {
try {
stopwatch.start();
handler.getEventHandler().onEvent(event, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
// 为了避免太多日志输出只有处理时间超过
if (elapsed > context.getLogElapsedThreshold()) {
handleCosts.put(handler.getName(), elapsed);
}
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s",
context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg);
}
} catch (BusinessException ex) {
log.warn("==MQ CONSUMER {}==, handle biz failed, handler = {}, {}\nevent = {}",
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex);
handleException(ex, event.toPrettyJsonString());
} catch (Exception ex) {
log.error("==MQ CONSUMER {}==, handle failed, handler = {}, {}\nevent = {}",
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex);
handleException(ex, event.toPrettyJsonString());
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
if (context.getLogEnabled(event)) {
log.info("==MQ CONSUMER {}==, timeCosts = {}, {}\n{}",
context.getSummary(), JSON.toJSONString(handleCosts), payloadDiffLog, eventLogText);
}
return !eventHandlers.isEmpty();
}
/**
* 批量处理多条事件
*
* @param events
* @param context
* @return
*/
public boolean batch(List<Event> events, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
List<EventHandlerHolder> eventHandlers = getEventHandlers(context.getEventCode());
eventHandlers.stream().forEach(handler -> {
try {
stopwatch.start();
String clazzName = handler.getClass().getCanonicalName();
log.info("====MQ CONSUMER BATCH====, start handling by {}", clazzName);
handler.getEventHandler().onEvents(events, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed);
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("take too long %d millis for %s to handle %s",
elapsed, clazzName, events);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg);
}
} catch (BusinessException ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex);
handleException(ex, payloads);
} catch (Exception ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex);
handleException(ex, payloads);
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
return !eventHandlers.isEmpty();
}
public boolean isHandled(Event event) {
Preconditions.checkArgument(event != null);
Preconditions.checkArgument(event.getEventCode() != null);
return canHandle(event.getEventCode());
}
private void handleException(Exception ex, String msg) {
if (exceptionHandler != null) {
exceptionHandler.accept(ex, msg);
}
}
private List<EventHandlerHolder> getEventHandlers(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.get(eventCode);
}
// 支持pattern的时候返回所有匹配的Handlers
return handlers.keySet().stream()
.filter(key -> antPathMatcher.match(key.toString(), eventCode.toString()))
.flatMap(key -> handlers.get(key).stream())
.collect(Collectors.toList());
}
private boolean isSupportPattern() {
return antPathMatcher != null;
}
@Data
private static class EventHandlerHolder {
String name;
EventHandler eventHandler;
@Builder
public EventHandlerHolder(String name, EventHandler eventHandler) {
this.name = Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName());
this.eventHandler = eventHandler;
}
}
}

View File

@ -0,0 +1,189 @@
package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.DiffablePayload;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.PayloadDifferentiator;
import cn.axzo.foundation.util.TraceUtils;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.messaging.MessageHeaders;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.MimeTypeUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
/**
* 默认EventProducer抽象里事件的组装和事务后发送的逻辑提供一个Sender具体服务具体提供
*/
@Slf4j
public abstract class AbstractEventProducer<Meta> implements EventProducer<Meta> {
private final AfterCommitExecutorImpl afterCommitExecutor;
/**
* @return 返回发送消息的发送器
*/
public abstract BiConsumer<Event, Context<Meta>> getSender();
/**
* @return 返回发送binary data消息的发送器
*/
protected BiConsumer<BinaryEvent, Context<Meta>> getBinaryEventSender() {
throw new UnsupportedOperationException("不支持发送binary消息");
}
private Context defaultContext;
public AbstractEventProducer(Context defaultContext) {
this.afterCommitExecutor = new AfterCommitExecutorImpl();
this.defaultContext = defaultContext;
}
@Override
public void send(@NonNull Event event, @NonNull Context context) {
// XXX不要在send的时候修改event的值有副作用
// 例如当将同一个event发送到不同的topic的时候buildSchemaHash会用不同的topic赋值两次导致一些异常case
Event copiedEvent = Event.builder().build();
BeanUtils.copyProperties(event, copiedEvent);
if (Strings.isNullOrEmpty(copiedEvent.getTargetId())) {
log.warn("targetId of event is black, best practice of targetId is present, event = {}", event.toJsonString());
}
if (copiedEvent.getData() == null) {
log.warn("data of event is empty, best practice of data must present, event = {}", event.toJsonString());
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventModule()), "eventModule不能为空");
Preconditions.checkArgument(!Strings.isNullOrEmpty(copiedEvent.getEventName()), "eventName不能为空");
// 复制一份 context并加入链路跟踪信息 traceId
HashMap newHeaders = Maps.newHashMap(Optional.ofNullable(context.getHeaders()).orElse(ImmutableMap.of()));
String traceId = TraceUtils.getOrCreateTraceId();
newHeaders.put(TraceUtils.TRACE_ID, traceId);
if (event.getData() instanceof DiffablePayload) {
List<DiffablePayload.DiffMeta> diffMetas = ((DiffablePayload) event.getData()).listDiffMetas();
PayloadDifferentiator differentiator = DiffablePayload.DiffMeta.toDifferentiator(diffMetas);
copiedEvent.setEventCode(event.getEventCode().toBuilder().differentiator(differentiator).build());
newHeaders.put(DiffablePayload.DIFF_META_HEADER, JSON.toJSONString(diffMetas));
}
final Context copiedContext = context.toBuilder().traceId(traceId).headers(newHeaders).build();
Runnable runnable = () -> {
try {
getSender().accept(copiedEvent, copiedContext);
} catch (Exception e) {
log.error("==MQ PRODUCER error {} {} ==, context={}, message = {}", event.getEventCode().toString(), traceId,
copiedContext, copiedEvent.toPrettyJsonString(), e);
throw e;
}
};
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
afterCommitExecutor.execute(runnable);
}
}
@Override
public void send(@NonNull Event event) {
send(event, defaultContext);
}
@Override
public void send(BinaryEvent binaryEvent, Context<Meta> context) {
// 复制一份 context并加入链路跟踪信息 traceId
HashMap newHeaders = Maps.newHashMap(Optional.ofNullable(context.getHeaders()).orElse(ImmutableMap.of()));
String traceId = TraceUtils.getOrCreateTraceId();
newHeaders.put(TraceUtils.TRACE_ID, traceId);
// header中增加类型octet-stream
newHeaders.put(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_OCTET_STREAM_VALUE);
final Context copiedContext = context.toBuilder().traceId(traceId).headers(newHeaders).build();
Runnable runnable = () -> {
try {
getBinaryEventSender().accept(binaryEvent, copiedContext);
} catch (Exception e) {
log.error("==MQ PRODUCER binary event error ==, context={}, binaryEvent = {}",
copiedContext, binaryEvent, e);
throw e;
}
};
if (copiedContext.isTransactional()) {
// https://www.jianshu.com/p/59891ede5f90
runnable.run();
} else {
// 并发会导致事件时序出现问题. 所以串行执行
afterCommitExecutor.execute(runnable);
}
}
/**
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
* 保证在交易结束后被调用.
*/
private static class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
public void execute(Runnable runnable) {
if (log.isDebugEnabled()) {
log.debug("Submitting new runnable {} to run after commit", runnable);
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
if (log.isDebugEnabled()) {
log.debug("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
}
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
if (log.isDebugEnabled()) {
log.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
}
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
if (log.isDebugEnabled()) {
log.debug("Executing runnable {}", runnable);
}
try {
runnable.run();
} catch (RuntimeException e) {
log.error("Failed to execute runnable " + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
if (log.isDebugEnabled()) {
log.debug("Transaction completed with status {}", status == TransactionSynchronization.STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
}
RUNNABLES.remove();
}
}
}

View File

@ -0,0 +1,150 @@
package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding;
import lombok.*;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
public interface EventProducer<Meta> {
/**
* 发送event
*
* @param event 事件主体
*/
void send(Event event, Context<Meta> context);
/**
* 发送eventcontext信息使用默认的context
*
* @param event
*/
void send(Event event);
/**
* 发送binary data的event
*
* @param binaryEvent
* @param context
*/
default void send(BinaryEvent binaryEvent, Context<Meta> context) {
throw new UnsupportedOperationException("不支持发送binary消息");
}
@Getter
@ToString
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
class Context<Meta> {
/**
* header中传递该值建议调用方关闭当前事件的log打印一般是数量较多的事件
*/
public static final String DISABLE_LOG_HEADER = "disableLog";
/**
* 存储发送消息需要的一些元数据, 例如rabbitMq的exchange, routingKey等
*/
private Meta meta;
/**
* 指定的schemahash. 如果context指定了schemahash, 就不会再计算每个event的schemaHash.
* 解决计算schemahash需要消耗资源问题.
*/
String schemaHash;
/**
* 发送消息需要附加的请求头
*/
private Map<String, String> headers;
/**
* 接受发送过程中的异常
*/
private transient Consumer<ExceptionContext> exceptionHandler;
/** 直接使用JSONObject发送会自动添加 Content-Type=application/json 然后payload不会进行JSON.toJSONString的操作 */
private Boolean sendAsJSONObject;
/**
* 同步发送默认 异步
*/
private Boolean syncSending;
public Boolean getSyncSending() {
return Optional.ofNullable(syncSending).orElse(Boolean.TRUE);
}
/**
* 是否使用 transactional 的方式发送查看
* https://www.jianshu.com/p/59891ede5f90
* 如果是 true调用者需要做如下改造
* <ul>
* <li>spring.kafka.producer.transaction-id-prefix</li>
* <li>调用函数需要添加@Transactional</li>
* </ul>
*/
private boolean transactional;
/**
* 跟踪消息链路 id
*/
private String traceId;
/**
* 在header中声明该event关闭log打印一般用于数量太多的event避免被log刷屏执行该方法后
* 1. producer{@link RocketMQEventProducer}中将不再打印该event的发送日志
* 2. consumer{@link DefaultEventConsumer}中默认不打印该日志除非consumer中显式声明日志开启{@link DefaultEventConsumer.Context#logEnabled}
*/
public <Meta> Context<Meta> disableLog() {
if (this.headers == null) {
this.headers = Maps.newHashMap();
}
this.headers.put(DISABLE_LOG_HEADER, Boolean.TRUE.toString());
return (Context<Meta>) this;
}
public Boolean logEnabled() {
return headers == null || headers.isEmpty() || !BooleanUtils.toBoolean(headers.getOrDefault(DISABLE_LOG_HEADER, ""));
}
}
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Builder
class ExceptionContext<Meta> {
Context<Meta> context;
Throwable throwable;
Event event;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class BinaryEvent {
private byte[] data;
private String shardingKey;
public String getShardingKey() {
if (Strings.isNullOrEmpty(shardingKey)) {
return StringUtils.EMPTY;
}
return shardingKey;
}
@Override
public String toString() {
return String.format("data=%s, shardingKey=%s", BaseEncoding.base16().encode(data), shardingKey);
}
}
}

View File

@ -0,0 +1,139 @@
package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.util.UUIDBuilder;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.CollectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* 发送消息用法如下
* <pre>
* &#064;Bean
* EventProducer eventProducer(AppRuntime appRuntime, KafkaTemplate kafkaTemplate, TaskExecutor taskExecutor, ApiStatsClient apiStatsClient) {
* BiConsumer<Event, KafkaEventProducer.KafkaMessageMeta> callback = (event, meta) -> {
* // 收集事件的元数据和发送次数做治理.
* apiStatsClient.reportProducedEvent(event, meta.getTopic());
* };
* return new KafkaEventProducer(kafkaTemplate, DEFAULT_EVENT_MODULE, DEFAULT_EVENT_SCENE, appRuntime.getAppId(),
* KafkaEventProducer.KafkaMessageMeta.build(DEFAULT_TOPIC), taskExecutor, callback);
* }
* </pre>
*/
@Slf4j
public class KafkaEventProducer extends AbstractEventProducer {
private KafkaTemplate kafkaTemplate;
private String defaultModule;
private String appName;
/**
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
*/
private BiConsumer<Event, Context<KafkaMessageMeta>> sendCallback;
public KafkaEventProducer(KafkaTemplate kafkaTemplate,
String defaultModule,
String appName,
Context<KafkaMessageMeta> defaultMeta,
BiConsumer<Event, Context<KafkaMessageMeta>> sendCallback) {
super(defaultMeta);
this.kafkaTemplate = kafkaTemplate;
this.defaultModule = defaultModule;
this.appName = appName;
this.sendCallback = sendCallback;
}
@Override
public BiConsumer<Event, Context<KafkaMessageMeta>> getSender() {
return (event, context) -> {
if (Strings.isNullOrEmpty(event.getEventModule())) {
event.setEventModule(defaultModule);
}
if (Strings.isNullOrEmpty(event.getEventId())) {
event.setEventId(String.format("%s_%s", appName, UUIDBuilder.generateLongUuid()));
}
event.check();
// 如果外部没有指定默认使用 targetId 作为分配的 key
String partitionKey = context.getMeta().getPartitionKey();
if (Strings.isNullOrEmpty(partitionKey) && !Strings.isNullOrEmpty(event.getTargetId())) {
partitionKey = event.getTargetId();
}
ProducerRecord producerRecord = new ProducerRecord(context.getMeta().getTopic(), partitionKey,
event.toJsonString());
if (!CollectionUtils.isEmpty(context.getHeaders())) {
context.getHeaders().entrySet()
.forEach(entry -> producerRecord.headers().add(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8)));
}
// 将eventCode放入header以便快速过滤自己不关注的信息
producerRecord.headers().add("eventCode", event.getEventCode().toString().getBytes());
//没有添加自定义的header. 后续有要求再添加
// https://docs.spring.io/spring-kafka/reference/html/#kafka-template
ListenableFuture future = kafkaTemplate.send(producerRecord);
if (context.getSyncSending()) {
// 使用同步方式发送消息
try {
future.get(60, TimeUnit.SECONDS);
if (context.logEnabled()) {
log.info("====MQ PRODUCER SYNC====, context={}, message = {}", context, event.toPrettyJsonString());
}
if (sendCallback != null) {
sendCallback.accept(event, context);
}
} catch (Throwable e) {
log.error("send kafka event ERROR-SYNC! event={} context={}", event.toPrettyJsonString(), context, e);
if (context.getExceptionHandler() != null) {
context.getExceptionHandler().accept(ExceptionContext.<KafkaMessageMeta>builder()
.context(context).event(event).throwable(e).build());
}
// 异常需要抛出以使kafka事务能够回滚(如果有)
throw new RuntimeException(e);
}
} else {
future.addCallback(result -> {
if (context.logEnabled()) {
log.info("====MQ PRODUCER ASYNC====, context={}, message = {}", context, event.toPrettyJsonString());
}
if (sendCallback != null) {
sendCallback.accept(event, context);
}
}, ex -> {
log.error("send kafka event ERROR-ASYNC! event={} context={}", event.toPrettyJsonString(), context, ex);
if (context.getExceptionHandler() != null) {
context.getExceptionHandler().accept(ExceptionContext.<KafkaMessageMeta>builder()
.context(context).event(event).throwable(ex).build());
}
});
}
};
}
@NoArgsConstructor
@Getter
@ToString
public static class KafkaMessageMeta {
private String topic;
private String partitionKey;
@Builder
public KafkaMessageMeta(String topic, String partitionKey) {
this.topic = topic;
this.partitionKey = partitionKey;
}
}
}

View File

@ -0,0 +1,84 @@
package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.consumer.EventConsumer;
import cn.axzo.foundation.util.UUIDBuilder;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* 本地消息发送避免直接发送到测试环境影响测试
* 如果配置了 EventConsumer,会将消息直接传到给 consumer 消费
* XXX只是为了本地测试不要使用在测试和线上环境
*/
@Slf4j
public class LocalEventProducer extends AbstractEventProducer {
private final EventConsumer eventConsumer;
private String defaultModule;
private String appName;
private final static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
public LocalEventProducer(EventConsumer eventConsumer,
String defaultModule,
String appName,
Context<LocalMessageMeta> defaultMeta,
@Deprecated TaskExecutor taskExecutor) {
super(defaultMeta);
this.defaultModule = defaultModule;
this.appName = appName;
this.eventConsumer = eventConsumer;
}
@Override
public BiConsumer<Event, Context<LocalMessageMeta>> getSender() {
return (event, context) -> {
if (eventConsumer == null) {
log.info("====IGNORE even since having no eventconsumer{}", event);
return;
}
if (Strings.isNullOrEmpty(event.getEventModule())) {
event.setEventModule(defaultModule);
}
if (Strings.isNullOrEmpty(event.getEventId())) {
event.setEventId(String.format("%s_%s", appName, UUIDBuilder.generateLongUuid()));
}
event.check();
Map<String, byte[]> headers = Maps.newHashMap();
if (!CollectionUtils.isEmpty(context.getHeaders())) {
context.getHeaders().entrySet()
.forEach(entry -> headers.put(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8)));
}
// 将eventCode放入header以便快速过滤自己不关注的信息
headers.put("eventCode", event.getEventCode().toString().getBytes());
Runnable sender = () -> eventConsumer.onEvent(event.toJsonString(), EventConsumer.Context.builder()
.eventCode(event.getEventCode()).headers(headers).ext(ImmutableMap.of()).build());
//提交到executor, 并延迟50ms. 模拟线上produce sender的时间
executor.schedule(sender, 50, TimeUnit.MILLISECONDS);
};
}
@NoArgsConstructor
@Getter
@ToString
public static class LocalMessageMeta {
}
}

View File

@ -0,0 +1,249 @@
package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.util.UUIDBuilder;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import java.util.Optional;
import java.util.function.BiConsumer;
@Slf4j
public class RocketMQEventProducer extends AbstractEventProducer {
private RocketMQTemplate rocketMQTemplate;
private String defaultModule;
private String appName;
/**
* 一个扩展点.让外部可以在发送消息后做特殊逻辑处理. 比如发送消息统计, 消息元数据监控
*/
private BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback;
public RocketMQEventProducer(RocketMQTemplate rocketMQTemplate,
String defaultModule,
String appName,
Context<RocketMQMessageMeta> defaultContext,
@Deprecated TaskExecutor taskExecutor,
BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback) {
super(defaultContext);
this.defaultModule = defaultModule;
this.appName = appName;
this.sendCallback = sendCallback;
this.rocketMQTemplate = rocketMQTemplate;
// 设置默认的Destination格式topic:tag
this.rocketMQTemplate.setDefaultDestination(defaultContext.getMeta().buildDestination(StringUtils.EMPTY));
}
@Override
public BiConsumer<Event, Context<RocketMQMessageMeta>> getSender() {
return (event, context) -> {
if (Strings.isNullOrEmpty(event.getEventModule())) {
event.setEventModule(defaultModule);
}
if (Strings.isNullOrEmpty(event.getEventId())) {
event.setEventId(String.format("%s_%s", appName, UUIDBuilder.generateLongUuid()));
}
event.check();
String destination = context.getMeta().buildDestination(event.getEventName());
String eventText = event.toJsonString();
MessageBuilder<?> messageBuilder = null;
if (BooleanUtils.isTrue(context.getSendAsJSONObject())) {
// 做一次toJSONString再转回json统一json序列化避免日期类型的序列化不一致导致的问题
messageBuilder = MessageBuilder.withPayload(JSON.parseObject(eventText, JSONObject.class));
messageBuilder.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE);
} else {
messageBuilder = MessageBuilder.withPayload(eventText);
}
messageBuilder.copyHeaders(context.getHeaders());
// 如果没显式指定消息的业务key则使用targetId作为key
if (!Strings.isNullOrEmpty(event.getTargetId())) {
messageBuilder.setHeaderIfAbsent(MessageConst.PROPERTY_KEYS, event.getTargetId());
}
// 将eventCode放入header以便快速过滤自己不关注的信息
Event.EventCode eventCode = event.getEventCode();
messageBuilder.setHeaderIfAbsent("eventCode", eventCode.toString());
final String eventLogText = context.logEnabled() ? event.toPrettyJsonString() : "";
// 使用lamda构造diff, 解决在rocketMQTemplate.asyncSendOrderly中访问的编译问题.
Supplier<String> payloadDiffBuilder = () -> {
if (context.logEnabled()) {
JSONObject eventJson = JSON.parseObject(eventText);
return eventCode.diffPayload(eventJson.getJSONObject("data"))
.map(diff -> "payloadDiff = " + StringUtils.left(diff, 1024))
.orElse(StringUtils.EMPTY);
}
return "";
};
String shardingKey = getMessageShardingKey(event, context);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (context.logEnabled()) {
log.info("==MQ PRODUCER {} {} {} {} {}== {}\n{}",
context.getSyncSending() ? "SYNC" : "ASYNC", eventCode, sendResult.getMsgId(),
context.getTraceId(), sendResult.getMessageQueue().getQueueId(),
payloadDiffBuilder.get(), eventLogText);
}
if (sendCallback != null) {
sendCallback.accept(event, context);
}
}
@Override
public void onException(Throwable throwable) {
log.error("send rocketMQ event ERROR-{}! {}\nevent={} context={}",
context.getSyncSending() ? "SYNC" : "ASYNC", payloadDiffBuilder.get(),
eventLogText, context, throwable);
if (context.getExceptionHandler() != null) {
context.getExceptionHandler().accept(ExceptionContext.<RocketMQMessageMeta>builder()
.context(context).event(event).throwable(throwable).build());
}
}
};
if (context.getSyncSending()) {
// 同步发送
doSyncSend(destination, messageBuilder, shardingKey, callback);
} else {
// 异步发送
doAsyncSend(destination, messageBuilder, shardingKey, callback);
}
};
}
@Override
public BiConsumer<BinaryEvent, Context<RocketMQMessageMeta>> getBinaryEventSender() {
return (binaryEvent, context) -> {
String destination = context.getMeta().buildDestination(StringUtils.EMPTY);
MessageBuilder<byte[]> messageBuilder = MessageBuilder.withPayload(binaryEvent.getData());
messageBuilder.copyHeaders(context.getHeaders());
String shardingKey = binaryEvent.getShardingKey();
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (context.logEnabled()) {
log.info("==MQ PRODUCER {} binary event {} {} {}== {}\n{}",
context.getSyncSending() ? "SYNC" : "ASYNC", destination, sendResult.getMsgId(),
context.getTraceId(), sendResult.getMessageQueue().getQueueId(), binaryEvent);
}
}
@Override
public void onException(Throwable throwable) {
log.error("send rocketMQ binary event ERROR-{}! {}\nbinaryEvent={} context={}",
context.getSyncSending() ? "SYNC" : "ASYNC", binaryEvent, context, throwable);
if (context.getExceptionHandler() != null) {
Event event = Event.builder().data(binaryEvent.toString()).build();
context.getExceptionHandler().accept(ExceptionContext.<RocketMQMessageMeta>builder()
.context(context).event(event).throwable(throwable).build());
}
}
};
if (context.getSyncSending()) {
// 同步发送
doSyncSend(destination, messageBuilder, shardingKey, callback);
} else {
// 异步发送
doAsyncSend(destination, messageBuilder, shardingKey, callback);
}
};
}
private void doSyncSend(String destination, MessageBuilder<?> messageBuilder,
String shardingKey, SendCallback callback) {
try {
// 同步发送超时时间支持在properties中设置
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), shardingKey);
callback.onSuccess(sendResult);
} catch (Throwable e) {
callback.onException(e);
// 异常需要抛出便于外部感知
throw new RuntimeException(e);
}
}
private void doAsyncSend(String destination, MessageBuilder<?> messageBuilder,
String shardingKey, SendCallback callback) {
// 异步发送
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), shardingKey, callback);
}
/**
* 根据shardingKey获取MessageQueueSelector
* rocketMQTemplate暂不支持直接设置shardingKey使用此方法一样能实现目的https://github.com/apache/rocketmq-spring/issues/123
*
* @param event
* @param context
*/
private String getMessageShardingKey(Event event, Context<RocketMQMessageMeta> context) {
String shardingKey = Optional.ofNullable(event.getShardingKey()).orElse(context.getMeta().getShardingKey());
if (Strings.isNullOrEmpty(shardingKey)) {
shardingKey = event.getTargetId();
}
if (Strings.isNullOrEmpty(shardingKey)) {
return "";
}
return shardingKey;
}
@NoArgsConstructor
@Getter
@ToString
public static class RocketMQMessageMeta {
private String topic;
/**
* 可选发送消息时可以指定tag以便在消费时过滤
*/
private String tag;
/**
* 可选相同shardingKey的消息会分配到同一个messageQueue以此来保证有序性
*/
private String shardingKey;
/**
* 可选如果强制指定了, 直接使用
*/
private String destination;
@Builder
public RocketMQMessageMeta(String topic, String tag, String shardingKey, String destination) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "topic不能为空");
this.topic = topic;
this.tag = tag;
this.shardingKey = shardingKey;
this.destination = destination;
}
private String buildDestination(String eventName) {
if (!Strings.isNullOrEmpty(destination)) {
return destination;
}
return String.format("%s:%s", topic, Strings.nullToEmpty(Optional.ofNullable(tag).orElse(eventName)));
}
}
}

View File

@ -0,0 +1,80 @@
package cn.axzo.foundation.event.utils;
import cn.axzo.foundation.event.support.PayloadDifferentiator;
import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MapDifference;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@UtilityClass
public class PayloadUtils {
public static PayloadDifferentiator buildDifferentiator() {
return buildDifferentiator("oldValue", "newValue", ImmutableSet.of("modifyTime"));
}
public static PayloadDifferentiator buildDifferentiator(@NonNull String oldValueKey, @NonNull String newValueKey) {
return buildDifferentiator(oldValueKey, newValueKey, null, null);
}
public static PayloadDifferentiator buildDifferentiator(@NonNull String oldValueKey, @NonNull String newValueKey,
Set<String> ignoreKeys) {
return buildDifferentiator(oldValueKey, newValueKey, ignoreKeys, null);
}
public static PayloadDifferentiator buildDifferentiator(@NonNull String oldValueKey, @NonNull String newValueKey,
Set<String> ignoreKeys, Set<String> ignoreKeyPatterns) {
return payload -> {
Preconditions.checkArgument(payload != null);
JSONObject oldValue, newValue;
try {
oldValue = payload.getJSONObject(oldValueKey);
newValue = payload.getJSONObject(newValueKey);
} catch (Exception e) {
// 如果需要diff的value不是json object构建一个
oldValue = Optional.ofNullable(payload.get(oldValueKey))
.map(v -> new JSONObject().fluentPut("value", v))
.orElse(null);
newValue = Optional.ofNullable(payload.get(newValueKey))
.map(v -> new JSONObject().fluentPut("value", v))
.orElse(null);
}
Preconditions.checkState(newValue != null);
if (oldValue == null) {
return "INSERT-ONLY";
}
MapDifference mapDifference = FastjsonUtils.diffJson(oldValue, newValue, ignoreKeys, ignoreKeyPatterns);
JSONObject res = new JSONObject();
if (!mapDifference.entriesOnlyOnLeft().isEmpty()) {
res.put("oldOnly", mapDifference.entriesOnlyOnLeft());
}
if (!mapDifference.entriesOnlyOnRight().isEmpty()) {
res.put("newOnly", mapDifference.entriesOnlyOnRight());
}
if (!mapDifference.entriesDiffering().isEmpty()) {
res.put("changed", mapDifference.entriesDiffering().toString());
}
if (res.isEmpty()) {
return "NO-DIFF";
}
return JSONObject.toJSONString(res);
};
}
public static PayloadDifferentiator buildDifferentiator(Map<String, PayloadDifferentiator> differentiators) {
return payload -> differentiators.entrySet().stream()
.map(e -> String.format("%s = %s", e.getKey(), e.getValue().buildDiffText(payload)))
.collect(Collectors.joining(";"));
}
}

View File

@ -18,6 +18,10 @@
<properties>
<axzo-bom.version>2.0.0-SNAPSHOT</axzo-bom.version>
<axzo-dependencies.version>2.0.0-SNAPSHOT</axzo-dependencies.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<modules>
@ -26,6 +30,7 @@
<module>unittest-support-lib</module>
<module>web-support-lib</module>
<module>gateway-support-lib</module>
<module>event-support-lib</module>
</modules>
<dependencies>