feat: add

This commit is contained in:
zengxiaobo 2024-04-24 21:34:33 +08:00
parent 1c3302717b
commit fa4c571f6e
3 changed files with 110 additions and 239 deletions

View File

@ -1,238 +0,0 @@
package cn.axzo.foundation.event.support;
import cn.axzo.foundation.event.support.consumer.EventConsumer;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* 事件处理器仓库
* 可以在多个地方使用.
*/
@Slf4j
public class EventHandlerRepository {
final protected ListMultimap<Event.EventCode, EventHandlerHolder> handlers = ArrayListMultimap.create();
private final BiConsumer<Exception, String> exceptionHandler;
private AntPathMatcher antPathMatcher;
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler) {
this(exceptionHandler, false);
}
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler, boolean supportPattern) {
this.exceptionHandler = exceptionHandler;
if (supportPattern) {
antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR);
antPathMatcher.setCachePatterns(true);
}
}
public EventHandlerRepository() {
this(null);
}
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) {
Objects.requireNonNull(eventCode);
Objects.requireNonNull(eventHandler);
handlers.put(eventCode, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build());
return this;
}
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name) {
Objects.requireNonNull(eventHandler);
eventCodes.forEach(e -> handlers.put(e, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build()));
return this;
}
public boolean canHandle(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.containsKey(eventCode);
}
return handlers.keySet().stream()
.anyMatch(key -> antPathMatcher.match(key.toString(), eventCode.toString()));
}
/**
* 处理事件
*
* @param event 事件
* @return 如果有注册的handler处理事件返回true, 否则false
*/
public boolean process(Event event, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
Event.EventCode eventCode = event.getEventCode();
List<EventHandlerHolder> eventHandlers = getEventHandlers(eventCode);
Map<String, Long> handleCosts = Maps.newHashMap();
PayloadDifferentiator differentiator = null;
if (!CollectionUtils.isEmpty(context.getHeaders())
&& context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) {
byte[] diffMetaHeader = context.getHeaders().get(DiffablePayload.DIFF_META_HEADER);
List<DiffablePayload.DiffMeta> diffMetas = JSON.parseObject(
new String(diffMetaHeader, Charsets.UTF_8), new TypeReference<List<DiffablePayload.DiffMeta>>() {
});
differentiator = DiffablePayload.DiffMeta.toDifferentiator(diffMetas);
}
if (differentiator == null) {
// 如果header中没有meta从注册的handler中获取differentiator
differentiator = handlers.keys().stream()
.filter(e -> e.equals(eventCode))
.map(Event.EventCode::getDifferentiator)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
String payloadDiffLog = Optional.ofNullable(differentiator)
.map(d -> eventCode.toBuilder().differentiator(d).build())
.flatMap(e -> e.diffPayload((JSONObject) JSON.toJSON(event.getData())))
.map(diff -> "payloadDiff = " + StringUtils.left(diff, 1024))
.orElse(StringUtils.EMPTY);
String eventLogText = event.toPrettyJsonString();
eventHandlers.forEach(handler -> {
try {
stopwatch.start();
handler.getEventHandler().onEvent(event, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
// 为了避免太多日志输出只有处理时间超过
if (elapsed > context.getLogElapsedThreshold()) {
handleCosts.put(handler.getName(), elapsed);
}
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s",
context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg);
}
} catch (BusinessException ex) {
log.warn("==MQ CONSUMER {}==, handle biz failed, handler = {}, {}\nevent = {}",
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex);
handleException(ex, event.toPrettyJsonString());
} catch (Exception ex) {
log.error("==MQ CONSUMER {}==, handle failed, handler = {}, {}\nevent = {}",
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex);
handleException(ex, event.toPrettyJsonString());
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
if (context.getLogEnabled(event)) {
log.info("==MQ CONSUMER {}==, timeCosts = {}, {}\n{}",
context.getSummary(), JSON.toJSONString(handleCosts), payloadDiffLog, eventLogText);
}
return !eventHandlers.isEmpty();
}
/**
* 批量处理多条事件
*
* @param events
* @param context
* @return
*/
public boolean batch(List<Event> events, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
List<EventHandlerHolder> eventHandlers = getEventHandlers(context.getEventCode());
eventHandlers.stream().forEach(handler -> {
try {
stopwatch.start();
String clazzName = handler.getClass().getCanonicalName();
log.info("====MQ CONSUMER BATCH====, start handling by {}", clazzName);
handler.getEventHandler().onEvents(events, context);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed);
if (elapsed > context.getMaxAllowElapsedMillis()) {
String msg = String.format("take too long %d millis for %s to handle %s",
elapsed, clazzName, events);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg);
}
} catch (BusinessException ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex);
handleException(ex, payloads);
} catch (Exception ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex);
handleException(ex, payloads);
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
return !eventHandlers.isEmpty();
}
public boolean isHandled(Event event) {
Preconditions.checkArgument(event != null);
Preconditions.checkArgument(event.getEventCode() != null);
return canHandle(event.getEventCode());
}
private void handleException(Exception ex, String msg) {
if (exceptionHandler != null) {
exceptionHandler.accept(ex, msg);
}
}
private List<EventHandlerHolder> getEventHandlers(Event.EventCode eventCode) {
if (!isSupportPattern()) {
return handlers.get(eventCode);
}
// 支持pattern的时候返回所有匹配的Handlers
return handlers.keySet().stream()
.filter(key -> antPathMatcher.match(key.toString(), eventCode.toString()))
.flatMap(key -> handlers.get(key).stream())
.collect(Collectors.toList());
}
private boolean isSupportPattern() {
return antPathMatcher != null;
}
@Data
private static class EventHandlerHolder {
String name;
EventHandler eventHandler;
@Builder
public EventHandlerHolder(String name, EventHandler eventHandler) {
this.name = Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName());
this.eventHandler = eventHandler;
}
}
}

