feat(REQ-2924) - 新增 Starter 支持不配置 Rocket 使用,所有的异步行为都将自动修正为同步

This commit is contained in:
wangli 2024-09-10 16:01:41 +08:00
parent 3d213a9cab
commit 26fc14cce9
6 changed files with 50 additions and 37 deletions

View File

@ -26,6 +26,7 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@ -44,6 +45,7 @@ import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_E
* @since 2024/6/5 17:39
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
public class StarterBroadcastMQConfiguration {
private final Logger log = LoggerFactory.getLogger(StarterBroadcastMQConfiguration.class);
public static final String BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME = "broadcastEventHandlerRepository";
@ -84,10 +86,10 @@ public class StarterBroadcastMQConfiguration {
@Component
@Conditional(NonContainerEnvironmentCondition.class)
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_${GID_SEGMENT}_consumer",
consumeMode = ConsumeMode.ORDERLY,
maxReconsumeTimes = 0,
nameServer = "${rocketmq.name-server}"
consumerGroup = "GID_${spring.application.name}_workflow_engine_${GID_SEGMENT}_consumer",
consumeMode = ConsumeMode.ORDERLY,
maxReconsumeTimes = 0,
nameServer = "${rocketmq.name-server}"
)
public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt>, InitializingBean {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineBroadcastConsumer.class);
@ -116,9 +118,9 @@ public class StarterBroadcastMQConfiguration {
@Override
public void afterPropertiesSet() {
this.filters = ImmutableList.of(
new InnerFilterMQOwnerShip(starterProperties, applicationName),
new InnerFilterDefinitionKey(starterProperties),
new InnerFilterExtension(businessFilterProvider));
new InnerFilterMQOwnerShip(starterProperties, applicationName),
new InnerFilterDefinitionKey(starterProperties),
new InnerFilterExtension(businessFilterProvider));
}
}

View File

@ -14,6 +14,7 @@ import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.openfeign.FeignClientFactoryBean;
@ -54,12 +55,14 @@ import java.util.Map;
public class StarterFeignClientConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(FeignClientFactoryBean.class)
@EnableFeignClients(clients = WorkflowCoreService.class)
public static class WorkflowCoreServiceClient {
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = "workflow.engine.starter", value = "manageable", havingValue = "true")
@ConditionalOnClass(FeignClientFactoryBean.class)
@EnableFeignClients(clients = WorkflowManageService.class)
public static class WorkflowManageServiceClient {
}

View File

@ -28,6 +28,7 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@ -49,6 +50,7 @@ import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID;
* @since 2024/5/30 14:05
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
public class StarterRPCInvokeMQConfiguration {
private final Logger log = LoggerFactory.getLogger(StarterRPCInvokeMQConfiguration.class);
public static final String WORKFLOW_ENGINE_STARTER_EVENT_PRODUCER_BEAN_NAME = "workflowEngineStarterEventProducer";

View File

@ -52,7 +52,7 @@ import java.util.List;
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WorkflowEngineStarterProperties.class)
@Import({StarterFeignClientConfiguration.class, StarterBroadcastMQConfiguration.class, StarterRPCInvokeMQConfiguration.class, MetaFeignClientEnableSelector.class})
@Import({StarterFeignClientConfiguration.class, MetaFeignClientEnableSelector.class})
public class WorkflowEngineStarterAutoConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterAutoConfiguration.class);

View File

