update(REQ-2516) - 完整测试整个 RPC 同异步模式,且支持通过 Properties 配置调整默认同异步模式。

This commit is contained in:
wangli 2024-05-31 14:25:36 +08:00
parent bfe7819565
commit a347cc45fe
8 changed files with 151 additions and 53 deletions

View File

@ -39,7 +39,7 @@ public enum WorkflowEngineEventEnum {
}
public String getTag() {
return tag;
return eventCode.getName();
}
public String getDesc() {

View File

@ -131,13 +131,20 @@ public class TestController {
}
@GetMapping("/test5")
public CommonResponse<Boolean> test5() {
public CommonResponse<String> test5(@RequestParam(required = false) Boolean sync) {
BpmnProcessInstanceCreateDTO dto = new BpmnProcessInstanceCreateDTO();
dto.setProcessDefinitionKey("1");
dto.setCooperationOrg(new CooperationOrgDTO());
dto.setBusinessKey("businessKey");
dto.setInitiator(new BpmnTaskDelegateAssigner());
workflowCoreService.async().createProcessInstance(dto);
return CommonResponse.success(true);
if (Objects.nonNull(sync)) {
if (sync) {
workflowCoreService.sync();
} else {
workflowCoreService.async();
}
}
CommonResponse<String> processInstance = workflowCoreService.createProcessInstance(dto);
return processInstance;
}
}

View File

@ -28,7 +28,7 @@ import java.util.List;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WorkflowEngineStarterProperties.class)
@EnableFeignClients(clients = {WorkflowCoreService.class})
@Import(RocketMQConfiguration.class)
@Import(WorkflowEngineStarterRocketMQConfiguration.class)
public class WorkflowEngineStarterAutoConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterAutoConfiguration.class);

View File

