update - 将更新 extHiTaskInst 表数据的逻辑,改成使用引擎的异步处理,结合重试的能力

This commit is contained in:
wangli 2024-04-30 11:28:46 +08:00
parent 40b999fe8e
commit 1f5bc93e07
2 changed files with 31 additions and 17 deletions

View File

@ -1,7 +1,7 @@
package cn.axzo.workflow.core.conf;
import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.id.TimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
@ -45,8 +45,8 @@ public class FlowableConfiguration {
configuration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_TRUE);
configuration.setEnableSafeBpmnXml(false);
// configuration.setCreateDiagramOnDeploy(false);
configuration.setIdGenerator(new TimeBasedIdGenerator());
// configuration.setIdGenerator(new DistributedTimeBasedIdGenerator(stringRedisTemplate));
// configuration.setIdGenerator(new TimeBasedIdGenerator());
configuration.setIdGenerator(new DistributedTimeBasedIdGenerator(stringRedisTemplate));
configuration.setHistoricProcessInstanceDataManager(new CustomMybatisHistoricProcessInstanceDataManager(configuration));
// 自定义的异步任务处理器
configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler());

View File

@ -13,22 +13,13 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.ManagementService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.api.AcquiredExternalWorkerJob;
import org.flowable.job.api.Job;
import org.flowable.job.service.JobService;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.springframework.scheduling.annotation.Async;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.CREATE;
@ -47,7 +38,6 @@ import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE;
@Component
@Slf4j
public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
private final SpringProcessEngineConfiguration processEngineConfiguration;
private final ExtAxHiTaskInstService extAxHiTaskInstService;
public static final Set<ReceiveTaskEventType> SUPPORT_EVENTS =
ImmutableSet.<ReceiveTaskEventType>builder()
@ -55,7 +45,6 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
.add(UPDATE)
.build();
@Async
@Override
public void onEvent(FlowableEvent flowableEvent) {
if (flowableEvent instanceof ExtTaskInstEvent) {
@ -94,6 +83,31 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
}
private void updateExtTaskInst(ExtTaskInstUpdateEvent event) {
// externalJob(event);
// 利用引擎异步任务的重试能力, 去正确更新扩展任务表的状态
createAsyncJob(event);
}
private void createAsyncJob(ExtTaskInstUpdateEvent event) {
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
processEngineConfiguration.getCommandExecutor().execute(commandContext -> {
JobService jobService = CommandContextUtil.getJobService(commandContext);
JobEntity job = jobService.createJob();
// job.setExecutionId(event.getExecutionId());
job.setProcessInstanceId(event.getProcessInstanceId());
job.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE);
// 携带自定义的数据
job.setCustomValues(JSONUtil.toJsonStr(event));
// 创建异步任务并调度
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
return null;
});
}
/*private void externalJob(ExtTaskInstUpdateEvent event) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
ManagementService managementService = processEngineConfiguration.getManagementService();
commandExecutor.execute((Command<Void>) commandContext -> {
@ -116,5 +130,5 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
acquiredJobs.forEach(i -> {
managementService.createExternalWorkerCompletionBuilder(i.getId(), "testWorker").complete();
});
}
}*/
}