update(REQ-2516) - 完善 MQ 死信队列的监控

This commit is contained in:
wangli 2024-06-14 16:32:42 +08:00
parent 2659a95d53
commit 476ea55322
2 changed files with 51 additions and 44 deletions

View File

@ -1,6 +1,8 @@
package cn.axzo.workflow.starter.mq.monitor;
import lombok.SneakyThrows;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -8,6 +10,7 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.Map;
@ -15,6 +18,8 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT;
import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.BROADCAST_CONSUMER_GROUP;
import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.RPC_RETRY_CONSUMER_GROUP;
/**
* 用于监控 Starter 关注的 Topic 相关的 Producer Consumer 信息
@ -25,6 +30,7 @@ import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_N
public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterDefaultMQMonitor.class);
public static final String DLQ_PREFIX = "%DLQ%";
/**
* 可能为 null
*/
@ -32,9 +38,6 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
private final Environment environment;
private final ThreadPoolTaskScheduler taskScheduler;
private final AtomicBoolean running = new AtomicBoolean(false);
private ConsumeStats broadcastConsumeStats;
private ConsumeStats rpcConsumeStats;
private final Map<String, Long> messageQueue = new HashMap<>();
public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider, Environment environment) {
this.defaultMQAdminExt = mqAdminExtObjectProvider.getIfAvailable();
@ -49,51 +52,49 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
return taskScheduler;
}
@SneakyThrows
public void mqWatch() {
log.debug("time is :{}", System.currentTimeMillis());
String segment = environment.getProperty(MQ_GID_NAME_SEGMENT);
String applicationName = environment.getProperty("spring.application.name");
// 周期性比对
// compareBroadcast(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment));
// compareRPC(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment));
compareBroadcastDLQ(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment));
compareRPCDLQ(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment));
}
private void compareRPC(String consumerGroupName) throws Exception {
private void compareRPCDLQ(String consumerGroupName) throws Exception {
log.info("current rpc consumer group is :{}", consumerGroupName);
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroupName);
if (Objects.isNull(broadcastConsumeStats)) {
broadcastConsumeStats = consumeStats;
messageQueue.putAll(calcCurrentTimes(consumerGroupName, consumeStats));
return;
}
Map<String, Long> temp = calcCurrentTimes(consumerGroupName, consumeStats);
}
private static Map<String, Long> calcCurrentTimes(String consumerGroupName, ConsumeStats consumeStats) {
Map<String, Long> temp = new HashMap<>();
String rpcDeadLetterConsumerName = "%RETRY%" + consumerGroupName;
consumeStats.getOffsetTable().forEach((k, v) -> {
long l = v.getBrokerOffset() - v.getConsumerOffset();
if (Objects.equals(k.getTopic(), rpcDeadLetterConsumerName)) {
// 死信队列
temp.put(rpcDeadLetterConsumerName, l);
} else {
// 普通队列
temp.computeIfPresent(consumerGroupName, ((k1, v1) -> v1 + l));
try {
TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName);
if (Objects.isNull(table) || CollectionUtils.isEmpty(table.getOffsetTable())) {
return;
}
});
return temp;
table.getOffsetTable().forEach((k, v) -> {
if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) {
long dlqCount = v.getMaxOffset() - v.getMinOffset();
log.error("found rpc MQ DLQ, count: {}", dlqCount);
}
});
} catch (Exception e) {
log.warn("monitor RPC DLQ error: {}", e.getMessage(), e);
}
}
private void compareBroadcast(String consumerGroupName) throws Exception {
private void compareBroadcastDLQ(String consumerGroupName) throws Exception {
log.info("current broadcast consumer group is :{}", consumerGroupName);
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroupName);
if (Objects.isNull(rpcConsumeStats)) {
rpcConsumeStats = consumeStats;
messageQueue.putAll(calcCurrentTimes(consumerGroupName, consumeStats));
return;
try {
TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName);
if (Objects.isNull(table) || CollectionUtils.isEmpty(table.getOffsetTable())) {
return;
}
table.getOffsetTable().forEach((k, v) -> {
if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) {
long dlqCount = v.getMaxOffset() - v.getMinOffset();
log.error("found Broadcast MQ DLQ, count: {}", dlqCount);
}
});
} catch (Exception e) {
log.warn("monitor Broadcast DLQ error: {}", e.getMessage(), e);
}
Map<String, Long> temp = calcCurrentTimes(consumerGroupName, consumeStats);
}
@Override
@ -109,8 +110,6 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
if (running.compareAndSet(true, false)) {
log.info("destroying workflow engine mq monitor");
taskScheduler.shutdown();
broadcastConsumeStats = null;
rpcConsumeStats = null;
}
}

View File

@ -3,6 +3,8 @@ package cn.axzo.workflow.starter.mq.monitor.console;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.azxo.framework.common.model.CommonResponse;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
@ -16,6 +18,7 @@ import java.util.Map;
import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_EVENT;
import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT;
import static cn.axzo.workflow.starter.mq.monitor.WorkflowEngineStarterDefaultMQMonitor.DLQ_PREFIX;
/**
* Workflow Engine Starter MessageQueue Monitor Controller
@ -26,6 +29,7 @@ import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_N
@RestController
@RequestMapping("/web/we/s")
public class WorkflowEngineStarterMQMonitorController {
private static final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterMQMonitorController.class);
@Resource
private WorkflowEngineStarterProperties starterProperties;
@Resource
@ -46,15 +50,19 @@ public class WorkflowEngineStarterMQMonitorController {
mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> {
String segment = environment.getProperty(MQ_GID_NAME_SEGMENT);
try {
result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment)));
result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment)));
String broadcastConsumer = String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment);
String rpcConsumer = String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment);
result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(broadcastConsumer));
result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(rpcConsumer));
result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo());
result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic));
// result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo());
// result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic));
result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic));
result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic));
// result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic));
result.put("广播-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + broadcastConsumer));
result.put("RPC-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + rpcConsumer));
} catch (Exception e) {
e.printStackTrace();
log.warn("monitor controller error: {}", e.getMessage(), e);
}
});
return CommonResponse.success(result);