@ -3,7 +3,7 @@ package cn.axzo.workflow.starter;
import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.SYNC;
import static cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum.ASYNC;
/**
* Workflow Engine Starter Properties
@ -32,7 +32,7 @@ public class WorkflowEngineStarterProperties {
* 如果是同步调用则直接通过普通 FeignClient 进行调用,
* 否则将通过 MQ RPC 调用进行解耦
*/
private RpcInvokeModeEnum invokeMode = SYNC;
private RpcInvokeModeEnum invokeMode = ASYNC;
/**
* 该属性还不确定能否实现

View File

@ -2,6 +2,7 @@ package cn.axzo.workflow.starter;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.DefaultEventConsumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandlerRepository;
import cn.axzo.framework.rocketmq.EventProducer;
@ -17,8 +18,9 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@ -37,8 +39,8 @@ import java.util.function.Consumer;
* @since 2024/5/30 14:05
*/
@Configuration(proxyBeanMethods = false)
public class RocketMQConfiguration {
private final Logger log = LoggerFactory.getLogger(RocketMQConfiguration.class);
public class WorkflowEngineStarterRocketMQConfiguration {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterRocketMQConfiguration.class);
private static final String DEFAULT_MODULE = "workflowEngine";
private static final String DEFAULT_EVENT = "topic_workflow_engine_";
@ -48,6 +50,50 @@ public class RocketMQConfiguration {
@Value("${spring.profiles.active:dev}")
private String activeProfile;
//================================= Workflow Engine Broadcast MQ =================================//
@Bean
@ConditionalOnMissingBean(EventHandlerRepository.class)
public EventHandlerRepository eventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {
log.warn("MQ, handle warning {}", logText, ex);
if (Objects.nonNull(ex)) {
throw new RuntimeException(ex);
}
});
}
@Bean
@ConditionalOnMissingBean(EventProducer.class)
public EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
Consumer<EventConsumer.EventWrapper> callback = eventWrapper -> {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
//String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
}
};
return new DefaultEventConsumer(applicationName, eventHandlerRepository, callback);
}
@Component
@Conditional(NonContainerEnvironmentCondition.class)
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_${MY_POD_NAMESPACE:debugging}_consumer",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}"
)
public class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt> {
@Resource(name = "eventConsumer")
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
}
//======================================= RPC Invoke MQ ========================================//
/**
* 客户端 RPC Retry 事件生产者
*
@ -58,7 +104,7 @@ public class RocketMQConfiguration {
public EventProducer workflowEngineClientEventProducer(RocketMQTemplate rocketMQTemplate) {
return new RocketMQEventProducer(rocketMQTemplate,
DEFAULT_MODULE,
applicationName,
applicationName + "Starter",
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(DEFAULT_EVENT + activeProfile)
@ -79,14 +125,14 @@ public class RocketMQConfiguration {
}
@Bean
public RpcInvokeEventProducer rpcInvokeEventProducer(EventProducer workflowEngineClientEventProducer,
public RpcInvokeEventProducer rpcInvokeEventProducer(@Qualifier("workflowEngineClientEventProducer") EventProducer workflowEngineClientEventProducer,
Environment environment) {
return new RpcInvokeEventProducer(workflowEngineClientEventProducer, environment);
}
@Bean
@ConditionalOnClass(EventHandlerRepository.class)
public EventHandlerRepository eventHandlerRepository() {
@ConditionalOnMissingBean(name = "workflowEngineStarterEventHandlerRepository")
public EventHandlerRepository workflowEngineStarterEventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {
log.warn("MQ, handle warning {}", logText, ex);
if (Objects.nonNull(ex)) {
@ -96,32 +142,16 @@ public class RocketMQConfiguration {
}
@Bean
@ConditionalOnClass(EventConsumer.class)
public EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
@ConditionalOnMissingBean(name = "workflowEngineStarterEventConsumer")
public EventConsumer workflowEngineStarterEventConsumer(@Qualifier("workflowEngineStarterEventHandlerRepository") EventHandlerRepository workflowEngineStarterEventHandlerRepository) {
Consumer<EventConsumer.EventWrapper> callback = eventWrapper -> {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
//String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
Event event = eventWrapper.getEvent();
log.info("MQ, handled event: {}", event.toPrettyJsonString());
}
};
return new DefaultEventConsumer(applicationName, eventHandlerRepository, callback);
}
@Component
@Conditional(NonContainerEnvironmentCondition.class)
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_${MY_POD_NAMESPACE:debugging}_consumer",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}"
)
public class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener<MessageExt> {
@Resource
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
return new DefaultEventConsumer(applicationName + "Starter", workflowEngineStarterEventHandlerRepository, callback);
}
@Component
@ -129,23 +159,24 @@ public class RocketMQConfiguration {
@RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_workflow_engine_starter_${MY_POD_NAMESPACE:debugging}_consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
replyTimeout = 10000,
nameServer = "${rocketmq.name-server}"
)
public class WorkflowEngineClientRetryConsumer extends BaseListener implements RocketMQListener<MessageExt> {
@Resource
private EventConsumer eventConsumer;
@Resource(name = "workflowEngineStarterEventConsumer")
private EventConsumer workflowEngineStarterEventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
super.onEvent(message, workflowEngineStarterEventConsumer);
}
}
@Bean
public WorkflowEngineStarterRetryEventListener workflowEngineClientRetryEventListener(EventConsumer eventConsumer,
public WorkflowEngineStarterRetryEventListener workflowEngineClientRetryEventListener(@Qualifier("workflowEngineStarterEventConsumer") EventConsumer workflowEngineStarterEventConsumer,
Environment environment,
WorkflowCoreService workflowCoreService) {
return new WorkflowEngineStarterRetryEventListener(eventConsumer, environment, workflowCoreService);
return new WorkflowEngineStarterRetryEventListener(workflowEngineStarterEventConsumer, environment, workflowCoreService);
}
}

View File

@ -19,4 +19,8 @@ public class ThreadUtil {
public static RpcInvokeModeEnum get() {
return threadLocal.get();
}
public static void clear() {
threadLocal.remove();
}
}

View File

@ -6,6 +6,8 @@ import cn.axzo.workflow.starter.common.enums.RpcInvokeModeEnum;
import cn.axzo.workflow.starter.common.exception.WorkflowEngineStarterException;
import cn.axzo.workflow.starter.common.util.ThreadUtil;
import cn.axzo.workflow.starter.mq.retry.producer.RpcInvokeEventProducer;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
import feign.Client;
import feign.Request;
import feign.Response;
@ -13,7 +15,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
@ -52,21 +59,43 @@ public class ComplexInvokeClient implements Client {
@Override
public Response execute(Request request, Request.Options options) throws IOException {
log.info("ComplexInvokeClient execute");
log.info("ComplexInvokeClient execute...");
RpcInvokeModeEnum currentInvokeModeEnum = ThreadUtil.get();
if (Objects.isNull(currentInvokeModeEnum)) {
log.info("Without calling the async or sync methods of the WorkflowCoreService, the default configuration in StarterProperties will be loaded.");
currentInvokeModeEnum = getDefaultInvokeMode(request);
}
log.info("final InvokeMode : {}", currentInvokeModeEnum);
if (Objects.equals(SYNC, currentInvokeModeEnum)) {
log.info("[sync] invoke...");
ThreadUtil.clear();
return feignClient.execute(request, options);
}
Map<String, Collection<String>> headers = request.headers();
headers.forEach((k, v) -> log.info("ComplexInvokeClient header: {} = {}", k, v));
// TODO 发送 RPC 调用动作的 MQ 事件
asyncInvoke(request);
return Response.builder()
.status(HttpStatus.OK.value())
.reason(HttpStatus.OK.getReasonPhrase())
.headers(headers)
.request(request)
.body(body)
.build();
}
/**
* 发送 RPC 调用动作的 MQ 事件
*
* @param request
*/
private void asyncInvoke(Request request) {
log.info("[async] invoke...");
WorkflowEngineStarterRpcInvokeDTO event = new WorkflowEngineStarterRpcInvokeDTO();
event.setClassName(request.requestTemplate().feignTarget().type().getName());
event.setMethodName(request.requestTemplate().methodMetadata().method().getName());
@ -78,12 +107,6 @@ public class ComplexInvokeClient implements Client {
event.setOriginUrl(request.url());
event.setBody(new String(request.body(), StandardCharsets.UTF_8));
rpcInvokeEventProducer.send(WORKFLOW_ENGINE_STARTER, event);
return Response.builder()
.status(HttpStatus.OK.value())
.body(new byte[0])
.request(request)
.build();
}
private RpcInvokeModeEnum getDefaultInvokeMode(Request request) {
@ -96,4 +119,34 @@ public class ComplexInvokeClient implements Client {
}
return RpcInvokeModeEnum.valueOf(invokeModel.iterator().next());
}
static Response.Body body = new Response.Body() {
final ByteArrayInputStream inputStream = new ByteArrayInputStream(JSON.toJSONString(CommonResponse.success("Send MQ Success"))
.getBytes(StandardCharsets.UTF_8));
@Override
public Integer length() {
return null;
}
@Override
public boolean isRepeatable() {
return false;
}
@Override
public InputStream asInputStream() throws IOException {
return inputStream;
}
@Override
public Reader asReader(Charset charset) throws IOException {
return new InputStreamReader(asInputStream(), charset);
}
@Override
public void close() throws IOException {
inputStream.close();
}
};
}

