diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java index 7f55753b3..3741d12e1 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java @@ -69,8 +69,10 @@ public class StarterBroadcastMQConfiguration { Consumer callback = eventWrapper -> { if (eventWrapper.isHandled()) { // 只收集被App真正消费的消息. - Event event = eventWrapper.getEvent(); - log.info("WorkflowEngine Broadcast MQ, handled event: {}", event.toPrettyJsonString()); + if (log.isDebugEnabled()) { + Event event = eventWrapper.getEvent(); + log.debug("WorkflowEngine Broadcast MQ, handled event: {}", event.toPrettyJsonString()); + } } }; return new DefaultEventConsumer(applicationName, eventHandlerRepository, callback); diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java index e62efe7dd..9c9b0ab0f 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterProperties.java @@ -33,6 +33,16 @@ public class WorkflowEngineStarterProperties { */ private Boolean joinContainerGroup = false; + /** + *

该参数只针对容器环境生效

+ * 配合 joinContainerGroup 使用,且只在 joinContainerGroup = false 时生效 + *
+     * 在本地有多台开发机同时启动时,又会组成新的集群消费,也会导致消息异常消费,
+     * 所以该参数就是为了创建完全唯一的消费者,避免本地开发机组成集群。
+     * 
+ */ + private String specialId; + /** * WorkflowCoreService 类中所有方法未标记{@link InvokeMode}注解的方法调用时, 默认采用的模式 * @@ -68,6 +78,14 @@ public class WorkflowEngineStarterProperties { this.joinContainerGroup = joinContainerGroup; } + public String getSpecialId() { + return specialId; + } + + public void setSpecialId(String specialId) { + this.specialId = specialId; + } + public RpcInvokeModeEnum getInvokeMode() { return invokeMode; } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java index 495c585b0..c942f6870 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/condition/NonContainerEnvironmentCondition.java @@ -32,6 +32,7 @@ public class NonContainerEnvironmentCondition implements Condition { @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { ConfigurableEnvironment environment = (ConfigurableEnvironment) context.getEnvironment(); + // 依赖 K8S 设置的环境变量信息,如果变量的 key 发生变化,会导致此处的功能可能出现异常 String myPodNamespace = environment.getProperty(K8S_POD_NAME_SPACE); String activeProfile = environment.getProperty(NACOS_PROFILES_ACTIVE); if (!StringUtils.hasText(activeProfile)) { @@ -49,10 +50,14 @@ public class NonContainerEnvironmentCondition implements Condition { // 获取默认值 joinContainerGroup = new WorkflowEngineStarterProperties().getJoinContainerGroup(); } - log.debug("workflow engine starter join-container-group status: {} ", joinContainerGroup); + if (log.isDebugEnabled()) { + log.debug("workflow engine starter join-container-group status: {} ", joinContainerGroup); + } + String specialId = environment.getProperty("workflow.engine.starter.special-id", String.class); environment.getSystemProperties().put(MQ_GID_NAME_SEGMENT, - joinContainerGroup ? activeProfile : activeProfile + DEBUGGING_MQ_SUFFIX); + joinContainerGroup ? activeProfile : + activeProfile + (StringUtils.hasText(specialId) ? "_" + specialId : "") + DEBUGGING_MQ_SUFFIX); return true; } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java index 7228c5fd6..665e7bf84 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java @@ -16,13 +16,19 @@ public final class LogInterceptor extends AbstractListenerInterceptor { public void execute(ListenerExecutor executor, Consumer consumer, EventConsumer.Context context, T t) { StopWatch stopWatch = new StopWatch(); stopWatch.start(); - log.debug("--- starting handle mq ---- "); - log.info("messageId: {}, eventCode: {}, messageBody: {}", context.getMsgId(), context.getEventCode().toString(), JSON.toJSONString(t)); + if (log.isDebugEnabled()) { + log.debug("--- starting handle mq ---- "); + } + if (log.isInfoEnabled()) { + log.info("messageId: {}, eventCode: {}, messageBody: {}", context.getMsgId(), context.getEventCode().toString(), JSON.toJSONString(t)); + } try { getNext().execute(executor, consumer, context, t); } finally { stopWatch.stop(); - log.debug("--- messageId: {} finished ,timeCost:{} ms ---", executor.getClass().getSimpleName(), stopWatch.getTotalTimeMillis()); + if (log.isDebugEnabled()) { + log.debug("--- messageId: {} finished ,timeCost:{} ms ---", executor.getClass().getSimpleName(), stopWatch.getTotalTimeMillis()); + } } } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java index 334998221..d3c09b78b 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/broadcast/consumer/WorkflowEngineBroadcastEventListener.java @@ -35,7 +35,7 @@ public class WorkflowEngineBroadcastEventListener implements EventHandler, Initi @Override public void onEvent(Event event, EventConsumer.Context context) { if (Objects.isNull(event) || Objects.isNull(event.getEventCode())) { - log.warn("illegal event code: {}", event); + log.warn("illegal event code: {}", JSON.toJSONString(event)); return; } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java index a73719d3c..0b847c0d0 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/consumer/WorkflowEngineStarterRetryEventListener.java @@ -128,9 +128,13 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In // 事件处理 RPC 请求, 强制使用同步模式 ThreadUtil.set(SYNC); Object[] args = convertToActualArgs(mapping.getMethod(), dto.getParameters()); - log.debug("event rpc request args: {}", JSON.toJSONString(args)); + if (log.isDebugEnabled()) { + log.debug("event rpc request args: {}", JSON.toJSONString(args)); + } Object invoke = mapping.getMethod().invoke(mapping.getInterfaceObject(), args); - log.debug("Event Invoke Result: {}", JSON.toJSONString(invoke)); + if (log.isDebugEnabled()) { + log.debug("Event Invoke Result: {}", JSON.toJSONString(invoke)); + } } catch (Throwable e) { // 能抛出异常目前只有两种情况, 一个是网络异常, 另一个是对端服务内部异常 Throwable cause = getRealCause(e); diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/producer/RpcInvokeEventProducer.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/producer/RpcInvokeEventProducer.java index b61406bef..de7f2fc80 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/producer/RpcInvokeEventProducer.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/retry/producer/RpcInvokeEventProducer.java @@ -49,7 +49,9 @@ public class RpcInvokeEventProducer extends RocketMQEventProducer { @Override public void send(@NonNull Event event, @NonNull Context context) { - log.info("发送事件内容: {}", event.toPrettyJsonString()); + if (log.isDebugEnabled()) { + log.debug("发送事件内容: {}", event.toPrettyJsonString()); + } if (sendBeforeCallback != null) { sendBeforeCallback.accept(event, context); } @@ -101,8 +103,10 @@ public class RpcInvokeEventProducer extends RocketMQEventProducer { getAfterCommitExecutor().executeAndRollback(runnable, rollbackRunnable); } - List runnables = ListUtils.emptyIfNull(getAfterCommitExecutor().getRunnables()); - log.info("runnables.size(): {}", runnables.size()); + if (log.isDebugEnabled()) { + List runnables = ListUtils.emptyIfNull(getAfterCommitExecutor().getRunnables()); + log.debug("runnables.size(): {}", runnables.size()); + } } @Override