update(REQ-2516) - 调整本地启用应用时 MQ 加入容器集群消费的测试
This commit is contained in:
parent
95bee3c586
commit
c471d787f6
@ -82,7 +82,7 @@ public class WorkflowEngineStarterRocketMQConfiguration {
|
||||
@Component
|
||||
@Conditional(NonContainerEnvironmentCondition.class)
|
||||
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
|
||||
consumerGroup = "GID_${spring.application.name}_workflow_engine_${MY_POD_NAMESPACE:debugging}_consumer",
|
||||
consumerGroup = "GID_${spring.application.name}_workflow_engine_${GID_SEGMENT}_consumer",
|
||||
consumeMode = ConsumeMode.ORDERLY,
|
||||
nameServer = "${rocketmq.name-server}"
|
||||
)
|
||||
@ -168,7 +168,7 @@ public class WorkflowEngineStarterRocketMQConfiguration {
|
||||
@Component
|
||||
@Conditional(NonContainerEnvironmentCondition.class)
|
||||
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
|
||||
consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${MY_POD_NAMESPACE:debugging}_consumer",
|
||||
consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${GID_SEGMENT}_consumer",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
maxReconsumeTimes = 3, // TODO 发布时需调整为 7, 总共耗时在 15min 内
|
||||
replyTimeout = 10000,
|
||||
|
||||
@ -11,13 +11,16 @@ import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import static cn.axzo.workflow.starter.common.constant.StarterConstants.DEBUGGING_MQ_SUFFIX;
|
||||
import static cn.axzo.workflow.starter.common.constant.StarterConstants.K8S_POD_NAME_SPACE;
|
||||
import static cn.axzo.workflow.starter.common.constant.StarterConstants.MQ_GID_NAME_SEGMENT;
|
||||
|
||||
/**
|
||||
* 用于处理 MQ 的消费者, 在本地启动或在容器中启动时, 能自主控制是否并入统一的消费组
|
||||
* <p>
|
||||
* 可查看 {@link WorkflowEngineStarterProperties#localConsumer} 属性, 来了解本来的用途,
|
||||
* 特别需要注意的是: Starter 是结合 K8S 的 命名空间(namespace) 来处理的. 如果公司在删减/变更
|
||||
* 环境后,可能会导致 {@link NonContainerEnvironmentCondition#mappingNamespace(String)}
|
||||
* 方法异常.
|
||||
* 可查看 {@link WorkflowEngineStarterProperties#joinContainerGroup} 属性, 来了解本类的用途,
|
||||
* 特别需要注意的是: Starter 是结合 K8S 的命名空间(namespace) 来处理的. 如果公司在变更环境后,
|
||||
* 可能会导致 {@link NonContainerEnvironmentCondition#mappingNamespace(String)} 方法异常.
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024/5/30 22:19
|
||||
@ -26,32 +29,49 @@ public class NonContainerEnvironmentCondition implements Condition {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(NonContainerEnvironmentCondition.class);
|
||||
|
||||
private static final String MY_POD_NAMESPACE = "MY_POD_NAMESPACE";
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
|
||||
ConfigurableEnvironment environment = (ConfigurableEnvironment) context.getEnvironment();
|
||||
// 优先外部化配置
|
||||
Boolean localCustomer = environment.getProperty("workflow.engine.starter.local-consumer", Boolean.class);
|
||||
if (Objects.isNull(localCustomer)) {
|
||||
// 获取默认值
|
||||
localCustomer = new WorkflowEngineStarterProperties().getLocalConsumer();
|
||||
}
|
||||
log.info("workflow engine starter local consumer status: {}", localCustomer);
|
||||
String myPodNamespace = environment.getProperty(K8S_POD_NAME_SPACE);
|
||||
|
||||
String myPodNamespace = environment.getProperty(MY_POD_NAMESPACE);
|
||||
if (localCustomer && !StringUtils.hasText(myPodNamespace)) {
|
||||
environment.getSystemProperties().put(MY_POD_NAMESPACE, mappingNamespace(environment.getProperty("spring.profiles.active")));
|
||||
// 在容器环境时, 强制加入集群消费组
|
||||
if (StringUtils.hasText(myPodNamespace)) {
|
||||
environment.getSystemProperties().put(MQ_GID_NAME_SEGMENT, mappingNamespace(myPodNamespace));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
// 优先外部化配置
|
||||
Boolean joinContainerGroup = environment.getProperty("workflow.engine.starter.join-container-group", Boolean.class);
|
||||
if (Objects.isNull(joinContainerGroup)) {
|
||||
// 获取默认值
|
||||
joinContainerGroup = new WorkflowEngineStarterProperties().getJoinContainerGroup();
|
||||
}
|
||||
log.debug("workflow engine starter join-container-group status: {} ", joinContainerGroup);
|
||||
|
||||
String activeProfile = environment.getProperty("spring.profiles.active", String.class);
|
||||
environment.getSystemProperties().put(MQ_GID_NAME_SEGMENT,
|
||||
joinContainerGroup ? mappingNamespace(activeProfile) : activeProfile + DEBUGGING_MQ_SUFFIX);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 K8S 集群的 NameSpace 空间名映射为开发常用的 profile 名称
|
||||
*
|
||||
* @param namespace
|
||||
* @return
|
||||
*/
|
||||
private String mappingNamespace(String namespace) {
|
||||
if (!StringUtils.hasText(namespace)) {
|
||||
throw new IllegalArgumentException("namespace is empty");
|
||||
}
|
||||
switch (namespace) {
|
||||
case "dev":
|
||||
case "local":
|
||||
return "java-dev";
|
||||
return "local";
|
||||
case "dev":
|
||||
case "java-dev":
|
||||
return "dev";
|
||||
case "test":
|
||||
return "test";
|
||||
case "pre":
|
||||
@ -59,7 +79,8 @@ public class NonContainerEnvironmentCondition implements Condition {
|
||||
case "live":
|
||||
return "live";
|
||||
case "master":
|
||||
return "pro";
|
||||
case "pro":
|
||||
return "master";
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format("namespace %s is not supported", namespace));
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user