update(REQ-2516) - 增加 MQ 监控的,发现死信队列的钩子,同时,也新增可动态开关监控的管理功能
This commit is contained in:
parent
476ea55322
commit
ebef19a9d5
@ -16,6 +16,8 @@ import cn.axzo.workflow.starter.handler.filter.MessageNotificationEventFilter;
|
|||||||
import cn.axzo.workflow.starter.handler.filter.ProcessActivityEventFilter;
|
import cn.axzo.workflow.starter.handler.filter.ProcessActivityEventFilter;
|
||||||
import cn.axzo.workflow.starter.handler.filter.ProcessInstanceEventFilter;
|
import cn.axzo.workflow.starter.handler.filter.ProcessInstanceEventFilter;
|
||||||
import cn.axzo.workflow.starter.handler.filter.ProcessTaskEventFilter;
|
import cn.axzo.workflow.starter.handler.filter.ProcessTaskEventFilter;
|
||||||
|
import cn.axzo.workflow.starter.handler.monitor.BroadcastDLQProcessor;
|
||||||
|
import cn.axzo.workflow.starter.handler.monitor.RpcDLQProcessor;
|
||||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerActivityEventListener;
|
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerActivityEventListener;
|
||||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerInstanceEventListener;
|
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerInstanceEventListener;
|
||||||
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerNotificationEventListener;
|
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerNotificationEventListener;
|
||||||
@ -114,7 +116,7 @@ public class WorkflowEngineStarterAutoConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt")
|
@Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt")
|
||||||
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
|
@ConditionalOnProperty(prefix = "workflow.engine.starter", value = "enable-dlq-monitor", havingValue = "true")
|
||||||
public DefaultMQAdminExt defaultMQAdminExt(Environment environment) {
|
public DefaultMQAdminExt defaultMQAdminExt(Environment environment) {
|
||||||
String namesrvAddress = environment.getProperty("rocketmq.name-server");
|
String namesrvAddress = environment.getProperty("rocketmq.name-server");
|
||||||
if (StringUtils.isBlank(namesrvAddress)) {
|
if (StringUtils.isBlank(namesrvAddress)) {
|
||||||
@ -138,8 +140,11 @@ public class WorkflowEngineStarterAutoConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ConditionalOnProperty(prefix = "workflow.engine.starter", value = "enable-dlq-monitor", havingValue = "true")
|
||||||
public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider,
|
public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider,
|
||||||
|
ObjectProvider<BroadcastDLQProcessor> broadcastDLQProcessorObjectProvider,
|
||||||
|
ObjectProvider<RpcDLQProcessor> rpcDLQProcessorObjectProvider,
|
||||||
Environment environment) {
|
Environment environment) {
|
||||||
return new WorkflowEngineStarterDefaultMQMonitor(mqAdminExtObjectProvider, environment);
|
return new WorkflowEngineStarterDefaultMQMonitor(mqAdminExtObjectProvider, broadcastDLQProcessorObjectProvider, rpcDLQProcessorObjectProvider, environment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -62,6 +62,15 @@ public class WorkflowEngineStarterProperties {
|
|||||||
@NestedConfigurationProperty
|
@NestedConfigurationProperty
|
||||||
private BroadcastListenerProperties broadcast = new BroadcastListenerProperties();
|
private BroadcastListenerProperties broadcast = new BroadcastListenerProperties();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否开始死信队列的监控
|
||||||
|
*/
|
||||||
|
private Boolean enableDlqMonitor = false;
|
||||||
|
/**
|
||||||
|
* 监控死信队列,周期间隔,单位:毫秒
|
||||||
|
*/
|
||||||
|
private Long DlqMonitorIntervalInMs = 4 * 60 * 60 * 1000L;
|
||||||
|
|
||||||
public Boolean getManageable() {
|
public Boolean getManageable() {
|
||||||
return manageable;
|
return manageable;
|
||||||
}
|
}
|
||||||
@ -101,4 +110,20 @@ public class WorkflowEngineStarterProperties {
|
|||||||
public void setBroadcast(BroadcastListenerProperties broadcast) {
|
public void setBroadcast(BroadcastListenerProperties broadcast) {
|
||||||
this.broadcast = broadcast;
|
this.broadcast = broadcast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean getEnableDlqMonitor() {
|
||||||
|
return enableDlqMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableDlqMonitor(Boolean enableDlqMonitor) {
|
||||||
|
this.enableDlqMonitor = enableDlqMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getDlqMonitorIntervalInMs() {
|
||||||
|
return DlqMonitorIntervalInMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDlqMonitorIntervalInMs(Long dlqMonitorIntervalInMs) {
|
||||||
|
DlqMonitorIntervalInMs = dlqMonitorIntervalInMs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,24 @@
|
|||||||
|
package cn.axzo.workflow.starter.handler.monitor;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.common.admin.TopicOffset;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 广播死信队列监控的处理器, 默认仅仅是打印异常。
|
||||||
|
* <p>
|
||||||
|
* 可以用它来发生钉钉之类的。
|
||||||
|
* <p>
|
||||||
|
* <strong color=orange>注意:该接口目前不是为了消费死信队列中的消息的。</strong>后续必要时,再开放死信队列的消费扩展点
|
||||||
|
*
|
||||||
|
* @author wangli
|
||||||
|
* @since 2024/6/14 17:04
|
||||||
|
*/
|
||||||
|
public interface BroadcastDLQProcessor {
|
||||||
|
Logger log = LoggerFactory.getLogger(BroadcastDLQProcessor.class);
|
||||||
|
|
||||||
|
default void process(TopicOffset v) {
|
||||||
|
long dlqCount = v.getMaxOffset() - v.getMinOffset();
|
||||||
|
log.error("found Broadcast MQ DLQ, count: {}", dlqCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
package cn.axzo.workflow.starter.handler.monitor;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.common.admin.TopicOffset;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RPC 重试的死信队列监控处理器, 默认仅仅是打印异常
|
||||||
|
* <p>
|
||||||
|
* 可以用它来发生钉钉之类的.
|
||||||
|
* <p>
|
||||||
|
* <strong color=orange>注意:该接口目前不是为了消费死信队列中的消息的。</strong>后续必要时,再开放死信队列的消费扩展点
|
||||||
|
*
|
||||||
|
* @author wangli
|
||||||
|
* @since 2024/6/14 17:01
|
||||||
|
*/
|
||||||
|
|
||||||
|
public interface RpcDLQProcessor {
|
||||||
|
Logger log = LoggerFactory.getLogger(RpcDLQProcessor.class);
|
||||||
|
|
||||||
|
default void process(TopicOffset v) {
|
||||||
|
long dlqCount = v.getMaxOffset() - v.getMinOffset();
|
||||||
|
log.error("found rpc MQ DLQ, count: {}", dlqCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,7 +1,8 @@
|
|||||||
package cn.axzo.workflow.starter.mq.monitor;
|
package cn.axzo.workflow.starter.mq.monitor;
|
||||||
|
|
||||||
|
import cn.axzo.workflow.starter.handler.monitor.BroadcastDLQProcessor;
|
||||||
|
import cn.axzo.workflow.starter.handler.monitor.RpcDLQProcessor;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import org.apache.rocketmq.common.admin.ConsumeStats;
|
|
||||||
import org.apache.rocketmq.common.admin.TopicStatsTable;
|
import org.apache.rocketmq.common.admin.TopicStatsTable;
|
||||||
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
|
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -16,6 +17,7 @@ import java.util.HashMap;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT;
|
import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT;
|
||||||
import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.BROADCAST_CONSUMER_GROUP;
|
import static cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController.BROADCAST_CONSUMER_GROUP;
|
||||||
@ -37,11 +39,20 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
|
|||||||
private final DefaultMQAdminExt defaultMQAdminExt;
|
private final DefaultMQAdminExt defaultMQAdminExt;
|
||||||
private final Environment environment;
|
private final Environment environment;
|
||||||
private final ThreadPoolTaskScheduler taskScheduler;
|
private final ThreadPoolTaskScheduler taskScheduler;
|
||||||
|
private final BroadcastDLQProcessor broadcastDLQProcessor;
|
||||||
|
private final RpcDLQProcessor rpcDLQProcessor;
|
||||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
private final Map<String, Consumer<?>> dlqProcessCache = new HashMap<>();
|
||||||
|
|
||||||
public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider, Environment environment) {
|
public WorkflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider,
|
||||||
|
ObjectProvider<BroadcastDLQProcessor> broadcastDLQProcessorObjectProvider,
|
||||||
|
ObjectProvider<RpcDLQProcessor> rpcDLQProcessorObjectProvider, Environment environment) {
|
||||||
this.defaultMQAdminExt = mqAdminExtObjectProvider.getIfAvailable();
|
this.defaultMQAdminExt = mqAdminExtObjectProvider.getIfAvailable();
|
||||||
this.environment = environment;
|
this.environment = environment;
|
||||||
|
this.broadcastDLQProcessor = broadcastDLQProcessorObjectProvider.getIfAvailable(() -> new BroadcastDLQProcessor() {
|
||||||
|
});
|
||||||
|
this.rpcDLQProcessor = rpcDLQProcessorObjectProvider.getIfAvailable(() -> new RpcDLQProcessor() {
|
||||||
|
});
|
||||||
this.taskScheduler = init();
|
this.taskScheduler = init();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,29 +68,11 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
|
|||||||
String segment = environment.getProperty(MQ_GID_NAME_SEGMENT);
|
String segment = environment.getProperty(MQ_GID_NAME_SEGMENT);
|
||||||
String applicationName = environment.getProperty("spring.application.name");
|
String applicationName = environment.getProperty("spring.application.name");
|
||||||
// 周期性比对
|
// 周期性比对
|
||||||
compareBroadcastDLQ(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment));
|
compareDLQ(String.format(BROADCAST_CONSUMER_GROUP, applicationName, segment), "broadcast");
|
||||||
compareRPCDLQ(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment));
|
compareDLQ(String.format(RPC_RETRY_CONSUMER_GROUP, applicationName, segment), "rpc");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void compareRPCDLQ(String consumerGroupName) throws Exception {
|
private void compareDLQ(String consumerGroupName, String dqlType) throws Exception {
|
||||||
log.info("current rpc consumer group is :{}", consumerGroupName);
|
|
||||||
try {
|
|
||||||
TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName);
|
|
||||||
if (Objects.isNull(table) || CollectionUtils.isEmpty(table.getOffsetTable())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
table.getOffsetTable().forEach((k, v) -> {
|
|
||||||
if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) {
|
|
||||||
long dlqCount = v.getMaxOffset() - v.getMinOffset();
|
|
||||||
log.error("found rpc MQ DLQ, count: {}", dlqCount);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("monitor RPC DLQ error: {}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void compareBroadcastDLQ(String consumerGroupName) throws Exception {
|
|
||||||
log.info("current broadcast consumer group is :{}", consumerGroupName);
|
log.info("current broadcast consumer group is :{}", consumerGroupName);
|
||||||
try {
|
try {
|
||||||
TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName);
|
TopicStatsTable table = defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + consumerGroupName);
|
||||||
@ -88,8 +81,11 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
|
|||||||
}
|
}
|
||||||
table.getOffsetTable().forEach((k, v) -> {
|
table.getOffsetTable().forEach((k, v) -> {
|
||||||
if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) {
|
if (Objects.equals(DLQ_PREFIX + consumerGroupName, k.getTopic())) {
|
||||||
long dlqCount = v.getMaxOffset() - v.getMinOffset();
|
if ("broadcast".equals(dqlType)) {
|
||||||
log.error("found Broadcast MQ DLQ, count: {}", dlqCount);
|
broadcastDLQProcessor.process(v);
|
||||||
|
} else if ("rpc".equals(dqlType)) {
|
||||||
|
rpcDLQProcessor.process(v);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package cn.axzo.workflow.starter.mq.monitor.console;
|
package cn.axzo.workflow.starter.mq.monitor.console;
|
||||||
|
|
||||||
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
|
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
|
||||||
|
import cn.axzo.workflow.starter.mq.monitor.WorkflowEngineStarterDefaultMQMonitor;
|
||||||
import cn.azxo.framework.common.model.CommonResponse;
|
import cn.azxo.framework.common.model.CommonResponse;
|
||||||
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
|
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -10,6 +11,7 @@ import org.springframework.beans.factory.annotation.Value;
|
|||||||
import org.springframework.core.env.Environment;
|
import org.springframework.core.env.Environment;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestParam;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
@ -35,6 +37,8 @@ public class WorkflowEngineStarterMQMonitorController {
|
|||||||
@Resource
|
@Resource
|
||||||
private ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider;
|
private ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider;
|
||||||
@Resource
|
@Resource
|
||||||
|
private WorkflowEngineStarterDefaultMQMonitor monitor;
|
||||||
|
@Resource
|
||||||
private Environment environment;
|
private Environment environment;
|
||||||
@Value("${spring.application.name}")
|
@Value("${spring.application.name}")
|
||||||
private String applicationName;
|
private String applicationName;
|
||||||
@ -47,6 +51,9 @@ public class WorkflowEngineStarterMQMonitorController {
|
|||||||
public CommonResponse<Map<String, Object>> monitor() {
|
public CommonResponse<Map<String, Object>> monitor() {
|
||||||
String topic = DEFAULT_EVENT + activeProfile;
|
String topic = DEFAULT_EVENT + activeProfile;
|
||||||
Map<String, Object> result = new HashMap<>();
|
Map<String, Object> result = new HashMap<>();
|
||||||
|
if (!starterProperties.getEnableDlqMonitor()) {
|
||||||
|
result.put("Info", "未开启·死信队列·的监控,如需,请设置 workflow.engine.starter.enableDlqMonitor = true 后再重试!");
|
||||||
|
} else {
|
||||||
mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> {
|
mqAdminExtObjectProvider.ifAvailable(defaultMQAdminExt -> {
|
||||||
String segment = environment.getProperty(MQ_GID_NAME_SEGMENT);
|
String segment = environment.getProperty(MQ_GID_NAME_SEGMENT);
|
||||||
try {
|
try {
|
||||||
@ -59,12 +66,34 @@ public class WorkflowEngineStarterMQMonitorController {
|
|||||||
// result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic));
|
// result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic));
|
||||||
result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic));
|
result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic));
|
||||||
// result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic));
|
// result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic));
|
||||||
result.put("广播-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + broadcastConsumer));
|
result.put("Broadcast-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + broadcastConsumer));
|
||||||
result.put("RPC-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + rpcConsumer));
|
result.put("RPC-DLQ", defaultMQAdminExt.examineTopicStats(DLQ_PREFIX + rpcConsumer));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("monitor controller error: {}", e.getMessage(), e);
|
log.warn("monitor controller error: {}", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
return CommonResponse.success(result);
|
return CommonResponse.success(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param status
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@GetMapping("/m/set")
|
||||||
|
public CommonResponse<String> changeMonitorState(@RequestParam("status") Boolean status) {
|
||||||
|
if (status) {
|
||||||
|
if (!monitor.isRunning()) {
|
||||||
|
monitor.start();
|
||||||
|
}
|
||||||
|
return CommonResponse.success("已临时开始 MQ 死信队列监控!" +
|
||||||
|
"在应用重启前,将持续监控。重启后,将根据环境变量来确定是否监控。");
|
||||||
|
} else {
|
||||||
|
if (monitor.isRunning()) {
|
||||||
|
monitor.stop();
|
||||||
|
}
|
||||||
|
return CommonResponse.success("已临时关闭 MQ 死信队列监控!" +
|
||||||
|
"在应用重启前,将不再监控。重启后,将根据环境变量配置来确定是否监控。");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user