update - 重新调整异步任务异常时的处理

This commit is contained in:
wangli 2024-04-29 18:14:45 +08:00
parent 7f116877bd
commit 78865751cf
10 changed files with 192 additions and 201 deletions

View File

@ -4,7 +4,7 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
import cn.axzo.workflow.core.engine.job.exception.handler.CustomAsyncRunnableExecutionExceptionHandler;
import cn.axzo.workflow.core.engine.job.exception.handle.CustomAsyncRunnableExceptionExceptionHandler;
import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import com.google.common.collect.Lists;
@ -52,7 +52,8 @@ public class FlowableConfiguration {
// 异步任务异常重试时间间隔
configuration.setDefaultFailedJobWaitTime(30);
configuration.setAsyncFailedJobWaitTime(30);
configuration.setCustomAsyncRunnableExecutionExceptionHandlers(Lists.newArrayList(new CustomAsyncRunnableExecutionExceptionHandler()));
configuration.setAddDefaultExceptionHandler(false);
configuration.setCustomAsyncRunnableExecutionExceptionHandlers(Lists.newArrayList(new CustomAsyncRunnableExceptionExceptionHandler()));
};
}

View File

@ -1,36 +0,0 @@
package cn.axzo.workflow.core.engine.event;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import org.flowable.job.api.JobInfo;
import static cn.axzo.workflow.core.engine.event.ErrorInfoEventType.NEW_ERROR;
/**
* 用于异步任务执行过程中出现的异常搜集和告警
*
* @author wangli
* @since 2024/4/17 16:45
*/
public class ErrorInfoEvent implements FlowableEvent {
private final JobInfo job;
private final Throwable throwable;
public ErrorInfoEvent(JobInfo job, Throwable throwable) {
this.job = job;
this.throwable = throwable;
}
@Override
public FlowableEventType getType() {
return NEW_ERROR;
}
public JobInfo getJob() {
return job;
}
public Throwable getThrowable() {
return throwable;
}
}

View File

@ -1,48 +0,0 @@
package cn.axzo.workflow.core.engine.event;
import org.apache.commons.lang3.StringUtils;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import java.util.ArrayList;
import java.util.List;
/**
* 异步任务异常事件类型
*
* @author wangli
* @since 2024/4/17 16:47
*/
public enum ErrorInfoEventType implements FlowableEventType {
NEW_ERROR;
public static final ErrorInfoEventType[] EMPTY_ARRAY = new ErrorInfoEventType[]{};
/**
* @param string the string containing a comma-separated list of event-type names
* @return a list of FlowableEngineEventType based on the given list.
* @throws FlowableIllegalArgumentException when one of the given string is not a valid type name
*/
public static ErrorInfoEventType[] getTypesFromString(String string) {
List<ErrorInfoEventType> result = new ArrayList<>();
if (string != null && !string.isEmpty()) {
String[] split = StringUtils.split(string, ",");
for (String typeName : split) {
boolean found = false;
for (ErrorInfoEventType type : values()) {
if (typeName.toUpperCase().equals(type.name())) {
result.add(type);
found = true;
break;
}
}
if (!found) {
throw new FlowableIllegalArgumentException("Invalid event-type: " + typeName);
}
}
}
return result.toArray(EMPTY_ARRAY);
}
}

View File

@ -0,0 +1,71 @@
package cn.axzo.workflow.core.engine.job.exception.handle;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandConfig;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.api.JobInfo;
import org.flowable.job.service.InternalJobCompatibilityManager;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler;
import org.flowable.job.service.impl.asyncexecutor.FailedJobCommandFactory;
import org.flowable.job.service.impl.persistence.entity.AbstractRuntimeJobEntity;
/**
* 默认的异步任务执行异常处理器
*
* @author wangli
* @since 2024/4/29 17:43
*/
@Slf4j
public class CustomAsyncRunnableExceptionExceptionHandler implements AsyncRunnableExecutionExceptionHandler {
@Override
public boolean handleException(final JobServiceConfiguration jobServiceConfiguration, final JobInfo job, final Throwable exception) {
jobServiceConfiguration.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
// Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed
String message = "Job " + job.getId() + " failed";
log.info(message, exception);
if (job instanceof AbstractRuntimeJobEntity) {
AbstractRuntimeJobEntity runtimeJob = (AbstractRuntimeJobEntity) job;
InternalJobCompatibilityManager internalJobCompatibilityManager = jobServiceConfiguration.getInternalJobCompatibilityManager();
if (internalJobCompatibilityManager != null && internalJobCompatibilityManager.isFlowable5Job(runtimeJob)) {
internalJobCompatibilityManager.handleFailedV5Job(runtimeJob, exception);
return null;
}
}
CommandConfig commandConfig = jobServiceConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew();
FailedJobCommandFactory failedJobCommandFactory = jobServiceConfiguration.getFailedJobCommandFactory();
Command<Object> cmd = failedJobCommandFactory.getCommand(job.getId(), exception);
log.info("Using FailedJobCommandFactory '{}' and command of type '{}'", failedJobCommandFactory.getClass(), cmd.getClass());
jobServiceConfiguration.getCommandExecutor().execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a
// try-catch block, to prevent the original exception to be swallowed
FlowableEventDispatcher eventDispatcher = jobServiceConfiguration.getEventDispatcher();
if (eventDispatcher != null && eventDispatcher.isEnabled()) {
try {
eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityExceptionEvent(
FlowableEngineEventType.JOB_EXECUTION_FAILURE, job, exception), jobServiceConfiguration.getEngineName());
} catch (Throwable ignore) {
log.warn("Exception occurred while dispatching job failure event, ignoring.", ignore);
}
}
return null;
}
});
return true;
}
}

