diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java index feecbf47a..de11113cc 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java @@ -16,6 +16,8 @@ import cn.axzo.workflow.starter.handler.filter.MessageNotificationEventFilter; import cn.axzo.workflow.starter.handler.filter.ProcessActivityEventFilter; import cn.axzo.workflow.starter.handler.filter.ProcessInstanceEventFilter; import cn.axzo.workflow.starter.handler.filter.ProcessTaskEventFilter; +import cn.axzo.workflow.starter.handler.monitor.BroadcastDLQProcessor; +import cn.axzo.workflow.starter.handler.monitor.RpcDLQProcessor; import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerActivityEventListener; import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerInstanceEventListener; import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerNotificationEventListener; @@ -114,7 +116,7 @@ public class WorkflowEngineStarterAutoConfiguration { } @Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt") - @ConditionalOnProperty(prefix = "rocketmq", value = "name-server") + @ConditionalOnProperty(prefix = "workflow.engine.starter", value = "enable-dlq-monitor", havingValue = "true") public DefaultMQAdminExt defaultMQAdminExt(Environment environment) { String namesrvAddress = environment.getProperty("rocketmq.name-server"); if (StringUtils.isBlank(namesrvAddress)) { @@ -138,8 +140,11 @@ public class WorkflowEngineStarterAutoConfiguration { } @Bean + @ConditionalOnProperty(prefix = "workflow.engine.starter", value = "enable-dlq-monitor", havingValue = "true") public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(ObjectProvider mqAdminExtObjectProvider, + ObjectProvider broadcastDLQProcessorObjectProvider, + ObjectProvider rpcDLQProcessorObjectProvider, Environment environment) { - return new WorkflowEngineStarterDefaultMQMonitor(mqAdminExtObjectProvider, environment); + return new WorkflowEngineStarterDefaultMQMonitor(mqAdminExtObjectProvider, broadcastDLQProcessorObjectProvider, rpcDLQProcessorObjectProvider, environment); } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java index 9c9b0ab0f..86da8f41d 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java @@ -62,6 +62,15 @@ public class WorkflowEngineStarterProperties { @NestedConfigurationProperty private BroadcastListenerProperties broadcast = new BroadcastListenerProperties(); + /** + * 是否开始死信队列的监控 + */ + private Boolean enableDlqMonitor = false; + /** + * 监控死信队列,周期间隔,单位:毫秒 + */ + private Long DlqMonitorIntervalInMs = 4 * 60 * 60 * 1000L; + public Boolean getManageable() { return manageable; } @@ -101,4 +110,20 @@ public class WorkflowEngineStarterProperties { public void setBroadcast(BroadcastListenerProperties broadcast) { this.broadcast = broadcast; } + + public Boolean getEnableDlqMonitor() { + return enableDlqMonitor; + } + + public void setEnableDlqMonitor(Boolean enableDlqMonitor) { + this.enableDlqMonitor = enableDlqMonitor; + } + + public Long getDlqMonitorIntervalInMs() { + return DlqMonitorIntervalInMs; + } + + public void setDlqMonitorIntervalInMs(Long dlqMonitorIntervalInMs) { + DlqMonitorIntervalInMs = dlqMonitorIntervalInMs; + } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/BroadcastDLQProcessor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/BroadcastDLQProcessor.java new file mode 100644 index 000000000..3edc50ea1 --- /dev/null +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/BroadcastDLQProcessor.java @@ -0,0 +1,24 @@ +package cn.axzo.workflow.starter.handler.monitor; + +import org.apache.rocketmq.common.admin.TopicOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 广播死信队列监控的处理器, 默认仅仅是打印异常。 + *

+ * 可以用它来发生钉钉之类的。 + *

+ * 注意:该接口目前不是为了消费死信队列中的消息的。后续必要时,再开放死信队列的消费扩展点 + * + * @author wangli + * @since 2024/6/14 17:04 + */ +public interface BroadcastDLQProcessor { + Logger log = LoggerFactory.getLogger(BroadcastDLQProcessor.class); + + default void process(TopicOffset v) { + long dlqCount = v.getMaxOffset() - v.getMinOffset(); + log.error("found Broadcast MQ DLQ, count: {}", dlqCount); + } +} diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/RpcDLQProcessor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/RpcDLQProcessor.java new file mode 100644 index 000000000..6aeaa49f7 --- /dev/null +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/RpcDLQProcessor.java @@ -0,0 +1,25 @@ +package cn.axzo.workflow.starter.handler.monitor; + +import org.apache.rocketmq.common.admin.TopicOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RPC 重试的死信队列监控处理器, 默认仅仅是打印异常 + *

+ * 可以用它来发生钉钉之类的. + *

+ * 注意:该接口目前不是为了消费死信队列中的消息的。后续必要时,再开放死信队列的消费扩展点 + * + * @author wangli + * @since 2024/6/14 17:01 + */ + +public interface RpcDLQProcessor { + Logger log = LoggerFactory.getLogger(RpcDLQProcessor.class); + + default void process(TopicOffset v) { + long dlqCount = v.getMaxOffset() - v.getMinOffset(); + log.error("found rpc MQ DLQ, count: {}", dlqCount); + } +} diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java index 26293118c..bda4ea56c 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java @@ -1,7 +1,8 @@ package cn.axzo.workflow.starter.mq.monitor; +import cn.axzo.workflow.starter.handler.monitor.BroadcastDLQProcessor; +import cn.axzo.workflow.starter.handler.monitor.RpcDLQProcessor; import lombok.SneakyThrows; -import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; @@ -16,6 +17,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT; import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.BROADCAST_CONSUMER_GROUP; @@ -37,11 +39,20 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { private final DefaultMQAdminExt defaultMQAdminExt; private final Environment environment; private final ThreadPoolTaskScheduler taskScheduler; + private final BroadcastDLQProcessor broadcastDLQProcessor; + private final RpcDLQProcessor rpcDLQProcessor; private final AtomicBoolean running = new AtomicBoolean(false); + private final Map> dlqProcessCache = new HashMap<>(); - public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider mqAdminExtObjectProvider, Environment environment) { + public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider mqAdminExtObjectProvider, + ObjectProvider broadcastDLQProcessorObjectProvider, + ObjectProvider rpcDLQProcessorObjectProvider, Environment environment) { this.defaultMQAdminExt = mqAdminExtObjectProvider.getIfAvailable(); this.environment = environment; + this.broadcastDLQProcessor = broadcastDLQProcessorObjectProvider.getIfAvailable(() -> new BroadcastDLQProcessor() { + }); + this.rpcDLQProcessor = rpcDLQProcessorObjectProvider.getIfAvailable(() -> new RpcDLQProcessor() { + }); this.taskScheduler = init(); } @@ -57,29 +68,11 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); String applicationName = environment.getProperty("spring.application.name"); // 周期性比对 - compareBroadcastDLQ(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment)); - compareRPCDLQ(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment)); + compareDLQ(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment), "broadcast"); + compareDLQ(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment), "rpc"); } - private void compareRPCDLQ(String consumerGroupName) throws Exception { - log.info("current rpc consumer group is :{}", consumerGroupName); - try { - TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName); - if (Objects.isNull(table) || CollectionUtils.isEmpty(table.getOffsetTable())) { - return; - } - table.getOffsetTable().forEach((k, v) -> { - if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) { - long dlqCount = v.getMaxOffset() - v.getMinOffset(); - log.error("found rpc MQ DLQ, count: {}", dlqCount); - } - }); - } catch (Exception e) { - log.warn("monitor RPC DLQ error: {}", e.getMessage(), e); - } - } - - private void compareBroadcastDLQ(String consumerGroupName) throws Exception { + private void compareDLQ(String consumerGroupName, String dqlType) throws Exception { log.info("current broadcast consumer group is :{}", consumerGroupName); try { TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName); @@ -88,8 +81,11 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { } table.getOffsetTable().forEach((k, v) -> { if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) { - long dlqCount = v.getMaxOffset() - v.getMinOffset(); - log.error("found Broadcast MQ DLQ, count: {}", dlqCount); + if ("broadcast".equals(dqlType)) { + broadcastDLQProcessor.process(v); + } else if ("rpc".equals(dqlType)) { + rpcDLQProcessor.process(v); + } } }); } catch (Exception e) { diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java index 2f1ce8c71..d1689bd5d 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/console/WorkflowEngineStarterMQMonitorController.java @@ -1,6 +1,7 @@ package cn.axzo.workflow.starter.mq.monitor.console; import cn.axzo.workflow.starter.WorkflowEngineStarterProperties; +import cn.axzo.workflow.starter.mq.monitor.WorkflowEngineStarterDefaultMQMonitor; import cn.azxo.framework.common.model.CommonResponse; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; @@ -10,6 +11,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @@ -35,6 +37,8 @@ public class WorkflowEngineStarterMQMonitorController { @Resource private ObjectProvider mqAdminExtObjectProvider; @Resource + private WorkflowEngineStarterDefaultMQMonitor monitor; + @Resource private Environment environment; @Value("${spring.application.name}") private String applicationName; @@ -47,24 +51,49 @@ public class WorkflowEngineStarterMQMonitorController { public CommonResponse> monitor() { String topic = DEFAULT_EVENT + activeProfile; Map result = new HashMap<>(); - mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> { - String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); - try { - String broadcastConsumer = String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment); - String rpcConsumer = String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment); - result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(broadcastConsumer)); - result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(rpcConsumer)); + if (!starterProperties.getEnableDlqMonitor()) { + result.put("Info", "未开启·死信队列·的监控,如需,请设置 workflow.engine.starter.enableDlqMonitor = true 后再重试!"); + } else { + mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> { + String segment = environment.getProperty(MQ_GID_NAME_SEGMENT); + try { + String broadcastConsumer = String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment); + String rpcConsumer = String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment); + result.put("Engine Broadcast MQ", defaultMQAdminExt.examineConsumeStats(broadcastConsumer)); + result.put("Starter RPC MQ", defaultMQAdminExt.examineConsumeStats(rpcConsumer)); // result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo()); // result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic)); - result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic)); + result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic)); // result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic)); - result.put("广播-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + broadcastConsumer)); - result.put("RPC-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + rpcConsumer)); - } catch (Exception e) { - log.warn("monitor controller error: {}", e.getMessage(), e); - } - }); + result.put("Broadcast-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + broadcastConsumer)); + result.put("RPC-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + rpcConsumer)); + } catch (Exception e) { + log.warn("monitor controller error: {}", e.getMessage(), e); + } + }); + } return CommonResponse.success(result); } + + /** + * @param status + * @return + */ + @GetMapping("/m/set") + public CommonResponse changeMonitorState(@RequestParam("status") Boolean status) { + if (status) { + if (!monitor.isRunning()) { + monitor.start(); + } + return CommonResponse.success("已临时开始 MQ 死信队列监控!" + + "在应用重启前,将持续监控。重启后,将根据环境变量来确定是否监控。"); + } else { + if (monitor.isRunning()) { + monitor.stop(); + } + return CommonResponse.success("已临时关闭 MQ 死信队列监控!" + + "在应用重启前,将不再监控。重启后,将根据环境变量配置来确定是否监控。"); + } + } }