feat: 事件重试

This commit is contained in:
zengxiaobo 2024-05-24 17:28:57 +08:00
parent 627fa77325
commit 0ef2eba7bc
16 changed files with 506 additions and 196 deletions

View File

@ -1,6 +1,6 @@
package cn.axzo.foundation.event.support; package cn.axzo.foundation.event.support;
import cn.axzo.foundation.event.utils.PayloadUtils; import cn.axzo.foundation.event.support.utils.PayloadUtils;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import lombok.*; import lombok.*;

View File

@ -40,7 +40,7 @@ public class Event implements Serializable {
*/ */
private String eventId; private String eventId;
private String eventVersion; private String version;
private String eventModule; private String eventModule;

View File

@ -0,0 +1,32 @@
package cn.axzo.foundation.event.support;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Optional;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventHandlerContext {
Event.EventCode eventCode;
/** handler的唯一索引, 重试时需要 */
String handlerKey;
/** 用于打印日志 */
String name;
EventHandler eventHandler;
public String getName() {
return Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName());
}
public void check() {
Preconditions.checkNotNull(eventCode);
Preconditions.checkNotNull(eventHandler);
}
}

View File

@ -0,0 +1,19 @@
package cn.axzo.foundation.event.support;
import cn.axzo.foundation.util.TraceUtils;
public class EventHeaders {
/** eventCode */
public static final String EVENT_CODE = "eventCode";
/** 指定消费者的handlerKey. 多个逗号分割, eg: CreateUserHandler,UpdateUserHandler */
public static final String TRIGGER_HANDLER_KEYS = "triggerHandlerKeys";
/** 重试次数, 数字 */
public static final String RETRY_COUNT = "retryCount";
public static final String TRACE_ID = TraceUtils.TRACE_ID;
public static final String DISABLE_LOG_HEADER = "disableLog";
}

View File

@ -2,6 +2,7 @@ package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler; import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.event.support.EventHandlerContext;
import cn.axzo.foundation.exception.BusinessException; import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode; import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.util.FastjsonUtils; import cn.axzo.foundation.util.FastjsonUtils;
@ -32,15 +33,15 @@ public class BatchEventConsumer implements EventConsumer {
} }
@Override @Override
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { public Boolean registerHandler(EventHandlerContext context) {
handlers.put(eventHandler, eventCode); handlers.put(context.getEventHandler(), context.getEventCode());
return null; return null;
} }
@Override @Override
public EventHandlerRepository registerHandlers(List<Event.EventCode> list, EventHandler eventHandler, String name) { public Boolean registerHandlers(List<Event.EventCode> list, EventHandler eventHandler, String name) {
handlers.putAll(eventHandler, list); handlers.putAll(eventHandler, list);
return null; return true;
} }
@Override @Override

View File

@ -1,15 +1,16 @@
package cn.axzo.foundation.event.support.consumer; package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler; import cn.axzo.foundation.event.support.EventHandlerContext;
import cn.axzo.foundation.util.FastjsonUtils; import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -32,18 +33,21 @@ import java.util.stream.Collectors;
*/ */
@Slf4j @Slf4j
public class DefaultEventConsumer implements EventConsumer { public class DefaultEventConsumer implements EventConsumer {
private final EventHandlerRepository handlerRepository; private final EventHandlerRepository handlerRepository;
private final String appName;
private final Consumer<EventWrapper> consumeCallback; private final Consumer<EventWrapper> consumeCallback;
public DefaultEventConsumer(String appName, EventHandlerRepository handlerRepository, Consumer<EventWrapper> consumeCallback) { @Builder
this.handlerRepository = handlerRepository; public DefaultEventConsumer(EventHandlerRepository handlerRepository, Consumer<EventWrapper> consumeCallback) {
this.appName = appName; this.handlerRepository = Optional.ofNullable(handlerRepository).orElseGet(() -> EventHandlerRepository.builder()
.build());
this.consumeCallback = consumeCallback; this.consumeCallback = consumeCallback;
} }
@Override
public Boolean registerHandler(EventHandlerContext context) {
return handlerRepository.registerHandler(context);
}
@Override @Override
public boolean onEvent(String message, Context context) { public boolean onEvent(String message, Context context) {
Event event; Event event;
@ -71,7 +75,7 @@ public class DefaultEventConsumer implements EventConsumer {
context.getSummary(), message, e); context.getSummary(), message, e);
return false; return false;
} }
Boolean handled = false; Boolean handled;
try { try {
handled = handlerRepository.process(event, context); handled = handlerRepository.process(event, context);
} catch (Exception ex) { } catch (Exception ex) {
@ -85,7 +89,6 @@ public class DefaultEventConsumer implements EventConsumer {
.event(event) .event(event)
.consumer(this) .consumer(this)
.isHandled(handled) .isHandled(handled)
.ext(context.getExt())
.context(context) .context(context)
.build()); .build());
} }
@ -95,7 +98,7 @@ public class DefaultEventConsumer implements EventConsumer {
@Override @Override
public boolean onEvents(List<BatchEvent> events) { public boolean onEvents(List<BatchEvent> events) {
// 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false // 默认开启日志如需关闭日志在调用onEvent的时候设置logEnable为false
log.info("====MQ CONSUMER BATCH===={}, events={}", appName, FastjsonUtils.toJsonPettyLogString(events)); log.info("====MQ CONSUMER BATCH====, events={}", FastjsonUtils.toJsonPettyLogString(events));
Map<Event.EventCode, List<Event>> eventGroup = events.stream().filter(e -> handlerRepository.canHandle(e.getContext().getEventCode())) Map<Event.EventCode, List<Event>> eventGroup = events.stream().filter(e -> handlerRepository.canHandle(e.getContext().getEventCode()))
.map(BatchEvent::toEvent).collect(Collectors.groupingBy(Event::getEventCode)); .map(BatchEvent::toEvent).collect(Collectors.groupingBy(Event::getEventCode));
if (eventGroup.isEmpty()) { if (eventGroup.isEmpty()) {
@ -105,20 +108,9 @@ public class DefaultEventConsumer implements EventConsumer {
handlerRepository.batch(entry.getValue(), handlerRepository.batch(entry.getValue(),
Context.builder() Context.builder()
.eventCode(entry.getKey()) .eventCode(entry.getKey())
.maxAllowElapsedMillis(TimeUnit.MINUTES.toMillis(1L))
.build()); .build());
} }
return true; return true;
} }
@Override
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) {
return handlerRepository.registerHandler(eventCode, eventHandler, name);
}
@Override
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name) {
return handlerRepository.registerHandlers(eventCodes, eventHandler, name);
}
} }

