update(REQ-2516) - 新增 MQ 监控相关设置

This commit is contained in:
wangli 2024-06-06 11:12:05 +08:00
parent 6bf6c7f56d
commit ca48245c43
6 changed files with 59 additions and 18 deletions

View File

@ -2,7 +2,6 @@ package cn.axzo.workflow.client.feign.bpmn;
import cn.axzo.workflow.client.config.CommonFeignConfiguration;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.generate.annotition.Management;
import cn.azxo.framework.common.model.CommonResponse;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.cloud.openfeign.FeignClient;
@ -25,6 +24,8 @@ public interface ProcessActivityApi {
/**
* 业务节点唤醒
* <p>
* TODO 接口需要合并但需要考虑客户端与服务端不同版本间如何兼容
*/
@GetMapping("/api/process/activity/old/trigger")
CommonResponse<Boolean> trigger(@NotBlank(message = "触发 ID 不能为空") @RequestParam String triggerId);

View File

@ -67,7 +67,7 @@ public class BpmnProcessActivityController implements ProcessActivityApi {
@Override
@RepeatSubmit
public CommonResponse<Boolean> trigger(@NotBlank(message = "触发 ID 不能为空") @RequestParam String triggerId) {
return trigger(triggerId, false);
return trigger(triggerId, true);
}
/**

View File

@ -69,7 +69,7 @@ public class StarterRPCInvokeMQConfiguration {
public EventProducer workflowEngineStarterEventProducer(RocketMQTemplate rocketMQTemplate) {
return new RpcInvokeEventProducer(rocketMQTemplate,
DEFAULT_MODULE,
applicationName + "Starter",
applicationName + "_WE_Starter",
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(DEFAULT_EVENT + activeProfile)

View File

@ -22,7 +22,8 @@ import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerInstanceEventListener
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerNotificationEventListener;
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerTaskEventListener;
import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowListener;
import cn.axzo.workflow.starter.mq.monitor.DefaultMQMonitor;
import cn.axzo.workflow.starter.mq.monitor.WorkflowEngineStarterDefaultMQMonitor;
import cn.axzo.workflow.starter.mq.monitor.console.WorkflowEngineStarterMQMonitorController;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
@ -132,7 +133,12 @@ public class WorkflowEngineStarterAutoConfiguration {
}
@Bean
public DefaultMQMonitor defaultMQMonitor() {
return new DefaultMQMonitor();
public WorkflowEngineStarterMQMonitorController workflowEngineStarterMQMonitorController() {
return new WorkflowEngineStarterMQMonitorController();
}
@Bean
public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(DefaultMQAdminExt defaultMQAdminExt) {
return new WorkflowEngineStarterDefaultMQMonitor(defaultMQAdminExt);
}
}

View File

@ -0,0 +1,18 @@
package cn.axzo.workflow.starter.mq.monitor;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
/**
* 用于监控 Starter 关注的 Topic 相关的 Producer Consumer 信息
*
* @author wangli
* @since 2024/6/6 10:04
*/
public class WorkflowEngineStarterDefaultMQMonitor {
private final DefaultMQAdminExt defaultMQAdminExt;
public WorkflowEngineStarterDefaultMQMonitor(DefaultMQAdminExt defaultMQAdminExt) {
this.defaultMQAdminExt = defaultMQAdminExt;
}
}

View File

@ -1,8 +1,10 @@
package cn.axzo.workflow.starter.mq.monitor;
package cn.axzo.workflow.starter.mq.monitor.console;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.azxo.framework.common.model.CommonResponse;
import lombok.SneakyThrows;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@ -11,29 +13,43 @@ import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_EVENT;
/**
* TODO
* Workflow Engine Starter MessageQueue Monitor Controller
*
* @author wangli
* @since 2024/6/5 17:49
* @since 2024/6/6 10:04
*/
@RestController
@RequestMapping("/web/workflow/engine/starter/monitor")
public class DefaultMQMonitor {
@RequestMapping("/web/we/starter")
public class WorkflowEngineStarterMQMonitorController {
@Resource
private WorkflowEngineStarterProperties starterProperties;
@Resource
private DefaultMQAdminExt defaultMQAdminExt;
@Value("${spring.profiles.active}")
private String activeProfile;
@SneakyThrows
@GetMapping("/test")
public CommonResponse<Map> monitor() {
@GetMapping("/monitor")
public CommonResponse<Map<String, Object>> monitor() {
String topic = DEFAULT_EVENT + activeProfile;
String broadcastConsumerGroup = "";
Map<String, Object> result = new HashMap<>();
result.put("examineTopicStats", defaultMQAdminExt.examineTopicStats("topic_workflow_engine_dev"));
result.put("BrokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo());
result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList(topic));
result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo(topic));
result.put("TopicStats", defaultMQAdminExt.examineTopicStats(topic));
System.getProperty("");
if (starterProperties.getJoinContainerGroup()) {
}
result.put("gid_senna", defaultMQAdminExt.examineConsumeStats("GID_senna_workflow_engine_debugging_consumer"));
result.put("gid_senna_starter", defaultMQAdminExt.examineConsumeStats("GID_senna_workflow_engine_starter_debugging_consumer"));
result.put("brokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo());
result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList("topic_workflow_engine_dev"));
result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo("topic_workflow_engine_dev"));
defaultMQAdminExt.examineTopicConfig(System.getProperty("rocketmq.name-server"), "topic_workflow_engine_dev");
return CommonResponse.success(result);
}