feat: 默认消费

This commit is contained in:
zengxiaobo 2024-05-25 16:56:34 +08:00
parent 1496b79b70
commit 5df04cde38
3 changed files with 80 additions and 35 deletions

View File

@ -0,0 +1,45 @@
package cn.axzo.foundation.event.support.consumer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import java.util.Map;
import java.util.Optional;
@Slf4j
public class DefaultRocketMQListener implements RocketMQListener<MessageExt> {
EventConsumer eventConsumer;
@Builder
public DefaultRocketMQListener(EventConsumer eventConsumer) {
Preconditions.checkNotNull(eventConsumer);
this.eventConsumer = eventConsumer;
}
@Override
public void onMessage(MessageExt message) {
String topic = message.getTopic();
String value = new String(message.getBody());
Map<String, String> headers = message.getProperties();
if (log.isDebugEnabled()) {
log.debug("received message, topic={}, headers={}, value={}", topic, headers, value);
}
//当前消息所在分区的lag. 而不是整个topic
Long partitionLag = Optional.ofNullable(message.getProperties().get(MessageConst.PROPERTY_MAX_OFFSET))
.map(e -> Long.parseLong(e) - message.getQueueOffset()).orElse(0L);
eventConsumer.onEvent(value, EventConsumer.Context.builder()
.topic(topic)
.msgId(message.getMsgId())
.headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[]{})))
.lagSupplier(() -> partitionLag)
.build());
}
}

View File

@ -50,7 +50,8 @@ public class RetryableEventConsumer implements EventConsumer {
}
Integer currentRetryCount = getRetryCount(context.getConsumeContext());
Optional<Long> nextRetryMillis = retryEvent.getBackoffPolicy().getNextRetryMillis(currentRetryCount);
Optional<Long> nextRetryMillis = Optional.ofNullable(retryEvent.getBackoffPolicy())
.orElse(DEFAULT_BACKOFFPOLICY).getNextRetryMillis(currentRetryCount);
if (!nextRetryMillis.isPresent()) {
return;
}
@ -59,6 +60,7 @@ public class RetryableEventConsumer implements EventConsumer {
.event(context.getEvent())
.retryKey(context.getHandlerKey())
.nextTriggerTime(nextTriggerTime)
.nextDelayMillis(nextRetryMillis.get())
.currentRetryCount(currentRetryCount)
.build());
};
@ -76,8 +78,8 @@ public class RetryableEventConsumer implements EventConsumer {
}
private Integer getRetryCount(EventConsumer.Context context) {
if (context.getHeaders().containsKey(EventHeaders.TRIGGER_HANDLER_KEYS)) {
return Integer.parseInt(new String(context.getHeaders().get(EventHeaders.TRIGGER_HANDLER_KEYS), Charsets.UTF_8));
if (context.getHeaders().containsKey(EventHeaders.RETRY_COUNT)) {
return Integer.parseInt(new String(context.getHeaders().get(EventHeaders.RETRY_COUNT), Charsets.UTF_8));
}
//默认从0开始
return 0;
@ -137,7 +139,7 @@ public class RetryableEventConsumer implements EventConsumer {
this.eventCode = eventCode;
this.eventHandler = eventHandler;
this.excludeExceptions = Optional.ofNullable(excludeExceptions).orElse(DEFAULT_EXCLUDE_EXCEPTIONS);
this.backoffPolicy = Optional.ofNullable(backoffPolicy).orElse(DEFAULT_BACKOFFPOLICY);
this.backoffPolicy = backoffPolicy;
}
}

View File

@ -5,47 +5,20 @@ import cn.axzo.foundation.event.support.producer.EventProducer;
import cn.axzo.foundation.event.support.producer.RocketMQEventProducer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableRangeMap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import lombok.Builder;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
public class RocketRetryableEventConsumer extends RetryableEventConsumer {
/**
* rocketMq延迟级别
* key = 延迟级别,
* https://rocketmq.apache.org/zh/docs/4.x/producer/04message3
*/
private static final RangeMap<Long, Integer> LEVEL_MAP = ImmutableRangeMap.<Long, Integer>builder()
.put(Range.openClosed(0L, TimeUnit.SECONDS.toMillis(1)), 1)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toMillis(5)), 2)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(5), TimeUnit.SECONDS.toMillis(10)), 3)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(10), TimeUnit.SECONDS.toMillis(30)), 4)
.put(Range.openClosed(TimeUnit.SECONDS.toMillis(30), TimeUnit.MINUTES.toMillis(1)), 5)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(1), TimeUnit.MINUTES.toMillis(2)), 6)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(2), TimeUnit.MINUTES.toMillis(3)), 7)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(3), TimeUnit.MINUTES.toMillis(4)), 8)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(4), TimeUnit.MINUTES.toMillis(5)), 9)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(5), TimeUnit.MINUTES.toMillis(6)), 10)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(6), TimeUnit.MINUTES.toMillis(7)), 11)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(7), TimeUnit.MINUTES.toMillis(8)), 12)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(8), TimeUnit.MINUTES.toMillis(9)), 13)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(9), TimeUnit.MINUTES.toMillis(10)), 14)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(10), TimeUnit.MINUTES.toMillis(20)), 15)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(20), TimeUnit.MINUTES.toMillis(30)), 16)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(30), TimeUnit.MINUTES.toMillis(60)), 17)
.put(Range.openClosed(TimeUnit.MINUTES.toMillis(60), Long.MAX_VALUE), 18)
.build();
private static final BackoffPolicy DEFAULT_ROCKET_BACKOFF_POLICY = RocketRetryLevelBackofPolicy.builder().build();
@Builder
public RocketRetryableEventConsumer(EventProducer eventProducer, EventConsumer eventConsumer, String retryTopic) {
super(eventConsumer, retryContext -> {
/** 将延迟毫秒转换为rocketMq的延迟级别 */
Integer delayLevel = LEVEL_MAP.get(retryContext.getNextDelayMillis());
/** 从1分钟开始起跳 https://rocketmq.apache.org/zh/docs/4.x/producer/04message3 */
int delayLevel = retryContext.getCurrentRetryCount() + 5;
eventProducer.send(retryContext.getEvent(), EventProducer.Context.builder()
.headers(ImmutableMap.of(EventHeaders.TRIGGER_HANDLER_KEYS, retryContext.getRetryKey(),
@ -59,4 +32,29 @@ public class RocketRetryableEventConsumer extends RetryableEventConsumer {
Preconditions.checkNotNull(retryTopic);
Preconditions.checkNotNull(eventProducer);
}
@Override
public Boolean registerHandler(RetryEvent retryEvent) {
retryEvent.setBackoffPolicy(Optional.ofNullable(retryEvent.getBackoffPolicy()).orElse(DEFAULT_ROCKET_BACKOFF_POLICY));
return super.registerHandler(retryEvent);
}
public static class RocketRetryLevelBackofPolicy implements BackoffPolicy {
Integer maxRetryCount;
@Builder
public RocketRetryLevelBackofPolicy(Integer maxRetryCount) {
this.maxRetryCount = Optional.ofNullable(maxRetryCount).orElse(20);
}
@Override
public Optional<Long> getNextRetryMillis(int retryCount) {
if (retryCount >= maxRetryCount) {
return Optional.empty();
}
//这了随便返回一个, 只要没超过最大次数都执行下一次
return Optional.of(Long.MAX_VALUE);
}
}
}