View File

@ -2,25 +2,38 @@ package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler; import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.event.support.producer.EventProducer; import cn.axzo.foundation.event.support.EventHandlerContext;
import cn.axzo.foundation.event.support.EventHeaders;
import cn.axzo.foundation.util.TraceUtils; import cn.axzo.foundation.util.TraceUtils;
import cn.axzo.foundation.util.UUIDBuilder; import cn.axzo.foundation.util.UUIDBuilder;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Predicate; import com.google.common.base.Splitter;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import lombok.*; import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.BooleanUtils; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
public interface EventConsumer { public interface EventConsumer {
/**
* 注册EventHandler
*
* @return EventHandlerRepository
*/
Boolean registerHandler(EventHandlerContext context);
/** /**
* 注册EventHandler * 注册EventHandler
* *
@ -29,10 +42,12 @@ public interface EventConsumer {
* @param name handler别名 * @param name handler别名
* @return EventHandlerRepository * @return EventHandlerRepository
*/ */
EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name); default Boolean registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) {
return registerHandler(EventHandlerContext.builder()
default EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) { .eventCode(eventCode)
return registerHandler(eventCode, eventHandler, null); .eventHandler(eventHandler)
.name(name)
.build());
} }
/** /**
@ -43,10 +58,9 @@ public interface EventConsumer {
* @param name handler别名 * @param name handler别名
* @return EventHandlerRepository * @return EventHandlerRepository
*/ */
EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name); default Boolean registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name) {
eventCodes.forEach(e -> registerHandler(e, eventHandler, name));
default EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler) { return true;
return registerHandlers(eventCodes, eventHandler, null);
} }
/** /**
@ -77,7 +91,6 @@ public interface EventConsumer {
private Event event; private Event event;
private EventConsumer consumer; private EventConsumer consumer;
private boolean isHandled; private boolean isHandled;
private Map<String, Object> ext;
private Context context; private Context context;
} }
@ -88,39 +101,30 @@ public interface EventConsumer {
private Event.EventCode eventCode; private Event.EventCode eventCode;
private String traceId; private String traceId;
private String msgId; private String msgId;
/** private String topic;
* 只elapse 超过多少毫秒才进行日志记录默认 3ms
*/
private Long logElapsedThreshold;
private Map<String, byte[]> headers; private Map<String, byte[]> headers;
private Map<String, Object> ext;
private Long maxAllowElapsedMillis;
@Getter(AccessLevel.NONE)
private Boolean logEnabled;
@Getter(AccessLevel.NONE)
private transient Predicate<Event> logFilter;
/** /**
* 返回消息消费者offset与全部消息offset之间的差. * 返回消息消费者offset与全部消息offset之间的差.
*/ */
private transient Supplier<Long> lagSupplier; private transient Supplier<Long> lagSupplier;
@Builder @Builder
public Context(Event.EventCode eventCode, String msgId, Long logElapsedThreshold, Map<String, byte[]> headers, Map<String, Object> ext, public Context(Event.EventCode eventCode, String msgId, Map<String, byte[]> headers,
Long maxAllowElapsedMillis, Boolean logEnabled, Predicate<Event> logFilter, Supplier<Long> lagSupplier) { Supplier<Long> lagSupplier, String topic) {
this.eventCode = eventCode; this.headers = Optional.ofNullable(headers).orElse(ImmutableMap.of());
this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L); this.eventCode = Optional.ofNullable(eventCode).orElseGet(() -> getHeaders().containsKey(EventHeaders.EVENT_CODE)
this.headers = headers; ? Event.EventCode.from(new String(getHeaders().get(EventHeaders.EVENT_CODE), Charsets.UTF_8))
this.ext = ext; : null);
this.topic = Strings.nullToEmpty(topic);
this.msgId = Strings.nullToEmpty(msgId); this.msgId = Strings.nullToEmpty(msgId);
this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L);
this.logEnabled = logEnabled;
this.logFilter = logFilter;
this.lagSupplier = lagSupplier; this.lagSupplier = lagSupplier;
if (!CollectionUtils.isEmpty(headers) && headers.containsKey(TraceUtils.TRACE_ID)) { if (!CollectionUtils.isEmpty(headers) && headers.containsKey(EventHeaders.TRACE_ID)) {
this.traceId = new String(headers.get(TraceUtils.TRACE_ID), Charsets.UTF_8); this.traceId = new String(headers.get(EventHeaders.TRACE_ID), Charsets.UTF_8);
} }
if (Strings.isNullOrEmpty(this.traceId)) { if (Strings.isNullOrEmpty(this.traceId)) {
this.traceId = UUIDBuilder.generateShortUuid(); this.traceId = UUIDBuilder.generateShortUuid();
@ -128,37 +132,12 @@ public interface EventConsumer {
TraceUtils.putTraceId(this.traceId); TraceUtils.putTraceId(this.traceId);
} }
public Event.EventCode getEventCode() { public Set<String> getTriggerHandlerKeys() {
if (eventCode == null) { if (getHeaders().containsKey(EventHeaders.TRIGGER_HANDLER_KEYS)) {
eventCode = headers.containsKey("eventCode") return Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings()
? Event.EventCode.from(new String(headers.get("eventCode"), Charsets.UTF_8)) .splitToList(new String(getHeaders().get(EventHeaders.TRIGGER_HANDLER_KEYS), Charsets.UTF_8)));
: null;
} }
return eventCode; return ImmutableSet.of();
}
public String getTraceId() {
if (Strings.isNullOrEmpty(traceId)) {
// 兼容老的实现
traceId = UUIDBuilder.generateShortUuid();
TraceUtils.putTraceId(traceId);
}
return traceId;
}
public boolean getLogEnabled(Event event) {
// consumer显式声明了日志开关则直接使用
if (logEnabled != null) {
return logEnabled && (logFilter == null || logFilter.apply(event));
}
// consumer未显式声明日志开关时根据event的header中的disableLog=true来决定是否关闭日志
boolean disableLog = BooleanUtils.toBoolean(new String(Optional.ofNullable(headers).orElse(Collections.emptyMap())
.getOrDefault(EventProducer.Context.DISABLE_LOG_HEADER, "".getBytes())));
if (disableLog) {
return false;
}
return logFilter == null || logFilter.apply(event);
} }
public String getSummary() { public String getSummary() {

View File

@ -0,0 +1,33 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandlerContext;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventExceptionContext {
Throwable exception;
String msg;
Event event;
EventConsumer.Context consumerContext;
EventHandlerContext handlerContext;
public void doPrint() {
if (BusinessException.class.isAssignableFrom(exception.getClass()) &&
!ResultCode.PROCESS_TIMEOUT.getCode().equals(((BusinessException) exception).getErrorCode())) {
log.warn("MQ, handle warning {}", msg, exception);
} else {
log.error("MQ, handle error {}", msg, exception);
}
}
}

View File

@ -1,35 +1,26 @@
package cn.axzo.foundation.event.support.consumer; package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.DiffablePayload; import cn.axzo.foundation.event.support.*;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.event.support.PayloadDifferentiator;
import cn.axzo.foundation.exception.BusinessException; import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.util.FastjsonUtils; import cn.axzo.foundation.util.FastjsonUtils;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Charsets; import com.google.common.base.*;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap; import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.Builder; import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.util.AntPathMatcher; import org.springframework.util.AntPathMatcher;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -37,41 +28,53 @@ import java.util.stream.Collectors;
* 可以在多个地方使用. * 可以在多个地方使用.
*/ */
@Slf4j @Slf4j
public class EventHandlerRepository { public class EventHandlerRepository {
final protected ListMultimap<Event.EventCode, EventHandlerHolder> handlers = ArrayListMultimap.create(); final protected ListMultimap<Event.EventCode, EventHandlerContext> handlers = ArrayListMultimap.create();
private final BiConsumer<Exception, String> exceptionHandler; private final Consumer<EventExceptionContext> exceptionHandler;
private AntPathMatcher antPathMatcher; private AntPathMatcher antPathMatcher;
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler) { /**
this(exceptionHandler, false); * 只elapse 超过多少毫秒才进行日志记录默认 3ms
} */
private final Long logElapsedThreshold;
private final Long maxAllowElapsedMillis;
private final Boolean logEnabled;
private final Predicate<Event> logFilter;
public EventHandlerRepository(BiConsumer<Exception, String> exceptionHandler, boolean supportPattern) {
this.exceptionHandler = exceptionHandler;
@Builder
public EventHandlerRepository(Consumer<EventExceptionContext> exceptionHandler,
boolean supportPattern,
Boolean logEnabled,
Predicate<Event> logFilter,
Long logElapsedThreshold,
Long maxAllowElapsedMillis) {
this.exceptionHandler = Optional.ofNullable(exceptionHandler).orElse(EventExceptionContext::doPrint);
this.logEnabled = Optional.ofNullable(logEnabled).orElse(Boolean.TRUE);
this.logFilter = logFilter;
this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L);
this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L);
if (supportPattern) { if (supportPattern) {
antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR); antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR);
antPathMatcher.setCachePatterns(true); antPathMatcher.setCachePatterns(true);
} }
} }
public EventHandlerRepository() { public Boolean registerHandler(EventHandlerContext handlerHolder) {
this(null); handlerHolder.check();
}
public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { //如果传入了key则检查部重复
Objects.requireNonNull(eventCode); if (StringUtils.isNoneBlank(handlerHolder.getHandlerKey())) {
Objects.requireNonNull(eventHandler); Set<String> existsHandlerKeys = handlers.values().stream()
.map(e -> e.getHandlerKey())
.filter(StringUtils::isNoneBlank)
.collect(Collectors.toSet());
Preconditions.checkArgument(!existsHandlerKeys.contains(handlerHolder.getHandlerKey()));
}
handlers.put(eventCode, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build()); handlers.put(handlerHolder.getEventCode(), handlerHolder);
return this; return true;
}
public EventHandlerRepository registerHandlers(List<Event.EventCode> eventCodes, EventHandler eventHandler, String name) {
Objects.requireNonNull(eventHandler);
eventCodes.forEach(e -> handlers.put(e, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build()));
return this;
} }
public boolean canHandle(Event.EventCode eventCode) { public boolean canHandle(Event.EventCode eventCode) {
@ -92,9 +95,8 @@ public class EventHandlerRepository {
public boolean process(Event event, EventConsumer.Context context) { public boolean process(Event event, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
Event.EventCode eventCode = event.getEventCode(); Event.EventCode eventCode = event.getEventCode();
List<EventHandlerHolder> eventHandlers = getEventHandlers(eventCode);
Map<String, Long> handleCosts = Maps.newHashMap();
Map<String, Long> handleCosts = Maps.newHashMap();
PayloadDifferentiator differentiator = null; PayloadDifferentiator differentiator = null;
if (!CollectionUtils.isEmpty(context.getHeaders()) if (!CollectionUtils.isEmpty(context.getHeaders())
&& context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) { && context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) {
@ -122,6 +124,7 @@ public class EventHandlerRepository {
String eventLogText = event.toPrettyJsonString(); String eventLogText = event.toPrettyJsonString();
List<EventHandlerContext> eventHandlers = getEventHandlers(eventCode, context);
eventHandlers.forEach(handler -> { eventHandlers.forEach(handler -> {
try { try {
stopwatch.start(); stopwatch.start();
@ -129,29 +132,29 @@ public class EventHandlerRepository {
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
// 为了避免太多日志输出只有处理时间超过 // 为了避免太多日志输出只有处理时间超过
if (elapsed > context.getLogElapsedThreshold()) { if (elapsed > logElapsedThreshold) {
handleCosts.put(handler.getName(), elapsed); handleCosts.put(handler.getName(), elapsed);
} }
if (elapsed > context.getMaxAllowElapsedMillis()) { if (elapsed > maxAllowElapsedMillis) {
String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s", String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s",
context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText); context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg); log.warn(msg);
} }
} catch (BusinessException ex) {
log.warn("==MQ CONSUMER {}==, handle biz failed, handler = {}, {}\nevent = {}",
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex);
handleException(ex, event.toPrettyJsonString());
} catch (Exception ex) { } catch (Exception ex) {
log.error("==MQ CONSUMER {}==, handle failed, handler = {}, {}\nevent = {}", handleException(EventExceptionContext.builder()
context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex); .event(event)
handleException(ex, event.toPrettyJsonString()); .handlerContext(handler)
.msg(event.toPrettyJsonString())
.exception(ex)
.consumerContext(context)
.build());
} finally { } finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错 // stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset(); stopwatch.reset();
} }
}); });
if (context.getLogEnabled(event)) { if (getLogEnabled(event, context)) {
log.info("==MQ CONSUMER {}==, timeCosts = {}, {}\n{}", log.info("==MQ CONSUMER {}==, timeCosts = {}, {}\n{}",
context.getSummary(), JSON.toJSONString(handleCosts), payloadDiffLog, eventLogText); context.getSummary(), JSON.toJSONString(handleCosts), payloadDiffLog, eventLogText);
} }
@ -167,7 +170,7 @@ public class EventHandlerRepository {
*/ */
public boolean batch(List<Event> events, EventConsumer.Context context) { public boolean batch(List<Event> events, EventConsumer.Context context) {
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
List<EventHandlerHolder> eventHandlers = getEventHandlers(context.getEventCode()); List<EventHandlerContext> eventHandlers = getEventHandlers(context.getEventCode(), context);
eventHandlers.stream().forEach(handler -> { eventHandlers.stream().forEach(handler -> {
try { try {
stopwatch.start(); stopwatch.start();
@ -178,19 +181,17 @@ public class EventHandlerRepository {
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed); log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed);
if (elapsed > context.getMaxAllowElapsedMillis()) { if (elapsed > maxAllowElapsedMillis) {
String msg = String.format("take too long %d millis for %s to handle %s", String msg = String.format("take too long %d millis for %s to handle %s",
elapsed, clazzName, events); elapsed, clazzName, events);
handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg); log.warn(msg);
} }
} catch (BusinessException ex) { } catch (BusinessException ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events); String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex); log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex);
handleException(ex, payloads);
} catch (Exception ex) { } catch (Exception ex) {
String payloads = FastjsonUtils.toJsonPettyLogString(events); String payloads = FastjsonUtils.toJsonPettyLogString(events);
log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex); log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex);
handleException(ex, payloads);
} finally { } finally {
// stopwatch必须reset()否则下一次stopwatch.start()会报错 // stopwatch必须reset()否则下一次stopwatch.start()会报错
stopwatch.reset(); stopwatch.reset();
@ -206,37 +207,44 @@ public class EventHandlerRepository {
return canHandle(event.getEventCode()); return canHandle(event.getEventCode());
} }
private void handleException(Exception ex, String msg) { private void handleException(EventExceptionContext context) {
if (exceptionHandler != null) { if (exceptionHandler != null) {
exceptionHandler.accept(ex, msg); exceptionHandler.accept(context);
} }
} }
private List<EventHandlerHolder> getEventHandlers(Event.EventCode eventCode) { private List<EventHandlerContext> getEventHandlers(Event.EventCode eventCode, EventConsumer.Context context) {
List<EventHandlerContext> eventHandlerHolders;
if (!isSupportPattern()) { if (!isSupportPattern()) {
return handlers.get(eventCode); eventHandlerHolders = handlers.get(eventCode);
} else {
eventHandlerHolders = handlers.keySet().stream()
.filter(key -> antPathMatcher.match(key.toString(), eventCode.toString()))
.flatMap(key -> handlers.get(key).stream())
.collect(Collectors.toList());
} }
return eventHandlerHolders.stream()
// 支持pattern的时候返回所有匹配的Handlers .filter(p -> {
return handlers.keySet().stream() if (CollectionUtils.isEmpty(context.getTriggerHandlerKeys()) || Strings.isNullOrEmpty(p.getHandlerKey())) {
.filter(key -> antPathMatcher.match(key.toString(), eventCode.toString())) return true;
.flatMap(key -> handlers.get(key).stream()) }
return context.getTriggerHandlerKeys().contains(p.getHandlerKey());
})
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public boolean getLogEnabled(Event event, EventConsumer.Context context) {
if (handlers.containsKey(EventHeaders.DISABLE_LOG_HEADER)) {
return false;
}
// consumer显式声明了日志开关则直接使用
if (logEnabled != null) {
return logEnabled && (logFilter == null || logFilter.apply(event));
}
return logFilter == null || logFilter.apply(event);
}
private boolean isSupportPattern() { private boolean isSupportPattern() {
return antPathMatcher != null; return antPathMatcher != null;
} }
@Data
private static class EventHandlerHolder {
String name;
EventHandler eventHandler;
@Builder
public EventHandlerHolder(String name, EventHandler eventHandler) {
this.name = Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName());
this.eventHandler = eventHandler;
}
}
} }

View File

@ -0,0 +1,176 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHandler;
import cn.axzo.foundation.event.support.EventHandlerContext;
import cn.axzo.foundation.event.support.EventHeaders;
import cn.axzo.foundation.exception.BusinessException;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class RetryableEventConsumer {
EventConsumer eventConsumer;
Map<String, RetryEvent> retryContexts = Maps.newConcurrentMap();
private static final Set<Class<? extends Throwable>> DEFAULT_EXCLUDE_EXCEPTIONS = ImmutableSet.of(BusinessException.class);
private static final BackoffPolicy DEFAULT_BACKOFFPOLICY = ExponentialBackOffPolicy.builder().build();
@Builder
public RetryableEventConsumer(Consumer<RetryContext> consumer) {
this.eventConsumer = DefaultEventConsumer.builder()
.handlerRepository(EventHandlerRepository.builder()
.exceptionHandler(context -> {
context.doPrint();
if (Strings.isNullOrEmpty(context.getHandlerContext().getHandlerKey())) {
return;
}
RetryEvent retryEvent = retryContexts.get(context.getHandlerContext().getHandlerKey());
Set<Class<? extends Throwable>> excludeExceptions = retryEvent.getExcludeExceptions();
if (excludeExceptions.stream().anyMatch(e -> e.isAssignableFrom(context.getException().getClass()))) {
return;
}
Integer currentRetryCount = getRetryCount(context.getConsumerContext());
Optional<Long> nextRetryMillis = retryEvent.getBackoffPolicy().getNextRetryMillis(currentRetryCount);
if (!nextRetryMillis.isPresent()) {
return;
}
LocalDateTime nextTriggerTime = Instant.ofEpochMilli(nextRetryMillis.get()).atZone(ZoneId.systemDefault()).toLocalDateTime();
consumer.accept(RetryContext.builder()
.event(context.getEvent())
.retryKey(context.getHandlerContext().getHandlerKey())
.nextTriggerTime(nextTriggerTime)
.currentRetryCount(currentRetryCount)
.build());
})
.build())
.build();
}
public Boolean registerHandler(RetryEvent retryEvent) {
retryContexts.put(retryEvent.getRetryKey(), retryEvent);
return eventConsumer.registerHandler(EventHandlerContext.builder()
.eventCode(retryEvent.getEventCode())
.eventHandler(retryEvent.getEventHandler())
.name(retryEvent.getName())
.handlerKey(retryEvent.getRetryKey())
.build());
}
private Integer getRetryCount(EventConsumer.Context context) {
if (context.getHeaders().containsKey(EventHeaders.TRIGGER_HANDLER_KEYS)) {
return Integer.parseInt(new String(context.getHeaders().get(EventHeaders.TRIGGER_HANDLER_KEYS), Charsets.UTF_8));
}
//默认从0开始
return 0;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class RetryContext {
Event event;
String retryKey;
/** 下一次触发的时间 */
LocalDateTime nextTriggerTime;
/** 下一次延迟的毫秒数 */
Long nextDelayMillis;
/** 当前重试次数 */
Integer currentRetryCount;
public Integer getNextRetryCount() {
return currentRetryCount + 1;
}
}
@Data
public static class RetryEvent {
String retryKey;
String name;
Event.EventCode eventCode;
EventHandler eventHandler;
Set<Class<? extends Throwable>> excludeExceptions;
BackoffPolicy backoffPolicy;
@Builder
public RetryEvent(String retryKey, String name, Event.EventCode eventCode, EventHandler eventHandler, Set<Class<? extends Throwable>> excludeExceptions, BackoffPolicy backoffPolicy) {
Preconditions.checkArgument(StringUtils.isNotBlank(retryKey));
Preconditions.checkNotNull(eventCode);
Preconditions.checkNotNull(eventHandler);
this.retryKey = retryKey;
this.name = name;
this.eventCode = eventCode;
this.eventHandler = eventHandler;
this.excludeExceptions = Optional.ofNullable(excludeExceptions).orElse(DEFAULT_EXCLUDE_EXCEPTIONS);
this.backoffPolicy = Optional.ofNullable(backoffPolicy).orElse(DEFAULT_BACKOFFPOLICY);
}
}
interface BackoffPolicy {
Optional<Long> getNextRetryMillis(int retryCount);
}
/**
* delay task执行的重试策略. 每次都按照上次重试的时间间隔指数增长.
* 比如第一次1分钟, 第二次2分钟, 第三次4分钟, 第四次8分钟.
* 参考Spring retry的定义
* https://github.com/spring-projects/spring-retry/tree/master/src/main/java/org/springframework/retry/backoff
*/
@NoArgsConstructor
@Data
@Builder
static class ExponentialBackOffPolicy implements BackoffPolicy {
/**
* 指数时间. 建议2
*/
private Double multiplier;
/**
* 重试的间隔. 后续会按照这个间隔* multiplier的指数计算间隔时间
*/
private Long intervalMillis;
private Long maxIntervalMillis;
@Builder
public ExponentialBackOffPolicy(Double multiplier, Long intervalMillis, Long maxIntervalMillis) {
this.multiplier = Optional.ofNullable(multiplier).orElse(2D);
this.intervalMillis = Optional.ofNullable(intervalMillis).orElse(TimeUnit.MINUTES.toMillis(1));
this.maxIntervalMillis = Optional.ofNullable(maxIntervalMillis).orElse(TimeUnit.DAYS.toMillis(2));
Preconditions.checkArgument(this.multiplier >= 1);
Preconditions.checkArgument(this.intervalMillis >= TimeUnit.SECONDS.toMillis(10)
&& this.intervalMillis <= TimeUnit.DAYS.toMillis(1));
Preconditions.checkArgument(this.maxIntervalMillis >= TimeUnit.MINUTES.toMillis(10)
&& this.maxIntervalMillis <= TimeUnit.DAYS.toMillis(10));
}
public Optional<Long> getNextRetryMillis(int retryCount) {
Long interval = (long) (intervalMillis * Math.pow(multiplier, retryCount));
if (interval > maxIntervalMillis) {
return Optional.empty();
}
return Optional.of(interval);
}
}
}

View File

@ -0,0 +1,60 @@
package cn.axzo.foundation.event.support.consumer;
import cn.axzo.foundation.event.support.EventHeaders;
import cn.axzo.foundation.event.support.producer.EventProducer;
import cn.axzo.foundation.event.support.producer.RocketMQEventProducer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableRangeMap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import java.util.concurrent.TimeUnit;
public class RocketRetryableEventConsumer extends RetryableEventConsumer {
/**
* rocketMq延迟级别
* key = 延迟级别,
* https://rocketmq.apache.org/zh/docs/4.x/producer/04message3
*/
private static final RangeMap<Long, Integer> LEVEL_MAP = ImmutableRangeMap.<Long, Integer>builder()
.put(Range.openClosed(0L, TimeUnit.SECONDS.toMillis(1)), 1)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toMillis(5)), 2)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(5), TimeUnit.SECONDS.toMillis(10)), 3)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(10), TimeUnit.SECONDS.toMillis(30)), 4)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(30), TimeUnit.MINUTES.toMillis(1)), 5)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(1), TimeUnit.MINUTES.toMillis(2)), 6)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(2), TimeUnit.MINUTES.toMillis(3)), 7)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(3), TimeUnit.MINUTES.toMillis(4)), 8)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(4), TimeUnit.MINUTES.toMillis(5)), 9)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(5), TimeUnit.MINUTES.toMillis(6)), 10)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(6), TimeUnit.MINUTES.toMillis(7)), 11)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(7), TimeUnit.MINUTES.toMillis(8)), 12)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(8), TimeUnit.MINUTES.toMillis(9)), 13)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(9), TimeUnit.MINUTES.toMillis(10)), 14)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(10), TimeUnit.MINUTES.toMillis(20)), 15)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(20), TimeUnit.MINUTES.toMillis(30)), 16)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(30), TimeUnit.MINUTES.toMillis(60)), 17)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(60), Long.MAX_VALUE), 18)
.build();
public RocketRetryableEventConsumer(EventProducer eventProducer, String retryTopic) {
super(retryContext -> {
/** 将延迟毫秒转换为rocketMq的延迟级别 */
Integer delayLevel = LEVEL_MAP.get(retryContext.getNextDelayMillis());
eventProducer.send(retryContext.getEvent(), EventProducer.Context.builder()
.headers(ImmutableMap.of(EventHeaders.TRIGGER_HANDLER_KEYS, retryContext.getRetryKey(),
EventHeaders.RETRY_COUNT, retryContext.getNextRetryCount() + ""))
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.delayLevel(delayLevel)
.topic(retryTopic)
.build())
.build());
});
Preconditions.checkNotNull(retryTopic);
Preconditions.checkNotNull(eventProducer);
}
}

