diff --git a/pom.xml b/pom.xml index 17d35aa02..11ea072b1 100644 --- a/pom.xml +++ b/pom.xml @@ -145,6 +145,11 @@ easy-es-annotation ${easy-es.version} + + cn.axzo + riven-api + ${axzo-dependencies.version} + diff --git a/workflow-engine-server/pom.xml b/workflow-engine-server/pom.xml index 84ef5f73a..934ed9487 100644 --- a/workflow-engine-server/pom.xml +++ b/workflow-engine-server/pom.xml @@ -138,6 +138,10 @@ com.xuxueli xxl-job-core + + cn.axzo + riven-api + diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/DingtalkRocketConfiguration.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/DingtalkRocketConfiguration.java new file mode 100644 index 000000000..d6b9f917f --- /dev/null +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/DingtalkRocketConfiguration.java @@ -0,0 +1,80 @@ +package cn.axzo.workflow.server.outside.mq; + +import cn.axzo.framework.rocketmq.BaseListener; +import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.framework.rocketmq.EventProducer; +import cn.axzo.framework.rocketmq.RocketMQEventProducer; +import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.HashMap; + +/** + * 钉钉消息 Rocket 配置 + * + * @author wangli + * @since 2024-10-25 13:39 + */ +@Slf4j +@Configuration(proxyBeanMethods = false) +public class DingtalkRocketConfiguration { + @Value("${spring.profiles.active:dev}") + private String activeProfile; + private static final String DEFAULT_MODULE = "workflowEngine"; + private static final String DEFAULT_EVENT = "topic_third_party_sync_event_"; + private static final String MODULE_NAME_SUFFIX = "_dingtalk_message"; + + @Bean + public DingtalkSendProducer dingtalkSendProducer(RocketMQTemplate rocketMQTemplate) { + return new DingtalkSendProducer(rocketMQTemplate, + DEFAULT_MODULE, + DEFAULT_MODULE + MODULE_NAME_SUFFIX, + EventProducer.Context.builder() + .meta(RocketMQEventProducer.RocketMQMessageMeta.builder() + .topic(DEFAULT_EVENT + activeProfile) + .build()) + .headers(new HashMap<>()) + .syncSending(Boolean.TRUE) + .exceptionHandler(context -> { + log.error("MQ, send event error: {}, event: {}", + context.getThrowable().getCause().getMessage(), + context.getEvent().toPrettyJsonString(), + context.getThrowable()); + }) + .build(), + null + ); + } + + @Component + @ConditionalOnProperty(name = "rocketmq.name-server") + @RocketMQMessageListener(topic = "topic_third_party_sync_event_${spring.profiles.active}", + consumerGroup = "GID_${spring.application.name}_riven_consumer", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorType = SelectorType.TAG, + selectorExpression = "riven-dingtalk-receive", + maxReconsumeTimes = 0, + nameServer = "${rocketmq.name-server}" + ) + public static class ReplyMessageRocketConsumer extends BaseListener implements RocketMQListener { + @Resource + private EventConsumer eventConsumer; + + @Override + public void onMessage(MessageExt message) { + super.onEvent(message, eventConsumer); + } + } +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/consumer/DingtalkReceiveListener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/consumer/DingtalkReceiveListener.java new file mode 100644 index 000000000..7a62cb4af --- /dev/null +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/consumer/DingtalkReceiveListener.java @@ -0,0 +1,56 @@ +package cn.axzo.workflow.server.outside.mq.consumer; + +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.EventConsumer; +import cn.axzo.framework.rocketmq.EventHandler; +import cn.axzo.riven.client.common.enums.DingtalkEventEnum; +import cn.axzo.riven.client.model.DingtalkReceiveMqModel; +import cn.axzo.riven.client.model.DingtalkSendMqModel; +import cn.axzo.riven.client.model.SampleText; +import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 监听钉钉群消息的事件 + * + * @author wangli + * @since 2024-10-25 11:16 + */ +@Slf4j +@Component +public class DingtalkReceiveListener implements EventHandler, InitializingBean { + + @Resource + private EventConsumer eventConsumer; + @Resource + private DingtalkSendProducer dingtalkSendProducer; + + @Override + public void onEvent(Event event, EventConsumer.Context context) { + log.info("receive dingding message: {}", event.getTargetId()); + + DingtalkReceiveMqModel data = event.normalizedData(DingtalkReceiveMqModel.class); + if (log.isDebugEnabled()) { + log.debug("message data: {}", JSON.toJSONString(data)); + } + + DingtalkSendMqModel sendModel = new DingtalkSendMqModel<>(); + sendModel.setTraceId(data.getTraceId()); + sendModel.setConversationId(data.getConversationId()); + sendModel.setMsgId(data.getMsgId()); + sendModel.setRobotCode(data.getRobotCode()); + sendModel.setMessage(SampleText.from("由 WorkflowEngine 处理的消息: " + data.getContent())); + + dingtalkSendProducer.send(sendModel); + } + + @Override + public void afterPropertiesSet() throws Exception { + eventConsumer.registerHandler(DingtalkEventEnum.receive.getEventCode(), this); + } +} diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/producer/DingtalkSendProducer.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/producer/DingtalkSendProducer.java new file mode 100644 index 000000000..88831e231 --- /dev/null +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/outside/mq/producer/DingtalkSendProducer.java @@ -0,0 +1,39 @@ +package cn.axzo.workflow.server.outside.mq.producer; + +import cn.axzo.framework.rocketmq.Event; +import cn.axzo.framework.rocketmq.RocketMQEventProducer; +import cn.axzo.riven.client.common.enums.DingtalkEventEnum; +import cn.axzo.riven.client.model.DingtalkSendMqModel; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.function.BiConsumer; + +/** + * 回复钉钉消息给 Riven 的事件生产者 + * + * @author wangli + * @since 2024-10-25 11:33 + */ +@Slf4j +@Component +public class DingtalkSendProducer extends RocketMQEventProducer { + @Value("${spring.application.name}") + private String applicationName; + public DingtalkSendProducer(RocketMQTemplate rocketMQTemplate, String defaultModule, String appName, Context defaultContext, BiConsumer> sendCallback) { + super(rocketMQTemplate, defaultModule, appName, defaultContext, sendCallback); + } + + public void send(DingtalkSendMqModel model) { + send(Event.builder() + .shardingKey(applicationName) + .eventCode(DingtalkEventEnum.send.getEventCode()) + .targetId(model.getTraceId()) + .targetType(DingtalkEventEnum.send.getTag()) + .data(model) + .build()); + } + +}