@ -42,13 +42,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static cn.axzo.workflow.common.constant.StarterConstants.STARTER_INVOKE_MODE;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
import static cn.axzo.workflow.common.enums.WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER;
import static cn.axzo.workflow.common.constant.StarterConstants.STARTER_INVOKE_MODE;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
@ -63,14 +64,14 @@ public class ComplexInvokeClient implements Client {
private final Logger log = LoggerFactory.getLogger(ComplexInvokeClient.class);
private final WorkflowEngineStarterProperties starterProperties;
private final RpcInvokeEventProducer eventProducer;
private final Optional<EventProducer> optEventProducer;
private final Client feignClient;
public ComplexInvokeClient(WorkflowEngineStarterProperties starterProperties,
EventProducer eventProducer,
Optional<EventProducer> optEventProducer,
Client feignClient) {
this.starterProperties = starterProperties;
this.eventProducer = (RpcInvokeEventProducer) eventProducer;
this.optEventProducer = optEventProducer; //(RpcInvokeEventProducer) eventProducer;
this.feignClient = feignClient;
}
@ -78,40 +79,44 @@ public class ComplexInvokeClient implements Client {
public Response execute(Request request, Request.Options options) throws IOException {
log.debug("ComplexInvokeClient execute... Url: {}", request.url());
RpcInvokeModeEnum currentInvokeModeEnum = getInvokeMode(request);
Map<String, Collection<String>> headers = request.headers();
headers.forEach((k, v) -> log.debug("ComplexInvokeClient Header: {} = {}", k, v));
log.debug("[{}] invoke url: {}", currentInvokeModeEnum, request.url());
if (Objects.equals(SYNC, currentInvokeModeEnum)) {
return feignClient.execute(request, options);
}
asyncInvoke(request);
return Response.builder()
.status(HttpStatus.OK.value())
.reason(HttpStatus.OK.getReasonPhrase())
.headers(headers)
.request(request)
.body(body)
.build();
return asyncInvoke(request, options);
}
/**
* 发送 RPC 调用动作的 MQ 事件
*
* @param request
* @param options
*/
private void asyncInvoke(Request request) {
WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO();
MethodMetadata metadata = request.requestTemplate().methodMetadata();
event.setClassName(metadata.targetType().getName());
event.setMethodName(metadata.method().getName());
private Response asyncInvoke(Request request, Request.Options options) throws IOException {
if (!optEventProducer.isPresent()) {
return feignClient.execute(request, options);
}
optEventProducer.ifPresent(eventProducer -> {
WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO();
MethodMetadata metadata = request.requestTemplate().methodMetadata();
event.setClassName(metadata.targetType().getName());
event.setMethodName(metadata.method().getName());
List<String> args = new ArrayList<>();
event.setParameters(args);
buildArgs(request, metadata, args);
log.debug("[async-invoke] sourceEvent: {}", JSON.toJSONString(event));
eventProducer.send(WORKFLOW_ENGINE_STARTER, event);
List<String> args = new ArrayList<>();
event.setParameters(args);
buildArgs(request, metadata, args);
log.debug("[async-invoke] sourceEvent: {}", JSON.toJSONString(event));
((RpcInvokeEventProducer) eventProducer).send(WORKFLOW_ENGINE_STARTER, event);
});
Map<String, Collection<String>> headers = request.headers();
headers.forEach((k, v) -> log.debug("ComplexInvokeClient Header: {} = {}", k, v));
return Response.builder()
.status(HttpStatus.OK.value())
.reason(HttpStatus.OK.getReasonPhrase())
.headers(headers)
.request(request)
.body(body)
.build();
}
@SneakyThrows
@ -204,7 +209,7 @@ public class ComplexInvokeClient implements Client {
private RpcInvokeModeEnum getInvokeMode(Request request) {
Collection<String> invokeModel = request.headers().getOrDefault(STARTER_INVOKE_MODE,
Collections.singletonList(starterProperties.getInvokeMode().name()));
Collections.singletonList(starterProperties.getInvokeMode().name()));
if (CollectionUtils.isEmpty(invokeModel)) {
return starterProperties.getInvokeMode();
} else if (invokeModel.size() > 1) {
@ -215,7 +220,7 @@ public class ComplexInvokeClient implements Client {
static Response.Body body = new Response.Body() {
final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success(HttpStatus.OK.value(), "Send MQ Success", null))
.getBytes(UTF_8));
.getBytes(UTF_8));
@Override
public Integer length() {

View File

@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletRequest;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
//import static cn.axzo.framework.web.filter.BasicRecordExceptionFilter.MICRO_SERVER_RECORD_ERROR_GET_PARAM_NAME;
import static cn.axzo.workflow.client.config.WorkflowRequestInterceptor.HEADER_API_VERSION;
@ -58,7 +59,7 @@ public class WorkflowEngineStarterFeignConfiguration {
public Client complexInvokeClient(WorkflowEngineStarterProperties starterProperties,
@Qualifier("workflowEngineStarterEventProducer") EventProducer eventProducer,
Client feignClient) {
return new ComplexInvokeClient(starterProperties, eventProducer, feignClient);
return new ComplexInvokeClient(starterProperties, Optional.ofNullable(eventProducer), feignClient);
}
@Bean