update - 解决多个任务同时操作同一个实例,出现的事务问题

This commit is contained in:
wangli 2024-07-02 19:04:01 +08:00
parent 8a0affbf44
commit 78cf58423d
2 changed files with 56 additions and 140 deletions

View File

@ -23,6 +23,7 @@ import com.alibaba.cloud.nacos.NacosServiceManager;
import com.google.common.collect.Lists;
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
import org.flowable.common.engine.impl.history.HistoryLevel;
import org.flowable.common.engine.impl.interceptor.RetryInterceptor;
import org.flowable.form.spring.SpringFormEngineConfiguration;
import org.flowable.job.service.JobProcessor;
import org.flowable.spring.SpringProcessEngineConfiguration;
@ -97,7 +98,7 @@ public class FlowableConfiguration {
new CustomAsyncRunnableExceptionExceptionHandler()));
configuration.setCommandContextFactory(new CustomCommandContextFactory());
configuration.setCustomPreCommandInterceptors(Lists.newArrayList(
// new CustomLockProcessInstanceInterceptor(redisTemplate)
new CustomLockProcessInstanceInterceptor()
));
};
}

View File

@ -1,25 +1,12 @@
package cn.axzo.workflow.core.engine.interceptor;
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
import liquibase.pro.packaged.T;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.exceptions.PersistenceException;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.impl.interceptor.AbstractCommandInterceptor;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandConfig;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.UUID;
/**
* 增加一个拦截器,用来控制多个操作,不允许同时操作同一个实例
@ -27,141 +14,69 @@ import java.util.UUID;
* @author wangli
* @since 2024/7/1 13:51
*/
@Slf4j
public class CustomLockProcessInstanceInterceptor extends AbstractCommandInterceptor {
private static final String KEY_PREFIX = "operation:processInstance:";
/**
* 解锁脚本原子操作
*/
private static final String unlockScript =
"if redis.call(\"get\",KEYS[1]) == ARGV[1]\n"
+ "then\n"
+ " return redis.call(\"del\",KEYS[1])\n"
+ "else\n"
+ " return 0\n"
+ "end";
private static final Logger log = LoggerFactory.getLogger(CustomLockProcessInstanceInterceptor.class);
private final StringRedisTemplate redisTemplate;
public CustomLockProcessInstanceInterceptor(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
protected int numOfRetries = 3;
protected int waitTimeInMs = 50;
protected int waitIncreaseFactor = 5;
@Override
public final <T> T execute(CommandConfig config, Command<T> command, CommandExecutor commandExecutor) {
if (AbstractCommand.class.isAssignableFrom(command.getClass())) {
AbstractCommand abstractCommand = (AbstractCommand) command;
String token = null;
try {
do {
token = getLock(KEY_PREFIX + abstractCommand.getProcessInstanceId(), 10 * 1000, 11 * 1000);
log.info(" threadName: {}, token: {}", Thread.currentThread().getName(), token);
} while (Objects.isNull(token));
return next.execute(config, command, commandExecutor);
} finally {
log.info("finally unlock threadName: {}, token: {}", Thread.currentThread().getName(), token);
if (token != null) {
unlock(KEY_PREFIX + abstractCommand.getProcessInstanceId(), token);
}
}
}
return next.execute(config, command, commandExecutor);
}
public <T> T execute(CommandConfig config, Command<T> command, CommandExecutor commandExecutor) {
long waitTime = waitTimeInMs;
int failedAttempts = 0;
/**
* 加锁有阻塞
*
* @param name key的值
* @param expire 过期时间
* @param timeout 加锁执行超时时间
* @return
*/
public String getLock(String name, long expire, long timeout) {
long startTime = System.currentTimeMillis(); //获取开始时间
String token;
//规定的时间内循环获取有值的token
do {
token = tryGetLock(name, expire); //获取秘钥Key
if (token == null) {
if ((System.currentTimeMillis() - startTime) > (timeout - 50))
break;
try {
Thread.sleep(50); //try 50毫秒 per sec milliseconds
} catch (InterruptedException e) {
log.warn("getLock error: {}", e.getMessage(), e);
return null;
}
if (failedAttempts > 0) {
log.warn("Waiting for {}ms before retrying the command.", waitTime);
waitBeforeRetry(waitTime);
waitTime *= waitIncreaseFactor;
}
} while (token == null);
return token;
}
/**
* 加锁无阻塞
*
* @param name 设置key
* @param expire
* @return
*/
public String tryGetLock(String name, long expire) {
//获取UUID值为value
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
Boolean result = conn.set(name.getBytes(StandardCharsets.UTF_8), //设置name为key
token.getBytes(StandardCharsets.UTF_8), //设置token为value
// Expiration.from(expire, TimeUnit.MILLISECONDS), //设置过期时间MILLISECONDS毫秒
Expiration.persistent(),
RedisStringCommands.SetOption.SET_IF_ABSENT); //如果name不存在创建
log.info("tryGetLock result: {}", result);
if (Objects.isNull(result)) {
throw new IllegalStateException("tryGetLock processInstance fail");
}
if (result)
return token;
} catch (Exception e) {
log.warn("tryGetLock error: {}", e.getMessage(), e);
return null;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
}
return null;
}
/**
* 功能描述:使用Lua脚本解锁
*
* @MethodName: unlock
* @MethodParam: [name, token]
* @Return: boolean
* @Author: yyalin
* @CreateDate: 2023/7/17 18:41
*/
public boolean unlock(String name, String token) {
byte[][] keysAndArgs = new byte[2][];
keysAndArgs[0] = name.getBytes(Charset.forName("UTF-8")); //lock_key
keysAndArgs[1] = token.getBytes(Charset.forName("UTF-8")); //token的值也即唯一标识符
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
Long result = (Long) conn.scriptingCommands().eval(unlockScript.getBytes(Charset.forName("UTF-8")),
ReturnType.INTEGER, 1, keysAndArgs);
if (Objects.isNull(result)) {
throw new IllegalStateException("unLock processInstance fail");
}
return result > 0;
} catch (Exception e) {
log.warn("unlock error: {}", e.getMessage(), e);
try {
Thread.sleep(1000L);
} catch (InterruptedException ex) {
// ignore
// try to execute the command
return next.execute(config, command, commandExecutor);
} catch (PersistenceException e) {
log.error("Caught persistence exception: {}", e.getMessage(), e);
}
return unlock(name, token);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
failedAttempts++;
} while (failedAttempts <= numOfRetries);
throw new FlowableException(numOfRetries + " retries failed with FlowableOptimisticLockingException. Giving up.");
}
protected void waitBeforeRetry(long waitTime) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
log.error("I am interrupted while waiting for a retry.");
}
}
public void setNumOfRetries(int numOfRetries) {
this.numOfRetries = numOfRetries;
}
public void setWaitIncreaseFactor(int waitIncreaseFactor) {
this.waitIncreaseFactor = waitIncreaseFactor;
}
public void setWaitTimeInMs(int waitTimeInMs) {
this.waitTimeInMs = waitTimeInMs;
}
public int getNumOfRetries() {
return numOfRetries;
}
public int getWaitIncreaseFactor() {
return waitIncreaseFactor;
}
public int getWaitTimeInMs() {
return waitTimeInMs;
}
}