update - 异步添加任务异常处理器

This commit is contained in:
wangli 2024-04-17 17:33:31 +08:00
parent 664d621d23
commit 2f92461d47
10 changed files with 230 additions and 8 deletions

View File

@ -4,6 +4,7 @@ import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
import cn.axzo.workflow.core.engine.job.exception.handler.CustomAsyncRunnableExecutionExceptionHandler;
import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import com.google.common.collect.Lists;
@ -47,6 +48,7 @@ public class FlowableConfiguration {
configuration.setHistoricProcessInstanceDataManager(new CustomMybatisHistoricProcessInstanceDataManager(configuration));
configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler());
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
configuration.setCustomAsyncRunnableExecutionExceptionHandlers(Lists.newArrayList(new CustomAsyncRunnableExecutionExceptionHandler()));
};
}

View File

@ -0,0 +1,36 @@
package cn.axzo.workflow.core.engine.event;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import org.flowable.job.api.JobInfo;
import static cn.axzo.workflow.core.engine.event.ErrorInfoEventType.NEW_ERROR;
/**
* TODO
*
* @author wangli
* @since 2024/4/17 16:45
*/
public class ErrorInfoEvent implements FlowableEvent {
private final JobInfo job;
private final Throwable throwable;
public ErrorInfoEvent(JobInfo job, Throwable throwable) {
this.job = job;
this.throwable = throwable;
}
@Override
public FlowableEventType getType() {
return NEW_ERROR;
}
public JobInfo getJob() {
return job;
}
public Throwable getThrowable() {
return throwable;
}
}

View File

@ -0,0 +1,48 @@
package cn.axzo.workflow.core.engine.event;
import org.apache.commons.lang3.StringUtils;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import java.util.ArrayList;
import java.util.List;
/**
* TODO
*
* @author wangli
* @since 2024/4/17 16:47
*/
public enum ErrorInfoEventType implements FlowableEventType {
NEW_ERROR;
public static final ErrorInfoEventType[] EMPTY_ARRAY = new ErrorInfoEventType[]{};
/**
* @param string the string containing a comma-separated list of event-type names
* @return a list of FlowableEngineEventType based on the given list.
* @throws FlowableIllegalArgumentException when one of the given string is not a valid type name
*/
public static ErrorInfoEventType[] getTypesFromString(String string) {
List<ErrorInfoEventType> result = new ArrayList<>();
if (string != null && !string.isEmpty()) {
String[] split = StringUtils.split(string, ",");
for (String typeName : split) {
boolean found = false;
for (ErrorInfoEventType type : values()) {
if (typeName.toUpperCase().equals(type.name())) {
result.add(type);
found = true;
break;
}
}
if (!found) {
throw new FlowableIllegalArgumentException("Invalid event-type: " + typeName);
}
}
}
return result.toArray(EMPTY_ARRAY);
}
}

View File

@ -1,5 +1,7 @@
package cn.axzo.workflow.core.engine.job.exception.handler;
import cn.axzo.workflow.core.engine.event.ErrorInfoEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
import org.flowable.job.api.JobInfo;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler;
@ -11,8 +13,14 @@ import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExcepti
* @since 2024/4/17 16:03
*/
public class CustomAsyncRunnableExecutionExceptionHandler implements AsyncRunnableExecutionExceptionHandler {
@Override
public boolean handleException(JobServiceConfiguration jobServiceConfiguration, JobInfo job, Throwable exception) {
return false;
FlowableEventDispatcher eventDispatcher = jobServiceConfiguration.getEventDispatcher();
eventDispatcher.dispatchEvent(new ErrorInfoEvent(job, exception), jobServiceConfiguration.getEngineName());
return true;
}
}

View File

@ -0,0 +1,67 @@
package cn.axzo.workflow.core.engine.listener;
import cn.axzo.workflow.core.engine.event.ErrorInfoEvent;
import cn.axzo.workflow.core.engine.event.ErrorInfoEventType;
import cn.axzo.workflow.core.listener.BpmnAsyncExecutionErrorEventListener;
import com.google.common.collect.ImmutableSet;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static cn.axzo.workflow.core.engine.event.ErrorInfoEventType.NEW_ERROR;
/**
* TODO
*
* @author wangli
* @since 2024/4/17 16:52
*/
@Slf4j
@Component
public class EngineAsyncExecutionErrorEventListener extends AbstractFlowableEngineEventListener {
@Resource
ObjectProvider<List<BpmnAsyncExecutionErrorEventListener>> errorEventListeners;
public static final Set<ErrorInfoEventType> ACCEPT_EVENTS =
ImmutableSet.<ErrorInfoEventType>builder()
.add(NEW_ERROR)
.build();
@Override
public void onEvent(FlowableEvent flowableEvent) {
if (flowableEvent instanceof ErrorInfoEvent) {
ErrorInfoEvent event = (ErrorInfoEvent) flowableEvent;
ErrorInfoEventType eventType = (ErrorInfoEventType) flowableEvent.getType();
if (ACCEPT_EVENTS.contains(eventType)) {
switch (eventType) {
case NEW_ERROR:
getOrderedListeners().forEach(i -> i.notify(event.getJob(), event.getThrowable()));
break;
default:
break;
}
}
}
}
private List<BpmnAsyncExecutionErrorEventListener> getOrderedListeners() {
List<BpmnAsyncExecutionErrorEventListener> orderListeners = new ArrayList<>();
errorEventListeners.ifAvailable(orderListeners::addAll);
return orderListeners;
}
@Override
public boolean isFailOnException() {
return true;
}
}

