REQ-2596-job执行加锁,增加重试机制,获取锁失败,执行删除过期锁操作

This commit is contained in:
yangqicheng 2024-07-29 11:56:20 +08:00
parent 78151d27af
commit 24c06f3f8c

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.core.common.utils.SpringContextUtils; import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.repository.entity.ExtAxProperty; import cn.axzo.workflow.core.repository.entity.ExtAxProperty;
import cn.axzo.workflow.core.service.ExtAxPropertyService; import cn.axzo.workflow.core.service.ExtAxPropertyService;
import com.alibaba.nacos.common.utils.ThreadUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.flowable.common.engine.impl.interceptor.CommandContext; import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler; import org.flowable.job.service.JobHandler;
@ -12,33 +13,55 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.DuplicateKeyException;
import java.util.concurrent.TimeUnit;
public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandler implements JobHandler { public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandler implements JobHandler {
private static final Logger log = LoggerFactory.getLogger(AbstractExecuteWithLockJobHandler.class); private static final Logger log = LoggerFactory.getLogger(AbstractExecuteWithLockJobHandler.class);
@Override @Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) { public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
ExtAxPropertyService extAxPropertyService = SpringContextUtils.getBean(ExtAxPropertyService.class);
String processInstanceId = job.getProcessInstanceId(), jobId = job.getId(); String processInstanceId = job.getProcessInstanceId(), jobId = job.getId();
if (StringUtils.isBlank(processInstanceId) || StringUtils.isBlank(jobId)) { if (StringUtils.isBlank(processInstanceId) || StringUtils.isBlank(jobId)) {
log.warn("processInstanceId or lockOwner is empty,cannot execute with lock,jobId:{},processInstanceId:{}", job.getId(), job.getProcessInstanceId()); log.warn("processInstanceId or lockOwner is empty,cannot execute with lock,jobId:{},processInstanceId:{}", job.getId(), job.getProcessInstanceId());
executeInternal(job, configuration, variableScope, commandContext); executeInternal(job, configuration, variableScope, commandContext);
return; return;
} }
//todo 处理超时时间超过一定时间锁还存在删除锁 try {
if (acquireLock(processInstanceId, jobId)) {
log.info("job acquire lock success,processInstanceId:{},jobId:{}", processInstanceId, jobId);
executeInternal(job, configuration, variableScope, commandContext);
}
} finally {
releaseLock(processInstanceId, jobId);
}
}
private boolean acquireLock(String processInstanceId, String jobId) {
int attemptNo = 3, acquireCount = 0;
ExtAxPropertyService extAxPropertyService = SpringContextUtils.getBean(ExtAxPropertyService.class);
do {
try { try {
ExtAxProperty extAxProperty = new ExtAxProperty(); ExtAxProperty extAxProperty = new ExtAxProperty();
extAxProperty.setName(processInstanceId); extAxProperty.setName(processInstanceId);
extAxProperty.setValue(jobId); extAxProperty.setValue(jobId);
extAxPropertyService.add(extAxProperty); extAxPropertyService.add(extAxProperty);
log.info("job acquire lock success,processInstanceId:{},jobId:{}", processInstanceId, jobId); return true;
executeInternal(job, configuration, variableScope, commandContext);
} catch (DuplicateKeyException e) { } catch (DuplicateKeyException e) {
log.error("executeWithLock error,lock by another job,jobId:{},processInstanceId:{}", job.getId(), job.getProcessInstanceId(), e); //删除15分钟前的数据,相当于超时时间是15分钟
throw e; extAxPropertyService.deleteByNameWithDuration(processInstanceId, 15L, TimeUnit.MINUTES);
} finally { //睡眠2秒
extAxPropertyService.delete(processInstanceId, jobId); ThreadUtils.sleep(2 * 1000);
acquireCount++;
log.error("acquireLock error,processInstanceId:{},jobId:{},acquireCount:{}", processInstanceId, jobId, acquireCount);
} }
} while (acquireCount < attemptNo);
return false;
}
private void releaseLock(String processInstanceId, String jobId) {
ExtAxPropertyService extAxPropertyService = SpringContextUtils.getBean(ExtAxPropertyService.class);
extAxPropertyService.delete(processInstanceId, jobId);
} }
abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext); abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext);