From dbfc0607913c616b9034425a06714ad89b209950 Mon Sep 17 00:00:00 2001 From: wangli Date: Fri, 13 Sep 2024 23:21:23 +0800 Subject: [PATCH] =?UTF-8?q?feat(REQ-2924)=20-=20=E7=A7=BB=E9=99=A4eventpro?= =?UTF-8?q?ducer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../starter/feign/ext/ComplexInvokeClient.java | 16 +++++++--------- .../WorkflowEngineStarterFeignConfiguration.java | 4 ++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java index 2f290bf81..ea262e8de 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java @@ -1,6 +1,5 @@ package cn.axzo.workflow.starter.feign.ext; -import cn.axzo.framework.rocketmq.EventProducer; import cn.axzo.workflow.common.enums.RpcInvokeModeEnum; import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeDTO; import cn.axzo.workflow.starter.WorkflowEngineStarterProperties; @@ -43,6 +42,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -64,11 +64,11 @@ public class ComplexInvokeClient implements Client { private final Logger log = LoggerFactory.getLogger(ComplexInvokeClient.class); private final WorkflowEngineStarterProperties starterProperties; - private final ObjectProvider optEventProducer; + private final ObjectProvider optEventProducer; private final Client feignClient; public ComplexInvokeClient(WorkflowEngineStarterProperties starterProperties, - ObjectProvider optEventProducer, + ObjectProvider optEventProducer, Client feignClient) { this.starterProperties = starterProperties; this.optEventProducer = optEventProducer; //(RpcInvokeEventProducer) eventProducer; @@ -93,13 +93,10 @@ public class ComplexInvokeClient implements Client { * @param options */ private Response asyncInvoke(Request request, Request.Options options) throws IOException { - EventProducer ifAvailable = optEventProducer.getIfAvailable(() -> null); - if (ifAvailable == null) { + Optional opt = Optional.ofNullable(optEventProducer.getIfAvailable()); + if (!opt.isPresent()) { return feignClient.execute(request, options); } -// if (!optEventProducer.isPresent()) { -// return feignClient.execute(request, options); -// } optEventProducer.ifAvailable(eventProducer -> { WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO(); MethodMetadata metadata = request.requestTemplate().methodMetadata(); @@ -110,8 +107,9 @@ public class ComplexInvokeClient implements Client { event.setParameters(args); buildArgs(request, metadata, args); log.debug("[async-invoke] sourceEvent: {}", JSON.toJSONString(event)); - ((RpcInvokeEventProducer) eventProducer).send(WORKFLOW_ENGINE_STARTER, event); + eventProducer.send(WORKFLOW_ENGINE_STARTER, event); }); + Map> headers = request.headers(); headers.forEach((k, v) -> log.debug("ComplexInvokeClient Header: {} = {}", k, v)); return Response.builder() diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java index 4c0aab4f5..d411c6875 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java @@ -1,9 +1,9 @@ package cn.axzo.workflow.starter.feign.ext; -import cn.axzo.framework.rocketmq.EventProducer; import cn.axzo.workflow.common.enums.RpcInvokeModeEnum; import cn.axzo.workflow.common.util.ThreadUtil; import cn.axzo.workflow.starter.WorkflowEngineStarterProperties; +import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer; import cn.azxo.framework.common.constatns.Constants; import feign.Client; import feign.RequestInterceptor; @@ -55,7 +55,7 @@ public class WorkflowEngineStarterFeignConfiguration { @Bean public Client complexInvokeClient(WorkflowEngineStarterProperties starterProperties, - ObjectProvider producerObjectProvider, + ObjectProvider producerObjectProvider, Client feignClient) { return new ComplexInvokeClient(starterProperties, producerObjectProvider, feignClient); }