feat:REQ-1419 消息发送支持批量发送

This commit is contained in:
zuoqinbo 2023-10-25 18:01:20 +08:00
parent d6f96ca959
commit 59dcd11716
13 changed files with 325 additions and 22 deletions

View File

@ -8,6 +8,7 @@ import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* IM消息信息
@ -39,7 +40,7 @@ public class MessageInfo {
* 消息接收用户Id列表
*/
@NotNull(message = "接收消息用户personIdList不能为空")
private List<String> toPersonIdList;
private Set<String> toPersonIdList;
/**
* 消息标题

View File

@ -1,5 +1,6 @@
package cn.axzo.im.center.api.vo.resp;
import lombok.Builder;
import lombok.Data;
/**
* im-center
@ -15,6 +16,7 @@ public class MessageDispatchResp {
* 消息ID
*/
private String msgid;
/**
* 消息发送的时间戳
*/
@ -50,4 +52,10 @@ public class MessageDispatchResp {
* 接收人personId
*/
private String personId;
/**
* IM系统是否已注册
*/
private boolean registered;
}

View File

@ -22,6 +22,22 @@
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: cn.hutool:hutool-all:5.8.4" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-spring-boot2:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.vavr:vavr:0.10.2" level="project" />
<orderEntry type="library" name="Maven: io.vavr:vavr-match:0.10.2" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.32" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-spring:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-annotations:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-consumer:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-core:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-circularbuffer:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-framework-common:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-circuitbreaker:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-ratelimiter:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-retry:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-bulkhead:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-timelimiter:1.7.0" level="project" />
<orderEntry type="library" name="Maven: io.github.resilience4j:resilience4j-micrometer:1.7.0" level="project" />
<orderEntry type="library" name="Maven: cn.axzo.framework:axzo-web-spring-boot-starter:1.0.0-SNAPSHOT" level="project" />
<orderEntry type="library" name="Maven: cn.axzo.framework:axzo-spring-boot-starter:1.0.0-SNAPSHOT" level="project" />
<orderEntry type="library" name="Maven: cn.axzo.framework.jackson:jackson-starter:1.0.0-SNAPSHOT" level="project" />
@ -171,7 +187,6 @@
<orderEntry type="library" name="Maven: io.reactivex.rxjava3:rxjava:3.0.12" level="project" />
<orderEntry type="library" name="Maven: org.jboss.marshalling:jboss-marshalling-river:2.0.11.Final" level="project" />
<orderEntry type="library" name="Maven: org.jboss.marshalling:jboss-marshalling:2.0.11.Final" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.32" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.12.7" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.12.7" level="project" />
<orderEntry type="library" name="Maven: net.bytebuddy:byte-buddy:1.10.22" level="project" />

View File

@ -26,6 +26,11 @@
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>cn.axzo.framework</groupId>
<artifactId>axzo-web-spring-boot-starter</artifactId>

View File

@ -28,6 +28,14 @@ public interface IMChannelProvider {
*/
MessageDispatchResponse dispatchMessage(@Valid MessageDispatchRequest messageInfo);
/**
* 批量IM消息派发
*
* @param messageInfo 注册请求
* @return 返回注册成功信息
*/
MessageBatchDispatchResponse dispatchBatchMessage(@Valid MessageBatchDispatchRequest messageInfo);
/**
* 获取AppKey
*

View File

@ -7,7 +7,9 @@ import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Maps;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -38,11 +40,17 @@ public class NimChannelService implements IMChannelProvider {
private static final String NIM_MESSAGE_DISPATCH_URL = "https://api.netease.im/nimserver/msg/sendMsg.action";
private static final String NIM_MESSAGE_BATCH_DISPATCH_URL = "https://api.netease.im/nimserver/msg/sendBatchMsg.action";
private static final int SUCCESS_CODE = 200;
private static final String NIM_ACCOUNT_ALREADY_REGISTER = "already register";
private static final String PROVIDER_NAME = "NIM";
//消息接收者的网易云信 IM 账号accid列表 500 返回msgid最多100人,目前设置100
private static final int NIM_MAX_BATCH_MSG = 100;
//目前支持 100自定义消息
private static final int MSG_TYPE = 100;
@Resource
private AppKeyUtil appKeyUtil;
@ -117,11 +125,11 @@ public class NimChannelService implements IMChannelProvider {
@Override
public MessageDispatchResponse dispatchMessage(@Valid MessageDispatchRequest messageInfo) {
if (messageInfo == null) {
throw new ServiceException("发送消息请求,参数不能为空!");
throw new ServiceException("发送单聊消息请求,请求参数不能为空!");
} else {
//目前支持的默认值 0单聊消息 100自定义消息
messageInfo.setOpe(0);
messageInfo.setType(100);
messageInfo.setType(MSG_TYPE);
}
HashMap<String, Object> paramMap = Maps.newHashMap();
paramMap.put("from", messageInfo.getFrom());
@ -145,6 +153,49 @@ public class NimChannelService implements IMChannelProvider {
return MessageDispatchResponse.builder().desc("请求网易云信Server异常,请联系管理员!").build();
}
@Override
@RateLimiter(name = "dispatchBatchMessage", fallbackMethod = "fallback")
public MessageBatchDispatchResponse dispatchBatchMessage(@Valid MessageBatchDispatchRequest messageInfo) {
if (messageInfo == null) {
throw new ServiceException("批量发送单聊消息请求,请求参数不能为空!");
} else {
//目前支持的默认值 0单聊消息 100自定义消息
messageInfo.setType(100);
}
if (messageInfo.getToAccids() == null) {
throw new ServiceException("批量发送单聊消息请求,请求参数[消息接收人]不能为空!");
}
if (messageInfo.getToAccids().size() > NIM_MAX_BATCH_MSG) {
throw new ServiceException("批量发送单聊消息请求,单批次消息数超过最大值:" + NIM_MAX_BATCH_MSG);
}
HashMap<String, Object> paramMap = Maps.newHashMap();
paramMap.put("fromAccid", messageInfo.getFromAccid());
paramMap.put("body", messageInfo.getBody());
JSONArray arrayAccids = JSONUtil.parseArray(JSONUtil.toJsonStr(messageInfo.getToAccids()));
paramMap.put("toAccids", arrayAccids.toString());
paramMap.put("type", messageInfo.getType());
//是否需要返回消息ID false不返回消息ID默认值 true返回消息IDtoAccids 包含的账号数量不可以超过 100
paramMap.put("returnMsgid",true);
Map<String, String> authHeaderMap = buildAuthHeader(getProviderAppKey(), getProviderAppSecret());
log.info("im-center dispatchBatchMessage 请求网易云信Server:{},请求参数:{}", NIM_MESSAGE_BATCH_DISPATCH_URL, JSONUtil.toJsonStr(paramMap));
HttpResponse response = HttpRequest.post(NIM_MESSAGE_BATCH_DISPATCH_URL).addHeaders(authHeaderMap)
.form(paramMap).timeout(5000).execute();
if (response.getStatus() == SUCCESS_CODE) {
MessageBatchDispatchResponse dispatchResponse = JSONUtil.toBean(response.body(), MessageBatchDispatchResponse.class);
if (dispatchResponse.getCode() != SUCCESS_CODE) {
log.warn("im-center请求网易云信Server:{},返回异常:{}", NIM_MESSAGE_BATCH_DISPATCH_URL, response.body());
}
return dispatchResponse;
} else {
log.error("im-center请求网易云信Server:{},异常:{}", NIM_MESSAGE_BATCH_DISPATCH_URL, response.body());
}
return MessageBatchDispatchResponse.builder().desc("请求网易云信Server异常,请联系管理员!").build();
}
private MessageBatchDispatchResponse fallback(@Valid MessageBatchDispatchRequest messageInfo, IllegalArgumentException e) {
return MessageBatchDispatchResponse.builder().desc("请求网易云信Server异常,已触发流控fallback,请联系管理员!").build();
}
@Override
public RegisterResponse updateAccountProfile(@Valid RegisterUpdateRequest updateRequest) {
HashMap<String, Object> paramMap = Maps.newHashMap();

View File

@ -0,0 +1,50 @@
package cn.axzo.im.channel.netease.dto;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.util.List;
/**
* 文档地址:https://doc.yunxin.163.com/messaging/docs/DQ2NTg4ODE?platform=server
* @version V1.0
* @author zuoqinbo
* @date 2023/10/10 18:38
*/
@Data
public class MessageBatchDispatchRequest {
/**
* 发送者的网易云信 IM 账号accid最大 32 字符
* 必须保证一个APP内唯一
*/
@NotNull
private String fromAccid;
/**
* 消息接收者的网易云信 IM 账号accid列表示例["accid1","accid2","accid3"]JSONArray对应的 accid如果解析出错会报 414 错误 500
*/
@NotNull
private List<String> toAccids;
/**
* 0文本消息
* 1图片消息
* 2语音消息
* 3视频消息
* 4地理位置消息
* 6文件消息
* 10提示消息
* 100自定义消息
*/
@NotNull
private int type;
/**
* 最大长度 5000 字符JSON 格式
* 具体请参见消息格式示例
*/
@NotNull
private String body;
}

View File

@ -0,0 +1,40 @@
package cn.axzo.im.channel.netease.dto;
import lombok.Builder;
import lombok.Data;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 批量发送消息返回响应
* 示例
* {
* "code":200
* "unregister":["123123","111422"] // toAccids列表中存在的未注册用户
* "timetag":123124124124 // 消息发送的时间戳
* "msgids":{ // 发送给每个accid的消息id列表
* "12345":1200510468189, // 发送给用户12345的消息消息id为1200510468189
* "55213":3141251231231 // 发送给用户55213的消息消息id为3141251231231
* }
* }
* @author zuoqinbo
* @version V1.0
* @date 2023/10/11 17:14
*/
@Data
@Builder
public class MessageBatchDispatchResponse {
private int code;
private Set<String> unregister;
private Long timetag;
private Map<String,Long> msgids;
private String desc;
}

View File

@ -2,6 +2,8 @@ package cn.axzo.im.channel.netease.dto;
import lombok.Data;
import java.util.List;
/**
* IM账户信息
*
@ -25,5 +27,11 @@ public class MessageRespInfo {
*/
private boolean antispam;
/**
* 用户未注册IM账户列表
*/
private List<String> unregister;
}

View File

@ -4,19 +4,17 @@ import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.service.MessageService;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* IM消息派发相关
@ -40,4 +38,10 @@ public class MessageController implements MessageApi {
return ApiResult.ok(messageRespList);
}
@ExceptionHandler({ RequestNotPermitted.class })
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
public ApiResult<String> handleRequestNotPermitted() {
return ApiResult.err("服务器资源繁忙,请求被拒绝!");
}
}

