update(REQ-2516) - 修复mq 监控功能启动过程未正确读取到nameServer

This commit is contained in:
wangli 2024-06-14 15:33:20 +08:00
parent c9542394c1
commit 2659a95d53
3 changed files with 29 additions and 21 deletions

View File

@ -31,6 +31,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -39,6 +40,7 @@ import org.springframework.core.env.Environment;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
/** /**
* Workflow Engine Auto Configuration * Workflow Engine Auto Configuration
@ -112,8 +114,9 @@ public class WorkflowEngineStarterAutoConfiguration {
} }
@Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt") @Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt")
public DefaultMQAdminExt defaultMQAdminExt() { @ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
String namesrvAddress = System.getProperty("rocketmq.name-server"); public DefaultMQAdminExt defaultMQAdminExt(Environment environment) {
String namesrvAddress = environment.getProperty("rocketmq.name-server");
if (StringUtils.isBlank(namesrvAddress)) { if (StringUtils.isBlank(namesrvAddress)) {
log.error("Build DefaultMQAdminExt error, namesrv is null"); log.error("Build DefaultMQAdminExt error, namesrv is null");
throw new RuntimeException("Build DefaultMQAdminExt error, namesrv is null", null); throw new RuntimeException("Build DefaultMQAdminExt error, namesrv is null", null);
@ -135,8 +138,8 @@ public class WorkflowEngineStarterAutoConfiguration {
} }
@Bean @Bean
public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(DefaultMQAdminExt defaultMQAdminExt, public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider,
Environment environment) { Environment environment) {
return new WorkflowEngineStarterDefaultMQMonitor(defaultMQAdminExt, environment); return new WorkflowEngineStarterDefaultMQMonitor(mqAdminExtObjectProvider, environment);
} }
} }

View File

@ -1,10 +1,10 @@
package cn.axzo.workflow.starter.mq.monitor; package cn.axzo.workflow.starter.mq.monitor;
import lombok.SneakyThrows;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@ -25,6 +25,9 @@ import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_N
public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterDefaultMQMonitor.class); private static final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterDefaultMQMonitor.class);
/**
* 可能为 null
*/
private final DefaultMQAdminExt defaultMQAdminExt; private final DefaultMQAdminExt defaultMQAdminExt;
private final Environment environment; private final Environment environment;
private final ThreadPoolTaskScheduler taskScheduler; private final ThreadPoolTaskScheduler taskScheduler;
@ -33,8 +36,8 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
private ConsumeStats rpcConsumeStats; private ConsumeStats rpcConsumeStats;
private final Map<String, Long> messageQueue = new HashMap<>(); private final Map<String, Long> messageQueue = new HashMap<>();
public WorkflowEngineStarterDefaultMQMonitor(DefaultMQAdminExt defaultMQAdminExt, Environment environment) { public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider, Environment environment) {
this.defaultMQAdminExt = defaultMQAdminExt; this.defaultMQAdminExt = mqAdminExtObjectProvider.getIfAvailable();
this.environment = environment; this.environment = environment;
this.taskScheduler = init(); this.taskScheduler = init();
} }

View File

@ -2,8 +2,8 @@ package cn.axzo.workflow.starter.mq.monitor.console;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties; import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.azxo.framework.common.model.CommonResponse; import cn.azxo.framework.common.model.CommonResponse;
import lombok.SneakyThrows;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
@ -29,7 +29,7 @@ public class WorkflowEngineStarterMQMonitorController {
@Resource @Resource
private WorkflowEngineStarterProperties starterProperties; private WorkflowEngineStarterProperties starterProperties;
@Resource @Resource
private DefaultMQAdminExt defaultMQAdminExt; private ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider;
@Resource @Resource
private Environment environment; private Environment environment;
@Value("${spring.application.name}") @Value("${spring.application.name}")
@ -40,11 +40,12 @@ public class WorkflowEngineStarterMQMonitorController {
public static String RPC_RETRY_CONSUMER_GROUP = "GID_%s_workflow_engine_starter_%s_consumer"; public static String RPC_RETRY_CONSUMER_GROUP = "GID_%s_workflow_engine_starter_%s_consumer";
@GetMapping("/m") @GetMapping("/m")
public CommonResponse<Map<String, Object>> monitor() throws Exception { public CommonResponse<Map<String, Object>> monitor() {
String topic = DEFAULT_EVENT + activeProfile; String topic = DEFAULT_EVENT + activeProfile;
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> {
String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); String segment = environment.getProperty(MQ_GID_NAME_SEGMENT);
try {
result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment))); result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment)));
result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment))); result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment)));
@ -52,9 +53,10 @@ public class WorkflowEngineStarterMQMonitorController {
result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic)); result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic));
result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic)); result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic));
result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic)); result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic));
} catch (Exception e) {
e.printStackTrace();
}
});
return CommonResponse.success(result); return CommonResponse.success(result);
} }
} }