diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java index 8982a7c47..26293118c 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java @@ -1,6 +1,8 @@ package cn.axzo.workflow.starter.mq.monitor; +import lombok.SneakyThrows; import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -8,6 +10,7 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.SmartLifecycle; import org.springframework.core.env.Environment; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.util.CollectionUtils; import java.util.HashMap; import java.util.Map; @@ -15,6 +18,8 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT; +import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.BROADCAST_CONSUMER_GROUP; +import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.RPC_RETRY_CONSUMER_GROUP; /** * 用于监控 Starter 关注的 Topic 相关的 Producer 与 Consumer 信息 @@ -25,6 +30,7 @@ import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_N public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { private static final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterDefaultMQMonitor.class); + public static final String DLQ_PREFIX = "%DLQ%"; /** * 可能为 null */ @@ -32,9 +38,6 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { private final Environment environment; private final ThreadPoolTaskScheduler taskScheduler; private final AtomicBoolean running = new AtomicBoolean(false); - private ConsumeStats broadcastConsumeStats; - private ConsumeStats rpcConsumeStats; - private final Map messageQueue = new HashMap<>(); public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider mqAdminExtObjectProvider, Environment environment) { this.defaultMQAdminExt = mqAdminExtObjectProvider.getIfAvailable(); @@ -49,51 +52,49 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { return taskScheduler; } + @SneakyThrows public void mqWatch() { - log.debug("time is :{}", System.currentTimeMillis()); String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); String applicationName = environment.getProperty("spring.application.name"); // 周期性比对 -// compareBroadcast(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment)); -// compareRPC(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment)); + compareBroadcastDLQ(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment)); + compareRPCDLQ(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment)); } - private void compareRPC(String consumerGroupName) throws Exception { + private void compareRPCDLQ(String consumerGroupName) throws Exception { log.info("current rpc consumer group is :{}", consumerGroupName); - ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroupName); - if (Objects.isNull(broadcastConsumeStats)) { - broadcastConsumeStats = consumeStats; - messageQueue.putAll(calcCurrentTimes(consumerGroupName, consumeStats)); - return; - } - Map temp = calcCurrentTimes(consumerGroupName, consumeStats); - } - - private static Map calcCurrentTimes(String consumerGroupName, ConsumeStats consumeStats) { - Map temp = new HashMap<>(); - String rpcDeadLetterConsumerName = "%RETRY%" + consumerGroupName; - consumeStats.getOffsetTable().forEach((k, v) -> { - long l = v.getBrokerOffset() - v.getConsumerOffset(); - if (Objects.equals(k.getTopic(), rpcDeadLetterConsumerName)) { - // 死信队列 - temp.put(rpcDeadLetterConsumerName, l); - } else { - // 普通队列 - temp.computeIfPresent(consumerGroupName, ((k1, v1) -> v1 + l)); + try { + TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName); + if (Objects.isNull(table) || CollectionUtils.isEmpty(table.getOffsetTable())) { + return; } - }); - return temp; + table.getOffsetTable().forEach((k, v) -> { + if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) { + long dlqCount = v.getMaxOffset() - v.getMinOffset(); + log.error("found rpc MQ DLQ, count: {}", dlqCount); + } + }); + } catch (Exception e) { + log.warn("monitor RPC DLQ error: {}", e.getMessage(), e); + } } - private void compareBroadcast(String consumerGroupName) throws Exception { + private void compareBroadcastDLQ(String consumerGroupName) throws Exception { log.info("current broadcast consumer group is :{}", consumerGroupName); - ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroupName); - if (Objects.isNull(rpcConsumeStats)) { - rpcConsumeStats = consumeStats; - messageQueue.putAll(calcCurrentTimes(consumerGroupName, consumeStats)); - return; + try { + TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName); + if (Objects.isNull(table) || CollectionUtils.isEmpty(table.getOffsetTable())) { + return; + } + table.getOffsetTable().forEach((k, v) -> { + if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) { + long dlqCount = v.getMaxOffset() - v.getMinOffset(); + log.error("found Broadcast MQ DLQ, count: {}", dlqCount); + } + }); + } catch (Exception e) { + log.warn("monitor Broadcast DLQ error: {}", e.getMessage(), e); } - Map temp = calcCurrentTimes(consumerGroupName, consumeStats); } @Override @@ -109,8 +110,6 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { if (running.compareAndSet(true, false)) { log.info("destroying workflow engine mq monitor"); taskScheduler.shutdown(); - broadcastConsumeStats = null; - rpcConsumeStats = null; } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java index be3ac08e6..2f1ce8c71 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java @@ -3,6 +3,8 @@ package cn.axzo.workflow.starter.mq.monitor.console; import cn.axzo.workflow.starter.WorkflowEngineStarterProperties; import cn.azxo.framework.common.model.CommonResponse; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; @@ -16,6 +18,7 @@ import java.util.Map; import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_EVENT; import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT; +import static cn.axzo.workflow.starter.mq.monitor.WorkflowEngineStarterDefaultMQMonitor.DLQ_PREFIX; /** * Workflow Engine Starter MessageQueue Monitor Controller @@ -26,6 +29,7 @@ import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_N @RestController @RequestMapping("/web/we/s") public class WorkflowEngineStarterMQMonitorController { + private static final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterMQMonitorController.class); @Resource private WorkflowEngineStarterProperties starterProperties; @Resource @@ -46,15 +50,19 @@ public class WorkflowEngineStarterMQMonitorController { mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> { String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); try { - result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment))); - result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment))); + String broadcastConsumer = String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment); + String rpcConsumer = String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment); + result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(broadcastConsumer)); + result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(rpcConsumer)); - result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo()); - result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic)); +// result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo()); +// result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic)); result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic)); - result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic)); +// result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic)); + result.put("广播-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + broadcastConsumer)); + result.put("RPC-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + rpcConsumer)); } catch (Exception e) { - e.printStackTrace(); + log.warn("monitor controller error: {}", e.getMessage(), e); } }); return CommonResponse.success(result);