feat(REQ-2924) - 调整 Starter 中的 Bean 注册逻辑

This commit is contained in:
wangli 2024-09-14 12:15:57 +08:00
parent fc36c31bfd
commit 748e8b1906
9 changed files with 61 additions and 39 deletions

View File

@ -31,6 +31,7 @@ public interface ProcessJobApi {
* @return
*/
@GetMapping("/dead-letter/exception/stacktrace")
@Manageable
String getDeadLetterJobExceptionStacktrace(@RequestParam String procInstId);
/**
@ -39,5 +40,6 @@ public interface ProcessJobApi {
* @return
*/
@GetMapping("/dead-letter/exception/stacktrace/byId")
@Manageable
String getDeadLetterJobExceptionStacktraceByJobId(@RequestParam String jobId);
}

View File

@ -19,12 +19,14 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@ -37,6 +39,7 @@ import java.util.List;
import java.util.function.Consumer;
import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_EVENT;
import static org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME;
/**
* 配置监听流程引擎服务广播消息的 RocketMQ 相关配置
@ -45,7 +48,7 @@ import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.DEFAULT_E
* @since 2024/6/5 17:39
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public class StarterBroadcastMQConfiguration {
private final Logger log = LoggerFactory.getLogger(StarterBroadcastMQConfiguration.class);
public static final String BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME = "broadcastEventHandlerRepository";
@ -57,7 +60,6 @@ public class StarterBroadcastMQConfiguration {
//================================= Workflow Engine Broadcast MQ =================================//
@Bean(BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnMissingBean(name = BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME)
public EventHandlerRepository broadcastEventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {
@ -70,7 +72,6 @@ public class StarterBroadcastMQConfiguration {
}
@Bean(BROADCAST_EVENT_CONSUMER_BEAN_NAME)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnMissingBean(name = BROADCAST_EVENT_CONSUMER_BEAN_NAME)
public EventConsumer broadcastEventConsumer(@Qualifier(BROADCAST_EVENT_HANDLER_REPOSITORY_BEAN_NAME) EventHandlerRepository eventHandlerRepository) {
Consumer<EventConsumer.EventWrapper> callback = eventWrapper -> {
@ -86,7 +87,7 @@ public class StarterBroadcastMQConfiguration {
}
@Component
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnBean(RocketMQTemplate.class)
@Conditional(NonContainerEnvironmentCondition.class)
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_${GID_SEGMENT}_consumer",
@ -128,7 +129,6 @@ public class StarterBroadcastMQConfiguration {
}
@Bean("workflowEngineBroadcastEventListener")
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
public WorkflowEngineBroadcastEventListener workflowEngineBroadcastEventListener(@Qualifier(BROADCAST_EVENT_CONSUMER_BEAN_NAME) EventConsumer broadcastEventConsumer,
WorkflowEngineStarterProperties workflowEngineStarterProperties,
List<InnerWorkflowListener> listenerProvider) {

View File

@ -27,6 +27,7 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@ -42,6 +43,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID;
import static org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME;
/**
* 配置 RPC 动作的 RocketMQ 消息的发送方和消息方等配置信息
@ -50,7 +52,7 @@ import static cn.axzo.framework.rocketmq.RocketMQEventProducer.MQ_MESSAGE_ID;
* @since 2024/5/30 14:05
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public class StarterRPCInvokeMQConfiguration {
private final Logger log = LoggerFactory.getLogger(StarterRPCInvokeMQConfiguration.class);
public static final String WORKFLOW_ENGINE_STARTER_EVENT_PRODUCER_BEAN_NAME = "workflowEngineStarterEventProducer";
@ -74,7 +76,7 @@ public class StarterRPCInvokeMQConfiguration {
* @return
*/
@Bean(WORKFLOW_ENGINE_STARTER_EVENT_PRODUCER_BEAN_NAME)
public EventProducer workflowEngineStarterEventProducer(RocketMQTemplate rocketMQTemplate) {
public RpcInvokeEventProducer workflowEngineStarterEventProducer(RocketMQTemplate rocketMQTemplate) {
return new RpcInvokeEventProducer(rocketMQTemplate,
DEFAULT_MODULE,
applicationName + MODULE_NAME_SUFFIX,
@ -139,7 +141,6 @@ public class StarterRPCInvokeMQConfiguration {
}
@Bean(WORKFLOW_ENGINE_STARTER_EVENT_HANDLER_REPOSITORY_BEAN_NAME)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnMissingBean(name = WORKFLOW_ENGINE_STARTER_EVENT_HANDLER_REPOSITORY_BEAN_NAME)
public EventHandlerRepository workflowEngineStarterEventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {
@ -151,7 +152,6 @@ public class StarterRPCInvokeMQConfiguration {
}
@Bean(WORKFLOW_ENGINE_STARTER_EVENT_CONSUMER_BEAN_NAME)
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnMissingBean(name = WORKFLOW_ENGINE_STARTER_EVENT_CONSUMER_BEAN_NAME)
public EventConsumer workflowEngineStarterEventConsumer(@Qualifier(WORKFLOW_ENGINE_STARTER_EVENT_HANDLER_REPOSITORY_BEAN_NAME) EventHandlerRepository workflowEngineStarterEventHandlerRepository) {
Consumer<EventConsumer.EventWrapper> callback = eventWrapper -> {
@ -165,7 +165,7 @@ public class StarterRPCInvokeMQConfiguration {
}
@Component
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@ConditionalOnBean(RocketMQTemplate.class)
@Conditional(NonContainerEnvironmentCondition.class)
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${GID_SEGMENT}_consumer",
@ -200,7 +200,6 @@ public class StarterRPCInvokeMQConfiguration {
}
@Bean("workflowEngineClientRetryEventListener")
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
public WorkflowEngineStarterRetryEventListener workflowEngineClientRetryEventListener(@Qualifier(WORKFLOW_ENGINE_STARTER_EVENT_CONSUMER_BEAN_NAME) EventConsumer workflowEngineStarterEventConsumer,
Environment environment,
WorkflowEngineStarterProperties starterProperties,

View File

@ -30,10 +30,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
@ -52,7 +54,7 @@ import java.util.List;
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WorkflowEngineStarterProperties.class)
@Import({StarterFeignClientConfiguration.class, MetaFeignClientEnableSelector.class})
@Import({StarterFeignClientConfiguration.class, MetaFeignClientEnableSelector.class, StarterRPCInvokeMQConfiguration.class, StarterBroadcastMQConfiguration.class})
public class WorkflowEngineStarterAutoConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterAutoConfiguration.class);
@ -116,6 +118,7 @@ public class WorkflowEngineStarterAutoConfiguration {
}
@Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt")
@ConditionalOnBean(RocketMQTemplate.class)
@ConditionalOnProperty(prefix = "workflow.engine.starter", value = "enable-dlq-monitor", havingValue = "true", matchIfMissing = true)
public DefaultMQAdminExt defaultMQAdminExt(Environment environment) {
String namesrvAddress = environment.getProperty("rocketmq.name-server");
@ -140,6 +143,7 @@ public class WorkflowEngineStarterAutoConfiguration {
}
@Bean
@ConditionalOnBean(RocketMQTemplate.class)
@ConditionalOnProperty(prefix = "workflow.engine.starter", value = "enable-dlq-monitor", havingValue = "true", matchIfMissing = true)
public WorkflowEngineStarterDefaultMQMonitor workflowEngineStarterDefaultMQMonitor(ObjectProvider<DefaultMQAdminExt> mqAdminExtObjectProvider,
ObjectProvider<BroadcastDLQReporter> broadcastDLQProcessorObjectProvider,

View File

@ -66,7 +66,7 @@ import javax.validation.constraints.NotEmpty;
* <p>
* Auto generation by workflow engine, It cannot be manually modified
*/
@org.springframework.cloud.openfeign.FeignClient(name = "workflow-engine-starter-core", url = "${axzo.service.workflow-engine.starter:http://workflow-engine:8080}", configuration = WorkflowEngineStarterFeignConfiguration.class)
@org.springframework.cloud.openfeign.FeignClient(name = "workflow-engine-starter-core", url = "${axzo.service.workflow-engine:http://workflow-engine:8080}", configuration = WorkflowEngineStarterFeignConfiguration.class)
public interface WorkflowCoreService {
/**
@ -198,6 +198,22 @@ public interface WorkflowCoreService {
@InvokeMode(SYNC)
BpmnProcessInstanceLogVO getProcessInstanceLogs(@Validated @RequestBody BpmnProcessInstanceLogQueryDTO dto);
/**
* 查询死信消息数据
* @param procInstId 流程实例id
* @return
*/
@GetMapping("/dead-letter/exception/stacktrace")
String getDeadLetterJobExceptionStacktrace(@RequestParam String procInstId);
/**
* 查询死信消息数据
* @param jobId 死信job的id
* @return
*/
@GetMapping("/dead-letter/exception/stacktrace/byId")
String getDeadLetterJobExceptionStacktraceByJobId(@RequestParam String jobId);
/**
* 同意
*

View File

@ -86,7 +86,7 @@ import cn.axzo.workflow.common.model.response.bpmn.process.BpmnProcessDefinition
* <p>
* Auto generation by workflow engine, It cannot be manually modified
*/
@org.springframework.cloud.openfeign.FeignClient(name = "workflow-engine-starter-manage", url = "${axzo.service.workflow-engine.starter:http://workflow-engine:8080}", configuration = WorkflowEngineStarterFeignConfiguration.class)
@org.springframework.cloud.openfeign.FeignClient(name = "workflow-engine-starter-manage", url = "${axzo.service.workflow-engine:http://workflow-engine:8080}", configuration = WorkflowEngineStarterFeignConfiguration.class)
public interface WorkflowManageService {
/**

View File

@ -1,5 +1,6 @@
package cn.axzo.workflow.starter.feign.ext;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.workflow.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.common.model.response.mq.WorkflowEngineStarterRpcInvokeDTO;
import cn.axzo.workflow.starter.WorkflowEngineStarterProperties;
@ -14,7 +15,7 @@ import feign.Response;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.http.HttpStatus;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.DeleteMapping;
@ -42,7 +43,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -50,6 +50,7 @@ import java.util.regex.Pattern;
import static cn.axzo.workflow.common.constant.StarterConstants.STARTER_INVOKE_MODE;
import static cn.axzo.workflow.common.enums.RpcInvokeModeEnum.SYNC;
import static cn.axzo.workflow.common.enums.WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER;
import static cn.axzo.workflow.starter.StarterRPCInvokeMQConfiguration.WORKFLOW_ENGINE_STARTER_EVENT_PRODUCER_BEAN_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
@ -64,14 +65,14 @@ public class ComplexInvokeClient implements Client {
private final Logger log = LoggerFactory.getLogger(ComplexInvokeClient.class);
private final WorkflowEngineStarterProperties starterProperties;
private final ObjectProvider<RpcInvokeEventProducer> optEventProducer;
private final BeanFactory beanFactory;
private final Client feignClient;
public ComplexInvokeClient(WorkflowEngineStarterProperties starterProperties,
ObjectProvider<RpcInvokeEventProducer> optEventProducer,
BeanFactory beanFactory,
Client feignClient) {
this.starterProperties = starterProperties;
this.optEventProducer = optEventProducer; //(RpcInvokeEventProducer) eventProducer;
this.beanFactory = beanFactory; //(RpcInvokeEventProducer) eventProducer;
this.feignClient = feignClient;
}
@ -93,11 +94,9 @@ public class ComplexInvokeClient implements Client {
* @param options
*/
private Response asyncInvoke(Request request, Request.Options options) throws IOException {
Optional<RpcInvokeEventProducer> opt = Optional.ofNullable(optEventProducer.getIfAvailable());
if (!opt.isPresent()) {
return feignClient.execute(request, options);
}
optEventProducer.ifAvailable(eventProducer -> {
RpcInvokeEventProducer producer = null;
try {
producer = beanFactory.getBean(WORKFLOW_ENGINE_STARTER_EVENT_PRODUCER_BEAN_NAME, RpcInvokeEventProducer.class);
WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO();
MethodMetadata metadata = request.requestTemplate().methodMetadata();
event.setClassName(metadata.targetType().getName());
@ -107,18 +106,19 @@ public class ComplexInvokeClient implements Client {
event.setParameters(args);
buildArgs(request, metadata, args);
log.debug("[async-invoke] sourceEvent: {}", JSON.toJSONString(event));
eventProducer.send(WORKFLOW_ENGINE_STARTER, event);
});
Map<String, Collection<String>> headers = request.headers();
headers.forEach((k, v) -> log.debug("ComplexInvokeClient Header: {} = {}", k, v));
return Response.builder()
.status(HttpStatus.OK.value())
.reason(HttpStatus.OK.getReasonPhrase())
.headers(headers)
.request(request)
.body(body)
.build();
producer.send(WORKFLOW_ENGINE_STARTER, event);
Map<String, Collection<String>> headers = request.headers();
headers.forEach((k, v) -> log.debug("ComplexInvokeClient Header: {} = {}", k, v));
return Response.builder()
.status(HttpStatus.OK.value())
.reason(HttpStatus.OK.getReasonPhrase())
.headers(headers)
.request(request)
.body(body)
.build();
} catch (Exception e) {
return feignClient.execute(request, options);
}
}
@SneakyThrows

View File

@ -14,6 +14,7 @@ import feign.codec.Decoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
@ -55,9 +56,9 @@ public class WorkflowEngineStarterFeignConfiguration {
@Bean
public Client complexInvokeClient(WorkflowEngineStarterProperties starterProperties,
ObjectProvider<RpcInvokeEventProducer> producerObjectProvider,
BeanFactory beanFactory,
Client feignClient) {
return new ComplexInvokeClient(starterProperties, producerObjectProvider, feignClient);
return new ComplexInvokeClient(starterProperties, beanFactory, feignClient);
}
@Bean

View File

@ -70,7 +70,7 @@ class WorkflowEngineStarterInvocationHandler implements InvocationHandler {
private void parseInvokeModelAnnotation(Method method) {
InvokeMode annotation = AnnotationUtils.getAnnotation(method, InvokeMode.class);
if (Objects.nonNull(annotation)) {
if (Objects.nonNull(annotation) && Objects.isNull(ThreadUtil.get())) {
ThreadUtil.set(annotation.value());
}
}