update(REQ-2516) - 重新调整 MQ 集群与本地的消费差异

This commit is contained in:
wangli 2024-06-05 20:41:11 +08:00
parent be09801a4d
commit 46e0bced80
6 changed files with 51 additions and 42 deletions

View File

@ -36,7 +36,7 @@ import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import static cn.axzo.workflow.starter.WorkflowEngineStarterRPCInvokeMQConfiguration.DEFAULT_EVENT;
import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_EVENT;
/**
* 配置监听流程引擎服务广播消息的 RocketMQ 相关配置
@ -45,8 +45,8 @@ import static cn.axzo.workflow.starter.WorkflowEngineStarterRPCInvokeMQConfigura
* @since 2024/6/5 17:39
*/
@Configuration(proxyBeanMethods = false)
public class WorkflowEngineStarterBroadcastMQConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterBroadcastMQConfiguration.class);
public class StarterBroadcastMQConfiguration {
private final Logger log = LoggerFactory.getLogger(StarterBroadcastMQConfiguration.class);
@Value("${spring.application.name}")
private String applicationName;

View File

@ -46,8 +46,8 @@ import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID;
* @since 2024/5/30 14:05
*/
@Configuration(proxyBeanMethods = false)
public class WorkflowEngineStarterRPCInvokeMQConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterRPCInvokeMQConfiguration.class);
public class StarterRPCInvokeMQConfiguration {
private final Logger log = LoggerFactory.getLogger(StarterRPCInvokeMQConfiguration.class);
public static final String DEFAULT_MODULE = "workflowEngine";
public static final String DEFAULT_EVENT = "topic_workflow_engine_";

View File

@ -22,6 +22,7 @@ import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerInstanceEventListener
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerNotificationEventListener;
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerTaskEventListener;
import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowListener;
import cn.axzo.workflow.starter.mq.monitor.DefaultMQMonitor;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
@ -48,7 +49,7 @@ import java.util.List;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WorkflowEngineStarterProperties.class)
@EnableFeignClients(clients = {WorkflowCoreService.class})
@Import({WorkflowEngineStarterBroadcastMQConfiguration.class, WorkflowEngineStarterRPCInvokeMQConfiguration.class})
@Import({StarterBroadcastMQConfiguration.class, StarterRPCInvokeMQConfiguration.class})
public class WorkflowEngineStarterAutoConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterAutoConfiguration.class);
@ -129,4 +130,9 @@ public class WorkflowEngineStarterAutoConfiguration {
}
return defaultMQAdminExt;
}
@Bean
public DefaultMQMonitor defaultMQMonitor() {
return new DefaultMQMonitor();
}
}

View File