View File

@ -1,6 +1,8 @@
package cn.axzo.foundation.event.support.producer; package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHeaders;
import cn.axzo.foundation.event.support.consumer.DefaultEventConsumer;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding; import com.google.common.io.BaseEncoding;
@ -47,19 +49,12 @@ public interface EventProducer<Meta> {
/** /**
* header中传递该值建议调用方关闭当前事件的log打印一般是数量较多的事件 * header中传递该值建议调用方关闭当前事件的log打印一般是数量较多的事件
*/ */
public static final String DISABLE_LOG_HEADER = "disableLog";
/** /**
* 存储发送消息需要的一些元数据, 例如rabbitMq的exchange, routingKey等 * 存储发送消息需要的一些元数据, 例如rabbitMq的exchange, routingKey等
*/ */
private Meta meta; private Meta meta;
/**
* 指定的schemahash. 如果context指定了schemahash, 就不会再计算每个event的schemaHash.
* 解决计算schemahash需要消耗资源问题.
*/
String schemaHash;
/** /**
* 发送消息需要附加的请求头 * 发送消息需要附加的请求头
*/ */
@ -108,12 +103,12 @@ public interface EventProducer<Meta> {
if (this.headers == null) { if (this.headers == null) {
this.headers = Maps.newHashMap(); this.headers = Maps.newHashMap();
} }
this.headers.put(DISABLE_LOG_HEADER, Boolean.TRUE.toString()); this.headers.put(EventHeaders.DISABLE_LOG_HEADER, Boolean.TRUE.toString());
return (Context<Meta>) this; return (Context<Meta>) this;
} }
public Boolean logEnabled() { public Boolean logEnabled() {
return headers == null || headers.isEmpty() || !BooleanUtils.toBoolean(headers.getOrDefault(DISABLE_LOG_HEADER, "")); return headers == null || headers.isEmpty() || !BooleanUtils.toBoolean(headers.getOrDefault(EventHeaders.DISABLE_LOG_HEADER, ""));
} }
} }

