Merge branch 'feature/rocketMQ' into 'master'
rocketMQ新增 BaseListener See merge request universal/framework/backend/axzo-framework-commons!81
This commit is contained in:
commit
05ed46c3f2
@ -0,0 +1,39 @@
|
|||||||
|
package cn.axzo.framework.rocketmq;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.common.message.MessageConst;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: liyong.tian
|
||||||
|
* @Date: 2023/8/2 11:31
|
||||||
|
* @Description:
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class BaseListener {
|
||||||
|
|
||||||
|
public void onEvent(MessageExt message, EventConsumer eventConsumer) {
|
||||||
|
String topic = message.getTopic();
|
||||||
|
String value = new String(message.getBody());
|
||||||
|
Map<String, String> headers = message.getProperties();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("received message, topic={}, headers={}, value={}", topic, headers, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
//当前消息所在分区的lag. 而不是整个topic
|
||||||
|
Long partitionLag = Optional.ofNullable(message.getProperties().get(MessageConst.PROPERTY_MAX_OFFSET))
|
||||||
|
.map(e -> Long.parseLong(e) - message.getQueueOffset()).orElse(0L);
|
||||||
|
|
||||||
|
eventConsumer.onEvent(value, EventConsumer.Context.builder()
|
||||||
|
.msgId(message.getMsgId())
|
||||||
|
.ext(ImmutableMap.of("topic", topic))
|
||||||
|
.headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[] {})))
|
||||||
|
.lagSupplier(() -> partitionLag)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user