update - REQ-2364-增加deadletter-job重试的xxl-job

This commit is contained in:
yangqicheng 2024-05-20 18:22:47 +08:00
parent e087697f15
commit 165dd2e525
6 changed files with 98 additions and 4 deletions

View File

@ -14,7 +14,8 @@ import lombok.Getter;
@AllArgsConstructor
public enum AsyncJobRespCode implements IModuleRespCode {
DATA_NOT_EXISTS("001", "ExtTaskInst 数据不存在, instId: {}, taskId: {}"),
JOB_NOT_EXISTS("002","【{}】对应任务不存在")
JOB_NOT_EXISTS_JOB_ID("002", "id为【{}】对应任务不存在"),
JOB_NOT_EXISTS_PROC_INST_ID("003", "流程实例id为【{}】对应任务不存在")
;
private String code;
private String message;

View File

@ -11,6 +11,7 @@ public enum BpmnVariablesRespCode implements IModuleRespCode {
PROCESS_VARIABLE_EXIST("002", "变量【{}】已存在流程实例【{}】中"),
PROCESS_VARIABLE_SCOPE_ERROR("003", "变量【{}】的作用域为【{}】不支持"),
PROCESS_VARIABLE_NAME_MUST_BE_NOT_NULL("004", "变量的名称不能为空"),
PROCESS_VARIABLE_VARS_NOT_NULL("005", "流程变量列表参数不能为空"),
;

View File

@ -22,6 +22,14 @@ public interface BpmnProcessVariableService {
*/
void updateVariable(String executionId, RestBpmnProcessVariable restVariable);
/**
* 批量更新流程变量如果流程变量不存在会抛出异常
*
* @param executionId 流程实例id
* @param restVariables 需要更新的流程列表
*/
void updateVariables(String executionId, List<RestBpmnProcessVariable> restVariables);
/**
* 删除流程变量
*

View File

@ -12,7 +12,8 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import static cn.axzo.workflow.core.common.code.AsyncJobRespCode.JOB_NOT_EXISTS;
import static cn.axzo.workflow.core.common.code.AsyncJobRespCode.JOB_NOT_EXISTS_JOB_ID;
import static cn.axzo.workflow.core.common.code.AsyncJobRespCode.JOB_NOT_EXISTS_PROC_INST_ID;
@Service
@Slf4j
@ -41,7 +42,7 @@ public class BpmnProcessJobServiceImp implements BpmnProcessJobService {
protected Job getDeadLetterJobById(String jobId) {
Job job = managementService.createDeadLetterJobQuery().jobId(jobId).singleResult();
if (job == null) {
throw new WorkflowEngineException(JOB_NOT_EXISTS, "id为 '" + jobId + "'.");
throw new WorkflowEngineException(JOB_NOT_EXISTS_JOB_ID, jobId);
}
return job;
}
@ -49,7 +50,7 @@ public class BpmnProcessJobServiceImp implements BpmnProcessJobService {
protected List<Job> getDeadLetterProcInstId(String procInstId) {
List<Job> jobs = managementService.createDeadLetterJobQuery().processInstanceId(procInstId).list();
if (CollectionUtils.isEmpty(jobs)) {
throw new WorkflowEngineException(JOB_NOT_EXISTS, "流程实例id为 '" + procInstId + "'.");
throw new WorkflowEngineException(JOB_NOT_EXISTS_PROC_INST_ID, procInstId);
}
return jobs;
}

View File

@ -4,6 +4,7 @@ import cn.axzo.workflow.common.model.request.bpmn.RestBpmnProcessVariable;
import cn.axzo.workflow.core.common.exception.WorkflowEngineException;
import cn.axzo.workflow.core.service.BpmnProcessVariableService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.runtime.Execution;
import org.springframework.stereotype.Service;
@ -16,6 +17,7 @@ import static cn.axzo.workflow.core.common.code.BpmnVariablesRespCode.PROCESS_VA
import static cn.axzo.workflow.core.common.code.BpmnVariablesRespCode.PROCESS_VARIABLE_NAME_MUST_BE_NOT_NULL;
import static cn.axzo.workflow.core.common.code.BpmnVariablesRespCode.PROCESS_VARIABLE_NOT_EXIST;
import static cn.axzo.workflow.core.common.code.BpmnVariablesRespCode.PROCESS_VARIABLE_SCOPE_ERROR;
import static cn.axzo.workflow.core.common.code.BpmnVariablesRespCode.PROCESS_VARIABLE_VARS_NOT_NULL;
@Service
@Slf4j
@ -36,6 +38,17 @@ public class BpmnProcessVariableServiceImp implements BpmnProcessVariableService
setSimpleVariable(restVariable, execution, false);
}
@Override
public void updateVariables(String executionId, List<RestBpmnProcessVariable> restVariables) {
if (CollectionUtils.isEmpty(restVariables)) {
throw new WorkflowEngineException(PROCESS_VARIABLE_VARS_NOT_NULL);
}
Execution execution = getExecutionFromRequest(executionId);
for (RestBpmnProcessVariable restVariable : restVariables) {
setSimpleVariable(restVariable, execution, false);
}
}
@Override
public void deleteVariables(String executionId, List<String> variableNames, String scope) {
Execution execution = getExecutionFromRequest(executionId);

View File

@ -0,0 +1,70 @@
package cn.axzo.workflow.server.xxljob;
import cn.axzo.workflow.common.model.request.bpmn.RestBpmnProcessVariable;
import cn.axzo.workflow.core.service.BpmnProcessJobService;
import cn.axzo.workflow.core.service.BpmnProcessVariableService;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.StringUtils;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static com.xxl.job.core.biz.model.ReturnT.FAIL_CODE;
/**
* deadletter-job重试
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class AsyncJobRetryHandler extends IJobHandler {
@Resource
private BpmnProcessVariableService variableService;
@Resource
private BpmnProcessJobService jobService;
@Override
@XxlJob("asyncJobRetry")
public ReturnT<String> execute(String param) {
if (StringUtils.isBlank(param)) {
return new ReturnT<>(FAIL_CODE, "参数不能为空");
}
try {
AsyncJobRetryDto bean = JSONUtil.toBean(param, AsyncJobRetryDto.class);
if (bean == null) {
throw new NullPointerException("反序列化实体为空");
}
if (StringUtils.isBlank(bean.getProcessInstanceId())) {
throw new IllegalArgumentException("流程实例id不能为空");
}
//设置变量
if (bean.getVariables() != null && !bean.getVariables().isEmpty()) {
variableService.updateVariables(bean.getProcessInstanceId(), bean.variables);
}
//进行重试
jobService.executeDeadLetterJobActionByProcInstId(bean.getProcessInstanceId());
return ReturnT.SUCCESS;
} catch (Exception e) {
return new ReturnT<>(FAIL_CODE, e.getMessage());
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static final class AsyncJobRetryDto {
private String processInstanceId;
private List<RestBpmnProcessVariable> variables;
}
}