add - 集成 RocketMQ

This commit is contained in:
wangli 2023-09-04 21:09:37 +08:00
parent def59f9098
commit d16b6fadda
9 changed files with 259 additions and 2 deletions

View File

@ -56,5 +56,12 @@
<version>${flowable.version}</version>
</dependency>
<dependency>
<groupId>cn.axzo.framework.rocketmq</groupId>
<artifactId>axzo-common-rocketmq</artifactId>
<optional>true</optional>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,48 @@
package cn.axzo.workflow.core.common.enums;
import cn.axzo.framework.rocketmq.Event;
/**
* Flowable Event Enum For RocketMQ
*
* @author wangli
* @since 2023/9/4 10:38
*/
public enum FlowableEventModuleEnum {
PROCESS_CREATED("process", "process-created", "流程创建"),
;
private final String module;
private final String tag;
private final String desc;
private final Event.EventCode eventCode;
FlowableEventModuleEnum(String module, String tag, String desc) {
this.module = module;
this.tag = tag;
this.desc = desc;
this.eventCode = Event.EventCode.builder()
.module(module)
.name(tag)
.build();
}
public String getModule() {
return module;
}
public String getTag() {
return tag;
}
public String getDesc() {
return desc;
}
public Event.EventCode getEventCode() {
return eventCode;
}
}

View File

@ -3,6 +3,7 @@ package cn.axzo.workflow.core.conf;
import cn.axzo.workflow.core.service.engine.behavior.CustomActivityBehaviorFactory;
import com.google.common.collect.Lists;
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
import org.flowable.form.spring.SpringFormEngineConfiguration;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.spring.boot.EngineConfigurationConfigurer;
import org.springframework.beans.factory.ObjectProvider;
@ -33,6 +34,11 @@ public class FlowableConfiguration {
};
}
@Bean
public EngineConfigurationConfigurer<SpringFormEngineConfiguration> formEngineConfigurer() {
return configuration -> configuration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_FALSE);
}
@Bean
public CustomActivityBehaviorFactory customActivityBehaviorFactory() {
return new CustomActivityBehaviorFactory();

View File

@ -0,0 +1,82 @@
package cn.axzo.workflow.core.conf;
import cn.axzo.framework.rocketmq.*;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
/**
* RocketMQ 全局配置
*
* @author wangli
* @since 2023/9/4 10:03
*/
@Slf4j
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RocketMQTemplate.class)
public class RocketMqEventConfiguration {
@Value("${spring.application.name}")
private String applicationName;
@Value("${spring.profiles.active:dev}")
private String activeProfile;
private static final String DEFAULT_MODULE = "flowable";
private static final String DEFAULT_EVENT = "flowable-default-event-";
@Bean
EventProducer eventProducer(RocketMQTemplate rocketMQTemplate) {
return new RocketMQEventProducer(rocketMQTemplate,
DEFAULT_MODULE,
applicationName,
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(DEFAULT_EVENT + activeProfile)
.build())
.build(),
null
);
}
@Bean
EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
Consumer<EventConsumer.EventWrapper> callback = (eventWrapper) -> {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
//String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
}
};
return new DefaultEventConsumer(applicationName, eventHandlerRepository, callback);
}
/**
* 这里注释是为了处理嵌入式和微服务兼容情况,由引入方自主配置 topic 监听
*/
// @Configuration(proxyBeanMethods = false)
// @ConditionalOnClass(RocketMQTemplate.class)
// @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
// consumerGroup = "${spring.application.name}-${spring.profiles.active}"
// )
// public static class DefaultListener extends BaseListener implements RocketMQListener<MessageExt> {
//
// @Autowired
// private EventConsumer eventConsumer;
//
// @Override
// public void onMessage(MessageExt message) {
// super.onEvent(message, eventConsumer);
// }
// }
@Bean
EventHandlerRepository eventHandlerRepository() {
return new EventHandlerRepository((ex, logText) ->
log.warn("MQ, handle warning {} , Exception: {}", logText, JSON.toJSONString(ex)));
}
}

View File

@ -0,0 +1,40 @@
## RocketMQ 使用说明
1. 服务方需要主动引入二方包
> ```xml
> <dependency>
> <groupId>cn.axzo.framework.rocketmq</groupId>
> <artifactId>axzo-common-rocketmq</artifactId>
> </dependency>
> ```
2. 服务方主动配置Topic 监听器
> ```java
> @Configuration(proxyBeanMethods = false)
> @ConditionalOnClass(RocketMQTemplate.class)
> @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}",
> consumerGroup = "${spring.application.name}-${spring.profiles.active}"
> )
> public static class DefaultListener extends BaseListener implements RocketMQListener<MessageExt> {
>
> @Autowired
> private EventConsumer eventConsumer;
>
> @Override
> public void onMessage(MessageExt message) {
> super.onEvent(message, eventConsumer);
> }
> }
>```
3. 配置 application.yml
> ```yaml
> rocketmq:
> name-server: 114.116.202.128:9876
> producer:
> group: yoke-dev
> send-message-timeout: 10000
> ```

View File

@ -67,7 +67,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.framework.rocketmq</groupId>
<artifactId>axzo-common-rocketmq</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -3,13 +3,14 @@ package cn.axzo.workflow.server;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@MapperScan({"cn.axzo.workflow.core.**.mapper"})
@ComponentScan({"cn.axzo.workflow"})
@SpringBootApplication
@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
@EnableTransactionManagement
public class WorkflowEnginApplication {

View File

@ -0,0 +1,34 @@
package cn.axzo.workflow.server.mq;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandler;
import cn.axzo.workflow.core.common.enums.FlowableEventModuleEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* TODO
*
* @author wangli
* @since 2023/9/4 10:51
*/
@Slf4j
@Component
public class ProcessCreateListener implements EventHandler, InitializingBean {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onEvent(Event event, EventConsumer.Context context) {
log.info("rocketmq event: {}", event);
}
@Override
public void afterPropertiesSet() throws Exception {
eventConsumer.registerHandler(FlowableEventModuleEnum.PROCESS_CREATED.getEventCode(), this);
}
}

View File

@ -0,0 +1,36 @@
package cn.axzo.workflow.server.mq;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.workflow.core.common.enums.FlowableEventModuleEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* TODO
*
* @author wangli
* @since 2023/9/4 10:31
*/
@RequestMapping("/web/v1/test")
@RestController
public class TestController {
@Autowired
private EventProducer eventProducer;
@GetMapping("/producer")
public void producer() {
eventProducer.send(Event.builder()
.shardingKey(FlowableEventModuleEnum.PROCESS_CREATED.getModule())
.targetId("1")
.targetType(FlowableEventModuleEnum.PROCESS_CREATED.getModule())
.eventCode(FlowableEventModuleEnum.PROCESS_CREATED.getEventCode())
.eventModule(FlowableEventModuleEnum.PROCESS_CREATED.getModule())
.eventName(FlowableEventModuleEnum.PROCESS_CREATED.getTag())
.data("自定义数据")
.build());
}
}