From 0aeafbc6c326ac58e53d590a150a026141b25c28 Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Wed, 19 Jun 2024 13:49:42 +0800 Subject: [PATCH] =?UTF-8?q?update(REQ-2516)=20-=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=86=85=E7=BD=AE=E9=92=89=E9=92=89=E5=91=8A=E8=AD=A6=E6=8E=A7?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../starter/WorkflowEngineStarterProperties.java | 2 ++ .../starter/handler/monitor/BroadcastDLQReporter.java | 2 +- .../starter/handler/monitor/RpcDLQReporter.java | 2 +- .../starter/mq/monitor/AlertBroadcastDLQReporter.java | 7 ++++--- .../starter/mq/monitor/AlertRcpDLQReporter.java | 7 ++++--- .../monitor/WorkflowEngineStarterDefaultMQMonitor.java | 10 ++++++++-- 6 files changed, 20 insertions(+), 10 deletions(-) diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java index 1b28ef3dc..584d75484 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java @@ -79,6 +79,8 @@ public class WorkflowEngineStarterProperties { /** * 开启后,会在主动给“工作流小分队”群中发送信息 + *
+ * 只针对容器环境中的应用生效 */ private Boolean alert = false; diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/BroadcastDLQReporter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/BroadcastDLQReporter.java index e5689a4db..9305f449c 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/BroadcastDLQReporter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/BroadcastDLQReporter.java @@ -17,7 +17,7 @@ import org.slf4j.LoggerFactory; public interface BroadcastDLQReporter { Logger log = LoggerFactory.getLogger(BroadcastDLQReporter.class); - default void process(TopicOffset v) { + default void process(TopicOffset v, String dlqName) { long dlqCount = v.getMaxOffset() - v.getMinOffset(); log.error("found Broadcast MQ DLQ, count: {}", dlqCount); } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/RpcDLQReporter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/RpcDLQReporter.java index 9ce1e7a01..ceeffc63e 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/RpcDLQReporter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/monitor/RpcDLQReporter.java @@ -18,7 +18,7 @@ import org.slf4j.LoggerFactory; public interface RpcDLQReporter { Logger log = LoggerFactory.getLogger(RpcDLQReporter.class); - default void process(TopicOffset v) { + default void process(TopicOffset v, String dlqName) { long dlqCount = v.getMaxOffset() - v.getMinOffset(); log.error("found rpc MQ DLQ, count: {}", dlqCount); } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertBroadcastDLQReporter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertBroadcastDLQReporter.java index ebd169758..97a7a24ae 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertBroadcastDLQReporter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertBroadcastDLQReporter.java @@ -30,16 +30,16 @@ public class AlertBroadcastDLQReporter implements BroadcastDLQReporter { } @Override - public void process(TopicOffset v) { + public void process(TopicOffset v, String dlqName) { log.warn("found DLQ"); long count = v.getMaxOffset() - v.getMinOffset(); if (count > 0) { - sendDingTalk(count); + sendDingTalk(count, dlqName); } } @SneakyThrows - public void sendDingTalk(Long count) { + public void sendDingTalk(Long count, String dlqName) { DingTalkClient client = new DefaultDingTalkClient(dingtalk_robot_webhook); OapiRobotSendRequest request = new OapiRobotSendRequest(); @@ -49,6 +49,7 @@ public class AlertBroadcastDLQReporter implements BroadcastDLQReporter { markdown.setText("#### [" + profile + "]环境[" + applicationName + "]发现“DLQ”数据(Rpc)\n" + "> ###### 1. 如需查看详情,[请点击这里](" + findDomain() + "m) \n" + "> ###### 2. 如需调整 MQ DLQ 的监控状态: \n" + + "> ###### 3. DLQ名称:" + dlqName + " \n" + "[关闭监控](" + findDomain() + "m/set?status=false) \n | " + "[开启监控](" + findDomain() + "m/set?status=true) \n" + "> 积累条数:" + count + " \n"); diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertRcpDLQReporter.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertRcpDLQReporter.java index 3ddf33306..3619f7f83 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertRcpDLQReporter.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/AlertRcpDLQReporter.java @@ -30,16 +30,16 @@ public class AlertRcpDLQReporter implements RpcDLQReporter { } @Override - public void process(TopicOffset v) { + public void process(TopicOffset v, String dlqName) { log.warn("found DLQ"); long count = v.getMaxOffset() - v.getMinOffset(); if (count > 0) { - sendDingTalk(count); + sendDingTalk(count, dlqName); } } @SneakyThrows - public void sendDingTalk(Long count) { + public void sendDingTalk(Long count, String dlqName) { DingTalkClient client = new DefaultDingTalkClient(dingtalk_robot_webhook); OapiRobotSendRequest request = new OapiRobotSendRequest(); @@ -49,6 +49,7 @@ public class AlertRcpDLQReporter implements RpcDLQReporter { markdown.setText("#### [" + profile + "]环境[" + applicationName + "]发现“DLQ”数据(Rpc)\n" + "> ###### 1. 如需查看详情,[请点击这里](" + findDomain() + "m) \n" + "> ###### 2. 如需调整 MQ DLQ 的监控状态: \n" + + "> ###### 3. DLQ名称:" + dlqName + " \n" + "[关闭监控](" + findDomain() + "m/set?status=false) \n | " + "[开启监控](" + findDomain() + "m/set?status=true) \n" + "> 积累条数:" + count + " \n"); diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java index 813a1c9d3..578f05d3a 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java @@ -13,10 +13,12 @@ import org.springframework.context.SmartLifecycle; import org.springframework.core.env.Environment; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import static cn.axzo.workflow.common.constant.StarterConstants.K8S_POD_NAME_SPACE; import static cn.axzo.workflow.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT; import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.BROADCAST_CONSUMER_GROUP; import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.RPC_RETRY_CONSUMER_GROUP; @@ -91,6 +93,10 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { private void compareDLQ(String consumerGroupName, String dqlType) { log.info("current broadcast consumer group is :{}", consumerGroupName); + String myPodNamespace = environment.getProperty(K8S_POD_NAME_SPACE); + if (!StringUtils.hasText(myPodNamespace)) { + return; + } try { TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName); if (Objects.isNull(table) || CollectionUtils.isEmpty(table.getOffsetTable())) { @@ -99,9 +105,9 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { table.getOffsetTable().forEach((k, v) -> { if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) { if (BROADCAST.equals(dqlType)) { - broadcastDLQProcessor.process(v); + broadcastDLQProcessor.process(v, DLQ_PREFIX + consumerGroupName); } else if (RPC.equals(dqlType)) { - rpcDLQProcessor.process(v); + rpcDLQProcessor.process(v, DLQ_PREFIX + consumerGroupName); } } });