diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java index 15302a561..1860b1b7b 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/FlowableConfiguration.java @@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf; import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory; import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory; import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator; +import cn.axzo.workflow.core.engine.interceptor.CustomLockProcessInstanceInterceptor; import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceHandler; import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler; import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler; @@ -30,6 +31,8 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import java.time.Duration; import java.util.List; @@ -56,7 +59,8 @@ public class FlowableConfiguration { BpmnProcessActivityService bpmnProcessActivityService, List jobProcessors, NacosServiceManager nacosServiceManager, - NacosDiscoveryProperties nacosDiscoveryProperties) { + NacosDiscoveryProperties nacosDiscoveryProperties, + StringRedisTemplate redisTemplate) { return configuration -> { configuration.setEnableHistoricTaskLogging(true); configuration.setHistoryLevel(HistoryLevel.AUDIT); @@ -92,6 +96,9 @@ public class FlowableConfiguration { new CustomWorkflowEngineExceptionHandler(), new CustomAsyncRunnableExceptionExceptionHandler())); configuration.setCommandContextFactory(new CustomCommandContextFactory()); + configuration.setCustomPreCommandInterceptors(Lists.newArrayList( + new CustomLockProcessInstanceInterceptor(redisTemplate) + )); }; } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/AbstractCommand.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/AbstractCommand.java new file mode 100644 index 000000000..ed5aae60b --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/AbstractCommand.java @@ -0,0 +1,12 @@ +package cn.axzo.workflow.core.engine.cmd; + +/** + * TODO + * + * @author wangli + * @since 2024/7/1 13:59 + */ +public abstract class AbstractCommand { + + public abstract String getProcessInstanceId(); +} diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomAbortProcessInstanceCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomAbortProcessInstanceCmd.java index 15a121697..0bd434a7a 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomAbortProcessInstanceCmd.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomAbortProcessInstanceCmd.java @@ -44,7 +44,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.createVir * @author wangli * @since 2024/1/2 18:19 */ -public class CustomAbortProcessInstanceCmd implements Command, Serializable { +public class CustomAbortProcessInstanceCmd extends AbstractCommand implements Command, Serializable { private final String processInstanceId; private final String tenantId; private final String reason; @@ -102,4 +102,12 @@ public class CustomAbortProcessInstanceCmd implements Command, Serializabl runtimeService.setVariable(task.getProcessInstanceId(), TASK_COMPLETE_OPERATION_TYPE + task.getId(), ABORTED); return null; } + + @Override + public String getProcessInstanceId() { + if (StringUtils.hasText(processInstanceId)) { + return processInstanceId; + } + return null; + } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomApproveTaskCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomApproveTaskCmd.java index 9bdaabe5f..f891b67a4 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomApproveTaskCmd.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomApproveTaskCmd.java @@ -10,6 +10,7 @@ import org.flowable.common.engine.impl.interceptor.CommandContext; import org.flowable.engine.RuntimeService; import org.flowable.engine.TaskService; import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.context.Context; import org.flowable.engine.impl.util.CommandContextUtil; import org.flowable.task.api.Task; import org.flowable.task.api.history.HistoricTaskInstance; @@ -38,7 +39,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask * @author wangli * @since 2024/1/4 15:50 */ -public class CustomApproveTaskCmd implements Command, Serializable { +public class CustomApproveTaskCmd extends AbstractCommand implements Command, Serializable { private static final Logger log = LoggerFactory.getLogger(CustomApproveTaskCmd.class); private final String taskId; @@ -146,4 +147,17 @@ public class CustomApproveTaskCmd implements Command, Serializable { } } + @Override + public String getProcessInstanceId() { + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(Context.getCommandContext()); + HistoricTaskInstanceQuery taskQuery = + processEngineConfiguration.getHistoryService().createHistoricTaskInstanceQuery(); + + HistoricTaskInstance historicTaskInstance = taskQuery.taskId(taskId).singleResult(); + if (Objects.isNull(historicTaskInstance)) { + return null; + } + return historicTaskInstance.getProcessInstanceId(); + } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomCancelProcessInstanceCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomCancelProcessInstanceCmd.java index 93c447265..540378165 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomCancelProcessInstanceCmd.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomCancelProcessInstanceCmd.java @@ -1,5 +1,6 @@ package cn.axzo.workflow.core.engine.cmd; +import cn.axzo.framework.context.validation.SpringValidator; import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner; import cn.axzo.workflow.core.common.exception.WorkflowEngineException; import cn.axzo.workflow.core.engine.operation.DeleteProcessInstanceOperation; @@ -12,6 +13,7 @@ import org.flowable.engine.history.HistoricProcessInstance; import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; import org.flowable.engine.impl.util.CommandContextUtil; import org.flowable.task.api.Task; +import org.springframework.util.StringUtils; import java.io.Serializable; import java.util.HashMap; @@ -41,7 +43,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.createVir * @author wangli * @since 2024/1/2 18:19 */ -public class CustomCancelProcessInstanceCmd implements Command, Serializable { +public class CustomCancelProcessInstanceCmd extends AbstractCommand implements Command, Serializable { private final String processInstanceId; private final String tenantId; private final String reason; @@ -102,4 +104,12 @@ public class CustomCancelProcessInstanceCmd implements Command, Serializab addComment(commandContext, task, COMMENT_TYPE_OPERATION_DESC, "已撤回"); return null; } + + @Override + public String getProcessInstanceId() { + if (StringUtils.hasText(processInstanceId)) { + return processInstanceId; + } + return null; + } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomRejectionTaskCmd.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomRejectionTaskCmd.java index b1f559d6c..7b47c6c1d 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomRejectionTaskCmd.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/cmd/CustomRejectionTaskCmd.java @@ -11,6 +11,7 @@ import org.flowable.common.engine.impl.interceptor.CommandContext; import org.flowable.engine.RuntimeService; import org.flowable.engine.TaskService; import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl; +import org.flowable.engine.impl.context.Context; import org.flowable.engine.impl.util.CommandContextUtil; import org.flowable.task.api.Task; import org.flowable.task.api.history.HistoricTaskInstance; @@ -21,6 +22,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC; import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_DELETE_PROCESS_FLAG; @@ -42,7 +44,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask * @author wangli * @since 2024/1/4 13:36 */ -public class CustomRejectionTaskCmd implements Command, Serializable { +public class CustomRejectionTaskCmd extends AbstractCommand implements Command, Serializable { private final String taskId; private final String advice; @@ -99,4 +101,18 @@ public class CustomRejectionTaskCmd implements Command, Serializable { .planOperation(new DeleteProcessInstanceOperation(commandContext, task.getProcessInstanceId(), extAxHiTaskInstService)); } + + @Override + public String getProcessInstanceId() { + ProcessEngineConfigurationImpl processEngineConfiguration = + CommandContextUtil.getProcessEngineConfiguration(Context.getCommandContext()); + HistoricTaskInstanceQuery taskQuery = + processEngineConfiguration.getHistoryService().createHistoricTaskInstanceQuery(); + + HistoricTaskInstance historicTaskInstance = taskQuery.taskId(taskId).singleResult(); + if (Objects.isNull(historicTaskInstance)) { + return null; + } + return historicTaskInstance.getProcessInstanceId(); + } } diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/interceptor/CustomLockProcessInstanceInterceptor.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/interceptor/CustomLockProcessInstanceInterceptor.java new file mode 100644 index 000000000..cb9f25af6 --- /dev/null +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/engine/interceptor/CustomLockProcessInstanceInterceptor.java @@ -0,0 +1,148 @@ +package cn.axzo.workflow.core.engine.interceptor; + +import cn.axzo.workflow.core.engine.cmd.AbstractCommand; +import org.flowable.common.engine.impl.interceptor.AbstractCommandInterceptor; +import org.flowable.common.engine.impl.interceptor.Command; +import org.flowable.common.engine.impl.interceptor.CommandConfig; +import org.flowable.common.engine.impl.interceptor.CommandExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.core.RedisConnectionUtils; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.types.Expiration; + +import java.nio.charset.Charset; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * 增加一个拦截器,用来控制多个操作,不允许同时操作同一个实例 + * + * @author wangli + * @since 2024/7/1 13:51 + */ +public class CustomLockProcessInstanceInterceptor extends AbstractCommandInterceptor { + private String key_prefix = "operation:processInstance:"; + /** + * 解锁脚本,原子操作 + */ + private static final String unlockScript = + "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" + + "then\n" + + " return redis.call(\"del\",KEYS[1])\n" + + "else\n" + + " return 0\n" + + "end"; + private static final Logger log = LoggerFactory.getLogger(CustomLockProcessInstanceInterceptor.class); + private final StringRedisTemplate redisTemplate; + + public CustomLockProcessInstanceInterceptor(StringRedisTemplate redisTemplate) { + this.redisTemplate = redisTemplate; + } + + @Override + public T execute(CommandConfig config, Command command, CommandExecutor commandExecutor) { + if (AbstractCommand.class.isAssignableFrom(command.getClass())) { + AbstractCommand abstractCommand = (AbstractCommand) command; + String token = null; + try { + do { + token = getLock(abstractCommand.getProcessInstanceId(), 10 * 1000, 11 * 1000); + } while (Objects.isNull(token)); + return next.execute(config, command, commandExecutor); + } finally { + if (token != null) { + unlock(abstractCommand.getProcessInstanceId(), token); + } + } + } + return next.execute(config, command, commandExecutor); + } + + /** + * 加锁,有阻塞 + * + * @param name key的值 + * @param expire 过期时间 + * @param timeout 加锁执行超时时间 + * @return + */ + public String getLock(String name, long expire, long timeout) { + long startTime = System.currentTimeMillis(); //获取开始时间 + String token; + //规定的时间内,循环获取有值的token + do { + token = tryGetLock(name, expire); //获取秘钥Key + if (token == null) { + if ((System.currentTimeMillis() - startTime) > (timeout - 50)) + break; + try { + Thread.sleep(50); //try 50毫秒 per sec milliseconds + } catch (InterruptedException e) { + log.warn("getLock error: {}", e.getMessage(), e); + return null; + } + } + } while (token == null); + return token; + } + + /** + * 加锁,无阻塞 + * + * @param name 设置key + * @param expire + * @return + */ + public String tryGetLock(String name, long expire) { + //获取UUID值为value + String token = UUID.randomUUID().toString(); + RedisConnectionFactory factory = redisTemplate.getConnectionFactory(); + RedisConnection conn = factory.getConnection(); + try { + Boolean result = conn.set(name.getBytes(Charset.forName("UTF-8")), //设置name为key + token.getBytes(Charset.forName("UTF-8")), //设置token为value +// Expiration.from(expire, TimeUnit.MILLISECONDS), //设置过期时间:MILLISECONDS毫秒 + Expiration.persistent(), + RedisStringCommands.SetOption.SET_IF_ABSENT); //如果name不存在创建 + if (result != null && result) + return token; + } finally { + RedisConnectionUtils.releaseConnection(conn, factory); + } + return null; + } + + /** + * 功能描述:使用Lua脚本解锁 + * + * @MethodName: unlock + * @MethodParam: [name, token] + * @Return: boolean + * @Author: yyalin + * @CreateDate: 2023/7/17 18:41 + */ + public boolean unlock(String name, String token) { + byte[][] keysAndArgs = new byte[2][]; + keysAndArgs[0] = name.getBytes(Charset.forName("UTF-8")); //lock_key + keysAndArgs[1] = token.getBytes(Charset.forName("UTF-8")); //token的值,也即唯一标识符 + RedisConnectionFactory factory = redisTemplate.getConnectionFactory(); + RedisConnection conn = factory.getConnection(); + try { + Long result = (Long) conn.scriptingCommands().eval(unlockScript.getBytes(Charset.forName("UTF-8")), + ReturnType.INTEGER, 1, keysAndArgs); + if (result != null && result > 0) + return true; + } finally { + RedisConnectionUtils.releaseConnection(conn, factory); + } + return false; + } + + +}