@ -14,13 +14,13 @@ import java.util.Objects;
import static cn.axzo.workflow.starter.common.constant.StarterConstants.DEBUGGING_MQ_SUFFIX;
import static cn.axzo.workflow.starter.common.constant.StarterConstants.K8S_POD_NAME_SPACE;
import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT;
import static cn.axzo.workflow.starter.common.constant.StarterConstants.NACOS_PROFILES_ACTIVE;
/**
* 用于处理 MQ 的消费者, 在本地启动或在容器中启动时, 能自主控制是否并入统一的消费组
* <p>
* 可查看 {@link WorkflowEngineStarterProperties#joinContainerGroup} 属性, 来了解本类的用途,
* 特别需要注意的是: Starter 是结合 K8S 的命名空间(namespace) 来处理的. 如果公司在变更环境后,
* 可能会导致 {@link NonContainerEnvironmentCondition#mappingNamespace(String)} 方法异常.
* 特别需要注意的是: Starter 是结合 K8S 的命名空间(namespace) 来处理的.
*
* @author wangli
* @since 2024/5/30 22:19
@ -33,14 +33,16 @@ public class NonContainerEnvironmentCondition implements Condition {
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
ConfigurableEnvironment environment = (ConfigurableEnvironment) context.getEnvironment();
String myPodNamespace = environment.getProperty(K8S_POD_NAME_SPACE);
String activeProfile = environment.getProperty(NACOS_PROFILES_ACTIVE);
if (!StringUtils.hasText(activeProfile)) {
activeProfile = environment.getProperty("spring.profiles.active", String.class);
}
// 在容器环境时, 强制加入集群消费组
if (StringUtils.hasText(myPodNamespace)) {
environment.getSystemProperties().put(MQ_GID_NAME_SEGMENT, mappingNamespace(myPodNamespace));
environment.getSystemProperties().put(MQ_GID_NAME_SEGMENT, activeProfile);
return true;
}
// 优先外部化配置
Boolean joinContainerGroup = environment.getProperty("workflow.engine.starter.join-container-group", Boolean.class);
if (Objects.isNull(joinContainerGroup)) {
@ -49,40 +51,10 @@ public class NonContainerEnvironmentCondition implements Condition {
}
log.debug("workflow engine starter join-container-group status: {} ", joinContainerGroup);
String activeProfile = environment.getProperty("spring.profiles.active", String.class);
environment.getSystemProperties().put(MQ_GID_NAME_SEGMENT,
joinContainerGroup ? mappingNamespace(activeProfile) : activeProfile + DEBUGGING_MQ_SUFFIX);
joinContainerGroup ? activeProfile : activeProfile + DEBUGGING_MQ_SUFFIX);
return true;
}
/**
* K8S 集群的 NameSpace 空间名映射为开发常用的 profile 名称
*
* @param namespace
* @return
*/
private String mappingNamespace(String namespace) {
if (!StringUtils.hasText(namespace)) {
throw new IllegalArgumentException("namespace is empty");
}
switch (namespace) {
case "local":
return "local";
case "dev":
case "java-dev":
return "dev";
case "test":
return "test";
case "pre":
return "pre";
case "live":
return "live";
case "master":
case "pro":
return "master";
default:
throw new IllegalArgumentException(String.format("namespace %s is not supported", namespace));
}
}
}

View File

@ -10,5 +10,6 @@ public interface StarterConstants {
String STARTER_INVOKE_MODE = "WORKFLOW-ENGINE-STARTER-INVOKE-MODE";
String DEBUGGING_MQ_SUFFIX = "_debugging";
String K8S_POD_NAME_SPACE = "MY_POD_NAMESPACE";
String NACOS_PROFILES_ACTIVE = "NACOS_PROFILES_ACTIVE";
String MQ_GID_NAME_SEGMENT = "GID_SEGMENT";
}

View File

@ -1,10 +1,40 @@
package cn.axzo.workflow.starter.mq.monitor;
import cn.azxo.framework.common.model.CommonResponse;
import lombok.SneakyThrows;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* TODO
*
* @author wangli
* @since 2024/6/5 17:49
*/
@RestController
@RequestMapping("/web/workflow/engine/starter/monitor")
public class DefaultMQMonitor {
@Resource
private DefaultMQAdminExt defaultMQAdminExt;
@SneakyThrows
@GetMapping("/test")
public CommonResponse<Map> monitor() {
Map<String, Object> result = new HashMap<>();
result.put("examineTopicStats", defaultMQAdminExt.examineTopicStats("topic_workflow_engine_dev"));
result.put("gid_senna", defaultMQAdminExt.examineConsumeStats("GID_senna_workflow_engine_debugging_consumer"));
result.put("gid_senna_starter", defaultMQAdminExt.examineConsumeStats("GID_senna_workflow_engine_starter_debugging_consumer"));
result.put("brokerClusterInfo", defaultMQAdminExt.examineBrokerClusterInfo());
result.put("TopicClusterList", defaultMQAdminExt.getTopicClusterList("topic_workflow_engine_dev"));
result.put("TopicRouteInfo", defaultMQAdminExt.examineTopicRouteInfo("topic_workflow_engine_dev"));
defaultMQAdminExt.examineTopicConfig(System.getProperty("rocketmq.name-server"), "topic_workflow_engine_dev");
return CommonResponse.success(result);
}
}