View File

@ -29,14 +29,16 @@ import java.util.Objects;
*/
public class WorkflowEngineStarterRetryEventListener implements EventHandler, InitializingBean {
private final Logger log = LoggerFactory.getLogger(WorkflowEngineStarterRetryEventListener.class);
private final EventConsumer eventConsumer;
private final EventConsumer workflowEngineStarterEventConsumer;
private final Environment environment;
private final String currentApplicationName;
private final WorkflowCoreService workflowCoreService;
private final Map<String, Method> methodCache = new HashMap<>();
public WorkflowEngineStarterRetryEventListener(EventConsumer eventConsumer, Environment environment, WorkflowCoreService workflowCoreService) {
this.eventConsumer = eventConsumer;
public WorkflowEngineStarterRetryEventListener(EventConsumer workflowEngineStarterEventConsumer,
Environment environment,
WorkflowCoreService workflowCoreService) {
this.workflowEngineStarterEventConsumer = workflowEngineStarterEventConsumer;
this.environment = environment;
this.currentApplicationName = environment.getProperty("spring.application.name");
this.workflowCoreService = workflowCoreService;
@ -82,15 +84,16 @@ public class WorkflowEngineStarterRetryEventListener implements EventHandler, In
throw new IllegalStateException("Not found method" + dto.getMethodName());
}
try {
workflowCoreService.sync();
Object invoke = method.invoke(workflowCoreService, JSON.parseObject((String) dto.getBody(), BpmnProcessInstanceCreateDTO.class));
log.info("Event Invoke Result: {}", JSON.toJSONString(invoke));
} catch (Exception e) {
log.error("Event Invoke Exception: {}", e.getMessage());
log.error("Event Invoke Exception: {}", e.getMessage(), e);
}
}
@Override
public void afterPropertiesSet() {
eventConsumer.registerHandler(WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER.getEventCode(currentApplicationName), this);
workflowEngineStarterEventConsumer.registerHandler(WorkflowEngineEventEnum.WORKFLOW_ENGINE_STARTER.getEventCode(currentApplicationName), this);
}
}