View File

@ -1,6 +1,7 @@
package cn.axzo.foundation.event.support.producer; package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHeaders;
import cn.axzo.foundation.util.UUIDBuilder; import cn.axzo.foundation.util.UUIDBuilder;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Strings; import com.google.common.base.Strings;
@ -79,7 +80,7 @@ public class KafkaEventProducer extends AbstractEventProducer {
.forEach(entry -> producerRecord.headers().add(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8))); .forEach(entry -> producerRecord.headers().add(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8)));
} }
// 将eventCode放入header以便快速过滤自己不关注的信息 // 将eventCode放入header以便快速过滤自己不关注的信息
producerRecord.headers().add("eventCode", event.getEventCode().toString().getBytes()); producerRecord.headers().add(EventHeaders.EVENT_CODE, event.getEventCode().toString().getBytes());
//没有添加自定义的header. 后续有要求再添加 //没有添加自定义的header. 后续有要求再添加

View File

@ -1,11 +1,11 @@
package cn.axzo.foundation.event.support.producer; package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHeaders;
import cn.axzo.foundation.event.support.consumer.EventConsumer; import cn.axzo.foundation.event.support.consumer.EventConsumer;
import cn.axzo.foundation.util.UUIDBuilder; import cn.axzo.foundation.util.UUIDBuilder;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -67,10 +67,11 @@ public class LocalEventProducer extends AbstractEventProducer {
.forEach(entry -> headers.put(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8))); .forEach(entry -> headers.put(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8)));
} }
// 将eventCode放入header以便快速过滤自己不关注的信息 // 将eventCode放入header以便快速过滤自己不关注的信息
headers.put("eventCode", event.getEventCode().toString().getBytes()); headers.put(EventHeaders.EVENT_CODE, event.getEventCode().toString().getBytes());
Runnable sender = () -> eventConsumer.onEvent(event.toJsonString(), EventConsumer.Context.builder() Runnable sender = () -> eventConsumer.onEvent(event.toJsonString(), EventConsumer.Context.builder()
.eventCode(event.getEventCode()).headers(headers).ext(ImmutableMap.of()).build()); .topic("local")
.eventCode(event.getEventCode()).headers(headers).build());
//提交到executor, 并延迟50ms. 模拟线上produce sender的时间 //提交到executor, 并延迟50ms. 模拟线上produce sender的时间
executor.schedule(sender, 50, TimeUnit.MILLISECONDS); executor.schedule(sender, 50, TimeUnit.MILLISECONDS);
}; };

