update - 调整 ExtHiTaskInst 变更状态的逻辑,借用引擎的异步事件的重试能力,达到正确更新的作用. 这不是解决的问题根本,还需要观察异常数据产生的原因

This commit is contained in:
wangli 2024-04-29 23:23:15 +08:00
parent 5a631bf6f1
commit 40b999fe8e
11 changed files with 25 additions and 47 deletions

View File

@ -12,7 +12,7 @@ import java.util.Objects;
import java.util.function.Supplier;
/**
* TODO
* 事件处理器中通用的上下文
*
* @author wangli
* @since 2024/4/9 21:00

View File

@ -2,7 +2,7 @@ package cn.axzo.workflow.core.common.context;
/**
* TODO
* 操作上下文接口
*
* @author wangli
* @since 2024/4/9 10:05

View File

@ -169,7 +169,6 @@ public final class BpmnJsonConverterUtil {
ExtensionAttribute serverVersion = new ExtensionAttribute();
serverVersion.setName(FLOW_SERVER_VERSION);
//FIXME: 尽量在每次版本迭代时, 都修改这里的版本号, 用于以后特殊场景下消息流程相关数据时, 能区别不同时期的处理办法,还需要动态起来
serverVersion.setValue(serverVersionStr);
ExtensionAttribute jsonMetaValue = new ExtensionAttribute();
jsonMetaValue.setName(FLOW_NODE_JSON);

View File

@ -1,7 +1,7 @@
package cn.axzo.workflow.core.conf;
import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.id.DistributedTimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.id.TimeBasedIdGenerator;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncExtTaskInstJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncRejectTaskJobHandler;
@ -45,7 +45,8 @@ public class FlowableConfiguration {
configuration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_TRUE);
configuration.setEnableSafeBpmnXml(false);
// configuration.setCreateDiagramOnDeploy(false);
configuration.setIdGenerator(new DistributedTimeBasedIdGenerator(stringRedisTemplate));
configuration.setIdGenerator(new TimeBasedIdGenerator());
// configuration.setIdGenerator(new DistributedTimeBasedIdGenerator(stringRedisTemplate));
configuration.setHistoricProcessInstanceDataManager(new CustomMybatisHistoricProcessInstanceDataManager(configuration));
// 自定义的异步任务处理器
configuration.addCustomJobHandler(new AsyncApproveTaskJobHandler());

View File

@ -12,7 +12,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* TODO
* 自定义线程配置器
*
* @author wangli
* @since 2024/4/9 23:10

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.core.engine.event;
import cn.axzo.workflow.common.enums.BpmnProcessInstanceResultEnum;
import lombok.Data;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE;
@ -11,6 +12,7 @@ import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.UPDATE;
* @author wangli
* @since 27/03/2024 15:02
*/
@Data
public class ExtTaskInstUpdateEvent implements ExtTaskInstEvent {
private final String processInstanceId;
private final String activityId;
@ -35,31 +37,6 @@ public class ExtTaskInstUpdateEvent implements ExtTaskInstEvent {
this.resultEnum = resultEnum;
}
@Override
public String getProcessInstanceId() {
return processInstanceId;
}
@Override
public String getActivityId() {
return activityId;
}
@Override
public String getTaskId() {
return taskId;
}
@Override
public String getAssignee() {
return assignee;
}
@Override
public BpmnProcessInstanceResultEnum getResultEnum() {
return resultEnum;
}
@Override
public FlowableEventType getType() {
return UPDATE;

View File

@ -5,7 +5,7 @@ import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.engine.event.ExtTaskInstUpdateEvent;
import cn.axzo.workflow.core.repository.entity.ExtAxHiTaskInst;
import cn.axzo.workflow.core.service.ExtAxHiTaskInstService;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler;
@ -38,7 +38,7 @@ public class AsyncExtTaskInstJobHandler implements JobHandler {
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
ExtTaskInstUpdateEvent event = JSONUtil.toBean(job.getCustomValues(), ExtTaskInstUpdateEvent.class);
ExtTaskInstUpdateEvent event = JSONObject.parseObject(job.getCustomValues(), ExtTaskInstUpdateEvent.class);
String taskId = event.getTaskId();
String processInstanceId = event.getProcessInstanceId();
String assignee = event.getAssignee();

View File

@ -57,8 +57,8 @@ public class EngineTaskEventListener implements TaskListener {
default:
}
});
stopWatch.stop();
log.info("StopWatch '" + stopWatch.currentTaskName() + "': running time = " + stopWatch.getTotalTimeSeconds() + " s");
stopWatch.stop();
}

View File

@ -7,7 +7,7 @@ import org.slf4j.MDC;
import static cn.azxo.framework.common.constatns.Constants.CTX_LOG_ID_MDC;
/**
* TODO
* 操作上下文的抽象实现
*
* @author wangli
* @since 2024/4/9 14:21

View File

@ -15,19 +15,20 @@ import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListen
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.flowable.engine.ManagementService;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.api.AcquiredExternalWorkerJob;
import org.flowable.job.api.Job;
import org.flowable.job.service.JobService;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor;
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import static cn.axzo.workflow.core.engine.event.ReceiveTaskEventType.CREATE;
@ -94,26 +95,26 @@ public class ExtTaskInstEventListener extends AbstractFlowableEventListener {
private void updateExtTaskInst(ExtTaskInstUpdateEvent event) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
ManagementService managementService = processEngineConfiguration.getManagementService();
commandExecutor.execute((Command<Void>) commandContext -> {
JobService jobService = CommandContextUtil.getJobService(commandContext);
JobServiceConfiguration jobServiceConfiguration = processEngineConfiguration.getJobServiceConfiguration();
ExternalWorkerJobEntity workerJob = jobService.createExternalWorkerJob();
workerJob.setJobHandlerConfiguration("extHiTaskInst");
workerJob.setJobType(Job.JOB_TYPE_EXTERNAL_WORKER);
workerJob.setRetries(jobServiceConfiguration.getAsyncExecutorNumberOfRetries());
workerJob.setJobHandlerType(AsyncExtTaskInstJobHandler.TYPE);
workerJob.setProcessInstanceId(event.getProcessInstanceId());
GregorianCalendar gregorianCalendar = new GregorianCalendar();
gregorianCalendar.setTime(jobServiceConfiguration.getClock().getCurrentTime());
AsyncExecutor asyncExecutor = jobServiceConfiguration.getAsyncExecutor();
gregorianCalendar.add(Calendar.MILLISECOND, asyncExecutor.getAsyncJobLockTimeInMillis());
workerJob.setLockExpirationTime(gregorianCalendar.getTime());
workerJob.setLockOwner(asyncExecutor.getLockOwner());
workerJob.setCustomValues(JSONUtil.toJsonStr(event));
jobService.insertExternalWorkerJob(workerJob);
return null;
});
List<AcquiredExternalWorkerJob> acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder()
.topic("extHiTaskInst", Duration.ofMinutes(30))
.acquireAndLock(3, "testWorker");
acquiredJobs.forEach(i -> {
managementService.createExternalWorkerCompletionBuilder(i.getId(), "testWorker").complete();
});
}
}

View File

@ -7,7 +7,7 @@ import java.util.List;
import java.util.Map;
/**
* TODO
* 适用于通用的操作 mapper
*
* @author wangli
* @since 2024/4/28 14:44