View File

@ -0,0 +1,103 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.util.FastjsonUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* 批量处理事件处理器
* 按照 handler 维度来路由感兴趣的 events
*/
@Slf4j
public class BatchEventConsumer implements EventConsumer {
final protected ListMultimap<EventHandler, Event.EventCode> handlers = ArrayListMultimap.create();
private final boolean logEnabled;
private BiConsumer<Exception, String> exceptionHandler;
public BatchEventConsumer(BiConsumer<Exception, String> exceptionHandler, boolean logEnabled) {
this.exceptionHandler = exceptionHandler;
this.logEnabled = logEnabled;
}
@Override
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) {
handlers.put(eventHandler, eventCode);
return null;
}
@Override
public EventHandlerRepository registerHandlers(List<Event.EventCode> list, EventHandler eventHandler, String name) {
handlers.putAll(eventHandler, list);
return null;
}
@Override
public boolean onEvent(String s, Context context) {
throw new UnsupportedOperationException();
}
public boolean onEvents(List<BatchEvent> batchEvents) {
// 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false
if (batchEvents.isEmpty()) {
return false;
}
log.info("====MQ CONSUMER BATCH2==== events={}", logEnabled ? FastjsonUtils.toJsonPettyLogString(batchEvents) : batchEvents.size());
List<Event> events = batchEvents.stream().map(EventConsumer.BatchEvent::toEvent).collect(Collectors.toList());
Stopwatch stopwatch = Stopwatch.createUnstarted();
handlers.asMap().forEach((handler, codes) -> {
try {
stopwatch.start();
ImmutableSet<Event.EventCode> handleCodes = ImmutableSet.copyOf(codes);
// 过滤感兴趣的数据给 handler
List<Event> filteredEvents = events.stream().filter(e -> handleCodes.contains(e.getEventCode())).collect(Collectors.toList());
if (filteredEvents.isEmpty()) {
return;
}
handler.onEvents(filteredEvents, null);
String clazzName = handler.getClass().getCanonicalName();
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("====MQ CONSUMER BATCH2====, handledSize={}, handled by {}, cost {} millis", filteredEvents.size(), clazzName, elapsed);
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
String msg = String.format("take too long %d millis for %s to handle %s",
elapsed, clazzName, filteredEvents);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg);
}
} catch (BusinessException ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents);
log.warn("====MQ CONSUMER BATCH====, handler={}, handle event warning, event = {}", handler.getClass().getCanonicalName(),
payloads, ex);
handleException(ex, payloads);
} catch (Exception ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents);
log.error("====MQ CONSUMER BATCH====, handler={}, handle event error, event = {}", handler.getClass().getCanonicalName(),
payloads, ex);
handleException(ex, payloads);
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
}
});
return true;
}
private void handleException(Exception ex, String msg) {
if (exceptionHandler != null) {
exceptionHandler.accept(ex, msg);
}
}
}

View File

@ -2,7 +2,6 @@ package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.event.support.EventHandlerRepository;
import cn.axzo.foundation.event.support.producer.EventProducer;
import cn.axzo.foundation.util.TraceUtils;
import cn.axzo.foundation.util.UUIDBuilder;
@ -32,6 +31,10 @@ public interface EventConsumer {
*/
EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name);
default EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) {
return registerHandler(eventCode, eventHandler, null);
}
/**
* 为多个eventCodes注册一个eventHandler
*
@ -42,6 +45,9 @@ public interface EventConsumer {
*/
EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name);
default EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler) {
return registerHandlers(eventCodes, eventHandler, null);
}
/**
* /**