Compare commits

...

9 Commits

8 changed files with 798 additions and 750 deletions

View File

@ -21,6 +21,7 @@ public enum BpmnFlowNodeType {
NODE_ROBOT("NODE_ROBOT", "机器人节点"),
NODE_COMMENT("NODE_COMMENT", "评论节点"),
NODE_ABORT("NODE_ABORT", "中止节点"),
NODE_ALTER("NODE_ALTER", "告警节点"),
NODE_CANCEL("NODE_CANCEL", "撤回节点"),
NODE_EMPTY("NODE_EMPTY", "空节点"),
;

View File

@ -18,6 +18,8 @@ public class WorkflowEngineStarterRpcInvokeDTO implements Serializable {
private List<String> parameters;
private String parameterTypesMd5;
public String getClassName() {
return className;
}
@ -41,4 +43,12 @@ public class WorkflowEngineStarterRpcInvokeDTO implements Serializable {
public void setParameters(List<String> parameters) {
this.parameters = parameters;
}
public String getParameterTypesMd5() {
return parameterTypesMd5;
}
public void setParameterTypesMd5(String parameterTypesMd5) {
this.parameterTypesMd5 = parameterTypesMd5;
}
}

View File

@ -1,65 +1,42 @@
package cn.axzo.workflow.starter.api;
import cn.axzo.workflow.starter.feign.ext.WorkflowEngineStarterFeignConfiguration;
import cn.axzo.workflow.common.util.ThreadUtil;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.ASYNC;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
import cn.axzo.workflow.client.annotation.WorkflowEngineFeignClient;
import cn.axzo.workflow.common.annotation.Manageable;
import cn.azxo.framework.common.model.CommonResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import cn.axzo.workflow.common.annotation.InvokeMode;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAbortDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCancelDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCarbonCopyDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceLogQueryDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceQueryDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnOptionalNodeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnRobotTaskCompleteDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnRobotTaskCreateDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAttachmentDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskBackAuditDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskCommentDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskCountersignDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskPageSearchDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskRemindDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskTransferDTO;
import cn.axzo.workflow.common.model.response.BpmPageResult;
import cn.axzo.workflow.common.model.response.bpmn.BatchOperationResultVO;
import cn.axzo.workflow.common.model.response.bpmn.task.BpmnHistoricTaskInstanceGroupVO;
import cn.axzo.workflow.common.model.response.bpmn.task.BpmnHistoricTaskInstanceVO;
import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskDonePageItemVO;
import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskInstanceVO;
import cn.axzo.workflow.common.model.response.bpmn.task.BpmnTaskTodoPageItemVO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceLogVO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceVO;
import cn.axzo.workflow.common.util.ThreadUtil;
import cn.axzo.workflow.starter.feign.ext.WorkflowEngineStarterFeignConfiguration;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.Nullable;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import java.util.List;
import java.util.Map;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAbortDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceAdminPageReqVO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCancelDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCarbonCopyDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCheckApproverDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateWithFormDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceLogQueryDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceMyPageReqVO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceQueryDTO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceAdminPageItemVO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceLogVO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstancePageItemVO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceVO;
import cn.axzo.workflow.common.model.response.bpmn.process.ProcessNodeDetailVO;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PutMapping;
import javax.validation.constraints.NotNull;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutCallbackDTO;
import cn.axzo.workflow.common.model.request.bpmn.activity.BpmnActivityTimeoutTriggerDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivitySetAssigneeDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.ASYNC;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
/**
* Workflow Engine Starter Core Service
@ -71,6 +48,135 @@ import cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO;
@org.springframework.cloud.openfeign.FeignClient(name = "workflow-engine-starter-core", url = "${axzo.service.workflow-engine:http://workflow-engine:8080}", configuration = WorkflowEngineStarterFeignConfiguration.class)
public interface WorkflowCoreService {
/**
* 业务节点唤醒, 该节点废弃请换成 {@link ProcessActivityApi#trigger(cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO)} 接口
* <p>
* 当模型中使用了业务节点且设置了不设置审批人模式则当业务监听到 PROCESS_ACTIVITY_START 事件时可通过该接口推动流程继续运行
*/
@Deprecated
@GetMapping("/api/process/activity/trigger")
Boolean trigger(@NotBlank(message = "触发 ID 不能为空") @RequestParam String triggerId);
/**
* 业务节点唤醒
*
* @param dto
* @return
*/
@PostMapping("/api/process/activity/trigger")
Boolean trigger(@Validated @RequestBody BpmnActivityTriggerDTO dto);
/**
* 业务节点设置审批人, 不支持重复设置
* <p>
* 当模型中使用了业务节点且设置了业务指定审批人模式则当业务监听到 PROCESS_ACTIVITY_WAIT_ASSIGNEE 事件时可通过该接口设置动态设置审批人
* <p>
* <strong color=orange>注意如果调用接口时传入的审批人集合为空流程引擎将对该审批流程实例自动中止</strong>
*
* @param dto
* @return
*/
@PostMapping("/api/process/activity/assignee/set")
@Operation(summary = "业务节点设置审批人,不支持重复调用设置审批人,需一次性传入所有审批人")
Boolean setAssignee(@Validated @RequestBody BpmnActivitySetAssigneeDTO dto);
/**
* 创建审批流程
*
* <pre>
* MQ 触发规则:
* 1. 当前流程实例会依次触发 process-instance-created process-instance-started 事件
* 2. 第一个审批任务会依次触发 process-task-assigned process-task-created 事件
* </pre>
*
* @param dto {@link BpmnProcessInstanceCreateDTO}
*/
@Operation(summary = "创建审批流程, MQ 触发规则:1. 当前流程实例会依次触发 process-instance-created 和 process-instance-started 事件,2. 第一个审批任务会依次触发 process-task-assigned 和 process-task-created 事件")
@PostMapping("/api/process/instance/create")
@InvokeMode(SYNC)
String createProcessInstance(@Validated @RequestBody BpmnProcessInstanceCreateDTO dto);
/**
* 发起人主动撤回审核
*
* <pre>
* MQ 触发规则:
* 1. 当前流程实例中现存的任务会依次触发 process-task-deleted 事件
* 2. 当前流程实例会触发 process-instance-cancelled 事件
* </pre>
*
* @param dto {@link BpmnProcessInstanceCancelDTO}
* @return
*/
@Operation(summary = "发起人主动撤回审核,MQ 触发规则:1. 当前流程实例中现存的任务会依次触发 process-task-deleted 事件,2. 当前流程实例会触发 process-instance-cancelled 事件")
@DeleteMapping("/api/process/instance/cancel")
Boolean cancelProcessInstance(@Validated @RequestBody BpmnProcessInstanceCancelDTO dto);
/**
* 中止流程实例
*
* @param dto
* @return
*/
@Operation(summary = "中止流程实例")
@DeleteMapping("/api/process/instance/abort")
Boolean abortProcessInstance(@Validated @RequestBody BpmnProcessInstanceAbortDTO dto);
/**
* 批量中止流程实例
*
* @param dtos
* @return
*/
@Operation(summary = "批量中止流程实例")
@DeleteMapping("/api/process/instance/batch/abort")
BatchOperationResultVO batchAbortProcessInstance(@Validated @RequestBody List<BpmnProcessInstanceAbortDTO> dtos);
/**
* 抄送流程实例未实现
*
* @param dto
* @return
*/
@Operation(summary = "抄送流程实例")
@PostMapping("/api/process/instance/carbon-copy")
@Deprecated
Boolean carbonCopyProcessInstance(@Validated @RequestBody BpmnProcessInstanceCarbonCopyDTO dto);
/**
* 获得流程实例
*
* @param dto {@link BpmnProcessInstanceQueryDTO} 可根据 Id,BusinessKey进行查询
* @return 流程实例, 租户Id不必传
*/
@Operation(summary = "获得流程实例")
@GetMapping("/api/process/instance/get")
@InvokeMode(SYNC)
BpmnProcessInstanceVO getProcessInstanceVO(@Validated @RequestBody BpmnProcessInstanceQueryDTO dto);
/**
* 获取指定流程实例的流程变量
*
* @param processInstanceId
* @param tenantId
* @return
*/
@Operation(summary = "获取指定流程实例的流程变量")
@GetMapping("/api/process/instance/cooperation-org")
@InvokeMode(SYNC)
Map<String, Object> getProcessVariables(@NotBlank(message = "流程实例 ID 不能为空") @RequestParam String processInstanceId, @Nullable @RequestParam(required = false) String tenantId);
/**
* 获取指定流程的日志
*
* @param dto
* @return
*/
@Operation(summary = "获取指定流程的日志")
@PostMapping("/api/process/instance/logs")
@InvokeMode(SYNC)
BpmnProcessInstanceLogVO getProcessInstanceLogs(@Validated @RequestBody BpmnProcessInstanceLogQueryDTO dto);
/**
* 同意
*
@ -196,135 +302,6 @@ public interface WorkflowCoreService {
@PostMapping("/api/process/task/robot/complete")
Boolean completeRobotTask(@Validated @RequestBody BpmnRobotTaskCompleteDTO dto);
/**
* 创建审批流程
*
* <pre>
* MQ 触发规则:
* 1. 当前流程实例会依次触发 process-instance-created process-instance-started 事件
* 2. 第一个审批任务会依次触发 process-task-assigned process-task-created 事件
* </pre>
*
* @param dto {@link BpmnProcessInstanceCreateDTO}
*/
@Operation(summary = "创建审批流程, MQ 触发规则:1. 当前流程实例会依次触发 process-instance-created 和 process-instance-started 事件,2. 第一个审批任务会依次触发 process-task-assigned 和 process-task-created 事件")
@PostMapping("/api/process/instance/create")
@InvokeMode(SYNC)
String createProcessInstance(@Validated @RequestBody BpmnProcessInstanceCreateDTO dto);
/**
* 发起人主动撤回审核
*
* <pre>
* MQ 触发规则:
* 1. 当前流程实例中现存的任务会依次触发 process-task-deleted 事件
* 2. 当前流程实例会触发 process-instance-cancelled 事件
* </pre>
*
* @param dto {@link BpmnProcessInstanceCancelDTO}
* @return
*/
@Operation(summary = "发起人主动撤回审核,MQ 触发规则:1. 当前流程实例中现存的任务会依次触发 process-task-deleted 事件,2. 当前流程实例会触发 process-instance-cancelled 事件")
@DeleteMapping("/api/process/instance/cancel")
Boolean cancelProcessInstance(@Validated @RequestBody BpmnProcessInstanceCancelDTO dto);
/**
* 中止流程实例
*
* @param dto
* @return
*/
@Operation(summary = "中止流程实例")
@DeleteMapping("/api/process/instance/abort")
Boolean abortProcessInstance(@Validated @RequestBody BpmnProcessInstanceAbortDTO dto);
/**
* 批量中止流程实例
*
* @param dtos
* @return
*/
@Operation(summary = "批量中止流程实例")
@DeleteMapping("/api/process/instance/batch/abort")
BatchOperationResultVO batchAbortProcessInstance(@Validated @RequestBody List<BpmnProcessInstanceAbortDTO> dtos);
/**
* 抄送流程实例未实现
*
* @param dto
* @return
*/
@Operation(summary = "抄送流程实例")
@PostMapping("/api/process/instance/carbon-copy")
@Deprecated
Boolean carbonCopyProcessInstance(@Validated @RequestBody BpmnProcessInstanceCarbonCopyDTO dto);
/**
* 获得流程实例
*
* @param dto {@link BpmnProcessInstanceQueryDTO} 可根据 Id,BusinessKey进行查询
* @return 流程实例, 租户Id不必传
*/
@Operation(summary = "获得流程实例")
@GetMapping("/api/process/instance/get")
@InvokeMode(SYNC)
BpmnProcessInstanceVO getProcessInstanceVO(@Validated @RequestBody BpmnProcessInstanceQueryDTO dto);
/**
* 获取指定流程实例的流程变量
*
* @param processInstanceId
* @param tenantId
* @return
*/
@Operation(summary = "获取指定流程实例的流程变量")
@GetMapping("/api/process/instance/cooperation-org")
@InvokeMode(SYNC)
Map<String, Object> getProcessVariables(@NotBlank(message = "流程实例 ID 不能为空") @RequestParam String processInstanceId, @Nullable @RequestParam(required = false) String tenantId);
/**
* 获取指定流程的日志
*
* @param dto
* @return
*/
@Operation(summary = "获取指定流程的日志")
@PostMapping("/api/process/instance/logs")
@InvokeMode(SYNC)
BpmnProcessInstanceLogVO getProcessInstanceLogs(@Validated @RequestBody BpmnProcessInstanceLogQueryDTO dto);
/**
* 业务节点唤醒, 该节点废弃请换成 {@link ProcessActivityApi#trigger(cn.axzo.workflow.common.model.request.bpmn.task.BpmnActivityTriggerDTO)} 接口
* <p>
* 当模型中使用了业务节点且设置了不设置审批人模式则当业务监听到 PROCESS_ACTIVITY_START 事件时可通过该接口推动流程继续运行
*/
@Deprecated
@GetMapping("/api/process/activity/trigger")
Boolean trigger(@NotBlank(message = "触发 ID 不能为空") @RequestParam String triggerId);
/**
* 业务节点唤醒
*
* @param dto
* @return
*/
@PostMapping("/api/process/activity/trigger")
Boolean trigger(@Validated @RequestBody BpmnActivityTriggerDTO dto);
/**
* 业务节点设置审批人, 不支持重复设置
* <p>
* 当模型中使用了业务节点且设置了业务指定审批人模式则当业务监听到 PROCESS_ACTIVITY_WAIT_ASSIGNEE 事件时可通过该接口设置动态设置审批人
* <p>
* <strong color=orange>注意如果调用接口时传入的审批人集合为空流程引擎将对该审批流程实例自动中止</strong>
*
* @param dto
* @return
*/
@PostMapping("/api/process/activity/assignee/set")
@Operation(summary = "业务节点设置审批人,不支持重复调用设置审批人,需一次性传入所有审批人")
Boolean setAssignee(@Validated @RequestBody BpmnActivitySetAssigneeDTO dto);
/**
* 强制使用异步模式调用该方法请在调用真实方法前调用该方法
* <pre>

View File

@ -5,6 +5,7 @@ import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeD
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import cn.axzo.workflow.starter.util.MD5;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
import feign.Client;
@ -18,6 +19,7 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.http.HttpStatus;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@ -36,6 +38,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -46,6 +49,7 @@ import java.util.Objects;
import java.util.Queue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static cn.axzo.workflow.common.constant.StarterConstants.STARTER_INVOKE_MODE;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
@ -102,6 +106,9 @@ public class ComplexInvokeClient implements Client {
event.setClassName(metadata.targetType().getName());
event.setMethodName(metadata.method().getName());
Class<?>[] parameterTypes = metadata.method().getParameterTypes();
event.setParameterTypesMd5(MD5.encrypt(StringUtils.collectionToCommaDelimitedString(Arrays.stream(parameterTypes).map(Class::getName).collect(Collectors.toList()))));
List<String> args = new ArrayList<>();
event.setParameters(args);
buildArgs(request, metadata, args);

View File

@ -11,6 +11,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.GetMapping;
@ -53,6 +54,9 @@ public class WorkflowEngineStarterMQMonitorController {
private String applicationName;
@Value("${spring.profiles.active}")
private String activeProfile;
@Resource
@Qualifier("serviceVersion")
private String serviceVersion;
public static String BROADCAST_CONSUMER_GROUP = "GID_%s_workflow_engine_%s_consumer";
public static String RPC_RETRY_CONSUMER_GROUP = "GID_%s_workflow_engine_starter_%s_consumer";
@ -144,4 +148,13 @@ public class WorkflowEngineStarterMQMonitorController {
return CommonResponse.success("以关闭 DLQ 钉钉通知");
}
}
@GetMapping("/v")
public CommonResponse<String> consoleVersion(@RequestParam(value = "fix", required = false, defaultValue = "false") Boolean showLastFix) {
String str = "当前 workflow-engine version: " + serviceVersion;
if (showLastFix) {
str += "fix: cn.axzo.workflow.starter.mq.retry.consumer.WorkflowEngineStarterRetryEventListener.multiMethodCache";
}
return CommonResponse.success(str);
}
}

View File

@ -10,6 +10,7 @@ import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
import cn.axzo.workflow.starter.api.WorkflowManageService;
import cn.axzo.workflow.starter.common.exception.WorkflowNoMethodException;
import cn.axzo.workflow.starter.util.MD5;
import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import org.slf4j.Logger;
@ -20,6 +21,7 @@ import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
@ -27,10 +29,12 @@ import org.springframework.web.bind.annotation.RequestParam;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
@ -48,7 +52,7 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
private final String currentApplicationName;
private WorkflowCoreService workflowCoreService;
private WorkflowManageService workflowManageService;
private final Map<String, InterfaceMapping> methodCache = new HashMap<>();
private final Map<String, Map<String, InterfaceMapping>> multiMethodCache = new HashMap<>();
class InterfaceMapping {
private final Object interfaceObject;
@ -97,7 +101,10 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
Method[] methods = coreService.getDeclaredMethods();
for (Method method : methods) {
methodCache.put(method.getName(), new InterfaceMapping(workflowCoreService, method));
String parameterTypesMd5 = MD5.encrypt(StringUtils.collectionToCommaDelimitedString(Arrays.stream(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList())));
Map<String, InterfaceMapping> methodCache = multiMethodCache.getOrDefault(method.getName(), new HashMap<>());
methodCache.put(parameterTypesMd5, new InterfaceMapping(workflowCoreService, method));
multiMethodCache.put(method.getName(), methodCache);
}
}
@ -110,7 +117,10 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
Method[] methods = manageService.getDeclaredMethods();
for (Method method : methods) {
methodCache.put(method.getName(), new InterfaceMapping(workflowManageService, method));
String parameterTypesMd5 = MD5.encrypt(StringUtils.collectionToCommaDelimitedString(Arrays.stream(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList())));
Map<String, InterfaceMapping> methodCache = multiMethodCache.getOrDefault(method.getName(), new HashMap<>());
methodCache.put(parameterTypesMd5, new InterfaceMapping(workflowManageService, method));
multiMethodCache.put(method.getName(), methodCache);
}
}
@ -120,7 +130,11 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
log.info("WorkflowEngineClientRetryEventListener onEvent: {}", event.toPrettyJsonString());
WorkflowEngineStarterRpcInvokeDTO dto = event.normalizedData(WorkflowEngineStarterRpcInvokeDTO.class);
InterfaceMapping mapping = methodCache.getOrDefault(dto.getMethodName(), null);
Map<String, InterfaceMapping> methodCache = multiMethodCache.getOrDefault(dto.getMethodName(), new HashMap<>());
if (CollectionUtils.isEmpty(methodCache)) {
throw new WorkflowNoMethodException("Not methodCache found: " + dto.getMethodName());
}
InterfaceMapping mapping = methodCache.getOrDefault(dto.getParameterTypesMd5(), null);
if (Objects.isNull(mapping)) {
throw new WorkflowNoMethodException("Not method found: " + dto.getMethodName());
}
@ -135,6 +149,7 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
if (log.isDebugEnabled()) {
log.debug("Event Invoke Result: {}", JSON.toJSONString(invoke));
}
log.info("WorkflowEngineClientRetryEventListener onEvent end!");
} catch (Throwable e) {
// 能抛出异常目前只有两种情况, 一个是网络异常, 另一个是对端服务内部异常
Throwable cause = getRealCause(e);

View File

@ -0,0 +1,46 @@
package cn.axzo.workflow.starter.util;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* Starter 专用的 MD5小工具
*
* @author wangli
* @since 2025-03-05 16:45
*/
public class MD5 {
public static String encrypt(String data) {
if (data == null) {
return null;
}
try {
// 获取 MD5 算法实例
MessageDigest md = MessageDigest.getInstance("MD5");
// 将输入字符串转换为字节数组并进行加密
byte[] digest = md.digest(data.getBytes(StandardCharsets.UTF_8));
// 将字节数组转换为 BigInteger
BigInteger bigInt = new BigInteger(1, digest);
// BigInteger 转换为十六进制字符串
String hashText = bigInt.toString(16);
// 不足 32 位时前面补 0
while (hashText.length() < 32) {
hashText = "0" + hashText;
}
return hashText;
} catch (NoSuchAlgorithmException e) {
// 处理算法不可用的异常
e.printStackTrace();
return null;
}
}
public static void main(String[] args) {
String encrypt = encrypt("123");
System.out.println("encrypt = " + encrypt);
}
}