hotfix/improve-batch-operation - 限制同时操作同一个实例数据

This commit is contained in:
wangli 2024-07-01 15:16:33 +08:00
parent 2bebe80382
commit e23dd75c3c
7 changed files with 220 additions and 5 deletions

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf;
import cn.axzo.workflow.core.engine.behavior.CustomActivityBehaviorFactory;
import cn.axzo.workflow.core.engine.cmd.CustomCommandContextFactory;
import cn.axzo.workflow.core.engine.id.BasedNacosSnowflakeIdGenerator;
import cn.axzo.workflow.core.engine.interceptor.CustomLockProcessInstanceInterceptor;
import cn.axzo.workflow.core.engine.job.AsyncAbortProcessInstanceHandler;
import cn.axzo.workflow.core.engine.job.AsyncApproveTaskJobHandler;
import cn.axzo.workflow.core.engine.job.AsyncBpmnProcessActivityJobHandler;
@ -30,6 +31,8 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.time.Duration;
import java.util.List;
@ -56,7 +59,8 @@ public class FlowableConfiguration {
BpmnProcessActivityService bpmnProcessActivityService,
List<JobProcessor> jobProcessors,
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
NacosDiscoveryProperties nacosDiscoveryProperties,
StringRedisTemplate redisTemplate) {
return configuration -> {
configuration.setEnableHistoricTaskLogging(true);
configuration.setHistoryLevel(HistoryLevel.AUDIT);
@ -92,6 +96,9 @@ public class FlowableConfiguration {
new CustomWorkflowEngineExceptionHandler(),
new CustomAsyncRunnableExceptionExceptionHandler()));
configuration.setCommandContextFactory(new CustomCommandContextFactory());
configuration.setCustomPreCommandInterceptors(Lists.newArrayList(
new CustomLockProcessInstanceInterceptor(redisTemplate)
));
};
}

View File

@ -0,0 +1,12 @@
package cn.axzo.workflow.core.engine.cmd;
/**
* TODO
*
* @author wangli
* @since 2024/7/1 13:59
*/
public abstract class AbstractCommand {
public abstract String getProcessInstanceId();
}

View File

@ -44,7 +44,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.createVir
* @author wangli
* @since 2024/1/2 18:19
*/
public class CustomAbortProcessInstanceCmd implements Command<Void>, Serializable {
public class CustomAbortProcessInstanceCmd extends AbstractCommand implements Command<Void>, Serializable {
private final String processInstanceId;
private final String tenantId;
private final String reason;
@ -102,4 +102,12 @@ public class CustomAbortProcessInstanceCmd implements Command<Void>, Serializabl
runtimeService.setVariable(task.getProcessInstanceId(), TASK_COMPLETE_OPERATION_TYPE + task.getId(), ABORTED);
return null;
}
@Override
public String getProcessInstanceId() {
if (StringUtils.hasText(processInstanceId)) {
return processInstanceId;
}
return null;
}
}

View File

