From 5ba59bfb1448fba05d5d4ffc46732b666b9a5d85 Mon Sep 17 00:00:00 2001 From: tianliyong Date: Wed, 2 Aug 2023 11:43:34 +0800 Subject: [PATCH] =?UTF-8?q?rocketMQ=E6=96=B0=E5=A2=9E=20BaseListener?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../axzo/framework/rocketmq/BaseListener.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/BaseListener.java diff --git a/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/BaseListener.java b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/BaseListener.java new file mode 100644 index 0000000..0182160 --- /dev/null +++ b/axzo-common-rocketmq/src/main/java/cn/axzo/framework/rocketmq/BaseListener.java @@ -0,0 +1,39 @@ +package cn.axzo.framework.rocketmq; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.Map; +import java.util.Optional; + +/** + * @Author: liyong.tian + * @Date: 2023/8/2 11:31 + * @Description: + */ +@Slf4j +public class BaseListener { + + public void onEvent(MessageExt message, EventConsumer eventConsumer) { + String topic = message.getTopic(); + String value = new String(message.getBody()); + Map headers = message.getProperties(); + if (log.isDebugEnabled()) { + log.debug("received message, topic={}, headers={}, value={}", topic, headers, value); + } + + //当前消息所在分区的lag. 而不是整个topic + Long partitionLag = Optional.ofNullable(message.getProperties().get(MessageConst.PROPERTY_MAX_OFFSET)) + .map(e -> Long.parseLong(e) - message.getQueueOffset()).orElse(0L); + + eventConsumer.onEvent(value, EventConsumer.Context.builder() + .msgId(message.getMsgId()) + .ext(ImmutableMap.of("topic", topic)) + .headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[] {}))) + .lagSupplier(() -> partitionLag) + .build()); + } +}