feat:feature-REQ/2129 增加mq,发送消息后发送mq

This commit is contained in:
lilong 2024-03-26 11:14:19 +08:00
parent 19745cde64
commit 980ab93693
14 changed files with 227 additions and 55 deletions

View File

@ -57,8 +57,6 @@ public class AsyncSendMessageParam {
*/ */
private List<SendMessageParam.JumpData> jumpData; private List<SendMessageParam.JumpData> jumpData;
private SendMessageParam.PushData pushData;
/** /**
* 封面图 * 封面图
*/ */

View File

@ -54,8 +54,6 @@ public class SendMessageParam {
*/ */
private List<JumpData> jumpData; private List<JumpData> jumpData;
private PushData pushData;
/** /**
* 封面图 * 封面图
*/ */
@ -82,33 +80,6 @@ public class SendMessageParam {
private String url; private String url;
} }
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class PushData {
/**
* 声音文件
*/
private String voiceFile;
/**
* 提醒方式voice声音vibrate(震动)
*/
private String ability;
/**
* push类型system系统消息op(运营消息)
*/
private String type;
/**
* 声音类型custom自定义system系统
*/
private String voiceType;
}
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum JumpPlatform { public enum JumpPlatform {

View File

@ -57,9 +57,6 @@ public class SendTemplateMessageParam {
*/ */
private String bizId; private String bizId;
private SendMessageParam.PushData pushData;
@Data @Data
@Builder @Builder
@AllArgsConstructor @AllArgsConstructor

View File

@ -1,6 +1,7 @@
package cn.axzo.im; package cn.axzo.im;
import cn.axzo.framework.data.mybatisplus.config.MybatisPlusAutoConfiguration; import cn.axzo.framework.data.mybatisplus.config.MybatisPlusAutoConfiguration;
import cn.axzo.im.config.RocketMQEventConfiguration;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
@ -8,6 +9,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
@Slf4j @Slf4j
@ -15,6 +17,7 @@ import org.springframework.core.env.Environment;
@EnableFeignClients(basePackages = {"cn.axzo"}) @EnableFeignClients(basePackages = {"cn.axzo"})
@MapperScan(value = {"cn.axzo.im.dao.mapper"}) @MapperScan(value = {"cn.axzo.im.dao.mapper"})
@EnableDiscoveryClient @EnableDiscoveryClient
@Import(RocketMQEventConfiguration.class)
public class Application { public class Application {
public static void main(String[] args) { public static void main(String[] args) {

View File

@ -0,0 +1,90 @@
package cn.axzo.im.config;
import cn.axzo.framework.rocketmq.BaseListener;
import cn.axzo.framework.rocketmq.DefaultEventConsumer;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandlerRepository;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
/**
* @Author: liyong.tian
* @Date: 2023/7/25 14:43
* @Description:
*/
@Slf4j
public class RocketMQEventConfiguration {
@Value("${spring.application.name}")
private String appName;
@Value("${topic}")
private String topic;
@Bean
public RocketMQTemplate ser(){
return new RocketMQTemplate();
}
@Bean
EventProducer eventProducer(RocketMQTemplate rocketMQTemplate) {
return new RocketMQEventProducer(rocketMQTemplate,
"im-center",
appName,
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
.topic(topic)
.build())
.build(),
null
);
}
@Bean
EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
Consumer<EventConsumer.EventWrapper> callback = (eventWrapper) -> {
if (eventWrapper.isHandled()) {
// 只收集被App真正消费的消息.
//String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
}
};
return new DefaultEventConsumer(appName, eventHandlerRepository, callback);
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic_im_center_${spring.profiles.active}",
consumerGroup = "GID_topic_im_center_${spring.application.name}_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}"
)
public static class DefaultListener extends BaseListener implements RocketMQListener<MessageExt> {
@Autowired
private EventConsumer eventConsumer;
@Override
public void onMessage(MessageExt message) {
super.onEvent(message, eventConsumer);
}
}
@Bean
EventHandlerRepository eventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {
log.warn("MQ, handle warning {}", logText, ex);
});
}
}

View File

@ -196,7 +196,6 @@ public class MessageController implements MessageApi {
// 全员发送是不常用的场景不应该由业务处理所以把配置放在bizData里面 // 全员发送是不常用的场景不应该由业务处理所以把配置放在bizData里面
.allPerson(sendMessageParam.isAllPerson()) .allPerson(sendMessageParam.isAllPerson())
.appTypes(sendMessageParam.getAppTypes()) .appTypes(sendMessageParam.getAppTypes())
.pushData(sendMessageParam.getPushData())
.build(); .build();
Date now = new Date(); Date now = new Date();
return MessageTask.builder() return MessageTask.builder()
@ -218,7 +217,6 @@ public class MessageController implements MessageApi {
MessageTask.BizData bizData = MessageTask.BizData.builder() MessageTask.BizData bizData = MessageTask.BizData.builder()
.jumpData(sendMessageParam.getJumpData()) .jumpData(sendMessageParam.getJumpData())
.pushData(sendMessageParam.getPushData())
.build(); .build();
Date now = new Date(); Date now = new Date();
return MessageTask.builder() return MessageTask.builder()

View File

@ -150,8 +150,6 @@ public class MessageTask {
* @See cn.axzo.im.center.common.enums.AppTypeEnum * @See cn.axzo.im.center.common.enums.AppTypeEnum
*/ */
private List<AppTypeEnum> appTypes; private List<AppTypeEnum> appTypes;
private SendMessageParam.PushData pushData;
} }
@Data @Data

View File

@ -0,0 +1,32 @@
package com.fiture.rbd.erp.connector.order.event.inner;
import cn.axzo.framework.rocketmq.Event;
import lombok.Getter;
/**
* @Classname EventTypeEnum
* @Date 2021/2/7 6:05 下午
* @Created by lilong
*/
@Getter
public enum EventTypeEnum {
MESSAGE_HISTORY_CREATED("message-history", "message-history-created", "发送记录创建"),
MESSAGE_HISTORY_UPDATED("message-history", "message-history-updated", "发送记录修改")
;
EventTypeEnum(String model, String name, String desc) {
this.eventCode = Event.EventCode.builder()
.module(model)
.name(name)
.build();
this.model = model;
this.name = name;
this.desc = desc;
}
private String model;
private String name;
private String desc;
private Event.EventCode eventCode;
}

View File

@ -0,0 +1,18 @@
package cn.axzo.im.event.payload;
import cn.axzo.im.entity.MessageHistory;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageHistoryCreatedPayload implements Serializable {
private MessageHistory messageHistory;
}

View File

@ -0,0 +1,19 @@
package cn.axzo.im.event.payload;
import cn.axzo.im.entity.MessageHistory;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageHistoryUpdatedPayload implements Serializable {
private MessageHistory newMessageHistory;
private MessageHistory oldMessageHistory;
}

View File

@ -26,6 +26,10 @@ public interface MessageHistoryService extends IService<MessageHistory> {
void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories); void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories);
void createBatch(List<MessageHistory> messageHistories);
void updateBatch(List<MessageHistory> messageHistories);
@SuperBuilder @SuperBuilder
@Data @Data
@NoArgsConstructor @NoArgsConstructor

View File

@ -2,11 +2,15 @@ package cn.axzo.im.service.impl;
import cn.axzo.basics.profiles.api.UserProfileServiceApi; import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto; import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.im.channel.IMChannelProvider; import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest; import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse; import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
import cn.axzo.im.dao.mapper.MessageHistoryMapper; import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.entity.MessageHistory; import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.event.payload.MessageHistoryCreatedPayload;
import cn.axzo.im.event.payload.MessageHistoryUpdatedPayload;
import cn.axzo.im.service.MessageHistoryService; import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.maokai.api.client.OrganizationalUnitApi; import cn.axzo.maokai.api.client.OrganizationalUnitApi;
import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery; import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
@ -28,6 +32,7 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.Collections; import java.util.Collections;
@ -40,6 +45,8 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL; import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_CREATED;
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_UPDATED;
@Slf4j @Slf4j
@Service @Service
@ -54,6 +61,8 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
private RateLimiterClient rateLimiterClient; private RateLimiterClient rateLimiterClient;
@Autowired @Autowired
private IMChannelProvider imChannelProvider; private IMChannelProvider imChannelProvider;
@Autowired
private EventProducer<?> eventProducer;
@Value("${send.message.limiter.permits:1}") @Value("${send.message.limiter.permits:1}")
private int permits; private int permits;
@ -76,6 +85,8 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
*/ */
private static final long DEFAULT_TIME_OUT_MILLIS = 60 * 1000; private static final long DEFAULT_TIME_OUT_MILLIS = 60 * 1000;
private static final String TARGET_TYPE = "messageHistoryId";
@Override @Override
public Page<MessageHistoryDTO> page(PageMessageHistoryParam param) { public Page<MessageHistoryDTO> page(PageMessageHistoryParam param) {
QueryWrapper<MessageHistory> wrapper = QueryWrapperHelper.fromBean(param, MessageHistory.class); QueryWrapper<MessageHistory> wrapper = QueryWrapperHelper.fromBean(param, MessageHistory.class);
@ -105,6 +116,49 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
} }
@Override @Override
@Transactional(rollbackFor = Exception.class)
public void createBatch(List<MessageHistory> messageHistories) {
this.saveBatch(messageHistories);
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
.forEach(messageHistory -> {
eventProducer.send(Event.builder()
.targetId(String.valueOf(messageHistory.getId()))
.targetType(TARGET_TYPE)
.eventCode(MESSAGE_HISTORY_CREATED.getEventCode())
.data(MessageHistoryCreatedPayload.builder()
.messageHistory(messageHistory)
.build())
.build());
});
}
@Override
@Transactional(rollbackFor = Exception.class)
public void updateBatch(List<MessageHistory> messageHistories) {
Map<Long, MessageHistory> oldMessageHistories = this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
.stream()
.collect(Collectors.toMap(MessageHistory::getId, Function.identity()));
this.updateBatchById(messageHistories);
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId)).forEach(messageHistory -> {
eventProducer.send(Event.builder()
.targetId(String.valueOf(messageHistory.getId()))
.targetType(TARGET_TYPE)
.eventCode(MESSAGE_HISTORY_UPDATED.getEventCode())
.data(MessageHistoryUpdatedPayload.builder()
.newMessageHistory(messageHistory)
.oldMessageHistory(oldMessageHistories.get(messageHistory.getId()))
.build())
.build());
});
}
@Override
@Transactional(rollbackFor = Exception.class)
public void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories) { public void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories) {
if (CollectionUtils.isEmpty(messageHistories)) { if (CollectionUtils.isEmpty(messageHistories)) {
@ -147,7 +201,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
return messageHistory; return messageHistory;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
this.updateBatchById(updateMessageHistories); this.updateBatch(updateMessageHistories);
return; return;
} }
@ -158,7 +212,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
.status(MessageHistory.Status.FAILED) .status(MessageHistory.Status.FAILED)
.build()) .build())
.collect(Collectors.toList()); .collect(Collectors.toList());
this.updateBatchById(failedMessageHistories); this.updateBatch(failedMessageHistories);
} }

