feat:feature-REQ/2129 增加mq,发送消息后发送mq
This commit is contained in:
parent
be52b41716
commit
2949e9ca42
@ -57,8 +57,6 @@ public class AsyncSendMessageParam {
|
||||
*/
|
||||
private List<SendMessageParam.JumpData> jumpData;
|
||||
|
||||
private SendMessageParam.PushData pushData;
|
||||
|
||||
/**
|
||||
* 封面图
|
||||
*/
|
||||
|
||||
@ -54,8 +54,6 @@ public class SendMessageParam {
|
||||
*/
|
||||
private List<JumpData> jumpData;
|
||||
|
||||
private PushData pushData;
|
||||
|
||||
/**
|
||||
* 封面图
|
||||
*/
|
||||
@ -82,33 +80,6 @@ public class SendMessageParam {
|
||||
private String url;
|
||||
}
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public static class PushData {
|
||||
|
||||
/**
|
||||
* 声音文件
|
||||
*/
|
||||
private String voiceFile;
|
||||
|
||||
/**
|
||||
* 提醒方式:voice(声音)、vibrate(震动)
|
||||
*/
|
||||
private String ability;
|
||||
|
||||
/**
|
||||
* push类型:system(系统消息)、op(运营消息)
|
||||
*/
|
||||
private String type;
|
||||
|
||||
/**
|
||||
* 声音类型:custom(自定义)、system(系统
|
||||
*/
|
||||
private String voiceType;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum JumpPlatform {
|
||||
|
||||
@ -57,9 +57,6 @@ public class SendTemplateMessageParam {
|
||||
*/
|
||||
private String bizId;
|
||||
|
||||
private SendMessageParam.PushData pushData;
|
||||
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package cn.axzo.im;
|
||||
|
||||
import cn.axzo.framework.data.mybatisplus.config.MybatisPlusAutoConfiguration;
|
||||
import cn.axzo.im.config.RocketMQEventConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
@ -8,6 +9,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
||||
@Slf4j
|
||||
@ -15,6 +17,7 @@ import org.springframework.core.env.Environment;
|
||||
@EnableFeignClients(basePackages = {"cn.axzo"})
|
||||
@MapperScan(value = {"cn.axzo.im.dao.mapper"})
|
||||
@EnableDiscoveryClient
|
||||
@Import(RocketMQEventConfiguration.class)
|
||||
public class Application {
|
||||
public static void main(String[] args) {
|
||||
|
||||
|
||||
@ -0,0 +1,90 @@
|
||||
package cn.axzo.im.config;
|
||||
|
||||
import cn.axzo.framework.rocketmq.BaseListener;
|
||||
import cn.axzo.framework.rocketmq.DefaultEventConsumer;
|
||||
import cn.axzo.framework.rocketmq.EventConsumer;
|
||||
import cn.axzo.framework.rocketmq.EventHandlerRepository;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* @Author: liyong.tian
|
||||
* @Date: 2023/7/25 14:43
|
||||
* @Description:
|
||||
*/
|
||||
@Slf4j
|
||||
public class RocketMQEventConfiguration {
|
||||
|
||||
@Value("${spring.application.name}")
|
||||
private String appName;
|
||||
|
||||
@Value("${topic}")
|
||||
private String topic;
|
||||
|
||||
@Bean
|
||||
public RocketMQTemplate ser(){
|
||||
return new RocketMQTemplate();
|
||||
}
|
||||
@Bean
|
||||
EventProducer eventProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
return new RocketMQEventProducer(rocketMQTemplate,
|
||||
"im-center",
|
||||
appName,
|
||||
EventProducer.Context.<RocketMQEventProducer.RocketMQMessageMeta>builder()
|
||||
.meta(RocketMQEventProducer.RocketMQMessageMeta.builder()
|
||||
.topic(topic)
|
||||
.build())
|
||||
.build(),
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
EventConsumer eventConsumer(EventHandlerRepository eventHandlerRepository) {
|
||||
Consumer<EventConsumer.EventWrapper> callback = (eventWrapper) -> {
|
||||
if (eventWrapper.isHandled()) {
|
||||
// 只收集被App真正消费的消息.
|
||||
//String topic = (String) eventWrapper.getExt().get(EVENT_TOPIC_KEY);
|
||||
|
||||
}
|
||||
};
|
||||
return new DefaultEventConsumer(appName, eventHandlerRepository, callback);
|
||||
}
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = "topic_im_center_${spring.profiles.active}",
|
||||
consumerGroup = "GID_topic_im_center_${spring.application.name}_${spring.profiles.active}",
|
||||
consumeMode = ConsumeMode.ORDERLY,
|
||||
nameServer = "${rocketmq.name-server}"
|
||||
)
|
||||
public static class DefaultListener extends BaseListener implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Autowired
|
||||
private EventConsumer eventConsumer;
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt message) {
|
||||
super.onEvent(message, eventConsumer);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
EventHandlerRepository eventHandlerRepository() {
|
||||
return new EventHandlerRepository((ex, logText) -> {
|
||||
log.warn("MQ, handle warning {}", logText, ex);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -196,7 +196,6 @@ public class MessageController implements MessageApi {
|
||||
// 全员发送是不常用的场景,不应该由业务处理,所以把配置放在bizData里面
|
||||
.allPerson(sendMessageParam.isAllPerson())
|
||||
.appTypes(sendMessageParam.getAppTypes())
|
||||
.pushData(sendMessageParam.getPushData())
|
||||
.build();
|
||||
Date now = new Date();
|
||||
return MessageTask.builder()
|
||||
@ -218,7 +217,6 @@ public class MessageController implements MessageApi {
|
||||
|
||||
MessageTask.BizData bizData = MessageTask.BizData.builder()
|
||||
.jumpData(sendMessageParam.getJumpData())
|
||||
.pushData(sendMessageParam.getPushData())
|
||||
.build();
|
||||
Date now = new Date();
|
||||
return MessageTask.builder()
|
||||
|
||||
@ -150,8 +150,6 @@ public class MessageTask {
|
||||
* @See cn.axzo.im.center.common.enums.AppTypeEnum
|
||||
*/
|
||||
private List<AppTypeEnum> appTypes;
|
||||
|
||||
private SendMessageParam.PushData pushData;
|
||||
}
|
||||
|
||||
@Data
|
||||
|
||||
@ -0,0 +1,32 @@
|
||||
package com.fiture.rbd.erp.connector.order.event.inner;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @Classname EventTypeEnum
|
||||
* @Date 2021/2/7 6:05 下午
|
||||
* @Created by lilong
|
||||
*/
|
||||
@Getter
|
||||
public enum EventTypeEnum {
|
||||
|
||||
MESSAGE_HISTORY_CREATED("message-history", "message-history-created", "发送记录创建"),
|
||||
MESSAGE_HISTORY_UPDATED("message-history", "message-history-updated", "发送记录修改")
|
||||
;
|
||||
|
||||
EventTypeEnum(String model, String name, String desc) {
|
||||
this.eventCode = Event.EventCode.builder()
|
||||
.module(model)
|
||||
.name(name)
|
||||
.build();
|
||||
this.model = model;
|
||||
this.name = name;
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
private String model;
|
||||
private String name;
|
||||
private String desc;
|
||||
private Event.EventCode eventCode;
|
||||
}
|
||||
@ -0,0 +1,18 @@
|
||||
package cn.axzo.im.event.payload;
|
||||
|
||||
import cn.axzo.im.entity.MessageHistory;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class MessageHistoryCreatedPayload implements Serializable {
|
||||
|
||||
private MessageHistory messageHistory;
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
package cn.axzo.im.event.payload;
|
||||
|
||||
import cn.axzo.im.entity.MessageHistory;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class MessageHistoryUpdatedPayload implements Serializable {
|
||||
|
||||
private MessageHistory newMessageHistory;
|
||||
private MessageHistory oldMessageHistory;
|
||||
}
|
||||
@ -26,6 +26,10 @@ public interface MessageHistoryService extends IService<MessageHistory> {
|
||||
|
||||
void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories);
|
||||
|
||||
void createBatch(List<MessageHistory> messageHistories);
|
||||
|
||||
void updateBatch(List<MessageHistory> messageHistories);
|
||||
|
||||
@SuperBuilder
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
|
||||
@ -2,11 +2,15 @@ package cn.axzo.im.service.impl;
|
||||
|
||||
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
|
||||
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.im.channel.IMChannelProvider;
|
||||
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchRequest;
|
||||
import cn.axzo.im.channel.netease.dto.MessageBatchDispatchResponse;
|
||||
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
|
||||
import cn.axzo.im.entity.MessageHistory;
|
||||
import cn.axzo.im.event.payload.MessageHistoryCreatedPayload;
|
||||
import cn.axzo.im.event.payload.MessageHistoryUpdatedPayload;
|
||||
import cn.axzo.im.service.MessageHistoryService;
|
||||
import cn.axzo.maokai.api.client.OrganizationalUnitApi;
|
||||
import cn.axzo.maokai.api.vo.request.OrganizationalUnitQuery;
|
||||
@ -28,6 +32,7 @@ import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
@ -40,6 +45,8 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static cn.axzo.im.config.BizResultCode.ACQUIRE_RATE_LIMITER_FAIL;
|
||||
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_CREATED;
|
||||
import static com.fiture.rbd.erp.connector.order.event.inner.EventTypeEnum.MESSAGE_HISTORY_UPDATED;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -54,6 +61,8 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
||||
private RateLimiterClient rateLimiterClient;
|
||||
@Autowired
|
||||
private IMChannelProvider imChannelProvider;
|
||||
@Autowired
|
||||
private EventProducer<?> eventProducer;
|
||||
|
||||
@Value("${send.message.limiter.permits:1}")
|
||||
private int permits;
|
||||
@ -76,6 +85,8 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
||||
*/
|
||||
private static final long DEFAULT_TIME_OUT_MILLIS = 60 * 1000;
|
||||
|
||||
private static final String TARGET_TYPE = "messageHistoryId";
|
||||
|
||||
@Override
|
||||
public Page<MessageHistoryDTO> page(PageMessageHistoryParam param) {
|
||||
QueryWrapper<MessageHistory> wrapper = QueryWrapperHelper.fromBean(param, MessageHistory.class);
|
||||
@ -105,6 +116,49 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void createBatch(List<MessageHistory> messageHistories) {
|
||||
|
||||
this.saveBatch(messageHistories);
|
||||
|
||||
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
|
||||
.forEach(messageHistory -> {
|
||||
eventProducer.send(Event.builder()
|
||||
.targetId(String.valueOf(messageHistory.getId()))
|
||||
.targetType(TARGET_TYPE)
|
||||
.eventCode(MESSAGE_HISTORY_CREATED.getEventCode())
|
||||
.data(MessageHistoryCreatedPayload.builder()
|
||||
.messageHistory(messageHistory)
|
||||
.build())
|
||||
.build());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void updateBatch(List<MessageHistory> messageHistories) {
|
||||
|
||||
Map<Long, MessageHistory> oldMessageHistories = this.listByIds(Lists.transform(messageHistories, MessageHistory::getId))
|
||||
.stream()
|
||||
.collect(Collectors.toMap(MessageHistory::getId, Function.identity()));
|
||||
|
||||
this.updateBatchById(messageHistories);
|
||||
|
||||
this.listByIds(Lists.transform(messageHistories, MessageHistory::getId)).forEach(messageHistory -> {
|
||||
eventProducer.send(Event.builder()
|
||||
.targetId(String.valueOf(messageHistory.getId()))
|
||||
.targetType(TARGET_TYPE)
|
||||
.eventCode(MESSAGE_HISTORY_UPDATED.getEventCode())
|
||||
.data(MessageHistoryUpdatedPayload.builder()
|
||||
.newMessageHistory(messageHistory)
|
||||
.oldMessageHistory(oldMessageHistories.get(messageHistory.getId()))
|
||||
.build())
|
||||
.build());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void sendMessage(List<MessageHistoryService.MessageHistoryDTO> messageHistories) {
|
||||
|
||||
if (CollectionUtils.isEmpty(messageHistories)) {
|
||||
@ -147,7 +201,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
||||
return messageHistory;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
this.updateBatchById(updateMessageHistories);
|
||||
this.updateBatch(updateMessageHistories);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -158,7 +212,7 @@ public class MessageHistoryServiceImpl extends ServiceImpl<MessageHistoryMapper,
|
||||
.status(MessageHistory.Status.FAILED)
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
this.updateBatchById(failedMessageHistories);
|
||||
this.updateBatch(failedMessageHistories);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -172,7 +172,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
|
||||
List<MessageHistory> messageHistories = absentReceivePersons.stream()
|
||||
.map(receivePerson -> resolveMessageHistory(messageTask, receivePerson, imAccounts, accountRegisters))
|
||||
.collect(Collectors.toList());
|
||||
messageHistoryService.saveBatch(messageHistories);
|
||||
messageHistoryService.createBatch(messageHistories);
|
||||
}
|
||||
|
||||
private MessageHistory resolveMessageHistory(MessageTask messageTask, MessageTask.ReceivePerson receivePerson, Map<String, AccountRegisterService.AccountRegisterDTO> imAccounts, Map<String, String> accountRegisters) {
|
||||
@ -299,8 +299,6 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
|
||||
defaultExtMap.put("msgTemplateId", bizData.getMsgTemplateId());
|
||||
} else {
|
||||
JSONObject msgBody = new JSONObject()
|
||||
// 必填字段,非模板消息的业务没有这个参数
|
||||
.fluentPut("isHighlight", false)
|
||||
.fluentPut("cardTitle", messageTask.getTitle())
|
||||
.fluentPut("cardContent", messageTask.getContent())
|
||||
.fluentPut("cardBannerUrl", messageTask.getCardBannerUrl());
|
||||
@ -326,6 +324,8 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
|
||||
msgBody.fluentPut("cardDetailButton", new JSONObject()
|
||||
.fluentPut("title", "查看详情")
|
||||
.fluentPut("action", "JUMP")
|
||||
// 必填字段,非模板消息的业务没有这个参数
|
||||
.fluentPut("isHighlight", false)
|
||||
.fluentPut("actionPaths", actionPaths));
|
||||
}
|
||||
|
||||
@ -337,6 +337,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
|
||||
// 传入app版本号,保证消息能被正常打开
|
||||
defaultExtMap.putIfAbsent("minAppVersion", "2.1.0");
|
||||
messageBody.setMessageExtension(defaultExtMap);
|
||||
|
||||
return JSONUtil.toJsonStr(messageBody);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,27 +1,16 @@
|
||||
package cn.axzo.im.service;
|
||||
|
||||
import cn.axzo.im.Application;
|
||||
import cn.axzo.im.center.common.enums.AppTypeEnum;
|
||||
import cn.axzo.im.controller.AccountController;
|
||||
import cn.axzo.im.entity.AccountRegister;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.TestPropertySources;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.test.web.servlet.MockMvc;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@TestPropertySource(properties = {
|
||||
"NACOS_HOST=https://dev-nacos.axzo.cn",
|
||||
"xxl.job.admin.addresses=http://dev-xxl-job.axzo.cn/xxl-job-admin",
|
||||
@ -45,7 +34,7 @@ class AccountServiceTest {
|
||||
|
||||
AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder()
|
||||
.build();
|
||||
List<AccountRegister> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
|
||||
List<AccountRegisterService.AccountRegisterDTO> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
|
||||
System.out.println(accountRegisters);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user