feat: 事件重试 step 2

This commit is contained in:
zengxiaobo 2024-05-25 14:46:36 +08:00
parent 0ef2eba7bc
commit 1496b79b70
7 changed files with 91 additions and 78 deletions

View File

@ -1,5 +1,6 @@
package cn.axzo.foundation.event.support;
import cn.axzo.foundation.event.support.consumer.EventHandledWrapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
@ -8,6 +9,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Optional;
import java.util.function.Consumer;
@Data
@Builder
@ -20,6 +22,10 @@ public class EventHandlerContext {
/** 用于打印日志 */
String name;
EventHandler eventHandler;
/** 消费失败的回调 */
Consumer<EventHandledWrapper> exceptionHandler;
/** 消费成功的回调 */
Consumer<EventHandledWrapper> successHandler;
public String getName() {
return Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName());

View File

@ -11,7 +11,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@ -34,13 +33,11 @@ import java.util.stream.Collectors;
@Slf4j
public class DefaultEventConsumer implements EventConsumer {
private final EventHandlerRepository handlerRepository;
private final Consumer<EventWrapper> consumeCallback;
@Builder
public DefaultEventConsumer(EventHandlerRepository handlerRepository, Consumer<EventWrapper> consumeCallback) {
public DefaultEventConsumer(EventHandlerRepository handlerRepository) {
this.handlerRepository = Optional.ofNullable(handlerRepository).orElseGet(() -> EventHandlerRepository.builder()
.build());
this.consumeCallback = consumeCallback;
}
@Override
@ -83,15 +80,6 @@ public class DefaultEventConsumer implements EventConsumer {
Throwables.throwIfUnchecked(ex);
throw new RuntimeException("process event fail", ex);
}
if (consumeCallback != null) {
consumeCallback.accept(EventWrapper.builder()
.event(event)
.consumer(this)
.isHandled(handled)
.context(context)
.build());
}
return handled;
}

View File

@ -85,15 +85,6 @@ public interface EventConsumer {
throw new UnsupportedOperationException();
}
@Builder
@Getter
class EventWrapper {
private Event event;
private EventConsumer consumer;
private boolean isHandled;
private Context context;
}
@Getter
@NoArgsConstructor
class Context {

View File

@ -1,7 +1,6 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandlerContext;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode;
import lombok.AllArgsConstructor;
@ -15,19 +14,27 @@ import lombok.extern.slf4j.Slf4j;
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventExceptionContext {
Throwable exception;
String msg;
public class EventHandledWrapper {
Event event;
EventConsumer.Context consumerContext;
EventHandlerContext handlerContext;
EventConsumer.Context consumeContext;
public void doPrint() {
/** handler的唯一索引, 重试时需要 */
String handlerKey;
/** 用于打印日志 */
String name;
/** 如果抛出异常则有值 */
Throwable exception;
public void doPrintException() {
if (exception == null) {
return;
}
if (BusinessException.class.isAssignableFrom(exception.getClass()) &&
!ResultCode.PROCESS_TIMEOUT.getCode().equals(((BusinessException) exception).getErrorCode())) {
log.warn("MQ, handle warning {}", msg, exception);
log.warn("MQ, handle warning {}", event.toPrettyJsonString(), exception);
} else {
log.error("MQ, handle error {}", msg, exception);
log.error("MQ, handle error {}", event.toPrettyJsonString(), exception);
}
}
}

View File

@ -30,7 +30,7 @@ import java.util.stream.Collectors;
@Slf4j
public class EventHandlerRepository {
final protected ListMultimap<Event.EventCode, EventHandlerContext> handlers = ArrayListMultimap.create();
private final Consumer<EventExceptionContext> exceptionHandler;
private final Consumer<EventHandledWrapper> DEFAULT_EXCEPTION_HANDLER = EventHandledWrapper::doPrintException;
private AntPathMatcher antPathMatcher;
/**
@ -43,13 +43,11 @@ public class EventHandlerRepository {
@Builder
public EventHandlerRepository(Consumer<EventExceptionContext> exceptionHandler,
boolean supportPattern,
public EventHandlerRepository(boolean supportPattern,
Boolean logEnabled,
Predicate<Event> logFilter,
Long logElapsedThreshold,
Long maxAllowElapsedMillis) {
this.exceptionHandler = Optional.ofNullable(exceptionHandler).orElse(EventExceptionContext::doPrint);
this.logEnabled = Optional.ofNullable(logEnabled).orElse(Boolean.TRUE);
this.logFilter = logFilter;
@ -142,13 +140,7 @@ public class EventHandlerRepository {
log.warn(msg);
}
} catch (Exception ex) {
handleException(EventExceptionContext.builder()
.event(event)
.handlerContext(handler)
.msg(event.toPrettyJsonString())
.exception(ex)
.consumerContext(context)
.build());
handleException(event, context, handler, ex);
} finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset();
@ -207,9 +199,21 @@ public class EventHandlerRepository {
return canHandle(event.getEventCode());
}
private void handleException(EventExceptionContext context) {
if (exceptionHandler != null) {
exceptionHandler.accept(context);
private void handleException(Event event,
EventConsumer.Context consumerContext,
EventHandlerContext context,
Exception exception) {
EventHandledWrapper wrapper = EventHandledWrapper.builder()
.event(event)
.consumeContext(consumerContext)
.name(context.getName())
.handlerKey(context.getHandlerKey())
.exception(exception)
.build();
if (context.getExceptionHandler() != null) {
context.getExceptionHandler().accept(wrapper);
} else {
DEFAULT_EXCEPTION_HANDLER.accept(wrapper);
}
}
@ -234,7 +238,7 @@ public class EventHandlerRepository {
}
public boolean getLogEnabled(Event event, EventConsumer.Context context) {
if (handlers.containsKey(EventHeaders.DISABLE_LOG_HEADER)) {
if (context.getHeaders().containsKey(EventHeaders.DISABLE_LOG_HEADER)) {
return false;
}
// consumer显式声明了日志开关则直接使用

View File

@ -26,28 +26,29 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class RetryableEventConsumer {
public class RetryableEventConsumer implements EventConsumer {
EventConsumer eventConsumer;
Map<String, RetryEvent> retryContexts = Maps.newConcurrentMap();
private static final Set<Class<? extends Throwable>> DEFAULT_EXCLUDE_EXCEPTIONS = ImmutableSet.of(BusinessException.class);
private static final BackoffPolicy DEFAULT_BACKOFFPOLICY = ExponentialBackOffPolicy.builder().build();
@Builder
public RetryableEventConsumer(Consumer<RetryContext> consumer) {
this.eventConsumer = DefaultEventConsumer.builder()
.handlerRepository(EventHandlerRepository.builder()
.exceptionHandler(context -> {
context.doPrint();
if (Strings.isNullOrEmpty(context.getHandlerContext().getHandlerKey())) {
private final Consumer<EventHandledWrapper> exceptionHandler;
public RetryableEventConsumer(EventConsumer eventConsumer, Consumer<RetryContext> consumer) {
Preconditions.checkNotNull(eventConsumer);
this.eventConsumer = eventConsumer;
exceptionHandler = context -> {
context.doPrintException();
if (Strings.isNullOrEmpty(context.getHandlerKey())) {
return;
}
RetryEvent retryEvent = retryContexts.get(context.getHandlerContext().getHandlerKey());
RetryEvent retryEvent = retryContexts.get(context.getHandlerKey());
Set<Class<? extends Throwable>> excludeExceptions = retryEvent.getExcludeExceptions();
if (excludeExceptions.stream().anyMatch(e -> e.isAssignableFrom(context.getException().getClass()))) {
return;
}
Integer currentRetryCount = getRetryCount(context.getConsumerContext());
Integer currentRetryCount = getRetryCount(context.getConsumeContext());
Optional<Long> nextRetryMillis = retryEvent.getBackoffPolicy().getNextRetryMillis(currentRetryCount);
if (!nextRetryMillis.isPresent()) {
@ -56,13 +57,11 @@ public class RetryableEventConsumer {
LocalDateTime nextTriggerTime = Instant.ofEpochMilli(nextRetryMillis.get()).atZone(ZoneId.systemDefault()).toLocalDateTime();
consumer.accept(RetryContext.builder()
.event(context.getEvent())
.retryKey(context.getHandlerContext().getHandlerKey())
.retryKey(context.getHandlerKey())
.nextTriggerTime(nextTriggerTime)
.currentRetryCount(currentRetryCount)
.build());
})
.build())
.build();
};
}
public Boolean registerHandler(RetryEvent retryEvent) {
@ -72,6 +71,7 @@ public class RetryableEventConsumer {
.eventHandler(retryEvent.getEventHandler())
.name(retryEvent.getName())
.handlerKey(retryEvent.getRetryKey())
.exceptionHandler(exceptionHandler)
.build());
}
@ -83,6 +83,21 @@ public class RetryableEventConsumer {
return 0;
}
@Override
public Boolean registerHandler(EventHandlerContext context) {
return registerHandler(RetryEvent.builder()
.eventCode(context.getEventCode())
.eventHandler(context.getEventHandler())
.name(context.getName())
.retryKey(context.getHandlerKey())
.build());
}
@Override
public boolean onEvent(String message, Context context) {
return eventConsumer.onEvent(message, context);
}
@Data
@Builder
@NoArgsConstructor

View File

@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableRangeMap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import lombok.Builder;
import java.util.concurrent.TimeUnit;
@ -40,8 +41,9 @@ public class RocketRetryableEventConsumer extends RetryableEventConsumer {
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(60), Long.MAX_VALUE), 18)
.build();
public RocketRetryableEventConsumer(EventProducer eventProducer, String retryTopic) {
super(retryContext -> {
@Builder
public RocketRetryableEventConsumer(EventProducer eventProducer, EventConsumer eventConsumer, String retryTopic) {
super(eventConsumer, retryContext -> {
/** 将延迟毫秒转换为rocketMq的延迟级别 */
Integer delayLevel = LEVEL_MAP.get(retryContext.getNextDelayMillis());