Merge branch 'refs/heads/1.3.2-SNAPSHOT' into REQ-2393

This commit is contained in:
wangli 2024-04-30 11:29:44 +08:00
commit 1cf0b33444
14 changed files with 166 additions and 39 deletions

View File

@ -0,0 +1,39 @@
package cn.axzo.workflow.core.common.code;
import cn.axzo.framework.domain.web.code.IModuleRespCode;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 异步任务相关响应码
*
* @author wangli
* @since 2024/4/29 20:30
*/
@Getter
@AllArgsConstructor
public enum AsyncJobRespCode implements IModuleRespCode {
DATA_NOT_EXISTS("001", "ExtTaskInst 数据不存在, instId: {}, taskId: {}"),
;
private String code;
private String message;
@Override
public String getModuleCode() {
return "11";
}
@Override
public String getProjectCode() {
return "998";
}
public void setCode(String code) {
this.code = code;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -12,7 +12,7 @@ import java.util.Objects;
import java.util.function.Supplier;
/**
* TODO
* 事件处理器中通用的上下文
*
* @author wangli
* @since 2024/4/9 21:00

View File

@ -2,7 +2,7 @@ package cn.axzo.workflow.core.common.context;
/**
* TODO
* 操作上下文接口
*
* @author wangli
* @since 2024/4/9 10:05

View File

@ -146,7 +146,6 @@ public final class BpmnJsonConverterUtil {
ExtensionAttribute serverVersion = new ExtensionAttribute();
serverVersion.setName(FLOW_SERVER_VERSION);
//FIXME: 尽量在每次版本迭代时, 都修改这里的版本号, 用于以后特殊场景下消息流程相关数据时, 能区别不同时期的处理办法,还需要动态起来
serverVersion.setValue(serverVersionStr);
ExtensionAttribute jsonMetaValue = new ExtensionAttribute();
jsonMetaValue.setName(FLOW_NODE_JSON);

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf;
import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
import cn.axzo.workflow.core.engine.job.exception.handle.CustomAsyncRunnableExceptionExceptionHandler;
import cn.axzo.workflow.core.engine.persistence.CustomMybatisHistoricProcessInstanceDataManager;
@ -44,11 +45,13 @@ public class FlowableConfiguration {
configuration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_TRUE);
configuration.setEnableSafeBpmnXml(false);
// configuration.setCreateDiagramOnDeploy(false);
// configuration.setIdGenerator(new TimeBasedIdGenerator());
configuration.setIdGenerator(new DistributedTimeBasedIdGenerator(stringRedisTemplate));
configuration.setHistoricProcessInstanceDataManager(new CustomMybatisHistoricProcessInstanceDataManager(configuration));
// 自定义的异步任务处理器
configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler());
configuration.addCustomJobHandler(new AsyncRejectTaskJobHandler(extAxHiTaskInstService));
configuration.addCustomJobHandler(new AsyncExtTaskInstJobHandler(extAxHiTaskInstService));
// 异步任务异常重试时间间隔
configuration.setDefaultFailedJobWaitTime(30);
configuration.setAsyncFailedJobWaitTime(30);

View File

@ -12,7 +12,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* TODO
* 自定义线程配置器
*
* @author wangli
* @since 2024/4/9 23:10

View File

@ -2,6 +2,7 @@ package cn.axzo.workflow.core.engine.event;
import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
/**
* 接收任务的扩展任务记录表创建的事件
@ -10,6 +11,8 @@ import org.flowable.common.engine.api.delegate.event.FlowableEvent;
* @since 2024/2/5 18:26
*/
public interface ExtTaskInstEvent extends FlowableEvent {
@Override
FlowableEventType getType();
String getProcessInstanceId();

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.core.engine.event;
import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum;
import lombok.Data;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE;
@ -11,6 +12,7 @@ import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE;
* @author wangli
* @since 27/03/2024 15:02
*/
@Data
public class ExtTaskInstUpdateEvent implements ExtTaskInstEvent {
private final String processInstanceId;
private final String activityId;
@ -35,31 +37,6 @@ public class ExtTaskInstUpdateEvent implements ExtTaskInstEvent {
this.resultEnum = resultEnum;
}
@Override
public String getProcessInstanceId() {
return processInstanceId;
}
@Override
public String getActivityId() {
return activityId;
}
@Override
public String getTaskId() {
return taskId;
}
@Override
public String getAssignee() {
return assignee;
}
@Override
public BpmnProcessInstanceResultEnum getResultEnum() {
return resultEnum;
}
@Override
public FlowableEventType getType() {
return UPDATE;

View File

@ -0,0 +1,58 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent;
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
import java.util.Objects;
import static cn.axzo.workflow.core.common.code.AsyncJobRespCode.DATA_NOT_EXISTS;
/**
* 异步操作扩展任务表的处理器
*
* @author wangli
* @since 2024/4/29 20:22
*/
@Slf4j
public class AsyncExtTaskInstJobHandler implements JobHandler {
public static final String TYPE = "async-update-ext-task-inst";
private final ExtAxHiTaskInstService extAxHiTaskInstService;
public AsyncExtTaskInstJobHandler(ExtAxHiTaskInstService extAxHiTaskInstService) {
this.extAxHiTaskInstService = extAxHiTaskInstService;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
ExtTaskInstUpdateEvent event = JSONObject.parseObject(job.getCustomValues(), ExtTaskInstUpdateEvent.class);
String taskId = event.getTaskId();
String processInstanceId = event.getProcessInstanceId();
String assignee = event.getAssignee();
BpmnProcessInstanceResultEnum resultEnum = event.getResultEnum();
ExtAxHiTaskInst existence = extAxHiTaskInstService.getByTaskId(taskId, processInstanceId);
if (Objects.nonNull(existence)) {
log.info("更新扩展任务实例表数据: taskId:{}, instanceId:{}, currentStatus:{}, anticipateStatus: {}",
taskId, processInstanceId, existence.getStatus(), resultEnum.getStatus());
} else {
log.warn("无法正确更新扩展任务表数据: taskId:{}, instanceId:{}, anticipateStatus: {}",
taskId, processInstanceId, resultEnum.getStatus());
throw new WorkflowEngineException(DATA_NOT_EXISTS, processInstanceId, taskId);
}
extAxHiTaskInstService.updateByTaskIdAndInstanceId(taskId, processInstanceId, assignee, resultEnum);
}
}

View File

@ -57,8 +57,8 @@ public class EngineTaskEventListener implements TaskListener {
default:
}
});
stopWatch.stop();
log.info("StopWatch '" + stopWatch.currentTaskName() + "': running time = " + stopWatch.getTotalTimeSeconds() + " s");
stopWatch.stop();
}

View File

@ -7,7 +7,7 @@ import org.slf4j.MDC;
import static cn.azxo.framework.common.constatns.Constants.CTX_LOG_ID_MDC;
/**
* TODO
* 操作上下文的抽象实现
*
* @author wangli
* @since 2024/4/9 14:21

View File

@ -4,6 +4,7 @@ import cn.axzo.workflow.core.engine.event.ExtTaskInstCreateEvent;
import cn.axzo.workflow.core.engine.event.ExtTaskInstEvent;
import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent;
import cn.axzo.workflow.core.engine.event.ReceiveTaskEventType;
import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import cn.hutool.json.JSONUtil;
@ -12,7 +13,10 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.springframework.scheduling.annotation.Async;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@ -41,11 +45,10 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
.add(UPDATE)
.build();
@Async
@Override
public void onEvent(FlowableEvent flowableEvent) {
if (flowableEvent instanceof ExtTaskInstEvent) {
log.info("Ext Task Inst Event : {}", JSONUtil.toJsonStr(flowableEvent));
log.info("Ext Task Inst Event : {}, eventType: {}", JSONUtil.toJsonStr(flowableEvent), flowableEvent.getType());
ExtTaskInstEvent event = (ExtTaskInstEvent) flowableEvent;
ReceiveTaskEventType eventType = (ReceiveTaskEventType) flowableEvent.getType();
if (SUPPORT_EVENTS.contains(eventType)) {
@ -80,9 +83,52 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
}
private void updateExtTaskInst(ExtTaskInstUpdateEvent event) {
extAxHiTaskInstService.updateByTaskIdAndInstanceId(event.getTaskId(),
event.getProcessInstanceId(),
event.getAssignee(),
event.getResultEnum());
// externalJob(event);
// 利用引擎异步任务的重试能力, 去正确更新扩展任务表的状态
createAsyncJob(event);
}
private void createAsyncJob(ExtTaskInstUpdateEvent event) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
processEngineConfiguration.getCommandExecutor().execute(commandContext -> {
JobService jobService = CommandContextUtil.getJobService(commandContext);
JobEntity job = jobService.createJob();
// job.setExecutionId(event.getExecutionId());
job.setProcessInstanceId(event.getProcessInstanceId());
job.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE);
// 携带自定义的数据
job.setCustomValues(JSONUtil.toJsonStr(event));
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
return null;
});
}
/*private void externalJob(ExtTaskInstUpdateEvent event) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
ManagementService managementService = processEngineConfiguration.getManagementService();
commandExecutor.execute((Command<Void>) commandContext -> {
JobService jobService = CommandContextUtil.getJobService(commandContext);
JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration();
ExternalWorkerJobEntity workerJob = jobService.createExternalWorkerJob();
workerJob.setJobHandlerConfiguration("extHiTaskInst");
workerJob.setJobType(Job.JOB_TYPE_EXTERNAL_WORKER);
workerJob.setRetries(jobServiceConfiguration.getAsyncExecutorNumberOfRetries());
workerJob.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE);
workerJob.setProcessInstanceId(event.getProcessInstanceId());
workerJob.setCustomValues(JSONUtil.toJsonStr(event));
jobService.insertExternalWorkerJob(workerJob);
return null;
});
List<AcquiredExternalWorkerJob> acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder()
.topic("extHiTaskInst", Duration.ofMinutes(30))
.acquireAndLock(3, "testWorker");
acquiredJobs.forEach(i -> {
managementService.createExternalWorkerCompletionBuilder(i.getId(), "testWorker").complete();
});
}*/
}

View File

@ -7,7 +7,7 @@ import java.util.List;
import java.util.Map;
/**
* TODO
* 适用于通用的操作 mapper
*
* @author wangli
* @since 2024/4/28 14:44

View File

@ -268,6 +268,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<Ta
}
private void autoReject(DelegateTask delegateTask) {
// TODO 应该用异步任务实现
log.info("AutoOperatorEventListener#autoReject...{}", delegateTask.getTaskDefinitionKey());
taskService.addComment(delegateTask.getId(), delegateTask.getProcessInstanceId(), COMMENT_TYPE_OPERATION_DESC
, "自动驳回");
@ -294,6 +295,7 @@ public class AutoOperatorEvent_101_Listener extends AbstractBpmnEventListener<Ta
private void autoPass(DelegateTask delegateTask) {
// TODO 用异步任务实现
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
BpmnTaskDelegateAssigner assigner = BpmnTaskDelegateAssigner.toObjectCompatible(