View File

@ -37,7 +37,7 @@ public class EngineNoticeEventListener extends AbstractFlowableEventListener {
@Resource
ObjectProvider<List<BpmnMessagePushEventListener>> processEventListeners;
public static final Set<MessagePushEventType> NOTICE_PUSH_EVENTS =
public static final Set<MessagePushEventType> ACCEPT_EVENTS =
ImmutableSet.<MessagePushEventType>builder()
.add(NOTICE)
.add(PENDING_PUSH)
@ -55,7 +55,7 @@ public class EngineNoticeEventListener extends AbstractFlowableEventListener {
MessagePushEvent event = (MessagePushEvent) flowableEvent;
MessagePushEventType pushEventType = (MessagePushEventType) flowableEvent.getType();
if (NOTICE_PUSH_EVENTS.contains(pushEventType)) {
if (ACCEPT_EVENTS.contains(pushEventType)) {
switch (pushEventType) {
case NOTICE:
stopWatch.start("PUSH-NOTICE Event Execution Time");

View File

@ -0,0 +1,15 @@
package cn.axzo.workflow.core.listener;
import org.flowable.job.api.JobInfo;
import org.springframework.core.Ordered;
/**
* TODO
*
* @author wangli
* @since 2024/4/17 16:54
*/
public interface BpmnAsyncExecutionErrorEventListener extends Ordered {
void notify(JobInfo jobInfo, Throwable throwable);
}

View File

@ -19,7 +19,7 @@ public enum ReporterType {
*/
ONLY_DING_TALK {
@Override
public void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Exception e) {
public void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Throwable e) {
if (sendDingTalk) {
DingTalkUtils.sendDingTalk(profile, title, argsArrayToString(args), e);
}
@ -31,7 +31,7 @@ public enum ReporterType {
*/
ONLY_LOG {
@Override
public void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Exception e) {
public void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Throwable e) {
LogUtil.error(LogUtil.ErrorType.ERROR_BUSINESS, shortString, e.getMessage(), e);
}
},
@ -40,7 +40,7 @@ public enum ReporterType {
*/
BOTH {
@Override
public void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Exception e) {
public void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Throwable e) {
if (sendDingTalk) {
DingTalkUtils.sendDingTalk(profile, title, argsArrayToString(args), e);
}
@ -48,6 +48,14 @@ public enum ReporterType {
}
};
public abstract void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Exception e);
/**
* @param profile 当前应用激活的环境
* @param title 告警消息标题
* @param sendDingTalk 是否调用钉钉
* @param args 请求所有的入参
* @param shortString 用于日志输出的关键信息
* @param e 异常对象
*/
public abstract void executeAction(String profile, String title, Boolean sendDingTalk, Object[] args, String shortString, Throwable e);
}

View File

@ -27,7 +27,7 @@ public class DingTalkUtils {
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
markdown.setTitle("Notice " + title + ", Env: " + profile);
markdown.setText("#### [" + profile + "]" + title + "\n" +
"> 创建参数: " + JSONUtil.toJsonStr(req) + "\n\n" +
"> 入参: " + JSONUtil.toJsonStr(req) + "\n\n" +
"> ###### 异常信息: " + JSONUtil.toJsonStr(Objects.isNull(throwable.getCause()) ? throwable.getMessage() :
throwable.getCause().getMessage()) + " \n");
request.setMarkdown(markdown);

View File

@ -0,0 +1,38 @@
package cn.axzo.workflow.server.controller.listener.error;
import cn.axzo.workflow.core.listener.BpmnAsyncExecutionErrorEventListener;
import cn.axzo.workflow.server.common.annotation.ReporterType;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.flowable.job.api.JobInfo;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author wangli
* @since 2024/4/17 16:59
*/
@Slf4j
@Component
public class ErrorReporterEventListener implements BpmnAsyncExecutionErrorEventListener {
@Value("${spring.profiles.active}")
private String profile;
@Value("${workflow.sendDingTalk:true}")
private Boolean sendDingTalk;
@Override
public void notify(JobInfo jobInfo, Throwable throwable) {
ReporterType reporterType = ReporterType.ONLY_LOG;
if (Lists.newArrayList("dev", "test", "pre").contains(profile)) {
reporterType = ReporterType.BOTH;
} else if (Lists.newArrayList("live", "master").contains(profile)) {
reporterType = ReporterType.ONLY_LOG;
}
reporterType.executeAction(profile, "异步任务执行异常", sendDingTalk, new Object[]{jobInfo}, "act_ru_job", throwable);
}
@Override
public int getOrder() {
return Integer.MIN_VALUE;
}
}