View File

@ -1,29 +0,0 @@
package cn.axzo.workflow.core.engine.job.exception.handler;
import cn.axzo.workflow.core.engine.event.ErrorInfoEvent;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
import org.flowable.job.api.JobInfo;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler;
/**
* 自定义的异步任务执行异常处理器
*
* @author wangli
* @since 2024/4/17 16:03
*/
@Slf4j
public class CustomAsyncRunnableExecutionExceptionHandler implements AsyncRunnableExecutionExceptionHandler {
@Override
public boolean handleException(JobServiceConfiguration jobServiceConfiguration, JobInfo job, Throwable exception) {
log.info("discover exception, jobId: {}, exception: {}", job.getId(), exception.getMessage());
FlowableEventDispatcher eventDispatcher = jobServiceConfiguration.getEventDispatcher();
eventDispatcher.dispatchEvent(new ErrorInfoEvent(job, exception), jobServiceConfiguration.getEngineName());
return false;
}
}

View File

@ -1,67 +0,0 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.workflow.core.engine.event.ErrorInfoEvent;
import cn.axzo.workflow.core.engine.event.ErrorInfoEventType;
import cn.axzo.workflow.core.listener.BpmnAsyncExecutionErrorEventListener;
import com.google.common.collect.ImmutableSet;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static cn.axzo.workflow.core.engine.event.ErrorInfoEventType.NEW_ERROR;
/**
* TODO
*
* @author wangli
* @since 2024/4/17 16:52
*/
@Slf4j
@Component
public class EngineAsyncExecutionErrorEventListener extends AbstractFlowableEngineEventListener {
@Resource
ObjectProvider<List<BpmnAsyncExecutionErrorEventListener>> errorEventListeners;
public static final Set<ErrorInfoEventType> ACCEPT_EVENTS =
ImmutableSet.<ErrorInfoEventType>builder()
.add(NEW_ERROR)
.build();
@Override
public void onEvent(FlowableEvent flowableEvent) {
if (flowableEvent instanceof ErrorInfoEvent) {
ErrorInfoEvent event = (ErrorInfoEvent) flowableEvent;
ErrorInfoEventType eventType = (ErrorInfoEventType) flowableEvent.getType();
if (ACCEPT_EVENTS.contains(eventType)) {
switch (eventType) {
case NEW_ERROR:
getOrderedListeners().forEach(i -> i.notify(event.getJob(), event.getThrowable()));
break;
default:
break;
}
}
}
}
private List<BpmnAsyncExecutionErrorEventListener> getOrderedListeners() {
List<BpmnAsyncExecutionErrorEventListener> orderListeners = new ArrayList<>();
errorEventListeners.ifAvailable(orderListeners::addAll);
return orderListeners;
}
@Override
public boolean isFailOnException() {
return true;
}
}

View File