View File

@ -9,9 +9,7 @@ import cn.axzo.im.center.api.vo.resp.UserAccountResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.channel.IMChannelProvider;
import cn.axzo.im.channel.netease.NimMsgTypeEnum;
import cn.axzo.im.channel.netease.dto.MessageBody;
import cn.axzo.im.channel.netease.dto.MessageDispatchRequest;
import cn.axzo.im.channel.netease.dto.MessageDispatchResponse;
import cn.axzo.im.channel.netease.dto.*;
import cn.axzo.im.dao.repository.AccountRegisterDao;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.AccountRegister;
@ -19,20 +17,19 @@ import cn.axzo.im.entity.MessageHistory;
import cn.hutool.json.JSONUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* im-center
@ -62,6 +59,25 @@ public class MessageService {
private MessageHistoryDao messageHistoryDao;
/**
* 单次请求最多支持2000个用户接收消息
*/
@Value("${im-center.message.batch.receiver.limit}")
public int msgReceiverLimit = 2000;
/**
* 超过该用户数量就走批量发送接口
*/
@Value("${im-center.message.batch.receiver.threshold}")
public int msgReceiverThreshold = 10;
/**
* 网易云信IM批量发送-每批次发送给多少用户
*/
@Value("${im-center.message.batch.receiver.once}")
public int msgPersonSendOfOneBatch = 100;
@Transactional
public List<MessageDispatchResp> sendMessage(MessageInfo messageInfo) {
String msgTemplateId = messageInfo.getMsgTemplateId();
@ -109,6 +125,93 @@ public class MessageService {
} else {
throw new ServiceException("暂未支持普通用户[" + messageInfo.getPersonId() + "]发送IM消息");
}
if (messageInfo.getToPersonIdList().size() > msgReceiverLimit) {
throw new ServiceException("消息接收用户数量超过上限:[" + msgReceiverLimit + "]");
}
List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
int personCount = messageInfo.getToPersonIdList().size();
if (personCount <= msgReceiverThreshold) {
messageDispatchRespList = sendOneByOneMessage(messageInfo, messageRequest);
} else {
messageDispatchRespList = sendBatchMessage(messageInfo, messageRequest);
}
insertImMessage(messageDispatchRespList, body);
return messageDispatchRespList;
}
private List<MessageDispatchResp> sendBatchMessage(MessageInfo messageInfo, MessageDispatchRequest messageRequest) {
//如果消息模板是针对多App端则分开进行发送消息
List<AppTypeEnum> appTypeList = messageInfo.getAppTypeList();
String fromAccid = messageRequest.getFrom();
List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
appTypeList.forEach(appTypeEnum -> {
String appType = appTypeEnum.getCode();
if (appType == null || AppTypeEnum.isValidAppType(appType) == null) {
throw new ServiceException("当前服务器不支持该appType类型!");
}
List<String> toPersonList = Lists.newArrayList(messageInfo.getToPersonIdList());
//自动添加IM账户进行批量发送,这样会导致其中部分IM账户是未注册的
toPersonList = toPersonList.stream().map(personId -> personId.concat("_")
.concat(appType)).collect(Collectors.toList());
//消息接收者分页,然后进行批量发送
List<List<String>> personPage = Lists.partition(toPersonList, msgPersonSendOfOneBatch);
MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
batchDispatchRequest.setBody(messageRequest.getBody());
batchDispatchRequest.setFromAccid(fromAccid);
Set<String> unregisterSets = Sets.newHashSet();
personPage.stream().forEach(personList -> {
batchDispatchRequest.setToAccids(personList);
MessageBatchDispatchResponse response = imChannel.dispatchBatchMessage(batchDispatchRequest);
if (response != null) {
Map<String, Long> userMsgResponseMap = response.getMsgids();
//未注册的IM账户
Set<String> unregisterAccount = response.getUnregister();
if (CollectionUtils.isNotEmpty(unregisterAccount)) {
unregisterAccount = unregisterAccount.stream()
.map(account -> account.replace("\"", "")).collect(Collectors.toSet());
unregisterSets.addAll(unregisterAccount);
}
if (userMsgResponseMap != null) {
userMsgResponseMap.forEach((imAccount, messageId) -> {
MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(String.valueOf(messageId), fromAccid,
imAccount, appType, response.getTimetag());
messageDispatchRespList.add(messageDispatchResp);
});
} else {
MessageDispatchResp messageDispatchResp = new MessageDispatchResp();
messageDispatchResp.setDesc(response.getDesc());
messageDispatchRespList.add(messageDispatchResp);
}
}
unregisterSets.forEach(unregisterImAccount -> {
MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(null, fromAccid,
unregisterImAccount, appType, response.getTimetag());
messageDispatchResp.setRegistered(false);
messageDispatchRespList.add(messageDispatchResp);
});
});
});
return messageDispatchRespList;
}
private MessageDispatchResp buildMessageDispatchResp(String messageId, String fromImAccount, String toImAccount,
String appType, long timeTag) {
MessageDispatchResp messageDispatchResp = new MessageDispatchResp();
messageDispatchResp.setMsgid(String.valueOf(messageId));
messageDispatchResp.setAppType(appType);
messageDispatchResp.setFromImAccount(fromImAccount);
messageDispatchResp.setToImAccount(toImAccount);
messageDispatchResp.setTimetag(timeTag);
messageDispatchResp.setRegistered(true);
String[] personAndAppType = toImAccount.split("_");
messageDispatchResp.setPersonId(personAndAppType[0]);
if (personAndAppType.length > 1) {
messageDispatchResp.setAppType(personAndAppType[1]);
}
return messageDispatchResp;
}
private List<MessageDispatchResp> sendOneByOneMessage(MessageInfo messageInfo, MessageDispatchRequest messageRequest) {
//如果消息模板是针对多App端则分开进行发送消息
List<AppTypeEnum> appTypeList = messageInfo.getAppTypeList();
List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
@ -116,7 +219,8 @@ public class MessageService {
if (appType == null || AppTypeEnum.isValidAppType(appType.getCode()) == null) {
throw new ServiceException("当前服务器不支持该appType类型!");
}
List<String> toPersonList = messageInfo.getToPersonIdList();
List<String> toPersonList = Lists.newArrayList(messageInfo.getToPersonIdList());
List<String> personUnregisterList = Lists.newArrayList();
//进行接收用户IM账户校验 目前支持单个用户进行IM消息发送,多个IM用户进行消息接收
toPersonList.forEach(personId -> {
AccountRegister accountRegister = accountRegisterDao.lambdaQuery().eq(AccountRegister::getIsDelete, 0)
@ -126,6 +230,7 @@ public class MessageService {
if (accountRegister == null || StringUtils.isEmpty(accountRegister.getImAccount())) {
log.warn("接收用户账户[" + personId + "],appType[" + appType.getCode() + "],未注册IM账户!");
personUnregisterList.add(personId);
return;
}
messageRequest.setTo(accountRegister.getImAccount());
@ -144,15 +249,13 @@ public class MessageService {
messageDispatchRespList.add(messageDispatchResp);
});
});
insertImMessage(messageDispatchRespList, body);
return messageDispatchRespList;
}
private void insertImMessage(List<MessageDispatchResp> messageRespList, String messageBody) {
String requestId = UUID.randomUUID().toString().replace("-", "");
log.info("异步持久化IM消息发送到数据库:{},请求信息:{}", messageBody, JSONUtil.toJsonStr(messageRespList));
CompletableFuture.runAsync(() -> messageRespList.stream().forEach(messageDispatchResp -> {
CompletableFuture.runAsync(() -> messageRespList.forEach(messageDispatchResp -> {
MessageHistory messageHistory = new MessageHistory();
messageHistory.setBizId(requestId);
messageHistory.setFromAccount(messageDispatchResp.getFromImAccount());
@ -167,6 +270,7 @@ public class MessageService {
}
try {
messageHistoryDao.saveOrUpdate(messageHistory);
Thread.sleep(50);
} catch (Exception e) {
log.error("持久化IM消息到数据库异常:{},", JSONUtil.toJsonStr(messageHistory), e);
}

View File

@ -23,6 +23,14 @@ mybatis-plus:
logic-not-delete-value: 0 #逻辑未删除值(默认为 0)
logic-delete-field: isDelete #逻辑删除字段
resilience4j.ratelimiter:
instances:
dispatchBatchMessage:
limitForPeriod: 5
limitRefreshPeriod: 60s
timeoutDuration: 0
registerHealthIndicator: true
eventConsumerBufferSize: 100
trade:
config:

View File

@ -53,3 +53,4 @@ management:
resp:
error-code:
non-base-mapping: {"[*]": OK}