View File

@ -172,7 +172,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
List<MessageHistory> messageHistories = absentReceivePersons.stream() List<MessageHistory> messageHistories = absentReceivePersons.stream()
.map(receivePerson -> resolveMessageHistory(messageTask, receivePerson, imAccounts, accountRegisters)) .map(receivePerson -> resolveMessageHistory(messageTask, receivePerson, imAccounts, accountRegisters))
.collect(Collectors.toList()); .collect(Collectors.toList());
messageHistoryService.saveBatch(messageHistories); messageHistoryService.createBatch(messageHistories);
} }
private MessageHistory resolveMessageHistory(MessageTask messageTask, MessageTask.ReceivePerson receivePerson, Map<String, AccountRegisterService.AccountRegisterDTO> imAccounts, Map<String, String> accountRegisters) { private MessageHistory resolveMessageHistory(MessageTask messageTask, MessageTask.ReceivePerson receivePerson, Map<String, AccountRegisterService.AccountRegisterDTO> imAccounts, Map<String, String> accountRegisters) {
@ -299,8 +299,6 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId()); defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId());
} else { } else {
JSONObject msgBody = new JSONObject() JSONObject msgBody = new JSONObject()
// 必填字段非模板消息的业务没有这个参数
.fluentPut("isHighlight", false)
.fluentPut("cardTitle", messageTask.getTitle()) .fluentPut("cardTitle", messageTask.getTitle())
.fluentPut("cardContent", messageTask.getContent()) .fluentPut("cardContent", messageTask.getContent())
.fluentPut("cardBannerUrl", messageTask.getCardBannerUrl()); .fluentPut("cardBannerUrl", messageTask.getCardBannerUrl());
@ -326,6 +324,8 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
msgBody.fluentPut("cardDetailButton", new JSONObject() msgBody.fluentPut("cardDetailButton", new JSONObject()
.fluentPut("title", "查看详情") .fluentPut("title", "查看详情")
.fluentPut("action", "JUMP") .fluentPut("action", "JUMP")
// 必填字段非模板消息的业务没有这个参数
.fluentPut("isHighlight", false)
.fluentPut("actionPaths", actionPaths)); .fluentPut("actionPaths", actionPaths));
} }
@ -337,6 +337,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
// 传入app版本号保证消息能被正常打开 // 传入app版本号保证消息能被正常打开
defaultExtMap.putIfAbsent("minAppVersion", "2.1.0"); defaultExtMap.putIfAbsent("minAppVersion", "2.1.0");
messageBody.setMessageExtension(defaultExtMap); messageBody.setMessageExtension(defaultExtMap);
return JSONUtil.toJsonStr(messageBody); return JSONUtil.toJsonStr(messageBody);
} }
} }

View File

@ -1,27 +1,16 @@
package cn.axzo.im.service; package cn.axzo.im.service;
import cn.axzo.im.Application; import cn.axzo.im.Application;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.controller.AccountController; import cn.axzo.im.controller.AccountController;
import cn.axzo.im.entity.AccountRegister;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.TestPropertySources;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MockMvc;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
@TestPropertySource(properties = { @TestPropertySource(properties = {
"NACOS_HOST=https://dev-nacos.axzo.cn", "NACOS_HOST=https://dev-nacos.axzo.cn",
"xxl.job.admin.addresses=http://dev-xxl-job.axzo.cn/xxl-job-admin", "xxl.job.admin.addresses=http://dev-xxl-job.axzo.cn/xxl-job-admin",
@ -45,7 +34,7 @@ class AccountServiceTest {
AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder() AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder()
.build(); .build();
List<AccountRegister> accountRegisters = accountRegisterService.list(listAccountRegisterParam); List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
System.out.println(accountRegisters); System.out.println(accountRegisters);
} }
} }