rocketMQ新增 BaseListener

This commit is contained in:
tianliyong 2023-08-02 11:43:34 +08:00
parent 1c9d371c00
commit 5ba59bfb14

View File

@ -0,0 +1,39 @@
package cn.axzo.framework.rocketmq;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Map;
import java.util.Optional;
/**
* @Author: liyong.tian
* @Date: 2023/8/2 11:31
* @Description:
*/
@Slf4j
public class BaseListener {
public void onEvent(MessageExt message, EventConsumer eventConsumer) {
String topic = message.getTopic();
String value = new String(message.getBody());
Map<String, String> headers = message.getProperties();
if (log.isDebugEnabled()) {
log.debug("received message, topic={}, headers={}, value={}", topic, headers, value);
}
//当前消息所在分区的lag. 而不是整个topic
Long partitionLag = Optional.ofNullable(message.getProperties().get(MessageConst.PROPERTY_MAX_OFFSET))
.map(e -> Long.parseLong(e) - message.getQueueOffset()).orElse(0L);
eventConsumer.onEvent(value, EventConsumer.Context.builder()
.msgId(message.getMsgId())
.ext(ImmutableMap.of("topic", topic))
.headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[] {})))
.lagSupplier(() -> partitionLag)
.build());
}
}