Merge branch 'refs/heads/feature/REQ-2596' into feature/merge-all

This commit is contained in:
yangqicheng 2024-07-30 09:54:32 +08:00
commit 9ebfa8df91

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.repository.entity.ExtAxProperty;
import cn.axzo.workflow.core.service.ExtAxPropertyService;
import com.alibaba.nacos.common.utils.ThreadUtils;
import org.apache.commons.lang.StringUtils;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler;
@ -12,34 +13,56 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;
import java.util.concurrent.TimeUnit;
public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandler implements JobHandler {
private static final Logger log = LoggerFactory.getLogger(AbstractExecuteWithLockJobHandler.class);
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
ExtAxPropertyService extAxPropertyService = SpringContextUtils.getBean(ExtAxPropertyService.class);
String processInstanceId = job.getProcessInstanceId(), jobId = job.getId();
if (StringUtils.isBlank(processInstanceId) || StringUtils.isBlank(jobId)) {
log.warn("processInstanceId or lockOwner is empty,cannot execute with lock,jobId:{},processInstanceId:{}", job.getId(), job.getProcessInstanceId());
executeInternal(job, configuration, variableScope, commandContext);
return;
}
//todo 处理超时时间超过一定时间锁还存在删除锁
try {
ExtAxProperty extAxProperty = new ExtAxProperty();
extAxProperty.setName(processInstanceId);
extAxProperty.setValue(jobId);
extAxPropertyService.add(extAxProperty);
log.info("job acquire lock success,processInstanceId:{},jobId:{}", processInstanceId, jobId);
executeInternal(job, configuration, variableScope, commandContext);
} catch (DuplicateKeyException e) {
log.error("executeWithLock error,lock by another job,jobId:{},processInstanceId:{}", job.getId(), job.getProcessInstanceId(), e);
throw e;
if (acquireLock(processInstanceId, jobId)) {
log.info("job acquire lock success,processInstanceId:{},jobId:{}", processInstanceId, jobId);
executeInternal(job, configuration, variableScope, commandContext);
}
} finally {
extAxPropertyService.delete(processInstanceId, jobId);
releaseLock(processInstanceId, jobId);
}
}
private boolean acquireLock(String processInstanceId, String jobId) {
int attemptNo = 3, acquireCount = 0;
ExtAxPropertyService extAxPropertyService = SpringContextUtils.getBean(ExtAxPropertyService.class);
do {
try {
ExtAxProperty extAxProperty = new ExtAxProperty();
extAxProperty.setName(processInstanceId);
extAxProperty.setValue(jobId);
extAxPropertyService.add(extAxProperty);
return true;
} catch (DuplicateKeyException e) {
//删除15分钟前的数据,相当于超时时间是15分钟
extAxPropertyService.deleteByNameWithDuration(processInstanceId, 15L, TimeUnit.MINUTES);
//睡眠2秒
ThreadUtils.sleep(2 * 1000);
acquireCount++;
log.error("acquireLock error,processInstanceId:{},jobId:{},acquireCount:{}", processInstanceId, jobId, acquireCount);
}
} while (acquireCount < attemptNo);
return false;
}
private void releaseLock(String processInstanceId, String jobId) {
ExtAxPropertyService extAxPropertyService = SpringContextUtils.getBean(ExtAxPropertyService.class);
extAxPropertyService.delete(processInstanceId, jobId);
}
abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext);
}