From 2659a95d53eb15627e9ae051c2813a411771c63b Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Fri, 14 Jun 2024 15:33:20 +0800 Subject: [PATCH] =?UTF-8?q?update(REQ-2516)=20-=20=E4=BF=AE=E5=A4=8Dmq=20?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=E5=8A=9F=E8=83=BD=E5=90=AF=E5=8A=A8=E8=BF=87?= =?UTF-8?q?=E7=A8=8B=E6=9C=AA=E6=AD=A3=E7=A1=AE=E8=AF=BB=E5=8F=96=E5=88=B0?= =?UTF-8?q?nameServer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...orkflowEngineStarterAutoConfiguration.java | 11 ++++--- ...WorkflowEngineStarterDefaultMQMonitor.java | 9 ++++-- ...kflowEngineStarterMQMonitorController.java | 30 ++++++++++--------- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java index 4b6982e5c..feecbf47a 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -39,6 +40,7 @@ import org.springframework.core.env.Environment; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * Workflow Engine Auto Configuration @@ -112,8 +114,9 @@ public class WorkflowEngineStarterAutoConfiguration { } @Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt") - public DefaultMQAdminExt defaultMQAdminExt() { - String namesrvAddress = System.getProperty("rocketmq.name-server"); + @ConditionalOnProperty(prefix = "rocketmq", value = "name-server") + public DefaultMQAdminExt defaultMQAdminExt(Environment environment) { + String namesrvAddress = environment.getProperty("rocketmq.name-server"); if (StringUtils.isBlank(namesrvAddress)) { log.error("Build DefaultMQAdminExt error, namesrv is null"); throw new RuntimeException("Build DefaultMQAdminExt error, namesrv is null", null); @@ -135,8 +138,8 @@ public class WorkflowEngineStarterAutoConfiguration { } @Bean - public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(DefaultMQAdminExt defaultMQAdminExt, + public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(ObjectProvider mqAdminExtObjectProvider, Environment environment) { - return new WorkflowEngineStarterDefaultMQMonitor(defaultMQAdminExt, environment); + return new WorkflowEngineStarterDefaultMQMonitor(mqAdminExtObjectProvider, environment); } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java index d45561213..8982a7c47 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java @@ -1,10 +1,10 @@ package cn.axzo.workflow.starter.mq.monitor; -import lombok.SneakyThrows; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.SmartLifecycle; import org.springframework.core.env.Environment; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -25,6 +25,9 @@ import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_N public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { private static final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterDefaultMQMonitor.class); + /** + * 可能为 null + */ private final DefaultMQAdminExt defaultMQAdminExt; private final Environment environment; private final ThreadPoolTaskScheduler taskScheduler; @@ -33,8 +36,8 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { private ConsumeStats rpcConsumeStats; private final Map messageQueue = new HashMap<>(); - public WorkflowEngineStarterDefaultMQMonitor(DefaultMQAdminExt defaultMQAdminExt, Environment environment) { - this.defaultMQAdminExt = defaultMQAdminExt; + public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider mqAdminExtObjectProvider, Environment environment) { + this.defaultMQAdminExt = mqAdminExtObjectProvider.getIfAvailable(); this.environment = environment; this.taskScheduler = init(); } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java index b1367687b..be3ac08e6 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java @@ -2,8 +2,8 @@ package cn.axzo.workflow.starter.mq.monitor.console; import cn.axzo.workflow.starter.WorkflowEngineStarterProperties; import cn.azxo.framework.common.model.CommonResponse; -import lombok.SneakyThrows; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.web.bind.annotation.GetMapping; @@ -29,7 +29,7 @@ public class WorkflowEngineStarterMQMonitorController { @Resource private WorkflowEngineStarterProperties starterProperties; @Resource - private DefaultMQAdminExt defaultMQAdminExt; + private ObjectProvider mqAdminExtObjectProvider; @Resource private Environment environment; @Value("${spring.application.name}") @@ -40,21 +40,23 @@ public class WorkflowEngineStarterMQMonitorController { public static String RPC_RETRY_CONSUMER_GROUP = "GID_%s_workflow_engine_starter_%s_consumer"; @GetMapping("/m") - public CommonResponse> monitor() throws Exception { + public CommonResponse> monitor() { String topic = DEFAULT_EVENT + activeProfile; Map result = new HashMap<>(); + mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> { + String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); + try { + result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment))); + result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment))); - String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); - result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment))); - result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment))); - - result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo()); - result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic)); - result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic)); - result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic)); - - - + result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo()); + result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic)); + result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic)); + result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic)); + } catch (Exception e) { + e.printStackTrace(); + } + }); return CommonResponse.success(result); } }