From 78cf58423d95e0f5b980bc4d7cdec1464bdb2e3d Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Tue, 2 Jul 2024 19:04:01 +0800 Subject: [PATCH] =?UTF-8?q?update=20-=20=E8=A7=A3=E5=86=B3=E5=A4=9A?= =?UTF-8?q?=E4=B8=AA=E4=BB=BB=E5=8A=A1=E5=90=8C=E6=97=B6=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E5=90=8C=E4=B8=80=E4=B8=AA=E5=AE=9E=E4=BE=8B,=E5=87=BA?= =?UTF-8?q?=E7=8E=B0=E7=9A=84=E4=BA=8B=E5=8A=A1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/conf/FlowableConfiguration.java | 3 +- .../CustomLockProcessInstanceInterceptor.java | 193 +++++------------- 2 files changed, 56 insertions(+), 140 deletions(-) diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java index fe8f1ea90..769d3651a 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java @@ -23,6 +23,7 @@ import com.alibaba.cloud.nacos.NacosServiceManager; import com.google.common.collect.Lists; import org.flowable.common.engine.api.delegate.event.FlowableEventListener; import org.flowable.common.engine.impl.history.HistoryLevel; +import org.flowable.common.engine.impl.interceptor.RetryInterceptor; import org.flowable.form.spring.SpringFormEngineConfiguration; import org.flowable.job.service.JobProcessor; import org.flowable.spring.SpringProcessEngineConfiguration; @@ -97,7 +98,7 @@ public class FlowableConfiguration { new CustomAsyncRunnableExceptionExceptionHandler())); configuration.setCommandContextFactory(new CustomCommandContextFactory()); configuration.setCustomPreCommandInterceptors(Lists.newArrayList( -// new CustomLockProcessInstanceInterceptor(redisTemplate) + new CustomLockProcessInstanceInterceptor() )); }; } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/interceptor/CustomLockProcessInstanceInterceptor.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/interceptor/CustomLockProcessInstanceInterceptor.java index fedcdaa15..f15ab3040 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/interceptor/CustomLockProcessInstanceInterceptor.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/interceptor/CustomLockProcessInstanceInterceptor.java @@ -1,25 +1,12 @@ package cn.axzo.workflow.core.engine.interceptor; -import cn.axzo.workflow.core.engine.cmd.AbstractCommand; -import liquibase.pro.packaged.T; +import lombok.extern.slf4j.Slf4j; +import org.apache.ibatis.exceptions.PersistenceException; +import org.flowable.common.engine.api.FlowableException; import org.flowable.common.engine.impl.interceptor.AbstractCommandInterceptor; import org.flowable.common.engine.impl.interceptor.Command; import org.flowable.common.engine.impl.interceptor.CommandConfig; import org.flowable.common.engine.impl.interceptor.CommandExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.RedisStringCommands; -import org.springframework.data.redis.connection.ReturnType; -import org.springframework.data.redis.core.RedisConnectionUtils; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.core.types.Expiration; - -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.Objects; -import java.util.UUID; /** * 增加一个拦截器,用来控制多个操作,不允许同时操作同一个实例 @@ -27,141 +14,69 @@ import java.util.UUID; * @author wangli * @since 2024/7/1 13:51 */ +@Slf4j public class CustomLockProcessInstanceInterceptor extends AbstractCommandInterceptor { - private static final String KEY_PREFIX = "operation:processInstance:"; - /** - * 解锁脚本,原子操作 - */ - private static final String unlockScript = - "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" - + "then\n" - + " return redis.call(\"del\",KEYS[1])\n" - + "else\n" - + " return 0\n" - + "end"; - private static final Logger log = LoggerFactory.getLogger(CustomLockProcessInstanceInterceptor.class); - private final StringRedisTemplate redisTemplate; - public CustomLockProcessInstanceInterceptor(StringRedisTemplate redisTemplate) { - this.redisTemplate = redisTemplate; - } + protected int numOfRetries = 3; + protected int waitTimeInMs = 50; + protected int waitIncreaseFactor = 5; @Override - public final T execute(CommandConfig config, Command command, CommandExecutor commandExecutor) { - if (AbstractCommand.class.isAssignableFrom(command.getClass())) { - AbstractCommand abstractCommand = (AbstractCommand) command; - String token = null; - try { - do { - token = getLock(KEY_PREFIX + abstractCommand.getProcessInstanceId(), 10 * 1000, 11 * 1000); - log.info(" threadName: {}, token: {}", Thread.currentThread().getName(), token); - } while (Objects.isNull(token)); - return next.execute(config, command, commandExecutor); - } finally { - log.info("finally unlock threadName: {}, token: {}", Thread.currentThread().getName(), token); - if (token != null) { - unlock(KEY_PREFIX + abstractCommand.getProcessInstanceId(), token); - } - } - } - return next.execute(config, command, commandExecutor); - } + public T execute(CommandConfig config, Command command, CommandExecutor commandExecutor) { + long waitTime = waitTimeInMs; + int failedAttempts = 0; - /** - * 加锁,有阻塞 - * - * @param name key的值 - * @param expire 过期时间 - * @param timeout 加锁执行超时时间 - * @return - */ - public String getLock(String name, long expire, long timeout) { - long startTime = System.currentTimeMillis(); //获取开始时间 - String token; - //规定的时间内,循环获取有值的token do { - token = tryGetLock(name, expire); //获取秘钥Key - if (token == null) { - if ((System.currentTimeMillis() - startTime) > (timeout - 50)) - break; - try { - Thread.sleep(50); //try 50毫秒 per sec milliseconds - } catch (InterruptedException e) { - log.warn("getLock error: {}", e.getMessage(), e); - return null; - } + if (failedAttempts > 0) { + log.warn("Waiting for {}ms before retrying the command.", waitTime); + waitBeforeRetry(waitTime); + waitTime *= waitIncreaseFactor; } - } while (token == null); - return token; - } - /** - * 加锁,无阻塞 - * - * @param name 设置key - * @param expire - * @return - */ - public String tryGetLock(String name, long expire) { - //获取UUID值为value - String token = UUID.randomUUID().toString(); - RedisConnectionFactory factory = redisTemplate.getConnectionFactory(); - RedisConnection conn = factory.getConnection(); - try { - Boolean result = conn.set(name.getBytes(StandardCharsets.UTF_8), //设置name为key - token.getBytes(StandardCharsets.UTF_8), //设置token为value -// Expiration.from(expire, TimeUnit.MILLISECONDS), //设置过期时间:MILLISECONDS毫秒 - Expiration.persistent(), - RedisStringCommands.SetOption.SET_IF_ABSENT); //如果name不存在创建 - log.info("tryGetLock result: {}", result); - if (Objects.isNull(result)) { - throw new IllegalStateException("tryGetLock processInstance fail"); - } - if (result) - return token; - } catch (Exception e) { - log.warn("tryGetLock error: {}", e.getMessage(), e); - return null; - } finally { - RedisConnectionUtils.releaseConnection(conn, factory); - } - return null; - } - - /** - * 功能描述:使用Lua脚本解锁 - * - * @MethodName: unlock - * @MethodParam: [name, token] - * @Return: boolean - * @Author: yyalin - * @CreateDate: 2023/7/17 18:41 - */ - public boolean unlock(String name, String token) { - byte[][] keysAndArgs = new byte[2][]; - keysAndArgs[0] = name.getBytes(Charset.forName("UTF-8")); //lock_key - keysAndArgs[1] = token.getBytes(Charset.forName("UTF-8")); //token的值,也即唯一标识符 - RedisConnectionFactory factory = redisTemplate.getConnectionFactory(); - RedisConnection conn = factory.getConnection(); - try { - Long result = (Long) conn.scriptingCommands().eval(unlockScript.getBytes(Charset.forName("UTF-8")), - ReturnType.INTEGER, 1, keysAndArgs); - if (Objects.isNull(result)) { - throw new IllegalStateException("unLock processInstance fail"); - } - return result > 0; - } catch (Exception e) { - log.warn("unlock error: {}", e.getMessage(), e); try { - Thread.sleep(1000L); - } catch (InterruptedException ex) { - // ignore + + // try to execute the command + return next.execute(config, command, commandExecutor); + + } catch (PersistenceException e) { + log.error("Caught persistence exception: {}", e.getMessage(), e); } - return unlock(name, token); - } finally { - RedisConnectionUtils.releaseConnection(conn, factory); + + failedAttempts++; + } while (failedAttempts <= numOfRetries); + + throw new FlowableException(numOfRetries + " retries failed with FlowableOptimisticLockingException. Giving up."); + } + + protected void waitBeforeRetry(long waitTime) { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + log.error("I am interrupted while waiting for a retry."); } } + public void setNumOfRetries(int numOfRetries) { + this.numOfRetries = numOfRetries; + } + public void setWaitIncreaseFactor(int waitIncreaseFactor) { + this.waitIncreaseFactor = waitIncreaseFactor; + } + + public void setWaitTimeInMs(int waitTimeInMs) { + this.waitTimeInMs = waitTimeInMs; + } + + public int getNumOfRetries() { + return numOfRetries; + } + + public int getWaitIncreaseFactor() { + return waitIncreaseFactor; + } + + public int getWaitTimeInMs() { + return waitTimeInMs; + } }