@ -10,6 +10,7 @@ import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.context.Context;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.api.Task;
import org.flowable.task.api.history.HistoricTaskInstance;
@ -38,7 +39,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask
* @author wangli
* @since 2024/1/4 15:50
*/
public class CustomApproveTaskCmd implements Command<Void>, Serializable {
public class CustomApproveTaskCmd extends AbstractCommand implements Command<Void>, Serializable {
private static final Logger log = LoggerFactory.getLogger(CustomApproveTaskCmd.class);
private final String taskId;
@ -146,4 +147,17 @@ public class CustomApproveTaskCmd implements Command<Void>, Serializable {
}
}
@Override
public String getProcessInstanceId() {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(Context.getCommandContext());
HistoricTaskInstanceQuery taskQuery =
processEngineConfiguration.getHistoryService().createHistoricTaskInstanceQuery();
HistoricTaskInstance historicTaskInstance = taskQuery.taskId(taskId).singleResult();
if (Objects.isNull(historicTaskInstance)) {
return null;
}
return historicTaskInstance.getProcessInstanceId();
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.workflow.core.engine.cmd;
import cn.axzo.framework.context.validation.SpringValidator;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskDelegateAssigner;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.engine.operation.DeleteProcessInstanceOperation;
@ -12,6 +13,7 @@ import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.api.Task;
import org.springframework.util.StringUtils;
import java.io.Serializable;
import java.util.HashMap;
@ -41,7 +43,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.createVir
* @author wangli
* @since 2024/1/2 18:19
*/
public class CustomCancelProcessInstanceCmd implements Command<Void>, Serializable {
public class CustomCancelProcessInstanceCmd extends AbstractCommand implements Command<Void>, Serializable {
private final String processInstanceId;
private final String tenantId;
private final String reason;
@ -102,4 +104,12 @@ public class CustomCancelProcessInstanceCmd implements Command<Void>, Serializab
addComment(commandContext, task, COMMENT_TYPE_OPERATION_DESC, "已撤回");
return null;
}
@Override
public String getProcessInstanceId() {
if (StringUtils.hasText(processInstanceId)) {
return processInstanceId;
}
return null;
}
}

View File

@ -11,6 +11,7 @@ import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.context.Context;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.task.api.Task;
import org.flowable.task.api.history.HistoricTaskInstance;
@ -21,6 +22,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static cn.axzo.workflow.common.constant.BpmnConstants.COMMENT_TYPE_OPERATION_DESC;
import static cn.axzo.workflow.common.constant.BpmnConstants.INTERNAL_DELETE_PROCESS_FLAG;
@ -42,7 +44,7 @@ import static cn.axzo.workflow.core.engine.cmd.helper.CustomTaskHelper.validTask
* @author wangli
* @since 2024/1/4 13:36
*/
public class CustomRejectionTaskCmd implements Command<Void>, Serializable {
public class CustomRejectionTaskCmd extends AbstractCommand implements Command<Void>, Serializable {
private final String taskId;
private final String advice;
@ -99,4 +101,18 @@ public class CustomRejectionTaskCmd implements Command<Void>, Serializable {
.planOperation(new DeleteProcessInstanceOperation(commandContext, task.getProcessInstanceId(),
extAxHiTaskInstService));
}
@Override
public String getProcessInstanceId() {
ProcessEngineConfigurationImpl processEngineConfiguration =
CommandContextUtil.getProcessEngineConfiguration(Context.getCommandContext());
HistoricTaskInstanceQuery taskQuery =
processEngineConfiguration.getHistoryService().createHistoricTaskInstanceQuery();
HistoricTaskInstance historicTaskInstance = taskQuery.taskId(taskId).singleResult();
if (Objects.isNull(historicTaskInstance)) {
return null;
}
return historicTaskInstance.getProcessInstanceId();
}
}

View File

@ -0,0 +1,148 @@
package cn.axzo.workflow.core.engine.interceptor;
import cn.axzo.workflow.core.engine.cmd.AbstractCommand;
import org.flowable.common.engine.impl.interceptor.AbstractCommandInterceptor;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandConfig;
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 增加一个拦截器,用来控制多个操作,不允许同时操作同一个实例
*
* @author wangli
* @since 2024/7/1 13:51
*/
public class CustomLockProcessInstanceInterceptor extends AbstractCommandInterceptor {
private String key_prefix = "operation:processInstance:";
/**
* 解锁脚本原子操作
*/
private static final String unlockScript =
"if redis.call(\"get\",KEYS[1]) == ARGV[1]\n"
+ "then\n"
+ " return redis.call(\"del\",KEYS[1])\n"
+ "else\n"
+ " return 0\n"
+ "end";
private static final Logger log = LoggerFactory.getLogger(CustomLockProcessInstanceInterceptor.class);
private final StringRedisTemplate redisTemplate;
public CustomLockProcessInstanceInterceptor(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public <T> T execute(CommandConfig config, Command<T> command, CommandExecutor commandExecutor) {
if (AbstractCommand.class.isAssignableFrom(command.getClass())) {
AbstractCommand abstractCommand = (AbstractCommand) command;
String token = null;
try {
do {
token = getLock(abstractCommand.getProcessInstanceId(), 10 * 1000, 11 * 1000);
} while (Objects.isNull(token));
return next.execute(config, command, commandExecutor);
} finally {
if (token != null) {
unlock(abstractCommand.getProcessInstanceId(), token);
}
}
}
return next.execute(config, command, commandExecutor);
}
/**
* 加锁有阻塞
*
* @param name key的值
* @param expire 过期时间
* @param timeout 加锁执行超时时间
* @return
*/
public String getLock(String name, long expire, long timeout) {
long startTime = System.currentTimeMillis(); //获取开始时间
String token;
//规定的时间内循环获取有值的token
do {
token = tryGetLock(name, expire); //获取秘钥Key
if (token == null) {
if ((System.currentTimeMillis() - startTime) > (timeout - 50))
break;
try {
Thread.sleep(50); //try 50毫秒 per sec milliseconds
} catch (InterruptedException e) {
log.warn("getLock error: {}", e.getMessage(), e);
return null;
}
}
} while (token == null);
return token;
}
/**
* 加锁无阻塞
*
* @param name 设置key
* @param expire
* @return
*/
public String tryGetLock(String name, long expire) {
//获取UUID值为value
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
Boolean result = conn.set(name.getBytes(Charset.forName("UTF-8")), //设置name为key
token.getBytes(Charset.forName("UTF-8")), //设置token为value
// Expiration.from(expire, TimeUnit.MILLISECONDS), //设置过期时间MILLISECONDS毫秒
Expiration.persistent(),
RedisStringCommands.SetOption.SET_IF_ABSENT); //如果name不存在创建
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
}
return null;
}
/**
* 功能描述:使用Lua脚本解锁
*
* @MethodName: unlock
* @MethodParam: [name, token]
* @Return: boolean
* @Author: yyalin
* @CreateDate: 2023/7/17 18:41
*/
public boolean unlock(String name, String token) {
byte[][] keysAndArgs = new byte[2][];
keysAndArgs[0] = name.getBytes(Charset.forName("UTF-8")); //lock_key
keysAndArgs[1] = token.getBytes(Charset.forName("UTF-8")); //token的值也即唯一标识符
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
Long result = (Long) conn.scriptingCommands().eval(unlockScript.getBytes(Charset.forName("UTF-8")),
ReturnType.INTEGER, 1, keysAndArgs);
if (result != null && result > 0)
return true;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
}
return false;
}
}