update(REQ-2516) - 优化测试过程中发现的一些问题

This commit is contained in:
wangli 2024-06-17 21:01:16 +08:00
parent a21b4eec10
commit 872999d51d
8 changed files with 27 additions and 13 deletions

View File

@ -56,8 +56,8 @@ public class BroadcastListenerProperties {
/** /**
* 失败处理策略 * 失败处理策略
* <pre> * <pre>
* 1FAIL_OVER, 当前listener执行出错在经历重试后不抛出异常忽略继续往下执行(默认策略) * 1FAIL_OVER, 当前listener执行出错在经历重试后抛出异常并将消息加入死信队列然后继续往下执行(默认策略)
* 2FAIL_FAST, 快速失败出错直接抛出异常listener不再往下执行 * 2FAIL_FAST, 快速失败不管内部是什么异常类型都将吞掉异常正确结束不会增加死信队列计数
* <s>3FAIL_BACK, 失败自动恢复在后台记录失败的消息并按照一定的策略后期再进行重试</s>目前暂不支持 * <s>3FAIL_BACK, 失败自动恢复在后台记录失败的消息并按照一定的策略后期再进行重试</s>目前暂不支持
* </pre> * </pre>
*/ */

View File

@ -6,6 +6,7 @@ import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandlerRepository; import cn.axzo.framework.rocketmq.EventHandlerRepository;
import cn.axzo.workflow.starter.common.condition.NonContainerEnvironmentCondition; import cn.axzo.workflow.starter.common.condition.NonContainerEnvironmentCondition;
import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException;
import cn.axzo.workflow.starter.handler.filter.global.BroadcastMessageQueueFilter; import cn.axzo.workflow.starter.handler.filter.global.BroadcastMessageQueueFilter;
import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerWorkflowListener; import cn.axzo.workflow.starter.mq.broadcast.consumer.InnerWorkflowListener;
import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowEngineBroadcastEventListener; import cn.axzo.workflow.starter.mq.broadcast.consumer.WorkflowEngineBroadcastEventListener;
@ -60,6 +61,8 @@ public class StarterBroadcastMQConfiguration {
// 不管 cn.axzo.workflow.starter.BroadcastListenerProperties.failHandleType 设置是哪种策略这里最终都会打印 WARN 级别日志框架底层会对所有异常的消息打印 ERROR 日志 // 不管 cn.axzo.workflow.starter.BroadcastListenerProperties.failHandleType 设置是哪种策略这里最终都会打印 WARN 级别日志框架底层会对所有异常的消息打印 ERROR 日志
// 如果不想打印需要扩展内部逻辑请主动注册自己扩展的 Bean且名称为 StarterBroadcastMQConfiguration.BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME // 如果不想打印需要扩展内部逻辑请主动注册自己扩展的 Bean且名称为 StarterBroadcastMQConfiguration.BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME
log.warn("Workflow Engine Starter MQ, an exception occurred during processing {}", logText, ex); log.warn("Workflow Engine Starter MQ, an exception occurred during processing {}", logText, ex);
// 让其进入死信队列
throw new WorkflowEngineStarterException(ex);
}); });
} }

View File

@ -164,8 +164,7 @@ public class StarterRPCInvokeMQConfiguration {
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}", @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${GID_SEGMENT}_consumer", consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${GID_SEGMENT}_consumer",
consumeMode = ConsumeMode.CONCURRENTLY, consumeMode = ConsumeMode.CONCURRENTLY,
maxReconsumeTimes = 7, // 发布时需调整为 7, 总共耗时在 15min maxReconsumeTimes = 7, // 发布时需调整为 7, 总共耗时在 15min 目前容器为滚动发布一版不会出现停机这么久
replyTimeout = 10000,
nameServer = "${rocketmq.name-server}" nameServer = "${rocketmq.name-server}"
) )
public static class WorkflowEngineStarterRetryConsumer extends BaseListener implements RocketMQListener<MessageExt>, InitializingBean { public static class WorkflowEngineStarterRetryConsumer extends BaseListener implements RocketMQListener<MessageExt>, InitializingBean {

View File

@ -8,6 +8,10 @@ package cn.axzo.workflow.starter.common.exception;
*/ */
public class WorkflowEngineStarterException extends RuntimeException { public class WorkflowEngineStarterException extends RuntimeException {
public WorkflowEngineStarterException(Throwable cause) {
super(cause);
}
public WorkflowEngineStarterException(String message) { public WorkflowEngineStarterException(String message) {
super(message); super(message);
} }

View File

@ -15,11 +15,9 @@ public final class FailFastInterceptor extends AbstractListenerInterceptor {
public <T> void execute(ListenerExecutor executor, Consumer<T> consumer, EventConsumer.Context context, T t) { public <T> void execute(ListenerExecutor executor, Consumer<T> consumer, EventConsumer.Context context, T t) {
try { try {
getNext().execute(executor, consumer, context, t); getNext().execute(executor, consumer, context, t);
} catch (Exception e) { } catch (Throwable e) {
throw new WorkflowListenerExecutionException( throw new WorkflowListenerExecutionException(
"Failed to invoke the method " "Failed to invoke the method. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
// + methodName
+ ". Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
} }
} }

View File

@ -29,8 +29,8 @@ public final class FailOverInterceptor extends AbstractListenerInterceptor {
@Override @Override
public <T> void execute(ListenerExecutor executor, Consumer<T> consumer, EventConsumer.Context context, T t) { public <T> void execute(ListenerExecutor executor, Consumer<T> consumer, EventConsumer.Context context, T t) {
// 避免组件抛出程序处理超时的异常信息 // 避免组件抛出程序处理超时的异常信息, 为业务冗余 10 处理时间
context.setMaxAllowElapsedMillis(calc() + 500L); context.setMaxAllowElapsedMillis(calc() + 10_000L);
long waitTime = waitTimeInMs; long waitTime = waitTimeInMs;
int failedAttempts = 0; int failedAttempts = 0;
do { do {
@ -42,7 +42,7 @@ public final class FailOverInterceptor extends AbstractListenerInterceptor {
try { try {
getNext().execute(executor, consumer, context, t); getNext().execute(executor, consumer, context, t);
return; return;
} catch (Exception e) { } catch (Throwable e) {
if (failedAttempts == numOfRetries) { if (failedAttempts == numOfRetries) {
log.error("Workflow Engine Starter caught exception: {}", e.getMessage(), e); log.error("Workflow Engine Starter caught exception: {}", e.getMessage(), e);
throw e; throw e;

View File

@ -1,6 +1,8 @@
package cn.axzo.workflow.starter.handler.execute.interceptor; package cn.axzo.workflow.starter.handler.execute.interceptor;
import cn.axzo.framework.rocketmq.EventConsumer; import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException;
import cn.axzo.workflow.starter.common.exception.WorkflowListenerExecutionException;
import cn.axzo.workflow.starter.handler.execute.ListenerExecutor; import cn.axzo.workflow.starter.handler.execute.ListenerExecutor;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -24,6 +26,14 @@ public final class LogInterceptor extends AbstractListenerInterceptor {
} }
try { try {
getNext().execute(executor, consumer, context, t); getNext().execute(executor, consumer, context, t);
} catch (WorkflowListenerExecutionException e) {
// fail_fast mode , will be ignored
} catch (Throwable e) {
if (e instanceof WorkflowEngineStarterException) {
// 如果业务想使用异常来正确结束事件处理可以抛出该移除类型
} else {
throw e;
}
} finally { } finally {
stopWatch.stop(); stopWatch.stop();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {

View File

@ -108,10 +108,10 @@ public class WorkflowEngineStarterDefaultMQMonitor implements SmartLifecycle {
@Override @Override
public boolean isAutoStartup() { public boolean isAutoStartup() {
if (starterProperties.getEnableDlqMonitor() && starterProperties.getRpcMonitorStatus()) { if (starterProperties.getEnableDlqMonitor() && starterProperties.getRpcMonitorStatus()) {
taskScheduler.scheduleWithFixedDelay(() -> mqWatch(BROADCAST), starterProperties.getDlqMonitorIntervalInMs()); taskScheduler.scheduleWithFixedDelay(() -> mqWatch(RPC), starterProperties.getDlqMonitorIntervalInMs());
} }
if (starterProperties.getEnableDlqMonitor() && starterProperties.getBroadcast().getMonitorStatus()) { if (starterProperties.getEnableDlqMonitor() && starterProperties.getBroadcast().getMonitorStatus()) {
taskScheduler.scheduleWithFixedDelay(() -> mqWatch(RPC), starterProperties.getDlqMonitorIntervalInMs()); taskScheduler.scheduleWithFixedDelay(() -> mqWatch(BROADCAST), starterProperties.getDlqMonitorIntervalInMs());
} }
return false; return false;
} }