@ -0,0 +1,65 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.workflow.core.listener.BpmnAsyncJobEventListener;
import com.google.common.collect.ImmutableSet;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* 引擎异步任务事件监听器
*
* @author wangli
* @since 2024/4/29 16:48
*/
@Slf4j
@Component
public class EngineAsyncJobEventListener extends AbstractFlowableEngineEventListener {
@Resource
ObjectProvider<List<BpmnAsyncJobEventListener>> asyncJobListeners;
public static final Set<FlowableEngineEventType> ASYNC_JOB_EVENTS =
ImmutableSet.<FlowableEngineEventType>builder()
.add(FlowableEngineEventType.JOB_CANCELED)
.add(FlowableEngineEventType.JOB_EXECUTION_SUCCESS)
.add(FlowableEngineEventType.JOB_EXECUTION_FAILURE)
.add(FlowableEngineEventType.JOB_RETRIES_DECREMENTED)
.add(FlowableEngineEventType.JOB_REJECTED)
.add(FlowableEngineEventType.JOB_RESCHEDULED)
.add(FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER)
.build();
public EngineAsyncJobEventListener() {
super(ASYNC_JOB_EVENTS);
}
@Override
public void onEvent(FlowableEvent flowableEvent) {
StopWatch stopWatch = new StopWatch("EngineAsyncJobEventListener");
stopWatch.start("async-job-event-listener");
if (ASYNC_JOB_EVENTS.contains(flowableEvent.getType())) {
getOrderedListeners().forEach(i -> {
if (i.support((FlowableEngineEventType) flowableEvent.getType())) {
i.notify(flowableEvent);
}
});
}
stopWatch.stop();
}
private List<BpmnAsyncJobEventListener> getOrderedListeners() {
List<BpmnAsyncJobEventListener> orderListeners = new ArrayList<>();
asyncJobListeners.ifAvailable(orderListeners::addAll);
return orderListeners;
}
}

View File

@ -1,15 +0,0 @@
package cn.axzo.workflow.core.listener;
import org.flowable.job.api.JobInfo;
import org.springframework.core.Ordered;
/**
* 对引擎内异步任务执行异常的事件扩展监听接口
*
* @author wangli
* @since 2024/4/17 16:54
*/
public interface BpmnAsyncExecutionErrorEventListener extends Ordered {
void notify(JobInfo jobInfo, Throwable throwable);
}

View File

@ -0,0 +1,30 @@
package cn.axzo.workflow.core.listener;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.springframework.core.Ordered;
/**
* 对引擎内异步任务执行异常的事件扩展监听接口
*
* @author wangli
* @since 2024/4/29 16:57
*/
public interface BpmnAsyncJobEventListener extends Ordered {
/**
* 是否支持处理
*
* @param eventType
* @return
*/
boolean support(FlowableEngineEventType eventType);
/**
* 具体的处理逻辑
*
* @param flowableEvent
*/
void notify(FlowableEvent flowableEvent);
}

View File

@ -1,13 +1,20 @@
package cn.axzo.workflow.server.controller.listener.error;
import cn.axzo.workflow.core.listener.BpmnAsyncExecutionErrorEventListener;
import cn.axzo.workflow.core.listener.BpmnAsyncJobEventListener;
import cn.axzo.workflow.server.common.annotation.ReporterType;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.impl.event.FlowableEntityExceptionEventImpl;
import org.flowable.job.api.JobInfo;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Objects;
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.JOB_EXECUTION_FAILURE;
/**
* 异步任务执行异常的扩展监听器
*
@ -16,25 +23,37 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
public class ErrorReporterEventListener implements BpmnAsyncExecutionErrorEventListener {
public class ErrorReporterEventListener implements BpmnAsyncJobEventListener {
@Value("${spring.profiles.active}")
private String profile;
@Value("${workflow.sendDingTalk:true}")
private Boolean sendDingTalk;
@Override
public void notify(JobInfo jobInfo, Throwable throwable) {
public boolean support(FlowableEngineEventType eventType) {
return Objects.equals(JOB_EXECUTION_FAILURE, eventType);
}
@Override
public void notify(FlowableEvent flowableEvent) {
if (!(flowableEvent instanceof FlowableEntityExceptionEventImpl)) {
return;
}
FlowableEntityExceptionEventImpl event = (FlowableEntityExceptionEventImpl) flowableEvent;
JobInfo job = (JobInfo) event.getEntity();
Throwable throwable = event.getCause();
ReporterType reporterType = ReporterType.ONLY_LOG;
if (Lists.newArrayList("dev", "test", "pre").contains(profile)) {
reporterType = ReporterType.BOTH;
} else if (Lists.newArrayList("live", "master").contains(profile)) {
reporterType = ReporterType.ONLY_LOG;
}
reporterType.executeAction(profile, "异步任务执行异常, 剩余重试次数:" + jobInfo.getRetries(), sendDingTalk, new Object[]{jobInfo}, "act_ru_job", throwable, false);
reporterType.executeAction(profile, "异步任务执行异常, 剩余重试次数:" + job.getRetries(), sendDingTalk, new Object[]{job}, "act_ru_job", throwable, false);
}
@Override
public int getOrder() {
return Integer.MIN_VALUE;
}
}