From 1496b79b704af7f46b6eeadf9acefd030afeefdb Mon Sep 17 00:00:00 2001 From: zengxiaobo Date: Sat, 25 May 2024 14:46:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BA=8B=E4=BB=B6=E9=87=8D=E8=AF=95=20?= =?UTF-8?q?step=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../event/support/EventHandlerContext.java | 6 ++ .../consumer/DefaultEventConsumer.java | 14 +--- .../event/support/consumer/EventConsumer.java | 9 --- ...nContext.java => EventHandledWrapper.java} | 25 ++++--- .../consumer/EventHandlerRepository.java | 34 +++++---- .../consumer/RetryableEventConsumer.java | 75 +++++++++++-------- .../RocketRetryableEventConsumer.java | 6 +- 7 files changed, 91 insertions(+), 78 deletions(-) rename event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/{EventExceptionContext.java => EventHandledWrapper.java} (56%) diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerContext.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerContext.java index 587bb89..cd41ad3 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerContext.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerContext.java @@ -1,5 +1,6 @@ package cn.axzo.foundation.event.support; +import cn.axzo.foundation.event.support.consumer.EventHandledWrapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import lombok.AllArgsConstructor; @@ -8,6 +9,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.util.Optional; +import java.util.function.Consumer; @Data @Builder @@ -20,6 +22,10 @@ public class EventHandlerContext { /** 用于打印日志 */ String name; EventHandler eventHandler; + /** 消费失败的回调 */ + Consumer exceptionHandler; + /** 消费成功的回调 */ + Consumer successHandler; public String getName() { return Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName()); diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java index 874d7de..8c25f54 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java @@ -11,7 +11,6 @@ import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -34,13 +33,11 @@ import java.util.stream.Collectors; @Slf4j public class DefaultEventConsumer implements EventConsumer { private final EventHandlerRepository handlerRepository; - private final Consumer consumeCallback; @Builder - public DefaultEventConsumer(EventHandlerRepository handlerRepository, Consumer consumeCallback) { + public DefaultEventConsumer(EventHandlerRepository handlerRepository) { this.handlerRepository = Optional.ofNullable(handlerRepository).orElseGet(() -> EventHandlerRepository.builder() .build()); - this.consumeCallback = consumeCallback; } @Override @@ -83,15 +80,6 @@ public class DefaultEventConsumer implements EventConsumer { Throwables.throwIfUnchecked(ex); throw new RuntimeException("process event fail", ex); } - - if (consumeCallback != null) { - consumeCallback.accept(EventWrapper.builder() - .event(event) - .consumer(this) - .isHandled(handled) - .context(context) - .build()); - } return handled; } diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java index a4c6d4d..8df4a71 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java @@ -85,15 +85,6 @@ public interface EventConsumer { throw new UnsupportedOperationException(); } - @Builder - @Getter - class EventWrapper { - private Event event; - private EventConsumer consumer; - private boolean isHandled; - private Context context; - } - @Getter @NoArgsConstructor class Context { diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventExceptionContext.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandledWrapper.java similarity index 56% rename from event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventExceptionContext.java rename to event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandledWrapper.java index 12d0d10..bd1cc1f 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventExceptionContext.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandledWrapper.java @@ -1,7 +1,6 @@ package cn.axzo.foundation.event.support.consumer; import cn.axzo.foundation.event.support.Event; -import cn.axzo.foundation.event.support.EventHandlerContext; import cn.axzo.foundation.exception.BusinessException; import cn.axzo.foundation.result.ResultCode; import lombok.AllArgsConstructor; @@ -15,19 +14,27 @@ import lombok.extern.slf4j.Slf4j; @Builder @NoArgsConstructor @AllArgsConstructor -public class EventExceptionContext { - Throwable exception; - String msg; +public class EventHandledWrapper { Event event; - EventConsumer.Context consumerContext; - EventHandlerContext handlerContext; + EventConsumer.Context consumeContext; - public void doPrint() { + /** handler的唯一索引, 重试时需要 */ + String handlerKey; + /** 用于打印日志 */ + String name; + + /** 如果抛出异常则有值 */ + Throwable exception; + + public void doPrintException() { + if (exception == null) { + return; + } if (BusinessException.class.isAssignableFrom(exception.getClass()) && !ResultCode.PROCESS_TIMEOUT.getCode().equals(((BusinessException) exception).getErrorCode())) { - log.warn("MQ, handle warning {}", msg, exception); + log.warn("MQ, handle warning {}", event.toPrettyJsonString(), exception); } else { - log.error("MQ, handle error {}", msg, exception); + log.error("MQ, handle error {}", event.toPrettyJsonString(), exception); } } } diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java index ccfd88b..62a3964 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java @@ -30,7 +30,7 @@ import java.util.stream.Collectors; @Slf4j public class EventHandlerRepository { final protected ListMultimap handlers = ArrayListMultimap.create(); - private final Consumer exceptionHandler; + private final Consumer DEFAULT_EXCEPTION_HANDLER = EventHandledWrapper::doPrintException; private AntPathMatcher antPathMatcher; /** @@ -43,13 +43,11 @@ public class EventHandlerRepository { @Builder - public EventHandlerRepository(Consumer exceptionHandler, - boolean supportPattern, + public EventHandlerRepository(boolean supportPattern, Boolean logEnabled, Predicate logFilter, Long logElapsedThreshold, Long maxAllowElapsedMillis) { - this.exceptionHandler = Optional.ofNullable(exceptionHandler).orElse(EventExceptionContext::doPrint); this.logEnabled = Optional.ofNullable(logEnabled).orElse(Boolean.TRUE); this.logFilter = logFilter; @@ -142,13 +140,7 @@ public class EventHandlerRepository { log.warn(msg); } } catch (Exception ex) { - handleException(EventExceptionContext.builder() - .event(event) - .handlerContext(handler) - .msg(event.toPrettyJsonString()) - .exception(ex) - .consumerContext(context) - .build()); + handleException(event, context, handler, ex); } finally { // stopwatch必须reset(),否则下一次stopwatch.start()会报错 stopwatch.reset(); @@ -207,9 +199,21 @@ public class EventHandlerRepository { return canHandle(event.getEventCode()); } - private void handleException(EventExceptionContext context) { - if (exceptionHandler != null) { - exceptionHandler.accept(context); + private void handleException(Event event, + EventConsumer.Context consumerContext, + EventHandlerContext context, + Exception exception) { + EventHandledWrapper wrapper = EventHandledWrapper.builder() + .event(event) + .consumeContext(consumerContext) + .name(context.getName()) + .handlerKey(context.getHandlerKey()) + .exception(exception) + .build(); + if (context.getExceptionHandler() != null) { + context.getExceptionHandler().accept(wrapper); + } else { + DEFAULT_EXCEPTION_HANDLER.accept(wrapper); } } @@ -234,7 +238,7 @@ public class EventHandlerRepository { } public boolean getLogEnabled(Event event, EventConsumer.Context context) { - if (handlers.containsKey(EventHeaders.DISABLE_LOG_HEADER)) { + if (context.getHeaders().containsKey(EventHeaders.DISABLE_LOG_HEADER)) { return false; } // consumer显式声明了日志开关,则直接使用 diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RetryableEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RetryableEventConsumer.java index c82d9e5..f83f4b7 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RetryableEventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RetryableEventConsumer.java @@ -26,43 +26,42 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -public class RetryableEventConsumer { +public class RetryableEventConsumer implements EventConsumer { EventConsumer eventConsumer; Map retryContexts = Maps.newConcurrentMap(); private static final Set> DEFAULT_EXCLUDE_EXCEPTIONS = ImmutableSet.of(BusinessException.class); private static final BackoffPolicy DEFAULT_BACKOFFPOLICY = ExponentialBackOffPolicy.builder().build(); - @Builder - public RetryableEventConsumer(Consumer consumer) { - this.eventConsumer = DefaultEventConsumer.builder() - .handlerRepository(EventHandlerRepository.builder() - .exceptionHandler(context -> { - context.doPrint(); - if (Strings.isNullOrEmpty(context.getHandlerContext().getHandlerKey())) { - return; - } - RetryEvent retryEvent = retryContexts.get(context.getHandlerContext().getHandlerKey()); - Set> excludeExceptions = retryEvent.getExcludeExceptions(); - if (excludeExceptions.stream().anyMatch(e -> e.isAssignableFrom(context.getException().getClass()))) { - return; - } - Integer currentRetryCount = getRetryCount(context.getConsumerContext()); + private final Consumer exceptionHandler; - Optional nextRetryMillis = retryEvent.getBackoffPolicy().getNextRetryMillis(currentRetryCount); - if (!nextRetryMillis.isPresent()) { - return; - } - LocalDateTime nextTriggerTime = Instant.ofEpochMilli(nextRetryMillis.get()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - consumer.accept(RetryContext.builder() - .event(context.getEvent()) - .retryKey(context.getHandlerContext().getHandlerKey()) - .nextTriggerTime(nextTriggerTime) - .currentRetryCount(currentRetryCount) - .build()); - }) - .build()) - .build(); + public RetryableEventConsumer(EventConsumer eventConsumer, Consumer consumer) { + Preconditions.checkNotNull(eventConsumer); + this.eventConsumer = eventConsumer; + exceptionHandler = context -> { + context.doPrintException(); + if (Strings.isNullOrEmpty(context.getHandlerKey())) { + return; + } + RetryEvent retryEvent = retryContexts.get(context.getHandlerKey()); + Set> excludeExceptions = retryEvent.getExcludeExceptions(); + if (excludeExceptions.stream().anyMatch(e -> e.isAssignableFrom(context.getException().getClass()))) { + return; + } + Integer currentRetryCount = getRetryCount(context.getConsumeContext()); + + Optional nextRetryMillis = retryEvent.getBackoffPolicy().getNextRetryMillis(currentRetryCount); + if (!nextRetryMillis.isPresent()) { + return; + } + LocalDateTime nextTriggerTime = Instant.ofEpochMilli(nextRetryMillis.get()).atZone(ZoneId.systemDefault()).toLocalDateTime(); + consumer.accept(RetryContext.builder() + .event(context.getEvent()) + .retryKey(context.getHandlerKey()) + .nextTriggerTime(nextTriggerTime) + .currentRetryCount(currentRetryCount) + .build()); + }; } public Boolean registerHandler(RetryEvent retryEvent) { @@ -72,6 +71,7 @@ public class RetryableEventConsumer { .eventHandler(retryEvent.getEventHandler()) .name(retryEvent.getName()) .handlerKey(retryEvent.getRetryKey()) + .exceptionHandler(exceptionHandler) .build()); } @@ -83,6 +83,21 @@ public class RetryableEventConsumer { return 0; } + @Override + public Boolean registerHandler(EventHandlerContext context) { + return registerHandler(RetryEvent.builder() + .eventCode(context.getEventCode()) + .eventHandler(context.getEventHandler()) + .name(context.getName()) + .retryKey(context.getHandlerKey()) + .build()); + } + + @Override + public boolean onEvent(String message, Context context) { + return eventConsumer.onEvent(message, context); + } + @Data @Builder @NoArgsConstructor diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RocketRetryableEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RocketRetryableEventConsumer.java index 1aa3404..63313cb 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RocketRetryableEventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RocketRetryableEventConsumer.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableRangeMap; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; +import lombok.Builder; import java.util.concurrent.TimeUnit; @@ -40,8 +41,9 @@ public class RocketRetryableEventConsumer extends RetryableEventConsumer { .put(Range.openClosed(TimeUnit.MINUTES.toMillis(60), Long.MAX_VALUE), 18) .build(); - public RocketRetryableEventConsumer(EventProducer eventProducer, String retryTopic) { - super(retryContext -> { + @Builder + public RocketRetryableEventConsumer(EventProducer eventProducer, EventConsumer eventConsumer, String retryTopic) { + super(eventConsumer, retryContext -> { /** 将延迟毫秒转换为rocketMq的延迟级别 */ Integer delayLevel = LEVEL_MAP.get(retryContext.getNextDelayMillis());