feat(REQ-3115) - 配合Riven 的钉钉消息处理

This commit is contained in:
wangli 2024-10-25 18:48:12 +08:00
parent 5f72e96ee2
commit 521805c2ef
5 changed files with 184 additions and 0 deletions

View File

@ -145,6 +145,11 @@
<artifactId>easy-es-annotation</artifactId> <artifactId>easy-es-annotation</artifactId>
<version>${easy-es.version}</version> <version>${easy-es.version}</version>
</dependency> </dependency>
<dependency>
<groupId>cn.axzo</groupId>
<artifactId>riven-api</artifactId>
<version>${axzo-dependencies.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

View File

@ -138,6 +138,10 @@
<groupId>com.xuxueli</groupId> <groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId> <artifactId>xxl-job-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>cn.axzo</groupId>
<artifactId>riven-api</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,80 @@
package cn.axzo.workflow.server.outside.mq;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
/**
* 钉钉消息 Rocket 配置
*
* @author wangli
* @since 2024-10-25 13:39
*/
@Slf4j
@Configuration(proxyBeanMethods = false)
public class DingtalkRocketConfiguration {
@Value("${spring.profiles.active:dev}")
private String activeProfile;
private static final String DEFAULT_MODULE = "workflowEngine";
private static final String DEFAULT_EVENT = "topic_third_party_sync_event_";
private static final String MODULE_NAME_SUFFIX = "_dingtalk_message";
@Bean
public DingtalkSendProducer dingtalkSendProducer(RocketMQTemplate rocketMQTemplate) {
return new DingtalkSendProducer(rocketMQTemplate,
DEFAULT_MODULE,
DEFAULT_MODULE + MODULE_NAME_SUFFIX,
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(DEFAULT_EVENT + activeProfile)
.build())
.headers(new HashMap<>())
.syncSending(Boolean.TRUE)
.exceptionHandler(context -> {
log.error("MQ, send event error: {}, event: {}",
context.getThrowable().getCause().getMessage(),
context.getEvent().toPrettyJsonString(),
context.getThrowable());
})
.build(),
null
);
}
@Component
@ConditionalOnProperty(name = "rocketmq.name-server")
@RocketMQMessageListener(topic = "topic_third_party_sync_event_${spring.profiles.active}",
consumerGroup = "GID_${spring.application.name}_riven_consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorType = SelectorType.TAG,
selectorExpression = "riven-dingtalk-receive",
maxReconsumeTimes = 0,
nameServer = "${rocketmq.name-server}"
)
public static class ReplyMessageRocketConsumer extends BaseListener implements RocketMQListener<MessageExt> {
@Resource
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
}
}

View File

@ -0,0 +1,56 @@
package cn.axzo.workflow.server.outside.mq.consumer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandler;
import cn.axzo.riven.client.common.enums.DingtalkEventEnum;
import cn.axzo.riven.client.model.DingtalkReceiveMqModel;
import cn.axzo.riven.client.model.DingtalkSendMqModel;
import cn.axzo.riven.client.model.SampleText;
import cn.axzo.workflow.server.outside.mq.producer.DingtalkSendProducer;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 监听钉钉群消息的事件
*
* @author wangli
* @since 2024-10-25 11:16
*/
@Slf4j
@Component
public class DingtalkReceiveListener implements EventHandler, InitializingBean {
@Resource
private EventConsumer eventConsumer;
@Resource
private DingtalkSendProducer dingtalkSendProducer;
@Override
public void onEvent(Event event, EventConsumer.Context context) {
log.info("receive dingding message: {}", event.getTargetId());
DingtalkReceiveMqModel data = event.normalizedData(DingtalkReceiveMqModel.class);
if (log.isDebugEnabled()) {
log.debug("message data: {}", JSON.toJSONString(data));
}
DingtalkSendMqModel<SampleText> sendModel = new DingtalkSendMqModel<>();
sendModel.setTraceId(data.getTraceId());
sendModel.setConversationId(data.getConversationId());
sendModel.setMsgId(data.getMsgId());
sendModel.setRobotCode(data.getRobotCode());
sendModel.setMessage(SampleText.from("由 WorkflowEngine 处理的消息: " + data.getContent()));
dingtalkSendProducer.send(sendModel);
}
@Override
public void afterPropertiesSet() throws Exception {
eventConsumer.registerHandler(DingtalkEventEnum.receive.getEventCode(), this);
}
}

View File

@ -0,0 +1,39 @@
package cn.axzo.workflow.server.outside.mq.producer;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.riven.client.common.enums.DingtalkEventEnum;
import cn.axzo.riven.client.model.DingtalkSendMqModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.function.BiConsumer;
/**
* 回复钉钉消息给 Riven 的事件生产者
*
* @author wangli
* @since 2024-10-25 11:33
*/
@Slf4j
@Component
public class DingtalkSendProducer extends RocketMQEventProducer {
@Value("${spring.application.name}")
private String applicationName;
public DingtalkSendProducer(RocketMQTemplate rocketMQTemplate, String defaultModule, String appName, Context<RocketMQMessageMeta> defaultContext, BiConsumer<Event, Context<RocketMQMessageMeta>> sendCallback) {
super(rocketMQTemplate, defaultModule, appName, defaultContext, sendCallback);
}
public void send(DingtalkSendMqModel model) {
send(Event.builder()
.shardingKey(applicationName)
.eventCode(DingtalkEventEnum.send.getEventCode())
.targetId(model.getTraceId())
.targetType(DingtalkEventEnum.send.getTag())
.data(model)
.build());
}
}