diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/BroadcastListenerProperties.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/BroadcastListenerProperties.java index d14a8ce74..679d752bd 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/BroadcastListenerProperties.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/BroadcastListenerProperties.java @@ -56,8 +56,8 @@ public class BroadcastListenerProperties { /** * 失败处理策略: *
-     * 1、FAIL_OVER, 当前listener执行出错,在经历重试后,不抛出异常,忽略继续往下执行(默认策略)
-     * 2、FAIL_FAST, 快速失败,出错直接抛出异常,listener不再往下执行
+     * 1、FAIL_OVER, 当前listener执行出错,在经历重试后,抛出异常,并将消息加入死信队列,然后继续往下执行(默认策略)
+     * 2、FAIL_FAST, 快速失败,不管内部是什么异常类型,都将吞掉异常,正确结束,不会增加死信队列计数
      * 3、FAIL_BACK, 失败自动恢复,在后台记录失败的消息,并按照一定的策略后期再进行重试,目前暂不支持
      * 
*/ diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java index 3741d12e1..9ac804265 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java @@ -6,6 +6,7 @@ import cn.axzo.framework.rocketmq.Event; import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.framework.rocketmq.EventHandlerRepository; import cn.axzo.workflow.starter.common.condition.NonContainerEnvironmentCondition; +import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException; import cn.axzo.workflow.starter.handler.filter.global.BroadcastMessageQueueFilter; import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerWorkflowListener; import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowEngineBroadcastEventListener; @@ -60,6 +61,8 @@ public class StarterBroadcastMQConfiguration { // 不管 cn.axzo.workflow.starter.BroadcastListenerProperties.failHandleType 设置是哪种策略,这里最终都会打印 WARN 级别日志,框架底层会对所有异常的消息打印 ERROR 日志 // 如果不想打印需要扩展内部逻辑,请主动注册自己扩展的 Bean,且名称为 StarterBroadcastMQConfiguration.BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME log.warn("Workflow Engine Starter MQ, an exception occurred during processing {}", logText, ex); + // 让其进入死信队列 + throw new WorkflowEngineStarterException(ex); }); } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java index 2a291e8bc..aa9854aeb 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java @@ -164,8 +164,7 @@ public class StarterRPCInvokeMQConfiguration { @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}", consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${GID_SEGMENT}_consumer", consumeMode = ConsumeMode.CONCURRENTLY, - maxReconsumeTimes = 7, // 发布时需调整为 7, 总共耗时在 15min 内 - replyTimeout = 10000, + maxReconsumeTimes = 7, // 发布时需调整为 7, 总共耗时在 15min 内, 目前容器为滚动发布,一版不会出现停机这么久 nameServer = "${rocketmq.name-server}" ) public static class WorkflowEngineStarterRetryConsumer extends BaseListener implements RocketMQListener, InitializingBean { diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowEngineStarterException.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowEngineStarterException.java index 2cf0712b7..cf4244523 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowEngineStarterException.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/common/exception/WorkflowEngineStarterException.java @@ -8,6 +8,10 @@ package cn.axzo.workflow.starter.common.exception; */ public class WorkflowEngineStarterException extends RuntimeException { + public WorkflowEngineStarterException(Throwable cause) { + super(cause); + } + public WorkflowEngineStarterException(String message) { super(message); } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailFastInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailFastInterceptor.java index bd432e078..77b46c5a0 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailFastInterceptor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailFastInterceptor.java @@ -15,11 +15,9 @@ public final class FailFastInterceptor extends AbstractListenerInterceptor { public void execute(ListenerExecutor executor, Consumer consumer, EventConsumer.Context context, T t) { try { getNext().execute(executor, consumer, context, t); - } catch (Exception e) { + } catch (Throwable e) { throw new WorkflowListenerExecutionException( - "Failed to invoke the method " -// + methodName - + ". Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); + "Failed to invoke the method. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailOverInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailOverInterceptor.java index 342640fba..0eb2659fc 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailOverInterceptor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/FailOverInterceptor.java @@ -29,8 +29,8 @@ public final class FailOverInterceptor extends AbstractListenerInterceptor { @Override public void execute(ListenerExecutor executor, Consumer consumer, EventConsumer.Context context, T t) { - // 避免组件抛出“程序处理超时”的异常信息 - context.setMaxAllowElapsedMillis(calc() + 500L); + // 避免组件抛出“程序处理超时”的异常信息, 为业务冗余 10 处理时间 + context.setMaxAllowElapsedMillis(calc() + 10_000L); long waitTime = waitTimeInMs; int failedAttempts = 0; do { @@ -42,7 +42,7 @@ public final class FailOverInterceptor extends AbstractListenerInterceptor { try { getNext().execute(executor, consumer, context, t); return; - } catch (Exception e) { + } catch (Throwable e) { if (failedAttempts == numOfRetries) { log.error("Workflow Engine Starter caught exception: {}", e.getMessage(), e); throw e; diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java index 665e7bf84..1ff75e8ad 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/handler/execute/interceptor/LogInterceptor.java @@ -1,6 +1,8 @@ package cn.axzo.workflow.starter.handler.execute.interceptor; import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException; +import cn.axzo.workflow.starter.common.exception.WorkflowListenerExecutionException; import cn.axzo.workflow.starter.handler.execute.ListenerExecutor; import com.alibaba.fastjson.JSON; import org.slf4j.Logger; @@ -24,6 +26,14 @@ public final class LogInterceptor extends AbstractListenerInterceptor { } try { getNext().execute(executor, consumer, context, t); + } catch (WorkflowListenerExecutionException e) { + // fail_fast mode , will be ignored + } catch (Throwable e) { + if (e instanceof WorkflowEngineStarterException) { + // 如果业务想使用异常来正确结束事件处理,可以抛出该移除类型 + } else { + throw e; + } } finally { stopWatch.stop(); if (log.isDebugEnabled()) { diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java index 51ac42f10..fb623dfd3 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/mq/monitor/WorkflowEngineStarterDefaultMQMonitor.java @@ -108,10 +108,10 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle { @Override public boolean isAutoStartup() { if (starterProperties.getEnableDlqMonitor() && starterProperties.getRpcMonitorStatus()) { - taskScheduler.scheduleWithFixedDelay(() -> mqWatch(BROADCAST), starterProperties.getDlqMonitorIntervalInMs()); + taskScheduler.scheduleWithFixedDelay(() -> mqWatch(RPC), starterProperties.getDlqMonitorIntervalInMs()); } if (starterProperties.getEnableDlqMonitor() && starterProperties.getBroadcast().getMonitorStatus()) { - taskScheduler.scheduleWithFixedDelay(() -> mqWatch(RPC), starterProperties.getDlqMonitorIntervalInMs()); + taskScheduler.scheduleWithFixedDelay(() -> mqWatch(BROADCAST), starterProperties.getDlqMonitorIntervalInMs()); } return false; }