REQ-2596-异步审批执行job添加锁

This commit is contained in:
yangqicheng 2024-07-19 16:36:09 +08:00
parent 9b55b354e6
commit 73913fc980
7 changed files with 98 additions and 11 deletions

View File

@ -0,0 +1,44 @@
package cn.axzo.workflow.core.engine.job;
import cn.axzo.workflow.core.common.utils.SpringContextUtils;
import cn.axzo.workflow.core.repository.entity.ExtAxProperty;
import cn.axzo.workflow.core.service.ExtAxPropertyService;
import org.apache.commons.lang.StringUtils;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.job.service.JobHandler;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.variable.api.delegate.VariableScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;
public abstract class AbstractExecuteWithLockJobHandler extends AbstractJobHandler implements JobHandler {
private static final Logger log = LoggerFactory.getLogger(AbstractExecuteWithLockJobHandler.class);
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
ExtAxPropertyService extAxPropertyService = SpringContextUtils.getBean(ExtAxPropertyService.class);
String processInstanceId = job.getProcessInstanceId(), jobId = job.getId();
if (StringUtils.isBlank(processInstanceId) || StringUtils.isBlank(jobId)) {
log.warn("processInstanceId or lockOwner is empty,cannot execute with lock,jobId:{},processInstanceId:{}", job.getId(), job.getProcessInstanceId());
executeInternal(job, configuration, variableScope, commandContext);
return;
}
//todo 处理超时时间超过一定时间锁还存在删除锁
try {
ExtAxProperty extAxProperty = new ExtAxProperty();
extAxProperty.setName(processInstanceId);
extAxProperty.setValue(jobId);
extAxPropertyService.add(extAxProperty);
executeInternal(job, configuration, variableScope, commandContext);
} catch (DuplicateKeyException e) {
log.error("executeWithLock error,lock by another job,jobId:{},processInstanceId:{}", job.getId(), job.getProcessInstanceId(), e);
throw e;
} finally {
extAxPropertyService.delete(processInstanceId, jobId);
}
}
abstract void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext);
}

View File

@ -12,13 +12,6 @@ import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.api.Task;
import org.flowable.variable.api.delegate.VariableScope;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Objects;
import static cn.axzo.workflow.common.enums.BpmnFlowNodeType.NODE_STARTER;
@ -30,7 +23,7 @@ import static cn.axzo.workflow.common.enums.BpmnFlowNodeType.NODE_STARTER;
* @since 2024/4/15 22:41
*/
@Slf4j
public class AsyncApproveTaskJobHandler extends AbstractJobHandler implements JobHandler {
public class AsyncApproveTaskJobHandler extends AbstractExecuteWithLockJobHandler implements JobHandler {
public static final String TYPE = "async-approve-task";
@Override
@ -39,7 +32,7 @@ public class AsyncApproveTaskJobHandler extends AbstractJobHandler implements Jo
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.info("AsyncApproveTaskJobHandler executing...");
log(job);
ProcessEngineConfigurationImpl processEngineConfiguration =

View File

@ -19,7 +19,7 @@ import org.flowable.variable.api.delegate.VariableScope;
* @since 2024/4/16 11:11
*/
@Slf4j
public class AsyncRejectTaskJobHandler extends AbstractJobHandler implements JobHandler {
public class AsyncRejectTaskJobHandler extends AbstractExecuteWithLockJobHandler implements JobHandler {
public static final String TYPE = "async-reject-task";
private final ExtAxHiTaskInstService extAxHiTaskInstService;
@ -33,7 +33,7 @@ public class AsyncRejectTaskJobHandler extends AbstractJobHandler implements Job
}
@Override
public void execute(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
public void executeInternal(JobEntity job, String configuration, VariableScope variableScope, CommandContext commandContext) {
log.info("AsyncRejectTaskJobHandler executing...");
log(job);
ProcessEngineConfigurationImpl processEngineConfiguration =

View File

@ -3,8 +3,10 @@ package cn.axzo.workflow.core.repository.entity;
import cn.axzo.framework.data.mybatisplus.model.BaseEntity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
@ -17,6 +19,8 @@ import lombok.ToString;
@TableName(value = "ext_ax_property", autoResultMap = true)
@Data
@ToString(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
public class ExtAxProperty extends BaseEntity<ExtAxProperty> {
private static final long serialVersionUID = 1L;

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.service;
import cn.axzo.workflow.core.repository.entity.ExtAxProperty;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* 引擎服务持久配置信息表操作 Service
@ -17,4 +18,17 @@ public interface ExtAxPropertyService {
ExtAxProperty update(ExtAxProperty property);
Optional<ExtAxProperty> getByName(String name);
int delete(String name, String value);
/**
* 删除指定时间段之前创建的数据
*
* @param name
* @param timeOut
* @param timeUnit
* @return
*/
int deleteByNameWithDuration(String name, Long timeOut, TimeUnit timeUnit);
}

View File

@ -9,7 +9,12 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* 引擎服务持久配置信息表操作 Service 实现
@ -44,4 +49,29 @@ public class ExtAxPropertyServiceImpl implements ExtAxPropertyService {
.eq("name", name);
return Optional.ofNullable(mapper.selectOne(queryWrapper));
}
@Override
public int delete(String name, String value) {
if (!StringUtils.hasText(name)) {
return 0;
}
Map<String, Object> deleteMap = new HashMap<>();
deleteMap.put("name", name);
if (StringUtils.hasText(value)) {
deleteMap.put("value", value);
}
return mapper.deleteByMap(deleteMap);
}
public int deleteByNameWithDuration(String name, Long timeOut, TimeUnit timeUnit) {
if (!StringUtils.hasText(name) || timeOut == null || timeUnit == null) {
log.error("argument not valid,name:{},timeOut:{},timeUnit:{}", name, timeOut, timeUnit);
throw new IllegalArgumentException("argument not valid");
}
LocalDateTime startTime = LocalDateTime.now().minus(timeUnit.toMillis(timeOut), ChronoUnit.MILLIS);
QueryWrapper<ExtAxProperty> queryWrapper = new QueryWrapper<ExtAxProperty>()
.eq("name", name)
.le("create_at", startTime);
return mapper.delete(queryWrapper);
}
}

View File

@ -0,0 +1,2 @@
ALTER TABLE ext_ax_property ADD UNIQUE INDEX name_unique_index (name);