From 78865751cfb2877e6e56b24b2ca7373350a391d1 Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Mon, 29 Apr 2024 18:14:45 +0800 Subject: [PATCH] =?UTF-8?q?update=20-=20=E9=87=8D=E6=96=B0=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E5=BC=82=E6=AD=A5=E4=BB=BB=E5=8A=A1=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E6=97=B6=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/conf/FlowableConfiguration.java | 5 +- .../core/engine/event/ErrorInfoEvent.java | 36 ---------- .../core/engine/event/ErrorInfoEventType.java | 48 ------------- ...syncRunnableExceptionExceptionHandler.java | 71 +++++++++++++++++++ ...syncRunnableExecutionExceptionHandler.java | 29 -------- ...ngineAsyncExecutionErrorEventListener.java | 67 ----------------- .../listener/EngineAsyncJobEventListener.java | 65 +++++++++++++++++ .../BpmnAsyncExecutionErrorEventListener.java | 15 ---- .../listener/BpmnAsyncJobEventListener.java | 30 ++++++++ .../error/ErrorReporterEventListener.java | 27 +++++-- 10 files changed, 192 insertions(+), 201 deletions(-) delete mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEvent.java delete mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEventType.java create mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handle/CustomAsyncRunnableExceptionExceptionHandler.java delete mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handler/CustomAsyncRunnableExecutionExceptionHandler.java delete mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncExecutionErrorEventListener.java create mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java delete mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncExecutionErrorEventListener.java create mode 100644 workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncJobEventListener.java diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java index d8d9a7131..e3f70de97 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java @@ -4,7 +4,7 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory; import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator; import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler; import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler; -import cn.axzo.workflow.core.engine.job.exception.handler.CustomAsyncRunnableExecutionExceptionHandler; +import cn.axzo.workflow.core.engine.job.exception.handle.CustomAsyncRunnableExceptionExceptionHandler; import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager; import cn.axzo.workflow.core.service.ExtAxHiTaskInstService; import com.google.common.collect.Lists; @@ -52,7 +52,8 @@ public class FlowableConfiguration { // 异步任务异常重试时间间隔 configuration.setDefaultFailedJobWaitTime(30); configuration.setAsyncFailedJobWaitTime(30); - configuration.setCustomAsyncRunnableExecutionExceptionHandlers(Lists.newArrayList(new CustomAsyncRunnableExecutionExceptionHandler())); + configuration.setAddDefaultExceptionHandler(false); + configuration.setCustomAsyncRunnableExecutionExceptionHandlers(Lists.newArrayList(new CustomAsyncRunnableExceptionExceptionHandler())); }; } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEvent.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEvent.java deleted file mode 100644 index b795f0264..000000000 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEvent.java +++ /dev/null @@ -1,36 +0,0 @@ -package cn.axzo.workflow.core.engine.event; - -import org.flowable.common.engine.api.delegate.event.FlowableEvent; -import org.flowable.common.engine.api.delegate.event.FlowableEventType; -import org.flowable.job.api.JobInfo; - -import static cn.axzo.workflow.core.engine.event.ErrorInfoEventType.NEW_ERROR; - -/** - * 用于异步任务执行过程中出现的异常搜集和告警 - * - * @author wangli - * @since 2024/4/17 16:45 - */ -public class ErrorInfoEvent implements FlowableEvent { - private final JobInfo job; - private final Throwable throwable; - - public ErrorInfoEvent(JobInfo job, Throwable throwable) { - this.job = job; - this.throwable = throwable; - } - - @Override - public FlowableEventType getType() { - return NEW_ERROR; - } - - public JobInfo getJob() { - return job; - } - - public Throwable getThrowable() { - return throwable; - } -} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEventType.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEventType.java deleted file mode 100644 index c8675ebdd..000000000 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/event/ErrorInfoEventType.java +++ /dev/null @@ -1,48 +0,0 @@ -package cn.axzo.workflow.core.engine.event; - -import org.apache.commons.lang3.StringUtils; -import org.flowable.common.engine.api.FlowableIllegalArgumentException; -import org.flowable.common.engine.api.delegate.event.FlowableEventType; - -import java.util.ArrayList; -import java.util.List; - -/** - * 异步任务异常事件类型 - * - * @author wangli - * @since 2024/4/17 16:47 - */ -public enum ErrorInfoEventType implements FlowableEventType { - - NEW_ERROR; - - public static final ErrorInfoEventType[] EMPTY_ARRAY = new ErrorInfoEventType[]{}; - - /** - * @param string the string containing a comma-separated list of event-type names - * @return a list of FlowableEngineEventType based on the given list. - * @throws FlowableIllegalArgumentException when one of the given string is not a valid type name - */ - public static ErrorInfoEventType[] getTypesFromString(String string) { - List result = new ArrayList<>(); - if (string != null && !string.isEmpty()) { - String[] split = StringUtils.split(string, ","); - for (String typeName : split) { - boolean found = false; - for (ErrorInfoEventType type : values()) { - if (typeName.toUpperCase().equals(type.name())) { - result.add(type); - found = true; - break; - } - } - if (!found) { - throw new FlowableIllegalArgumentException("Invalid event-type: " + typeName); - } - } - } - - return result.toArray(EMPTY_ARRAY); - } -} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handle/CustomAsyncRunnableExceptionExceptionHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handle/CustomAsyncRunnableExceptionExceptionHandler.java new file mode 100644 index 000000000..3bb4a264b --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handle/CustomAsyncRunnableExceptionExceptionHandler.java @@ -0,0 +1,71 @@ +package cn.axzo.workflow.core.engine.job.exception.handle; + +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher; +import org.flowable.common.engine.impl.interceptor.Command; +import org.flowable.common.engine.impl.interceptor.CommandConfig; +import org.flowable.common.engine.impl.interceptor.CommandContext; +import org.flowable.job.api.JobInfo; +import org.flowable.job.service.InternalJobCompatibilityManager; +import org.flowable.job.service.JobServiceConfiguration; +import org.flowable.job.service.event.impl.FlowableJobEventBuilder; +import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler; +import org.flowable.job.service.impl.asyncexecutor.FailedJobCommandFactory; +import org.flowable.job.service.impl.persistence.entity.AbstractRuntimeJobEntity; + +/** + * 默认的异步任务执行异常处理器 + * + * @author wangli + * @since 2024/4/29 17:43 + */ +@Slf4j +public class CustomAsyncRunnableExceptionExceptionHandler implements AsyncRunnableExecutionExceptionHandler { + + @Override + public boolean handleException(final JobServiceConfiguration jobServiceConfiguration, final JobInfo job, final Throwable exception) { + jobServiceConfiguration.getCommandExecutor().execute(new Command() { + + @Override + public Void execute(CommandContext commandContext) { + + // Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed + String message = "Job " + job.getId() + " failed"; + log.info(message, exception); + + if (job instanceof AbstractRuntimeJobEntity) { + AbstractRuntimeJobEntity runtimeJob = (AbstractRuntimeJobEntity) job; + InternalJobCompatibilityManager internalJobCompatibilityManager = jobServiceConfiguration.getInternalJobCompatibilityManager(); + if (internalJobCompatibilityManager != null && internalJobCompatibilityManager.isFlowable5Job(runtimeJob)) { + internalJobCompatibilityManager.handleFailedV5Job(runtimeJob, exception); + return null; + } + } + + CommandConfig commandConfig = jobServiceConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew(); + FailedJobCommandFactory failedJobCommandFactory = jobServiceConfiguration.getFailedJobCommandFactory(); + Command cmd = failedJobCommandFactory.getCommand(job.getId(), exception); + + log.info("Using FailedJobCommandFactory '{}' and command of type '{}'", failedJobCommandFactory.getClass(), cmd.getClass()); + jobServiceConfiguration.getCommandExecutor().execute(commandConfig, cmd); + + // Dispatch an event, indicating job execution failed in a + // try-catch block, to prevent the original exception to be swallowed + FlowableEventDispatcher eventDispatcher = jobServiceConfiguration.getEventDispatcher(); + if (eventDispatcher != null && eventDispatcher.isEnabled()) { + try { + eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityExceptionEvent( + FlowableEngineEventType.JOB_EXECUTION_FAILURE, job, exception), jobServiceConfiguration.getEngineName()); + } catch (Throwable ignore) { + log.warn("Exception occurred while dispatching job failure event, ignoring.", ignore); + } + } + + return null; + } + }); + + return true; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handler/CustomAsyncRunnableExecutionExceptionHandler.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handler/CustomAsyncRunnableExecutionExceptionHandler.java deleted file mode 100644 index 72dda789c..000000000 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/job/exception/handler/CustomAsyncRunnableExecutionExceptionHandler.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.axzo.workflow.core.engine.job.exception.handler; - -import cn.axzo.workflow.core.engine.event.ErrorInfoEvent; -import lombok.extern.slf4j.Slf4j; -import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher; -import org.flowable.job.api.JobInfo; -import org.flowable.job.service.JobServiceConfiguration; -import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler; - -/** - * 自定义的异步任务执行异常处理器 - * - * @author wangli - * @since 2024/4/17 16:03 - */ -@Slf4j -public class CustomAsyncRunnableExecutionExceptionHandler implements AsyncRunnableExecutionExceptionHandler { - - @Override - public boolean handleException(JobServiceConfiguration jobServiceConfiguration, JobInfo job, Throwable exception) { - log.info("discover exception, jobId: {}, exception: {}", job.getId(), exception.getMessage()); - - FlowableEventDispatcher eventDispatcher = jobServiceConfiguration.getEventDispatcher(); - - eventDispatcher.dispatchEvent(new ErrorInfoEvent(job, exception), jobServiceConfiguration.getEngineName()); - - return false; - } -} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncExecutionErrorEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncExecutionErrorEventListener.java deleted file mode 100644 index 4e56f8ea7..000000000 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncExecutionErrorEventListener.java +++ /dev/null @@ -1,67 +0,0 @@ -package cn.axzo.workflow.core.engine.listener; - -import cn.axzo.workflow.core.engine.event.ErrorInfoEvent; -import cn.axzo.workflow.core.engine.event.ErrorInfoEventType; -import cn.axzo.workflow.core.listener.BpmnAsyncExecutionErrorEventListener; -import com.google.common.collect.ImmutableSet; -import lombok.extern.slf4j.Slf4j; -import org.flowable.common.engine.api.delegate.event.FlowableEvent; -import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import static cn.axzo.workflow.core.engine.event.ErrorInfoEventType.NEW_ERROR; - -/** - * TODO - * - * @author wangli - * @since 2024/4/17 16:52 - */ -@Slf4j -@Component -public class EngineAsyncExecutionErrorEventListener extends AbstractFlowableEngineEventListener { - - @Resource - ObjectProvider> errorEventListeners; - - public static final Set ACCEPT_EVENTS = - ImmutableSet.builder() - .add(NEW_ERROR) - .build(); - - @Override - public void onEvent(FlowableEvent flowableEvent) { - if (flowableEvent instanceof ErrorInfoEvent) { - ErrorInfoEvent event = (ErrorInfoEvent) flowableEvent; - ErrorInfoEventType eventType = (ErrorInfoEventType) flowableEvent.getType(); - - if (ACCEPT_EVENTS.contains(eventType)) { - switch (eventType) { - case NEW_ERROR: - getOrderedListeners().forEach(i -> i.notify(event.getJob(), event.getThrowable())); - break; - default: - break; - } - } - } - } - - private List getOrderedListeners() { - List orderListeners = new ArrayList<>(); - errorEventListeners.ifAvailable(orderListeners::addAll); - return orderListeners; - } - - @Override - public boolean isFailOnException() { - return true; - } - -} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java new file mode 100644 index 000000000..590af2764 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/listener/EngineAsyncJobEventListener.java @@ -0,0 +1,65 @@ +package cn.axzo.workflow.core.engine.listener; + +import cn.axzo.workflow.core.listener.BpmnAsyncJobEventListener; +import com.google.common.collect.ImmutableSet; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.stereotype.Component; +import org.springframework.util.StopWatch; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * 引擎异步任务事件监听器 + * + * @author wangli + * @since 2024/4/29 16:48 + */ +@Slf4j +@Component +public class EngineAsyncJobEventListener extends AbstractFlowableEngineEventListener { + + @Resource + ObjectProvider> asyncJobListeners; + + public static final Set ASYNC_JOB_EVENTS = + ImmutableSet.builder() + .add(FlowableEngineEventType.JOB_CANCELED) + .add(FlowableEngineEventType.JOB_EXECUTION_SUCCESS) + .add(FlowableEngineEventType.JOB_EXECUTION_FAILURE) + .add(FlowableEngineEventType.JOB_RETRIES_DECREMENTED) + .add(FlowableEngineEventType.JOB_REJECTED) + .add(FlowableEngineEventType.JOB_RESCHEDULED) + .add(FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) + .build(); + + public EngineAsyncJobEventListener() { + super(ASYNC_JOB_EVENTS); + } + + @Override + public void onEvent(FlowableEvent flowableEvent) { + StopWatch stopWatch = new StopWatch("EngineAsyncJobEventListener"); + stopWatch.start("async-job-event-listener"); + if (ASYNC_JOB_EVENTS.contains(flowableEvent.getType())) { + getOrderedListeners().forEach(i -> { + if (i.support((FlowableEngineEventType) flowableEvent.getType())) { + i.notify(flowableEvent); + } + }); + } + stopWatch.stop(); + } + + private List getOrderedListeners() { + List orderListeners = new ArrayList<>(); + asyncJobListeners.ifAvailable(orderListeners::addAll); + return orderListeners; + } +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncExecutionErrorEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncExecutionErrorEventListener.java deleted file mode 100644 index 15a3d805b..000000000 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncExecutionErrorEventListener.java +++ /dev/null @@ -1,15 +0,0 @@ -package cn.axzo.workflow.core.listener; - -import org.flowable.job.api.JobInfo; -import org.springframework.core.Ordered; - -/** - * 对引擎内异步任务执行异常的事件扩展监听接口 - * - * @author wangli - * @since 2024/4/17 16:54 - */ -public interface BpmnAsyncExecutionErrorEventListener extends Ordered { - - void notify(JobInfo jobInfo, Throwable throwable); -} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncJobEventListener.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncJobEventListener.java new file mode 100644 index 000000000..100bc647c --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/listener/BpmnAsyncJobEventListener.java @@ -0,0 +1,30 @@ +package cn.axzo.workflow.core.listener; + +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.springframework.core.Ordered; + +/** + * 对引擎内异步任务执行异常的事件扩展监听接口 + * + * @author wangli + * @since 2024/4/29 16:57 + */ +public interface BpmnAsyncJobEventListener extends Ordered { + + /** + * 是否支持处理 + * + * @param eventType + * @return + */ + boolean support(FlowableEngineEventType eventType); + + /** + * 具体的处理逻辑 + * + * @param flowableEvent + */ + void notify(FlowableEvent flowableEvent); + +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/error/ErrorReporterEventListener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/error/ErrorReporterEventListener.java index fb3711b10..508632156 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/error/ErrorReporterEventListener.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/controller/listener/error/ErrorReporterEventListener.java @@ -1,13 +1,20 @@ package cn.axzo.workflow.server.controller.listener.error; -import cn.axzo.workflow.core.listener.BpmnAsyncExecutionErrorEventListener; +import cn.axzo.workflow.core.listener.BpmnAsyncJobEventListener; import cn.axzo.workflow.server.common.annotation.ReporterType; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.impl.event.FlowableEntityExceptionEventImpl; import org.flowable.job.api.JobInfo; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.util.Objects; + +import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.JOB_EXECUTION_FAILURE; + /** * 异步任务执行异常的扩展监听器 * @@ -16,25 +23,37 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -public class ErrorReporterEventListener implements BpmnAsyncExecutionErrorEventListener { +public class ErrorReporterEventListener implements BpmnAsyncJobEventListener { @Value("${spring.profiles.active}") private String profile; @Value("${workflow.sendDingTalk:true}") private Boolean sendDingTalk; @Override - public void notify(JobInfo jobInfo, Throwable throwable) { + public boolean support(FlowableEngineEventType eventType) { + return Objects.equals(JOB_EXECUTION_FAILURE, eventType); + } + + @Override + public void notify(FlowableEvent flowableEvent) { + if (!(flowableEvent instanceof FlowableEntityExceptionEventImpl)) { + return; + } + FlowableEntityExceptionEventImpl event = (FlowableEntityExceptionEventImpl) flowableEvent; + JobInfo job = (JobInfo) event.getEntity(); + Throwable throwable = event.getCause(); ReporterType reporterType = ReporterType.ONLY_LOG; if (Lists.newArrayList("dev", "test", "pre").contains(profile)) { reporterType = ReporterType.BOTH; } else if (Lists.newArrayList("live", "master").contains(profile)) { reporterType = ReporterType.ONLY_LOG; } - reporterType.executeAction(profile, "异步任务执行异常, 剩余重试次数:" + jobInfo.getRetries(), sendDingTalk, new Object[]{jobInfo}, "act_ru_job", throwable, false); + reporterType.executeAction(profile, "异步任务执行异常, 剩余重试次数:" + job.getRetries(), sendDingTalk, new Object[]{job}, "act_ru_job", throwable, false); } @Override public int getOrder() { return Integer.MIN_VALUE; } + }