diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/DiffablePayload.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/DiffablePayload.java index 17924ba..ccdb777 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/DiffablePayload.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/DiffablePayload.java @@ -1,6 +1,6 @@ package cn.axzo.foundation.event.support; -import cn.axzo.foundation.event.utils.PayloadUtils; +import cn.axzo.foundation.event.support.utils.PayloadUtils; import com.google.common.collect.ImmutableSet; import lombok.*; diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/Event.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/Event.java index 56750c6..e80c687 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/Event.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/Event.java @@ -40,7 +40,7 @@ public class Event implements Serializable { */ private String eventId; - private String eventVersion; + private String version; private String eventModule; diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerContext.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerContext.java new file mode 100644 index 0000000..587bb89 --- /dev/null +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHandlerContext.java @@ -0,0 +1,32 @@ +package cn.axzo.foundation.event.support; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Optional; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class EventHandlerContext { + Event.EventCode eventCode; + /** handler的唯一索引, 重试时需要 */ + String handlerKey; + /** 用于打印日志 */ + String name; + EventHandler eventHandler; + + public String getName() { + return Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName()); + } + + public void check() { + Preconditions.checkNotNull(eventCode); + Preconditions.checkNotNull(eventHandler); + } +} diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHeaders.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHeaders.java new file mode 100644 index 0000000..ddf6338 --- /dev/null +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/EventHeaders.java @@ -0,0 +1,19 @@ +package cn.axzo.foundation.event.support; + +import cn.axzo.foundation.util.TraceUtils; + +public class EventHeaders { + /** eventCode */ + public static final String EVENT_CODE = "eventCode"; + + /** 指定消费者的handlerKey. 多个逗号分割, eg: CreateUserHandler,UpdateUserHandler */ + public static final String TRIGGER_HANDLER_KEYS = "triggerHandlerKeys"; + + /** 重试次数, 数字 */ + public static final String RETRY_COUNT = "retryCount"; + + public static final String TRACE_ID = TraceUtils.TRACE_ID; + + public static final String DISABLE_LOG_HEADER = "disableLog"; + +} diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java index e2601dc..a4fe3b5 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/BatchEventConsumer.java @@ -2,6 +2,7 @@ package cn.axzo.foundation.event.support.consumer; import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.EventHandler; +import cn.axzo.foundation.event.support.EventHandlerContext; import cn.axzo.foundation.exception.BusinessException; import cn.axzo.foundation.result.ResultCode; import cn.axzo.foundation.util.FastjsonUtils; @@ -32,15 +33,15 @@ public class BatchEventConsumer implements EventConsumer { } @Override - public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { - handlers.put(eventHandler, eventCode); + public Boolean registerHandler(EventHandlerContext context) { + handlers.put(context.getEventHandler(), context.getEventCode()); return null; } @Override - public EventHandlerRepository registerHandlers(List list, EventHandler eventHandler, String name) { + public Boolean registerHandlers(List list, EventHandler eventHandler, String name) { handlers.putAll(eventHandler, list); - return null; + return true; } @Override diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java index c264419..874d7de 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/DefaultEventConsumer.java @@ -1,15 +1,16 @@ package cn.axzo.foundation.event.support.consumer; import cn.axzo.foundation.event.support.Event; -import cn.axzo.foundation.event.support.EventHandler; +import cn.axzo.foundation.event.support.EventHandlerContext; import cn.axzo.foundation.util.FastjsonUtils; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Throwables; +import lombok.Builder; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -32,18 +33,21 @@ import java.util.stream.Collectors; */ @Slf4j public class DefaultEventConsumer implements EventConsumer { - private final EventHandlerRepository handlerRepository; - - private final String appName; private final Consumer consumeCallback; - public DefaultEventConsumer(String appName, EventHandlerRepository handlerRepository, Consumer consumeCallback) { - this.handlerRepository = handlerRepository; - this.appName = appName; + @Builder + public DefaultEventConsumer(EventHandlerRepository handlerRepository, Consumer consumeCallback) { + this.handlerRepository = Optional.ofNullable(handlerRepository).orElseGet(() -> EventHandlerRepository.builder() + .build()); this.consumeCallback = consumeCallback; } + @Override + public Boolean registerHandler(EventHandlerContext context) { + return handlerRepository.registerHandler(context); + } + @Override public boolean onEvent(String message, Context context) { Event event; @@ -71,7 +75,7 @@ public class DefaultEventConsumer implements EventConsumer { context.getSummary(), message, e); return false; } - Boolean handled = false; + Boolean handled; try { handled = handlerRepository.process(event, context); } catch (Exception ex) { @@ -85,7 +89,6 @@ public class DefaultEventConsumer implements EventConsumer { .event(event) .consumer(this) .isHandled(handled) - .ext(context.getExt()) .context(context) .build()); } @@ -95,7 +98,7 @@ public class DefaultEventConsumer implements EventConsumer { @Override public boolean onEvents(List events) { // 默认开启日志,如需关闭日志,在调用onEvent的时候,设置logEnable为false - log.info("====MQ CONSUMER BATCH===={}, events={}", appName, FastjsonUtils.toJsonPettyLogString(events)); + log.info("====MQ CONSUMER BATCH====, events={}", FastjsonUtils.toJsonPettyLogString(events)); Map> eventGroup = events.stream().filter(e -> handlerRepository.canHandle(e.getContext().getEventCode())) .map(BatchEvent::toEvent).collect(Collectors.groupingBy(Event::getEventCode)); if (eventGroup.isEmpty()) { @@ -105,20 +108,9 @@ public class DefaultEventConsumer implements EventConsumer { handlerRepository.batch(entry.getValue(), Context.builder() .eventCode(entry.getKey()) - .maxAllowElapsedMillis(TimeUnit.MINUTES.toMillis(1L)) .build()); } return true; } - - @Override - public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { - return handlerRepository.registerHandler(eventCode, eventHandler, name); - } - - @Override - public EventHandlerRepository registerHandlers(List eventCodes, EventHandler eventHandler, String name) { - return handlerRepository.registerHandlers(eventCodes, eventHandler, name); - } } diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java index fd5c8c4..a4c6d4d 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventConsumer.java @@ -2,25 +2,38 @@ package cn.axzo.foundation.event.support.consumer; import cn.axzo.foundation.event.support.Event; import cn.axzo.foundation.event.support.EventHandler; -import cn.axzo.foundation.event.support.producer.EventProducer; +import cn.axzo.foundation.event.support.EventHandlerContext; +import cn.axzo.foundation.event.support.EventHeaders; import cn.axzo.foundation.util.TraceUtils; import cn.axzo.foundation.util.UUIDBuilder; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Charsets; -import com.google.common.base.Predicate; +import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.base.Supplier; -import lombok.*; -import org.apache.commons.lang3.BooleanUtils; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import org.springframework.util.CollectionUtils; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; public interface EventConsumer { + /** + * 注册EventHandler + * + * @return EventHandlerRepository + */ + Boolean registerHandler(EventHandlerContext context); + /** * 注册EventHandler * @@ -29,10 +42,12 @@ public interface EventConsumer { * @param name handler别名 * @return EventHandlerRepository */ - EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name); - - default EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler) { - return registerHandler(eventCode, eventHandler, null); + default Boolean registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { + return registerHandler(EventHandlerContext.builder() + .eventCode(eventCode) + .eventHandler(eventHandler) + .name(name) + .build()); } /** @@ -43,10 +58,9 @@ public interface EventConsumer { * @param name handler别名 * @return EventHandlerRepository */ - EventHandlerRepository registerHandlers(List eventCodes, EventHandler eventHandler, String name); - - default EventHandlerRepository registerHandlers(List eventCodes, EventHandler eventHandler) { - return registerHandlers(eventCodes, eventHandler, null); + default Boolean registerHandlers(List eventCodes, EventHandler eventHandler, String name) { + eventCodes.forEach(e -> registerHandler(e, eventHandler, name)); + return true; } /** @@ -77,7 +91,6 @@ public interface EventConsumer { private Event event; private EventConsumer consumer; private boolean isHandled; - private Map ext; private Context context; } @@ -88,39 +101,30 @@ public interface EventConsumer { private Event.EventCode eventCode; private String traceId; private String msgId; - /** - * 只elapse 超过多少毫秒才进行日志记录。默认 3ms - */ - private Long logElapsedThreshold; + private String topic; private Map headers; - private Map ext; - private Long maxAllowElapsedMillis; - @Getter(AccessLevel.NONE) - private Boolean logEnabled; - @Getter(AccessLevel.NONE) - private transient Predicate logFilter; + /** * 返回消息消费者offset与全部消息offset之间的差. */ private transient Supplier lagSupplier; @Builder - public Context(Event.EventCode eventCode, String msgId, Long logElapsedThreshold, Map headers, Map ext, - Long maxAllowElapsedMillis, Boolean logEnabled, Predicate logFilter, Supplier lagSupplier) { - this.eventCode = eventCode; + public Context(Event.EventCode eventCode, String msgId, Map headers, + Supplier lagSupplier, String topic) { + this.headers = Optional.ofNullable(headers).orElse(ImmutableMap.of()); - this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L); - this.headers = headers; - this.ext = ext; + this.eventCode = Optional.ofNullable(eventCode).orElseGet(() -> getHeaders().containsKey(EventHeaders.EVENT_CODE) + ? Event.EventCode.from(new String(getHeaders().get(EventHeaders.EVENT_CODE), Charsets.UTF_8)) + : null); + this.topic = Strings.nullToEmpty(topic); this.msgId = Strings.nullToEmpty(msgId); - this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L); - this.logEnabled = logEnabled; - this.logFilter = logFilter; + this.lagSupplier = lagSupplier; - if (!CollectionUtils.isEmpty(headers) && headers.containsKey(TraceUtils.TRACE_ID)) { - this.traceId = new String(headers.get(TraceUtils.TRACE_ID), Charsets.UTF_8); + if (!CollectionUtils.isEmpty(headers) && headers.containsKey(EventHeaders.TRACE_ID)) { + this.traceId = new String(headers.get(EventHeaders.TRACE_ID), Charsets.UTF_8); } if (Strings.isNullOrEmpty(this.traceId)) { this.traceId = UUIDBuilder.generateShortUuid(); @@ -128,37 +132,12 @@ public interface EventConsumer { TraceUtils.putTraceId(this.traceId); } - public Event.EventCode getEventCode() { - if (eventCode == null) { - eventCode = headers.containsKey("eventCode") - ? Event.EventCode.from(new String(headers.get("eventCode"), Charsets.UTF_8)) - : null; + public Set getTriggerHandlerKeys() { + if (getHeaders().containsKey(EventHeaders.TRIGGER_HANDLER_KEYS)) { + return Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings() + .splitToList(new String(getHeaders().get(EventHeaders.TRIGGER_HANDLER_KEYS), Charsets.UTF_8))); } - return eventCode; - } - - public String getTraceId() { - if (Strings.isNullOrEmpty(traceId)) { - // 兼容老的实现 - traceId = UUIDBuilder.generateShortUuid(); - TraceUtils.putTraceId(traceId); - } - return traceId; - } - - public boolean getLogEnabled(Event event) { - // consumer显式声明了日志开关,则直接使用 - if (logEnabled != null) { - return logEnabled && (logFilter == null || logFilter.apply(event)); - } - - // consumer未显式声明日志开关时,根据event的header中的disableLog=true来决定是否关闭日志。 - boolean disableLog = BooleanUtils.toBoolean(new String(Optional.ofNullable(headers).orElse(Collections.emptyMap()) - .getOrDefault(EventProducer.Context.DISABLE_LOG_HEADER, "".getBytes()))); - if (disableLog) { - return false; - } - return logFilter == null || logFilter.apply(event); + return ImmutableSet.of(); } public String getSummary() { diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventExceptionContext.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventExceptionContext.java new file mode 100644 index 0000000..12d0d10 --- /dev/null +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventExceptionContext.java @@ -0,0 +1,33 @@ +package cn.axzo.foundation.event.support.consumer; + +import cn.axzo.foundation.event.support.Event; +import cn.axzo.foundation.event.support.EventHandlerContext; +import cn.axzo.foundation.exception.BusinessException; +import cn.axzo.foundation.result.ResultCode; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class EventExceptionContext { + Throwable exception; + String msg; + Event event; + EventConsumer.Context consumerContext; + EventHandlerContext handlerContext; + + public void doPrint() { + if (BusinessException.class.isAssignableFrom(exception.getClass()) && + !ResultCode.PROCESS_TIMEOUT.getCode().equals(((BusinessException) exception).getErrorCode())) { + log.warn("MQ, handle warning {}", msg, exception); + } else { + log.error("MQ, handle error {}", msg, exception); + } + } +} diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java index a8bb1eb..ccfd88b 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/EventHandlerRepository.java @@ -1,35 +1,26 @@ package cn.axzo.foundation.event.support.consumer; -import cn.axzo.foundation.event.support.DiffablePayload; -import cn.axzo.foundation.event.support.Event; -import cn.axzo.foundation.event.support.EventHandler; -import cn.axzo.foundation.event.support.PayloadDifferentiator; +import cn.axzo.foundation.event.support.*; import cn.axzo.foundation.exception.BusinessException; -import cn.axzo.foundation.result.ResultCode; import cn.axzo.foundation.util.FastjsonUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.base.Strings; +import com.google.common.base.*; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import lombok.Builder; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.util.AntPathMatcher; import org.springframework.util.CollectionUtils; -import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -37,41 +28,53 @@ import java.util.stream.Collectors; * 可以在多个地方使用. */ @Slf4j -public class EventHandlerRepository { - final protected ListMultimap handlers = ArrayListMultimap.create(); - private final BiConsumer exceptionHandler; - +public class EventHandlerRepository { + final protected ListMultimap handlers = ArrayListMultimap.create(); + private final Consumer exceptionHandler; private AntPathMatcher antPathMatcher; - public EventHandlerRepository(BiConsumer exceptionHandler) { - this(exceptionHandler, false); - } + /** + * 只elapse 超过多少毫秒才进行日志记录。默认 3ms + */ + private final Long logElapsedThreshold; + private final Long maxAllowElapsedMillis; + private final Boolean logEnabled; + private final Predicate logFilter; - public EventHandlerRepository(BiConsumer exceptionHandler, boolean supportPattern) { - this.exceptionHandler = exceptionHandler; + @Builder + public EventHandlerRepository(Consumer exceptionHandler, + boolean supportPattern, + Boolean logEnabled, + Predicate logFilter, + Long logElapsedThreshold, + Long maxAllowElapsedMillis) { + this.exceptionHandler = Optional.ofNullable(exceptionHandler).orElse(EventExceptionContext::doPrint); + + this.logEnabled = Optional.ofNullable(logEnabled).orElse(Boolean.TRUE); + this.logFilter = logFilter; + this.logElapsedThreshold = Optional.ofNullable(logElapsedThreshold).orElse(3L); + this.maxAllowElapsedMillis = Optional.ofNullable(maxAllowElapsedMillis).orElse(10_000L); if (supportPattern) { antPathMatcher = new AntPathMatcher(Event.EventCode.SEPARATOR); antPathMatcher.setCachePatterns(true); } } - public EventHandlerRepository() { - this(null); - } + public Boolean registerHandler(EventHandlerContext handlerHolder) { + handlerHolder.check(); - public EventHandlerRepository registerHandler(Event.EventCode eventCode, EventHandler eventHandler, String name) { - Objects.requireNonNull(eventCode); - Objects.requireNonNull(eventHandler); + //如果传入了key则检查部重复 + if (StringUtils.isNoneBlank(handlerHolder.getHandlerKey())) { + Set existsHandlerKeys = handlers.values().stream() + .map(e -> e.getHandlerKey()) + .filter(StringUtils::isNoneBlank) + .collect(Collectors.toSet()); + Preconditions.checkArgument(!existsHandlerKeys.contains(handlerHolder.getHandlerKey())); + } - handlers.put(eventCode, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build()); - return this; - } - - public EventHandlerRepository registerHandlers(List eventCodes, EventHandler eventHandler, String name) { - Objects.requireNonNull(eventHandler); - eventCodes.forEach(e -> handlers.put(e, EventHandlerHolder.builder().name(name).eventHandler(eventHandler).build())); - return this; + handlers.put(handlerHolder.getEventCode(), handlerHolder); + return true; } public boolean canHandle(Event.EventCode eventCode) { @@ -92,9 +95,8 @@ public class EventHandlerRepository { public boolean process(Event event, EventConsumer.Context context) { Stopwatch stopwatch = Stopwatch.createUnstarted(); Event.EventCode eventCode = event.getEventCode(); - List eventHandlers = getEventHandlers(eventCode); - Map handleCosts = Maps.newHashMap(); + Map handleCosts = Maps.newHashMap(); PayloadDifferentiator differentiator = null; if (!CollectionUtils.isEmpty(context.getHeaders()) && context.getHeaders().containsKey(DiffablePayload.DIFF_META_HEADER)) { @@ -122,6 +124,7 @@ public class EventHandlerRepository { String eventLogText = event.toPrettyJsonString(); + List eventHandlers = getEventHandlers(eventCode, context); eventHandlers.forEach(handler -> { try { stopwatch.start(); @@ -129,29 +132,29 @@ public class EventHandlerRepository { long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); // 为了避免太多日志输出,只有处理时间超过 - if (elapsed > context.getLogElapsedThreshold()) { + if (elapsed > logElapsedThreshold) { handleCosts.put(handler.getName(), elapsed); } - if (elapsed > context.getMaxAllowElapsedMillis()) { + if (elapsed > maxAllowElapsedMillis) { String msg = String.format("[%s] take too long %d millis for %s to handle %s\nevent=%s", context.getTraceId(), elapsed, handler.getName(), payloadDiffLog, eventLogText); - handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg); + log.warn(msg); } - } catch (BusinessException ex) { - log.warn("==MQ CONSUMER {}==, handle biz failed, handler = {}, {}\nevent = {}", - context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex); - handleException(ex, event.toPrettyJsonString()); } catch (Exception ex) { - log.error("==MQ CONSUMER {}==, handle failed, handler = {}, {}\nevent = {}", - context.getSummary(), handler.getName(), payloadDiffLog, eventLogText, ex); - handleException(ex, event.toPrettyJsonString()); + handleException(EventExceptionContext.builder() + .event(event) + .handlerContext(handler) + .msg(event.toPrettyJsonString()) + .exception(ex) + .consumerContext(context) + .build()); } finally { // stopwatch必须reset(),否则下一次stopwatch.start()会报错 stopwatch.reset(); } }); - if (context.getLogEnabled(event)) { + if (getLogEnabled(event, context)) { log.info("==MQ CONSUMER {}==, timeCosts = {}, {}\n{}", context.getSummary(), JSON.toJSONString(handleCosts), payloadDiffLog, eventLogText); } @@ -167,7 +170,7 @@ public class EventHandlerRepository { */ public boolean batch(List events, EventConsumer.Context context) { Stopwatch stopwatch = Stopwatch.createUnstarted(); - List eventHandlers = getEventHandlers(context.getEventCode()); + List eventHandlers = getEventHandlers(context.getEventCode(), context); eventHandlers.stream().forEach(handler -> { try { stopwatch.start(); @@ -178,19 +181,17 @@ public class EventHandlerRepository { long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); log.info("====MQ CONSUMER BATCH====, handled by {}, cost {} millis", clazzName, elapsed); - if (elapsed > context.getMaxAllowElapsedMillis()) { + if (elapsed > maxAllowElapsedMillis) { String msg = String.format("take too long %d millis for %s to handle %s", elapsed, clazzName, events); - handleException(new BusinessException(ResultCode.PROCESS_TIMEOUT), msg); + log.warn(msg); } } catch (BusinessException ex) { String payloads = FastjsonUtils.toJsonPettyLogString(events); log.warn("====MQ CONSUMER BATCH====, handle event warning, event = {}", payloads, ex); - handleException(ex, payloads); } catch (Exception ex) { String payloads = FastjsonUtils.toJsonPettyLogString(events); log.error("====MQ CONSUMER BATCH====, handle event error, event = {}", payloads, ex); - handleException(ex, payloads); } finally { // stopwatch必须reset(),否则下一次stopwatch.start()会报错 stopwatch.reset(); @@ -206,37 +207,44 @@ public class EventHandlerRepository { return canHandle(event.getEventCode()); } - private void handleException(Exception ex, String msg) { + private void handleException(EventExceptionContext context) { if (exceptionHandler != null) { - exceptionHandler.accept(ex, msg); + exceptionHandler.accept(context); } } - private List getEventHandlers(Event.EventCode eventCode) { + private List getEventHandlers(Event.EventCode eventCode, EventConsumer.Context context) { + List eventHandlerHolders; if (!isSupportPattern()) { - return handlers.get(eventCode); + eventHandlerHolders = handlers.get(eventCode); + } else { + eventHandlerHolders = handlers.keySet().stream() + .filter(key -> antPathMatcher.match(key.toString(), eventCode.toString())) + .flatMap(key -> handlers.get(key).stream()) + .collect(Collectors.toList()); } - - // 支持pattern的时候,返回所有匹配的Handlers - return handlers.keySet().stream() - .filter(key -> antPathMatcher.match(key.toString(), eventCode.toString())) - .flatMap(key -> handlers.get(key).stream()) + return eventHandlerHolders.stream() + .filter(p -> { + if (CollectionUtils.isEmpty(context.getTriggerHandlerKeys()) || Strings.isNullOrEmpty(p.getHandlerKey())) { + return true; + } + return context.getTriggerHandlerKeys().contains(p.getHandlerKey()); + }) .collect(Collectors.toList()); } + public boolean getLogEnabled(Event event, EventConsumer.Context context) { + if (handlers.containsKey(EventHeaders.DISABLE_LOG_HEADER)) { + return false; + } + // consumer显式声明了日志开关,则直接使用 + if (logEnabled != null) { + return logEnabled && (logFilter == null || logFilter.apply(event)); + } + return logFilter == null || logFilter.apply(event); + } + private boolean isSupportPattern() { return antPathMatcher != null; } - - @Data - private static class EventHandlerHolder { - String name; - EventHandler eventHandler; - - @Builder - public EventHandlerHolder(String name, EventHandler eventHandler) { - this.name = Optional.ofNullable(Strings.emptyToNull(name)).orElse(eventHandler.getClass().getSimpleName()); - this.eventHandler = eventHandler; - } - } } diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RetryableEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RetryableEventConsumer.java new file mode 100644 index 0000000..c82d9e5 --- /dev/null +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RetryableEventConsumer.java @@ -0,0 +1,176 @@ +package cn.axzo.foundation.event.support.consumer; + +import cn.axzo.foundation.event.support.Event; +import cn.axzo.foundation.event.support.EventHandler; +import cn.axzo.foundation.event.support.EventHandlerContext; +import cn.axzo.foundation.event.support.EventHeaders; +import cn.axzo.foundation.exception.BusinessException; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + + +public class RetryableEventConsumer { + EventConsumer eventConsumer; + Map retryContexts = Maps.newConcurrentMap(); + + private static final Set> DEFAULT_EXCLUDE_EXCEPTIONS = ImmutableSet.of(BusinessException.class); + private static final BackoffPolicy DEFAULT_BACKOFFPOLICY = ExponentialBackOffPolicy.builder().build(); + + @Builder + public RetryableEventConsumer(Consumer consumer) { + this.eventConsumer = DefaultEventConsumer.builder() + .handlerRepository(EventHandlerRepository.builder() + .exceptionHandler(context -> { + context.doPrint(); + if (Strings.isNullOrEmpty(context.getHandlerContext().getHandlerKey())) { + return; + } + RetryEvent retryEvent = retryContexts.get(context.getHandlerContext().getHandlerKey()); + Set> excludeExceptions = retryEvent.getExcludeExceptions(); + if (excludeExceptions.stream().anyMatch(e -> e.isAssignableFrom(context.getException().getClass()))) { + return; + } + Integer currentRetryCount = getRetryCount(context.getConsumerContext()); + + Optional nextRetryMillis = retryEvent.getBackoffPolicy().getNextRetryMillis(currentRetryCount); + if (!nextRetryMillis.isPresent()) { + return; + } + LocalDateTime nextTriggerTime = Instant.ofEpochMilli(nextRetryMillis.get()).atZone(ZoneId.systemDefault()).toLocalDateTime(); + consumer.accept(RetryContext.builder() + .event(context.getEvent()) + .retryKey(context.getHandlerContext().getHandlerKey()) + .nextTriggerTime(nextTriggerTime) + .currentRetryCount(currentRetryCount) + .build()); + }) + .build()) + .build(); + } + + public Boolean registerHandler(RetryEvent retryEvent) { + retryContexts.put(retryEvent.getRetryKey(), retryEvent); + return eventConsumer.registerHandler(EventHandlerContext.builder() + .eventCode(retryEvent.getEventCode()) + .eventHandler(retryEvent.getEventHandler()) + .name(retryEvent.getName()) + .handlerKey(retryEvent.getRetryKey()) + .build()); + } + + private Integer getRetryCount(EventConsumer.Context context) { + if (context.getHeaders().containsKey(EventHeaders.TRIGGER_HANDLER_KEYS)) { + return Integer.parseInt(new String(context.getHeaders().get(EventHeaders.TRIGGER_HANDLER_KEYS), Charsets.UTF_8)); + } + //默认从0开始 + return 0; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class RetryContext { + Event event; + String retryKey; + /** 下一次触发的时间 */ + LocalDateTime nextTriggerTime; + /** 下一次延迟的毫秒数 */ + Long nextDelayMillis; + /** 当前重试次数 */ + Integer currentRetryCount; + + public Integer getNextRetryCount() { + return currentRetryCount + 1; + } + } + + @Data + public static class RetryEvent { + String retryKey; + String name; + Event.EventCode eventCode; + EventHandler eventHandler; + Set> excludeExceptions; + BackoffPolicy backoffPolicy; + + @Builder + public RetryEvent(String retryKey, String name, Event.EventCode eventCode, EventHandler eventHandler, Set> excludeExceptions, BackoffPolicy backoffPolicy) { + Preconditions.checkArgument(StringUtils.isNotBlank(retryKey)); + Preconditions.checkNotNull(eventCode); + Preconditions.checkNotNull(eventHandler); + + this.retryKey = retryKey; + this.name = name; + this.eventCode = eventCode; + this.eventHandler = eventHandler; + this.excludeExceptions = Optional.ofNullable(excludeExceptions).orElse(DEFAULT_EXCLUDE_EXCEPTIONS); + this.backoffPolicy = Optional.ofNullable(backoffPolicy).orElse(DEFAULT_BACKOFFPOLICY); + } + } + + interface BackoffPolicy { + Optional getNextRetryMillis(int retryCount); + } + + /** + * delay task执行的重试策略. 每次都按照上次重试的时间间隔指数增长. + * 比如第一次1分钟, 第二次2分钟, 第三次4分钟, 第四次8分钟. + * 参考Spring retry的定义 + * https://github.com/spring-projects/spring-retry/tree/master/src/main/java/org/springframework/retry/backoff + */ + + @NoArgsConstructor + @Data + @Builder + static class ExponentialBackOffPolicy implements BackoffPolicy { + /** + * 指数时间. 建议2 + */ + private Double multiplier; + /** + * 重试的间隔. 后续会按照这个间隔* multiplier的指数计算间隔时间 + */ + private Long intervalMillis; + private Long maxIntervalMillis; + + @Builder + public ExponentialBackOffPolicy(Double multiplier, Long intervalMillis, Long maxIntervalMillis) { + this.multiplier = Optional.ofNullable(multiplier).orElse(2D); + this.intervalMillis = Optional.ofNullable(intervalMillis).orElse(TimeUnit.MINUTES.toMillis(1)); + this.maxIntervalMillis = Optional.ofNullable(maxIntervalMillis).orElse(TimeUnit.DAYS.toMillis(2)); + + Preconditions.checkArgument(this.multiplier >= 1); + Preconditions.checkArgument(this.intervalMillis >= TimeUnit.SECONDS.toMillis(10) + && this.intervalMillis <= TimeUnit.DAYS.toMillis(1)); + Preconditions.checkArgument(this.maxIntervalMillis >= TimeUnit.MINUTES.toMillis(10) + && this.maxIntervalMillis <= TimeUnit.DAYS.toMillis(10)); + } + + + public Optional getNextRetryMillis(int retryCount) { + Long interval = (long) (intervalMillis * Math.pow(multiplier, retryCount)); + if (interval > maxIntervalMillis) { + return Optional.empty(); + } + return Optional.of(interval); + } + } +} diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RocketRetryableEventConsumer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RocketRetryableEventConsumer.java new file mode 100644 index 0000000..1aa3404 --- /dev/null +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/consumer/RocketRetryableEventConsumer.java @@ -0,0 +1,60 @@ +package cn.axzo.foundation.event.support.consumer; + +import cn.axzo.foundation.event.support.EventHeaders; +import cn.axzo.foundation.event.support.producer.EventProducer; +import cn.axzo.foundation.event.support.producer.RocketMQEventProducer; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableRangeMap; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; + +import java.util.concurrent.TimeUnit; + + +public class RocketRetryableEventConsumer extends RetryableEventConsumer { + + /** + * rocketMq延迟级别 + * key = 延迟级别, + * https://rocketmq.apache.org/zh/docs/4.x/producer/04message3 + */ + private static final RangeMap LEVEL_MAP = ImmutableRangeMap.builder() + .put(Range.openClosed(0L, TimeUnit.SECONDS.toMillis(1)), 1) + .put(Range.openClosed(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toMillis(5)), 2) + .put(Range.openClosed(TimeUnit.SECONDS.toMillis(5), TimeUnit.SECONDS.toMillis(10)), 3) + .put(Range.openClosed(TimeUnit.SECONDS.toMillis(10), TimeUnit.SECONDS.toMillis(30)), 4) + .put(Range.openClosed(TimeUnit.SECONDS.toMillis(30), TimeUnit.MINUTES.toMillis(1)), 5) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(1), TimeUnit.MINUTES.toMillis(2)), 6) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(2), TimeUnit.MINUTES.toMillis(3)), 7) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(3), TimeUnit.MINUTES.toMillis(4)), 8) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(4), TimeUnit.MINUTES.toMillis(5)), 9) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(5), TimeUnit.MINUTES.toMillis(6)), 10) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(6), TimeUnit.MINUTES.toMillis(7)), 11) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(7), TimeUnit.MINUTES.toMillis(8)), 12) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(8), TimeUnit.MINUTES.toMillis(9)), 13) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(9), TimeUnit.MINUTES.toMillis(10)), 14) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(10), TimeUnit.MINUTES.toMillis(20)), 15) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(20), TimeUnit.MINUTES.toMillis(30)), 16) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(30), TimeUnit.MINUTES.toMillis(60)), 17) + .put(Range.openClosed(TimeUnit.MINUTES.toMillis(60), Long.MAX_VALUE), 18) + .build(); + + public RocketRetryableEventConsumer(EventProducer eventProducer, String retryTopic) { + super(retryContext -> { + /** 将延迟毫秒转换为rocketMq的延迟级别 */ + Integer delayLevel = LEVEL_MAP.get(retryContext.getNextDelayMillis()); + + eventProducer.send(retryContext.getEvent(), EventProducer.Context.builder() + .headers(ImmutableMap.of(EventHeaders.TRIGGER_HANDLER_KEYS, retryContext.getRetryKey(), + EventHeaders.RETRY_COUNT, retryContext.getNextRetryCount() + "")) + .meta(RocketMQEventProducer.RocketMQMessageMeta.builder() + .delayLevel(delayLevel) + .topic(retryTopic) + .build()) + .build()); + }); + Preconditions.checkNotNull(retryTopic); + Preconditions.checkNotNull(eventProducer); + } +} diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/EventProducer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/EventProducer.java index fff41be..1f93d58 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/EventProducer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/EventProducer.java @@ -1,6 +1,8 @@ package cn.axzo.foundation.event.support.producer; import cn.axzo.foundation.event.support.Event; +import cn.axzo.foundation.event.support.EventHeaders; +import cn.axzo.foundation.event.support.consumer.DefaultEventConsumer; import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.io.BaseEncoding; @@ -47,19 +49,12 @@ public interface EventProducer { /** * header中传递该值,建议调用方关闭当前事件的log打印,一般是数量较多的事件 */ - public static final String DISABLE_LOG_HEADER = "disableLog"; /** * 存储发送消息需要的一些元数据, 例如rabbitMq的exchange, routingKey等 */ private Meta meta; - /** - * 指定的schemahash. 如果context指定了schemahash, 就不会再计算每个event的schemaHash. - * 解决计算schemahash需要消耗资源问题. - */ - String schemaHash; - /** * 发送消息需要附加的请求头 */ @@ -108,12 +103,12 @@ public interface EventProducer { if (this.headers == null) { this.headers = Maps.newHashMap(); } - this.headers.put(DISABLE_LOG_HEADER, Boolean.TRUE.toString()); + this.headers.put(EventHeaders.DISABLE_LOG_HEADER, Boolean.TRUE.toString()); return (Context) this; } public Boolean logEnabled() { - return headers == null || headers.isEmpty() || !BooleanUtils.toBoolean(headers.getOrDefault(DISABLE_LOG_HEADER, "")); + return headers == null || headers.isEmpty() || !BooleanUtils.toBoolean(headers.getOrDefault(EventHeaders.DISABLE_LOG_HEADER, "")); } } diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/KafkaEventProducer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/KafkaEventProducer.java index eb6695e..1c9063e 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/KafkaEventProducer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/KafkaEventProducer.java @@ -1,6 +1,7 @@ package cn.axzo.foundation.event.support.producer; import cn.axzo.foundation.event.support.Event; +import cn.axzo.foundation.event.support.EventHeaders; import cn.axzo.foundation.util.UUIDBuilder; import com.google.common.base.Charsets; import com.google.common.base.Strings; @@ -79,7 +80,7 @@ public class KafkaEventProducer extends AbstractEventProducer { .forEach(entry -> producerRecord.headers().add(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8))); } // 将eventCode放入header,以便快速过滤自己不关注的信息 - producerRecord.headers().add("eventCode", event.getEventCode().toString().getBytes()); + producerRecord.headers().add(EventHeaders.EVENT_CODE, event.getEventCode().toString().getBytes()); //没有添加自定义的header. 后续有要求再添加 diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/LocalEventProducer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/LocalEventProducer.java index 2892cec..c38b266 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/LocalEventProducer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/LocalEventProducer.java @@ -1,11 +1,11 @@ package cn.axzo.foundation.event.support.producer; import cn.axzo.foundation.event.support.Event; +import cn.axzo.foundation.event.support.EventHeaders; import cn.axzo.foundation.event.support.consumer.EventConsumer; import cn.axzo.foundation.util.UUIDBuilder; import com.google.common.base.Charsets; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import lombok.Getter; import lombok.NoArgsConstructor; @@ -67,10 +67,11 @@ public class LocalEventProducer extends AbstractEventProducer { .forEach(entry -> headers.put(entry.getKey(), entry.getValue().getBytes(Charsets.UTF_8))); } // 将eventCode放入header,以便快速过滤自己不关注的信息 - headers.put("eventCode", event.getEventCode().toString().getBytes()); + headers.put(EventHeaders.EVENT_CODE, event.getEventCode().toString().getBytes()); Runnable sender = () -> eventConsumer.onEvent(event.toJsonString(), EventConsumer.Context.builder() - .eventCode(event.getEventCode()).headers(headers).ext(ImmutableMap.of()).build()); + .topic("local") + .eventCode(event.getEventCode()).headers(headers).build()); //提交到executor, 并延迟50ms. 模拟线上produce 到 sender的时间 executor.schedule(sender, 50, TimeUnit.MILLISECONDS); }; diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/RocketMQEventProducer.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/RocketMQEventProducer.java index 43bb4a4..aa9074d 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/RocketMQEventProducer.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/producer/RocketMQEventProducer.java @@ -1,6 +1,7 @@ package cn.axzo.foundation.event.support.producer; import cn.axzo.foundation.event.support.Event; +import cn.axzo.foundation.event.support.EventHeaders; import cn.axzo.foundation.util.UUIDBuilder; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; @@ -18,7 +19,6 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.core.task.TaskExecutor; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeTypeUtils; @@ -40,7 +40,6 @@ public class RocketMQEventProducer extends AbstractEventProducer { String defaultModule, String appName, Context defaultContext, - @Deprecated TaskExecutor taskExecutor, BiConsumer> sendCallback) { super(defaultContext); this.defaultModule = defaultModule; @@ -80,7 +79,7 @@ public class RocketMQEventProducer extends AbstractEventProducer { } // 将eventCode放入header,以便快速过滤自己不关注的信息 Event.EventCode eventCode = event.getEventCode(); - messageBuilder.setHeaderIfAbsent("eventCode", eventCode.toString()); + messageBuilder.setHeaderIfAbsent(EventHeaders.EVENT_CODE, eventCode.toString()); final String eventLogText = context.logEnabled() ? event.toPrettyJsonString() : ""; // 使用lamda构造diff, 解决在rocketMQTemplate.asyncSendOrderly中访问的编译问题. @@ -123,10 +122,10 @@ public class RocketMQEventProducer extends AbstractEventProducer { if (context.getSyncSending()) { // 同步发送 - doSyncSend(destination, messageBuilder, shardingKey, callback); + doSyncSend(destination, messageBuilder, shardingKey, callback, context.getMeta().getDelayLevel()); } else { // 异步发送 - doAsyncSend(destination, messageBuilder, shardingKey, callback); + doAsyncSend(destination, messageBuilder, shardingKey, callback, context.getMeta().getDelayLevel()); } }; } @@ -161,21 +160,27 @@ public class RocketMQEventProducer extends AbstractEventProducer { } }; + Integer delayLevel = context.getMeta().getDelayLevel(); if (context.getSyncSending()) { // 同步发送 - doSyncSend(destination, messageBuilder, shardingKey, callback); + doSyncSend(destination, messageBuilder, shardingKey, callback, delayLevel); } else { // 异步发送 - doAsyncSend(destination, messageBuilder, shardingKey, callback); + doAsyncSend(destination, messageBuilder, shardingKey, callback, delayLevel); } }; } private void doSyncSend(String destination, MessageBuilder messageBuilder, - String shardingKey, SendCallback callback) { + String shardingKey, SendCallback callback, Integer delayLevel) { try { - // 同步发送超时时间支持在properties中设置 - SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), shardingKey); + SendResult sendResult; + if (delayLevel != null) { + sendResult = rocketMQTemplate.syncSend(destination, messageBuilder.build(), 10000L, delayLevel); + } else { + // 同步发送超时时间支持在properties中设置 + sendResult = rocketMQTemplate.syncSendOrderly(destination, messageBuilder.build(), shardingKey); + } callback.onSuccess(sendResult); } catch (Throwable e) { callback.onException(e); @@ -185,9 +190,13 @@ public class RocketMQEventProducer extends AbstractEventProducer { } private void doAsyncSend(String destination, MessageBuilder messageBuilder, - String shardingKey, SendCallback callback) { - // 异步发送 - rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), shardingKey, callback); + String shardingKey, SendCallback callback, Integer delayLevel) { + if (delayLevel != null) { + rocketMQTemplate.asyncSend(destination, messageBuilder.build(), callback, 10000L, delayLevel); + } else { + // 异步发送 + rocketMQTemplate.asyncSendOrderly(destination, messageBuilder.build(), shardingKey, callback); + } } /** @@ -230,13 +239,17 @@ public class RocketMQEventProducer extends AbstractEventProducer { */ private String destination; + /** 延迟级别 */ + private Integer delayLevel; + @Builder - public RocketMQMessageMeta(String topic, String tag, String shardingKey, String destination) { + public RocketMQMessageMeta(String topic, String tag, String shardingKey, String destination, Integer delayLevel) { Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "topic不能为空"); this.topic = topic; this.tag = tag; this.shardingKey = shardingKey; this.destination = destination; + this.delayLevel = delayLevel; } private String buildDestination(String eventName) { diff --git a/event-support-lib/src/main/java/cn/axzo/foundation/event/utils/PayloadUtils.java b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/utils/PayloadUtils.java similarity index 98% rename from event-support-lib/src/main/java/cn/axzo/foundation/event/utils/PayloadUtils.java rename to event-support-lib/src/main/java/cn/axzo/foundation/event/support/utils/PayloadUtils.java index 75768db..dce4259 100644 --- a/event-support-lib/src/main/java/cn/axzo/foundation/event/utils/PayloadUtils.java +++ b/event-support-lib/src/main/java/cn/axzo/foundation/event/support/utils/PayloadUtils.java @@ -1,4 +1,4 @@ -package cn.axzo.foundation.event.utils; +package cn.axzo.foundation.event.support.utils; import cn.axzo.foundation.event.support.PayloadDifferentiator; import cn.axzo.foundation.util.FastjsonUtils;