feat: 优化事件消费异常

This commit is contained in:
zengxiaobo 2024-07-08 09:33:37 +08:00
parent 7d531ac876
commit 65ce48ddb8
2 changed files with 20 additions and 15 deletions

View File

@ -6,7 +6,9 @@ import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.*;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
@ -16,11 +18,11 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.CollectionUtils;
import java.util.Objects;
import java.util.Optional;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@ -29,8 +31,7 @@ import java.util.stream.Collectors;
*/
@Slf4j
public class EventHandlerRepository {
final protected ListMultimap<Event.EventCode, EventHandlerContext> handlers = ArrayListMultimap.create();
private final Consumer<EventHandledWrapper> DEFAULT_EXCEPTION_HANDLER = EventHandledWrapper::doPrintException;
protected final ListMultimap<Event.EventCode, EventHandlerContext> handlers = ArrayListMultimap.create();
private AntPathMatcher antPathMatcher;
/**
@ -41,18 +42,22 @@ public class EventHandlerRepository {
private final Boolean logEnabled;
private final Predicate<Event> logFilter;
private Consumer<EventHandledWrapper> globalExceptionHandler;
@Builder
public EventHandlerRepository(boolean supportPattern,
Boolean logEnabled,
Predicate<Event> logFilter,
Long logElapsedThreshold,
Long maxAllowElapsedMillis) {
Long maxAllowElapsedMillis,
Consumer<EventHandledWrapper> globalExceptionHandler) {
this.logEnabled = Optional.ofNullable(logEnabled).orElse(Boolean.TRUE);
this.logFilter = logFilter;
this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L);
this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L);
this.globalExceptionHandler = Optional.ofNullable(globalExceptionHandler).orElse(EventHandledWrapper::doPrintException);
if (supportPattern) {
antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR);
antPathMatcher.setCachePatterns(true);
@ -65,7 +70,7 @@ public class EventHandlerRepository {
//如果传入了key则检查部重复
if (StringUtils.isNoneBlank(handlerHolder.getHandlerKey())) {
Set<String> existsHandlerKeys = handlers.values().stream()
.map(e -> e.getHandlerKey())
.map(EventHandlerContext::getHandlerKey)
.filter(StringUtils::isNoneBlank)
.collect(Collectors.toSet());
Preconditions.checkArgument(!existsHandlerKeys.contains(handlerHolder.getHandlerKey()));
@ -100,7 +105,7 @@ public class EventHandlerRepository {
&& context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) {
byte[] diffMetaHeader = context.getHeaders().get(DiffablePayload.DIFF_META_HEADER);
List<DiffablePayload.DiffMeta> diffMetas = JSON.parseObject(
new String(diffMetaHeader, Charsets.UTF_8), new TypeReference<List<DiffablePayload.DiffMeta>>() {
new String(diffMetaHeader, StandardCharsets.UTF_8), new TypeReference<List<DiffablePayload.DiffMeta>>() {
});
differentiator = DiffablePayload.DiffMeta.toDifferentiator(diffMetas);
}
@ -135,7 +140,7 @@ public class EventHandlerRepository {
}
if (elapsed > maxAllowElapsedMillis) {
String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s",
String msg = String.format("[%s] take too long %d millis for %s to handle %s event=%s",
context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText);
log.warn(msg);
}
@ -163,7 +168,7 @@ public class EventHandlerRepository {
public boolean batch(List<Event> events, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
List<EventHandlerContext> eventHandlers = getEventHandlers(context.getEventCode(), context);
eventHandlers.stream().forEach(handler -> {
eventHandlers.forEach(handler -> {
try {
stopwatch.start();
String clazzName = handler.getClass().getCanonicalName();
@ -210,10 +215,11 @@ public class EventHandlerRepository {
.handlerKey(context.getHandlerKey())
.exception(exception)
.build();
if (globalExceptionHandler != null) {
globalExceptionHandler.accept(wrapper);
}
if (context.getExceptionHandler() != null) {
context.getExceptionHandler().accept(wrapper);
} else {
DEFAULT_EXCEPTION_HANDLER.accept(wrapper);
}
}
@ -243,9 +249,9 @@ public class EventHandlerRepository {
}
// consumer显式声明了日志开关则直接使用
if (logEnabled != null) {
return logEnabled && (logFilter == null || logFilter.apply(event));
return logEnabled && (logFilter == null || logFilter.test(event));
}
return logFilter == null || logFilter.apply(event);
return logFilter == null || logFilter.test(event);
}
private boolean isSupportPattern() {

View File

@ -39,7 +39,6 @@ public class RetryableEventConsumer implements EventConsumer {
Preconditions.checkNotNull(eventConsumer);
this.eventConsumer = eventConsumer;
exceptionHandler = context -> {
context.doPrintException();
if (Strings.isNullOrEmpty(context.getHandlerKey())) {
return;
}