From 26fc14cce92dc3a4fd299322c0bf4a80dde6b3da Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Tue, 10 Sep 2024 16:01:41 +0800 Subject: [PATCH] =?UTF-8?q?feat(REQ-2924)=20-=20=E6=96=B0=E5=A2=9E=20Start?= =?UTF-8?q?er=20=E6=94=AF=E6=8C=81=E4=B8=8D=E9=85=8D=E7=BD=AE=20Rocket=20?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=EF=BC=8C=E6=89=80=E6=9C=89=E7=9A=84=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E8=A1=8C=E4=B8=BA=E9=83=BD=E5=B0=86=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E4=B8=BA=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../StarterBroadcastMQConfiguration.java | 16 ++--- .../StarterFeignClientConfiguration.java | 3 + .../StarterRPCInvokeMQConfiguration.java | 2 + ...orkflowEngineStarterAutoConfiguration.java | 2 +- .../feign/ext/ComplexInvokeClient.java | 61 ++++++++++--------- ...rkflowEngineStarterFeignConfiguration.java | 3 +- 6 files changed, 50 insertions(+), 37 deletions(-) diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java index 9ac804265..7a1848191 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterBroadcastMQConfiguration.java @@ -26,6 +26,7 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -44,6 +45,7 @@ import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_E * @since 2024/6/5 17:39 */ @Configuration(proxyBeanMethods = false) +@ConditionalOnProperty(prefix = "rocketmq", value = "name-server") public class StarterBroadcastMQConfiguration { private final Logger log = LoggerFactory.getLogger(StarterBroadcastMQConfiguration.class); public static final String BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME = "broadcastEventHandlerRepository"; @@ -84,10 +86,10 @@ public class StarterBroadcastMQConfiguration { @Component @Conditional(NonContainerEnvironmentCondition.class) @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}", - consumerGroup = "GID_${spring.application.name}_workflow_engine_${GID_SEGMENT}_consumer", - consumeMode = ConsumeMode.ORDERLY, - maxReconsumeTimes = 0, - nameServer = "${rocketmq.name-server}" + consumerGroup = "GID_${spring.application.name}_workflow_engine_${GID_SEGMENT}_consumer", + consumeMode = ConsumeMode.ORDERLY, + maxReconsumeTimes = 0, + nameServer = "${rocketmq.name-server}" ) public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener, InitializingBean { private final Logger log = LoggerFactory.getLogger(WorkflowEngineBroadcastConsumer.class); @@ -116,9 +118,9 @@ public class StarterBroadcastMQConfiguration { @Override public void afterPropertiesSet() { this.filters = ImmutableList.of( - new InnerFilterMQOwnerShip(starterProperties, applicationName), - new InnerFilterDefinitionKey(starterProperties), - new InnerFilterExtension(businessFilterProvider)); + new InnerFilterMQOwnerShip(starterProperties, applicationName), + new InnerFilterDefinitionKey(starterProperties), + new InnerFilterExtension(businessFilterProvider)); } } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterFeignClientConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterFeignClientConfiguration.java index 91d071282..663778004 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterFeignClientConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterFeignClientConfiguration.java @@ -14,6 +14,7 @@ import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.FeignClientFactoryBean; @@ -54,12 +55,14 @@ import java.util.Map; public class StarterFeignClientConfiguration { @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(FeignClientFactoryBean.class) @EnableFeignClients(clients = WorkflowCoreService.class) public static class WorkflowCoreServiceClient { } @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(prefix = "workflow.engine.starter", value = "manageable", havingValue = "true") + @ConditionalOnClass(FeignClientFactoryBean.class) @EnableFeignClients(clients = WorkflowManageService.class) public static class WorkflowManageServiceClient { } diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java index aa47f3f3d..5b9b39094 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/StarterRPCInvokeMQConfiguration.java @@ -28,6 +28,7 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -49,6 +50,7 @@ import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID; * @since 2024/5/30 14:05 */ @Configuration(proxyBeanMethods = false) +@ConditionalOnProperty(prefix = "rocketmq", value = "name-server") public class StarterRPCInvokeMQConfiguration { private final Logger log = LoggerFactory.getLogger(StarterRPCInvokeMQConfiguration.class); public static final String WORKFLOW_ENGINE_STARTER_EVENT_PRODUCER_BEAN_NAME = "workflowEngineStarterEventProducer"; diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java index 466f464d9..01a19b309 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/WorkflowEngineStarterAutoConfiguration.java @@ -52,7 +52,7 @@ import java.util.List; */ @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(WorkflowEngineStarterProperties.class) -@Import({StarterFeignClientConfiguration.class, StarterBroadcastMQConfiguration.class, StarterRPCInvokeMQConfiguration.class, MetaFeignClientEnableSelector.class}) +@Import({StarterFeignClientConfiguration.class, MetaFeignClientEnableSelector.class}) public class WorkflowEngineStarterAutoConfiguration { private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterAutoConfiguration.class); diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java index 017237e0e..2e3f733a6 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/ComplexInvokeClient.java @@ -42,13 +42,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static cn.axzo.workflow.common.constant.StarterConstants.STARTER_INVOKE_MODE; import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC; import static cn.axzo.workflow.common.enums.WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER; -import static cn.axzo.workflow.common.constant.StarterConstants.STARTER_INVOKE_MODE; import static java.nio.charset.StandardCharsets.UTF_8; /** @@ -63,14 +64,14 @@ public class ComplexInvokeClient implements Client { private final Logger log = LoggerFactory.getLogger(ComplexInvokeClient.class); private final WorkflowEngineStarterProperties starterProperties; - private final RpcInvokeEventProducer eventProducer; + private final Optional optEventProducer; private final Client feignClient; public ComplexInvokeClient(WorkflowEngineStarterProperties starterProperties, - EventProducer eventProducer, + Optional optEventProducer, Client feignClient) { this.starterProperties = starterProperties; - this.eventProducer = (RpcInvokeEventProducer) eventProducer; + this.optEventProducer = optEventProducer; //(RpcInvokeEventProducer) eventProducer; this.feignClient = feignClient; } @@ -78,40 +79,44 @@ public class ComplexInvokeClient implements Client { public Response execute(Request request, Request.Options options) throws IOException { log.debug("ComplexInvokeClient execute... Url: {}", request.url()); RpcInvokeModeEnum currentInvokeModeEnum = getInvokeMode(request); - Map> headers = request.headers(); - headers.forEach((k, v) -> log.debug("ComplexInvokeClient Header: {} = {}", k, v)); - log.debug("[{}] invoke url: {}", currentInvokeModeEnum, request.url()); if (Objects.equals(SYNC, currentInvokeModeEnum)) { return feignClient.execute(request, options); } - asyncInvoke(request); - - return Response.builder() - .status(HttpStatus.OK.value()) - .reason(HttpStatus.OK.getReasonPhrase()) - .headers(headers) - .request(request) - .body(body) - .build(); + return asyncInvoke(request, options); } /** * 发送 RPC 调用动作的 MQ 事件 * * @param request + * @param options */ - private void asyncInvoke(Request request) { - WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO(); - MethodMetadata metadata = request.requestTemplate().methodMetadata(); - event.setClassName(metadata.targetType().getName()); - event.setMethodName(metadata.method().getName()); + private Response asyncInvoke(Request request, Request.Options options) throws IOException { + if (!optEventProducer.isPresent()) { + return feignClient.execute(request, options); + } + optEventProducer.ifPresent(eventProducer -> { + WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO(); + MethodMetadata metadata = request.requestTemplate().methodMetadata(); + event.setClassName(metadata.targetType().getName()); + event.setMethodName(metadata.method().getName()); - List args = new ArrayList<>(); - event.setParameters(args); - buildArgs(request, metadata, args); - log.debug("[async-invoke] sourceEvent: {}", JSON.toJSONString(event)); - eventProducer.send(WORKFLOW_ENGINE_STARTER, event); + List args = new ArrayList<>(); + event.setParameters(args); + buildArgs(request, metadata, args); + log.debug("[async-invoke] sourceEvent: {}", JSON.toJSONString(event)); + ((RpcInvokeEventProducer) eventProducer).send(WORKFLOW_ENGINE_STARTER, event); + }); + Map> headers = request.headers(); + headers.forEach((k, v) -> log.debug("ComplexInvokeClient Header: {} = {}", k, v)); + return Response.builder() + .status(HttpStatus.OK.value()) + .reason(HttpStatus.OK.getReasonPhrase()) + .headers(headers) + .request(request) + .body(body) + .build(); } @SneakyThrows @@ -204,7 +209,7 @@ public class ComplexInvokeClient implements Client { private RpcInvokeModeEnum getInvokeMode(Request request) { Collection invokeModel = request.headers().getOrDefault(STARTER_INVOKE_MODE, - Collections.singletonList(starterProperties.getInvokeMode().name())); + Collections.singletonList(starterProperties.getInvokeMode().name())); if (CollectionUtils.isEmpty(invokeModel)) { return starterProperties.getInvokeMode(); } else if (invokeModel.size() > 1) { @@ -215,7 +220,7 @@ public class ComplexInvokeClient implements Client { static Response.Body body = new Response.Body() { final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success(HttpStatus.OK.value(), "Send MQ Success", null)) - .getBytes(UTF_8)); + .getBytes(UTF_8)); @Override public Integer length() { diff --git a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java index 19a406cb3..5d610f981 100644 --- a/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java +++ b/workflow-engine-spring-boot-starter/src/main/java/cn/axzo/workflow/starter/feign/ext/WorkflowEngineStarterFeignConfiguration.java @@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletRequest; import java.util.Collection; import java.util.Collections; import java.util.Objects; +import java.util.Optional; //import static cn.axzo.framework.web.filter.BasicRecordExceptionFilter.MICRO_SERVER_RECORD_ERROR_GET_PARAM_NAME; import static cn.axzo.workflow.client.config.WorkflowRequestInterceptor.HEADER_API_VERSION; @@ -58,7 +59,7 @@ public class WorkflowEngineStarterFeignConfiguration { public Client complexInvokeClient(WorkflowEngineStarterProperties starterProperties, @Qualifier("workflowEngineStarterEventProducer") EventProducer eventProducer, Client feignClient) { - return new ComplexInvokeClient(starterProperties, eventProducer, feignClient); + return new ComplexInvokeClient(starterProperties, Optional.ofNullable(eventProducer), feignClient); } @Bean