From fa4c571f6e6741f56e9c4445bbaadae6f26acd39 Mon Sep 17 00:00:00 2001 From: zengxiaobo Date: Wed, 24 Apr 2024 21:34:33 +0800 Subject: [PATCH] feat: add --- .../event/support/EventHandlerRepository.java | 238 ------------------ .../support/consumer/BatchEventConsumer.java | 103 ++++++++ .../event/support/consumer/EventConsumer.java | 8 +- 3 files changed, 110 insertions(+), 239 deletions(-) delete mode 100644 event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerRepository.java create mode 100644 event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerRepository.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerRepository.java deleted file mode 100644 index 8f94377..0000000 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerRepository.java +++ /dev/null @@ -1,238 +0,0 @@ -package cn.axzo.foundation.event.support; - -import cn.axzo.foundation.event.support.consumer.EventConsumer; -import cn.axzo.foundation.exception.BusinessException; -import cn.axzo.foundation.result.ResultCode; -import cn.axzo.foundation.util.FastjsonUtils; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.base.Strings; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Maps; -import lombok.Builder; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.util.AntPathMatcher; -import org.springframework.util.CollectionUtils; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; - -/** - * 事件处理器仓库 - * 可以在多个地方使用. - */ -@Slf4j -public class EventHandlerRepository { - final protected ListMultimap handlers = ArrayListMultimap.create(); - private final BiConsumer exceptionHandler; - - private AntPathMatcher antPathMatcher; - - public EventHandlerRepository(BiConsumer exceptionHandler) { - this(exceptionHandler, false); - } - - public EventHandlerRepository(BiConsumer exceptionHandler, boolean supportPattern) { - this.exceptionHandler = exceptionHandler; - - if (supportPattern) { - antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR); - antPathMatcher.setCachePatterns(true); - } - } - - public EventHandlerRepository() { - this(null); - } - - public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { - Objects.requireNonNull(eventCode); - Objects.requireNonNull(eventHandler); - - handlers.put(eventCode, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build()); - return this; - } - - public EventHandlerRepository registerHandlers(List eventCodes, EventHandler eventHandler, String name) { - Objects.requireNonNull(eventHandler); - eventCodes.forEach(e -> handlers.put(e, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build())); - return this; - } - - public boolean canHandle(Event.EventCode eventCode) { - if (!isSupportPattern()) { - return handlers.containsKey(eventCode); - } - - return handlers.keySet().stream() - .anyMatch(key -> antPathMatcher.match(key.toString(), eventCode.toString())); - } - - /** - * 处理事件 - * - * @param event 事件 - * @return 如果有注册的handler处理事件返回true, 否则false - */ - public boolean process(Event event, EventConsumer.Context context) { - Stopwatch stopwatch = Stopwatch.createUnstarted(); - Event.EventCode eventCode = event.getEventCode(); - List eventHandlers = getEventHandlers(eventCode); - Map handleCosts = Maps.newHashMap(); - - PayloadDifferentiator differentiator = null; - if (!CollectionUtils.isEmpty(context.getHeaders()) - && context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) { - byte[] diffMetaHeader = context.getHeaders().get(DiffablePayload.DIFF_META_HEADER); - List diffMetas = JSON.parseObject( - new String(diffMetaHeader, Charsets.UTF_8), new TypeReference>() { - }); - differentiator = DiffablePayload.DiffMeta.toDifferentiator(diffMetas); - } - if (differentiator == null) { - // 如果header中没有meta,从注册的handler中获取differentiator - differentiator = handlers.keys().stream() - .filter(e -> e.equals(eventCode)) - .map(Event.EventCode::getDifferentiator) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); - } - - String payloadDiffLog = Optional.ofNullable(differentiator) - .map(d -> eventCode.toBuilder().differentiator(d).build()) - .flatMap(e -> e.diffPayload((JSONObject) JSON.toJSON(event.getData()))) - .map(diff -> "payloadDiff = " + StringUtils.left(diff, 1024)) - .orElse(StringUtils.EMPTY); - - String eventLogText = event.toPrettyJsonString(); - - eventHandlers.forEach(handler -> { - try { - stopwatch.start(); - handler.getEventHandler().onEvent(event, context); - long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - // 为了避免太多日志输出,只有处理时间超过 - if (elapsed > context.getLogElapsedThreshold()) { - handleCosts.put(handler.getName(), elapsed); - } - - if (elapsed > context.getMaxAllowElapsedMillis()) { - String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s", - context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText); - handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg); - } - } catch (BusinessException ex) { - log.warn("==MQ CONSUMER {}==, handle biz failed, handler = {}, {}\nevent = {}", - context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex); - handleException(ex, event.toPrettyJsonString()); - } catch (Exception ex) { - log.error("==MQ CONSUMER {}==, handle failed, handler = {}, {}\nevent = {}", - context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex); - handleException(ex, event.toPrettyJsonString()); - } finally { - // stopwatch必须reset(),否则下一次stopwatch.start()会报错 - stopwatch.reset(); - } - }); - if (context.getLogEnabled(event)) { - log.info("==MQ CONSUMER {}==, timeCosts = {}, {}\n{}", - context.getSummary(), JSON.toJSONString(handleCosts), payloadDiffLog, eventLogText); - } - return !eventHandlers.isEmpty(); - } - - /** - * 批量处理多条事件 - * - * @param events - * @param context - * @return - */ - public boolean batch(List events, EventConsumer.Context context) { - Stopwatch stopwatch = Stopwatch.createUnstarted(); - List eventHandlers = getEventHandlers(context.getEventCode()); - eventHandlers.stream().forEach(handler -> { - try { - stopwatch.start(); - String clazzName = handler.getClass().getCanonicalName(); - log.info("====MQ CONSUMER BATCH====, start handling by {}", clazzName); - handler.getEventHandler().onEvents(events, context); - - long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed); - - if (elapsed > context.getMaxAllowElapsedMillis()) { - String msg = String.format("take too long %d millis for %s to handle %s", - elapsed, clazzName, events); - handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg); - } - } catch (BusinessException ex) { - String payloads = FastjsonUtils.toJsonPettyLogString(events); - log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex); - handleException(ex, payloads); - } catch (Exception ex) { - String payloads = FastjsonUtils.toJsonPettyLogString(events); - log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex); - handleException(ex, payloads); - } finally { - // stopwatch必须reset(),否则下一次stopwatch.start()会报错 - stopwatch.reset(); - } - }); - return !eventHandlers.isEmpty(); - } - - public boolean isHandled(Event event) { - Preconditions.checkArgument(event != null); - Preconditions.checkArgument(event.getEventCode() != null); - - return canHandle(event.getEventCode()); - } - - private void handleException(Exception ex, String msg) { - if (exceptionHandler != null) { - exceptionHandler.accept(ex, msg); - } - } - - private List getEventHandlers(Event.EventCode eventCode) { - if (!isSupportPattern()) { - return handlers.get(eventCode); - } - - // 支持pattern的时候,返回所有匹配的Handlers - return handlers.keySet().stream() - .filter(key -> antPathMatcher.match(key.toString(), eventCode.toString())) - .flatMap(key -> handlers.get(key).stream()) - .collect(Collectors.toList()); - } - - private boolean isSupportPattern() { - return antPathMatcher != null; - } - - @Data - private static class EventHandlerHolder { - String name; - EventHandler eventHandler; - - @Builder - public EventHandlerHolder(String name, EventHandler eventHandler) { - this.name = Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName()); - this.eventHandler = eventHandler; - } - } -} diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java new file mode 100644 index 0000000..e2601dc --- /dev/null +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java @@ -0,0 +1,103 @@ +package cn.axzo.foundation.event.support.consumer; + +import cn.axzo.foundation.event.support.Event; +import cn.axzo.foundation.event.support.EventHandler; +import cn.axzo.foundation.exception.BusinessException; +import cn.axzo.foundation.result.ResultCode; +import cn.axzo.foundation.util.FastjsonUtils; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ListMultimap; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +/** + * 批量处理事件处理器。 + * 按照 handler 维度来路由感兴趣的 events + */ +@Slf4j +public class BatchEventConsumer implements EventConsumer { + final protected ListMultimap handlers = ArrayListMultimap.create(); + private final boolean logEnabled; + private BiConsumer exceptionHandler; + + public BatchEventConsumer(BiConsumer exceptionHandler, boolean logEnabled) { + this.exceptionHandler = exceptionHandler; + this.logEnabled = logEnabled; + } + + @Override + public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { + handlers.put(eventHandler, eventCode); + return null; + } + + @Override + public EventHandlerRepository registerHandlers(List list, EventHandler eventHandler, String name) { + handlers.putAll(eventHandler, list); + return null; + } + + @Override + public boolean onEvent(String s, Context context) { + throw new UnsupportedOperationException(); + } + + public boolean onEvents(List batchEvents) { + // 默认开启日志,如需关闭日志,在调用onEvent的时候,设置logEnable为false + if (batchEvents.isEmpty()) { + return false; + } + log.info("====MQ CONSUMER BATCH2==== events={}", logEnabled ? FastjsonUtils.toJsonPettyLogString(batchEvents) : batchEvents.size()); + List events = batchEvents.stream().map(EventConsumer.BatchEvent::toEvent).collect(Collectors.toList()); + Stopwatch stopwatch = Stopwatch.createUnstarted(); + handlers.asMap().forEach((handler, codes) -> { + try { + stopwatch.start(); + ImmutableSet handleCodes = ImmutableSet.copyOf(codes); + // 过滤感兴趣的数据给 handler + List filteredEvents = events.stream().filter(e -> handleCodes.contains(e.getEventCode())).collect(Collectors.toList()); + if (filteredEvents.isEmpty()) { + return; + } + + handler.onEvents(filteredEvents, null); + + String clazzName = handler.getClass().getCanonicalName(); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + log.info("====MQ CONSUMER BATCH2====, handledSize={}, handled by {}, cost {} millis", filteredEvents.size(), clazzName, elapsed); + + if (elapsed > TimeUnit.MINUTES.toMillis(1)) { + String msg = String.format("take too long %d millis for %s to handle %s", + elapsed, clazzName, filteredEvents); + handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg); + } + } catch (BusinessException ex) { + String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents); + log.warn("====MQ CONSUMER BATCH====, handler={}, handle event warning, event = {}", handler.getClass().getCanonicalName(), + payloads, ex); + handleException(ex, payloads); + } catch (Exception ex) { + String payloads = FastjsonUtils.toJsonPettyLogString(batchEvents); + log.error("====MQ CONSUMER BATCH====, handler={}, handle event error, event = {}", handler.getClass().getCanonicalName(), + payloads, ex); + handleException(ex, payloads); + } finally { + // stopwatch必须reset(),否则下一次stopwatch.start()会报错 + stopwatch.reset(); + } + }); + return true; + } + + private void handleException(Exception ex, String msg) { + if (exceptionHandler != null) { + exceptionHandler.accept(ex, msg); + } + } +} diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java index 87ea7cd..fd5c8c4 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java @@ -2,7 +2,6 @@ package cn.axzo.foundation.event.support.consumer; import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.EventHandler; -import cn.axzo.foundation.event.support.EventHandlerRepository; import cn.axzo.foundation.event.support.producer.EventProducer; import cn.axzo.foundation.util.TraceUtils; import cn.axzo.foundation.util.UUIDBuilder; @@ -32,6 +31,10 @@ public interface EventConsumer { */ EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name); + default EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) { + return registerHandler(eventCode, eventHandler, null); + } + /** * 为多个eventCodes注册一个eventHandler * @@ -42,6 +45,9 @@ public interface EventConsumer { */ EventHandlerRepository registerHandlers(List eventCodes, EventHandler eventHandler, String name); + default EventHandlerRepository registerHandlers(List eventCodes, EventHandler eventHandler) { + return registerHandlers(eventCodes, eventHandler, null); + } /** * /**