update(REQ-2516) - 针对 RPC 调用的同步异步,进行功能优化

This commit is contained in:
wangli 2024-06-01 22:59:50 +08:00
parent c471d787f6
commit 30b151ca9a
10 changed files with 82 additions and 23 deletions

View File

@ -0,0 +1,28 @@
package cn.axzo.workflow.common.annotation;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.ASYNC;
/**
* 用于标记接口的方法调用模式
*
* @author wangli
* @since 2024/6/1 19:01
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface InvokeMode {
/**
* 调用模式, 默认异步
*/
RpcInvokeModeEnum value() default ASYNC;
}

View File

@ -1,7 +1,7 @@
package cn.axzo.workflow.starter.common.enums;
package cn.axzo.workflow.common.enums;
/**
* TODO
* PRC 调用模式枚举
*
* @author wangli
* @since 2024/5/29 10:27

View File

@ -1,6 +1,7 @@
package cn.axzo.workflow.starter.common.util;
package cn.axzo.workflow.common.util;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
/**
* TODO

View File

@ -1,11 +1,13 @@
package cn.axzo.workflow.starter;
import cn.axzo.workflow.common.annotation.InvokeMode;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
import cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.ASYNC;
import static cn.axzo.workflow.starter.common.enums.FailHandleTypeEnum.FAIL_OVER;
import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.ASYNC;
/**
* Workflow Engine Starter Properties
@ -21,20 +23,25 @@ public class WorkflowEngineStarterProperties {
private Boolean manageable = false;
/**
* 本地启动是否消费容器的广播事件
* <h3>该参数只针对<strong color=orange></strong>容器环境生效</h3>
* 本地启动时,是否将本地的 MQ 消费者加入集群消费组
* <pre>
* 默认 false, 本地启动应用时, 将创建消息组名称中含有"debugging"的消费组.
* 否则, 本地启动应用时, 将与容器环境中所有实例共同消费广播事件.
* 否则, 本地启动应用时, 消费者将加入容器环境, 进行集群消费.
* </pre>
*/
private Boolean localConsumer = false;
private Boolean joinContainerGroup = false;
/**
* WorkflowCoreService 类中所有方法调用时默认采用的模式
* WorkflowCoreService 类中所有方法未标记{@link InvokeMode}注解的方法调用时, 默认采用的模式
*
* <pre>
* 如果是同步调用则直接通过普通 FeignClient 进行调用,
* 否则将通过 MQ RPC 调用进行解耦
* </pre>
* <p>
* 如果方法上有{@link InvokeMode}注解, 则以注解上的模式优先, 如果还想覆盖注解中的模式,
* 则可以通过 {@link WorkflowCoreService#sync()}{@link WorkflowCoreService#async()}方法进行覆盖
*/
private RpcInvokeModeEnum invokeMode = ASYNC;
@ -77,12 +84,12 @@ public class WorkflowEngineStarterProperties {
this.manageable = manageable;
}
public Boolean getLocalConsumer() {
return localConsumer;
public Boolean getJoinContainerGroup() {
return joinContainerGroup;
}
public void setLocalConsumer(Boolean localConsumer) {
this.localConsumer = localConsumer;
public void setJoinContainerGroup(Boolean joinContainerGroup) {
this.joinContainerGroup = joinContainerGroup;
}
public RpcInvokeModeEnum getInvokeMode() {

View File

@ -1,10 +1,11 @@
package cn.axzo.workflow.starter.api;
import cn.axzo.workflow.common.annotation.InvokeMode;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceCreateDTO;
import cn.axzo.workflow.common.model.request.bpmn.process.BpmnProcessInstanceQueryDTO;
import cn.axzo.workflow.common.model.request.bpmn.task.BpmnTaskAuditDTO;
import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessInstanceVO;
import cn.axzo.workflow.starter.common.util.ThreadUtil;
import cn.axzo.workflow.common.util.ThreadUtil;
import cn.axzo.workflow.starter.feign.ext.WorkflowEngineStarterFeignConfiguration;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.cloud.openfeign.FeignClient;
@ -13,8 +14,9 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.ASYNC;
import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.SYNC;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.ASYNC;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
/**
* 模拟生成的受限访问的接口定义
@ -28,6 +30,7 @@ import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.SYNC;
public interface WorkflowCoreService {
@PostMapping("/api/process/instance/create")
@InvokeMode(SYNC)
String createProcessInstance(@Validated @RequestBody BpmnProcessInstanceCreateDTO dto);
@Operation(summary = "同意MQ 触发规则:1. 当前审批任务会依次触发 process-task-completed 和 process-task-deleted 事件(如果有下一级审批,则会触发第 2.1 点中的事件,如果当前审核任务最后一级审批,则会触发第 2.2 点中的事件)2.1. 下一级审批任务会依次触发 process-task-assigned 和 process-task-created 事件2.2. 流程实例正常结束会触发 process-instance-completed 事件")

View File

@ -8,4 +8,7 @@ package cn.axzo.workflow.starter.common.constant;
*/
public interface StarterConstants {
String STARTER_INVOKE_MODE = "WORKFLOW-ENGINE-STARTER-INVOKE-MODE";
String DEBUGGING_MQ_SUFFIX = "_debugging";
String K8S_POD_NAME_SPACE = "MY_POD_NAMESPACE";
String MQ_GID_NAME_SEGMENT = "GID_SEGMENT";
}

View File

@ -1,10 +1,10 @@
package cn.axzo.workflow.starter.feign.ext;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeDTO;
import cn.axzo.workflow.common.util.ThreadUtil;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException;
import cn.axzo.workflow.starter.common.util.ThreadUtil;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
@ -31,9 +31,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
import static cn.axzo.workflow.common.enums.WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER;
import static cn.axzo.workflow.starter.common.constant.StarterConstants.STARTER_INVOKE_MODE;
import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.SYNC;
/**
* 适用于 Starter 中复合型的 FeignClient 实现

View File

@ -1,8 +1,8 @@
package cn.axzo.workflow.starter.feign.ext;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.common.util.ThreadUtil;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.starter.common.util.ThreadUtil;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import feign.Client;
import feign.RequestInterceptor;

View File

@ -1,12 +1,16 @@
package cn.axzo.workflow.starter.feign.ext;
import cn.axzo.workflow.common.annotation.InvokeMode;
import cn.axzo.workflow.common.util.ThreadUtil;
import feign.InvocationHandlerFactory;
import feign.Target;
import org.springframework.core.annotation.AnnotationUtils;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.Objects;
/**
* Workflow Engine Starter Core Service Invocation Handler
@ -40,9 +44,18 @@ class WorkflowEngineStarterInvocationHandler implements InvocationHandler {
return toString();
}
parseInvokeModelAnnotation(method);
return dispatch.get(method).invoke(args);
}
private static void parseInvokeModelAnnotation(Method method) {
InvokeMode annotation = AnnotationUtils.getAnnotation(method, InvokeMode.class);
if (Objects.nonNull(annotation)) {
ThreadUtil.set(annotation.value());
}
}
@Override
public boolean equals(Object obj) {
if (obj instanceof WorkflowEngineStarterInvocationHandler) {

View File

@ -5,6 +5,7 @@ import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandler;
import cn.axzo.workflow.common.enums.WorkflowEngineEventEnum;
import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeDTO;
import cn.axzo.workflow.common.util.ThreadUtil;
import cn.axzo.workflow.starter.api.WorkflowCoreService;
import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
@ -21,6 +22,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
/**
* RPC 动作事件的 MQ 消费者集成业务系统中的自产自销
*
@ -84,7 +87,8 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
throw new IllegalStateException("Not found method" + dto.getMethodName());
}
try {
workflowCoreService.sync();
// 事件处理 RPC 请求, 强制使用同步模式
ThreadUtil.set(SYNC);
// TODO 入参需要根据类型进行转换
Class<?> parameterType = method.getParameterTypes()[0];
Object invoke = method.invoke(workflowCoreService, JSON.parseObject((String) dto.getBody(), parameterType));