View File

@ -1,6 +1,7 @@
package cn.axzo.foundation.event.support.producer; package cn.axzo.foundation.event.support.producer;
import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.Event;
import cn.axzo.foundation.event.support.EventHeaders;
import cn.axzo.foundation.util.UUIDBuilder; import cn.axzo.foundation.util.UUIDBuilder;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -18,7 +19,6 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils; import org.springframework.util.MimeTypeUtils;
@ -40,7 +40,6 @@ public class RocketMQEventProducer extends AbstractEventProducer {
String defaultModule, String defaultModule,
String appName, String appName,
Context<RocketMQMessageMeta> defaultContext, Context<RocketMQMessageMeta> defaultContext,
@Deprecated TaskExecutor taskExecutor,
BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback) { BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback) {
super(defaultContext); super(defaultContext);
this.defaultModule = defaultModule; this.defaultModule = defaultModule;
@ -80,7 +79,7 @@ public class RocketMQEventProducer extends AbstractEventProducer {
} }
// 将eventCode放入header以便快速过滤自己不关注的信息 // 将eventCode放入header以便快速过滤自己不关注的信息
Event.EventCode eventCode = event.getEventCode(); Event.EventCode eventCode = event.getEventCode();
messageBuilder.setHeaderIfAbsent("eventCode", eventCode.toString()); messageBuilder.setHeaderIfAbsent(EventHeaders.EVENT_CODE, eventCode.toString());
final String eventLogText = context.logEnabled() ? event.toPrettyJsonString() : ""; final String eventLogText = context.logEnabled() ? event.toPrettyJsonString() : "";
// 使用lamda构造diff, 解决在rocketMQTemplate.asyncSendOrderly中访问的编译问题. // 使用lamda构造diff, 解决在rocketMQTemplate.asyncSendOrderly中访问的编译问题.
@ -123,10 +122,10 @@ public class RocketMQEventProducer extends AbstractEventProducer {
if (context.getSyncSending()) { if (context.getSyncSending()) {
// 同步发送 // 同步发送
doSyncSend(destination, messageBuilder, shardingKey, callback); doSyncSend(destination, messageBuilder, shardingKey, callback, context.getMeta().getDelayLevel());
} else { } else {
// 异步发送 // 异步发送
doAsyncSend(destination, messageBuilder, shardingKey, callback); doAsyncSend(destination, messageBuilder, shardingKey, callback, context.getMeta().getDelayLevel());
} }
}; };
} }
@ -161,21 +160,27 @@ public class RocketMQEventProducer extends AbstractEventProducer {
} }
}; };
Integer delayLevel = context.getMeta().getDelayLevel();
if (context.getSyncSending()) { if (context.getSyncSending()) {
// 同步发送 // 同步发送
doSyncSend(destination, messageBuilder, shardingKey, callback); doSyncSend(destination, messageBuilder, shardingKey, callback, delayLevel);
} else { } else {
// 异步发送 // 异步发送
doAsyncSend(destination, messageBuilder, shardingKey, callback); doAsyncSend(destination, messageBuilder, shardingKey, callback, delayLevel);
} }
}; };
} }
private void doSyncSend(String destination, MessageBuilder<?> messageBuilder, private void doSyncSend(String destination, MessageBuilder<?> messageBuilder,
String shardingKey, SendCallback callback) { String shardingKey, SendCallback callback, Integer delayLevel) {
try { try {
// 同步发送超时时间支持在properties中设置 SendResult sendResult;
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), shardingKey); if (delayLevel != null) {
sendResult = rocketMQTemplate.syncSend(destination, messageBuilder.build(), 10000L, delayLevel);
} else {
// 同步发送超时时间支持在properties中设置
sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), shardingKey);
}
callback.onSuccess(sendResult); callback.onSuccess(sendResult);
} catch (Throwable e) { } catch (Throwable e) {
callback.onException(e); callback.onException(e);
@ -185,9 +190,13 @@ public class RocketMQEventProducer extends AbstractEventProducer {
} }
private void doAsyncSend(String destination, MessageBuilder<?> messageBuilder, private void doAsyncSend(String destination, MessageBuilder<?> messageBuilder,
String shardingKey, SendCallback callback) { String shardingKey, SendCallback callback, Integer delayLevel) {
// 异步发送 if (delayLevel != null) {
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), shardingKey, callback); rocketMQTemplate.asyncSend(destination, messageBuilder.build(), callback, 10000L, delayLevel);
} else {
// 异步发送
rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), shardingKey, callback);
}
} }
/** /**
@ -230,13 +239,17 @@ public class RocketMQEventProducer extends AbstractEventProducer {
*/ */
private String destination; private String destination;
/** 延迟级别 */
private Integer delayLevel;
@Builder @Builder
public RocketMQMessageMeta(String topic, String tag, String shardingKey, String destination) { public RocketMQMessageMeta(String topic, String tag, String shardingKey, String destination, Integer delayLevel) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "topic不能为空"); Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "topic不能为空");
this.topic = topic; this.topic = topic;
this.tag = tag; this.tag = tag;
this.shardingKey = shardingKey; this.shardingKey = shardingKey;
this.destination = destination; this.destination = destination;
this.delayLevel = delayLevel;
} }
private String buildDestination(String eventName) { private String buildDestination(String eventName) {

View File

@ -1,4 +1,4 @@
package cn.axzo.foundation.event.utils; package cn.axzo.foundation.event.support.utils;
import cn.axzo.foundation.event.support.PayloadDifferentiator; import cn.axzo.foundation.event.support.PayloadDifferentiator;
import cn.axzo.foundation.util.FastjsonUtils; import cn.axzo.foundation.util.FastjsonUtils;