feat:REQ-1419 代码逻辑优化

This commit is contained in:
zuoqinbo 2023-10-26 12:05:02 +08:00
parent 20e16b576b
commit 458cceb68c
5 changed files with 99 additions and 75 deletions

View File

@ -21,10 +21,15 @@ import java.util.List;
@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
public interface MessageApi {
/**
* 发送消息
* 发送消息,单条消息批量发送消息统一入口
* 1.该接口一次请求接收人支持最大2000人
* 2.网易云信一分钟支持120次调用,每次调用IM中心设置100个账户(能返回msgId最大支持100人)
* 3.IM中心接收人有工人端和管理端账户,故当接收人最大2000人时需要调用网易云信发送4000条消息
* 4.按照每批次发送100条消息,需要发送40次
* 5.因此该接口一分钟内最大支持3次接收人为2000人的请求
*
* @param messageInfo 消息请求参数
* @return 消息响应
* @param messageInfo 发送消息请求参数
* @return 发送消息请求响应
*/
@PostMapping("api/im/message/dispatch")
ApiResult<List<MessageDispatchResp>> sendMessage(@RequestBody @Validated MessageInfo messageInfo);

View File

@ -32,7 +32,7 @@ public class MessageInfo {
/**
* 发送用户Id
* 发送用户Id,目前暂不支持非机器人发送消息
*/
private String personId;

View File

@ -47,14 +47,19 @@ public class NimChannelService implements IMChannelProvider {
private static final String PROVIDER_NAME = "NIM";
/**
* 消息接收者的网易云信 IM 账号accid列表 500 返回msgid最多100人,目前设置100
* 消息接收者的网易云信IM账号accid列表上限是500人,但是返回msgid最多100人,故目前设置100人
*/
private static final int NIM_MAX_BATCH_MSG = 100;
private static final int NIM_MSG_BATCH_MAX_NUM = 100;
/**
* 目前支持 100自定义消息
*/
private static final int MSG_TYPE = 100;
/**
* 目前支持 0单聊消息
*/
private static final int MSG_OPE = 0;
@Resource
private AppKeyUtil appKeyUtil;
@ -137,11 +142,10 @@ public class NimChannelService implements IMChannelProvider {
public MessageDispatchResponse dispatchMessage(@Valid MessageDispatchRequest messageInfo) {
if (messageInfo == null) {
throw new ServiceException("发送单聊消息请求,请求参数不能为空!");
} else {
//目前支持的默认值 0单聊消息 100自定义消息
messageInfo.setOpe(0);
messageInfo.setType(MSG_TYPE);
}
//目前支持的默认值 0单聊消息 100自定义消息
messageInfo.setOpe(MSG_OPE);
messageInfo.setType(MSG_TYPE);
HashMap<String, Object> paramMap = Maps.newHashMap();
paramMap.put("from", messageInfo.getFrom());
paramMap.put("body", messageInfo.getBody());
@ -166,19 +170,18 @@ public class NimChannelService implements IMChannelProvider {
}
@Override
@RateLimiter(name = "dispatchBatchMessage", fallbackMethod = "fallback")
@RateLimiter(name = "dispatchBatchMessage")
public MessageBatchDispatchResponse dispatchBatchMessage(@Valid MessageBatchDispatchRequest messageInfo) {
if (messageInfo == null) {
throw new ServiceException("批量发送单聊消息请求,请求参数不能为空!");
} else {
//目前支持的默认值 0单聊消息 100自定义消息
messageInfo.setType(100);
}
//目前支持的默认值 100自定义消息
messageInfo.setType(MSG_TYPE);
if (messageInfo.getToAccids() == null) {
throw new ServiceException("批量发送单聊消息请求,请求参数[消息接收人]不能为空!");
}
if (messageInfo.getToAccids().size() > NIM_MAX_BATCH_MSG) {
throw new ServiceException("批量发送单聊消息请求,单批次消息数超过最大值:" + NIM_MAX_BATCH_MSG);
if (messageInfo.getToAccids().size() > NIM_MSG_BATCH_MAX_NUM) {
throw new ServiceException("批量发送单聊消息请求,单批次消息数超过最大值:" + NIM_MSG_BATCH_MAX_NUM);
}
HashMap<String, Object> paramMap = Maps.newHashMap();
paramMap.put("fromAccid", messageInfo.getFromAccid());

View File

@ -33,9 +33,7 @@ public class MessageController implements MessageApi {
@Override
public ApiResult<List<MessageDispatchResp>> sendMessage(MessageInfo messageInfo) {
List<MessageDispatchResp> messageRespList = messageService.sendMessage(messageInfo);
return ApiResult.ok(messageRespList);
}

View File

@ -75,28 +75,41 @@ public class MessageService {
* 网易云信IM批量发送-每批次发送给多少用户
*/
@Value("${im-center.message.batch.receiver.once}")
public int msgPersonSendOfOneBatch = 100;
public int msgSendPersonOfOneBatch = 100;
@Transactional
public List<MessageDispatchResp> sendMessage(MessageInfo messageInfo) {
String msgTemplateId = messageInfo.getMsgTemplateId();
MessageDispatchRequest messageRequest = new MessageDispatchRequest();
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
messageBody.setMsgContent(messageInfo.getMsgContent());
messageBody.setMsgHeader(messageInfo.getMsgHeader());
messageBody.setMsgBody(messageInfo.getMsgTemplateContent());
Map<String, String> defaultExtMap = Maps.newHashMap();
defaultExtMap.put("msgTemplateId", messageInfo.getMsgTemplateId());
if (messageInfo.getExtendsInfo() != null) {
defaultExtMap.putAll(messageInfo.getExtendsInfo());
}
messageBody.setExt(defaultExtMap);
String body = JSONUtil.toJsonStr(messageBody);
messageRequest.setBody(body);
MessageDispatchRequest messageRequest = buildMessageDispatchRequest(messageInfo);
buildMsgFromAccount(messageRequest, msgTemplateId);
//设置IM消息发送者账号
if (StringUtils.isNotBlank(msgTemplateId) && StringUtils.isBlank(messageInfo.getPersonId())) {
if (messageInfo.getToPersonIdList().size() > msgReceiverLimit) {
throw new ServiceException("IM消息接收用户数量超过上限:[" + msgReceiverLimit + "]");
}
List<MessageDispatchResp> messageDispatchRespList;
int personCount = messageInfo.getToPersonIdList().size();
//小于阈值就走单个IM消息发送接口
if (personCount <= msgReceiverThreshold) {
messageDispatchRespList = sendOneByOneMessage(messageInfo, messageRequest);
} else {
messageDispatchRespList = sendBatchMessage(messageInfo, messageRequest);
}
insertImMessage(messageDispatchRespList, messageRequest.getBody());
return messageDispatchRespList;
}
/**
* 设置IM消息发送者账户目前只支持机器人账户发送
* 暂未支持普通用户发送IM消息
*
* @param messageRequest IM请求结构体
* @param msgTemplateId 消息模板ID
*/
private void buildMsgFromAccount(MessageDispatchRequest messageRequest, String msgTemplateId) {
if (StringUtils.isBlank(msgTemplateId) || messageRequest == null) {
throw new ServiceException("请求参数为空");
} else {
List<String> robotIdList = robotMsgTemplateService.queryRobotIdByTemplate(msgTemplateId);
if (CollectionUtils.isEmpty(robotIdList)) {
throw new ServiceException("消息模板ID[" + msgTemplateId + "],还未维护机器人账户!");
@ -122,27 +135,31 @@ public class MessageService {
}
//目前发送用户为机器人
messageRequest.setFrom(robotImAccount);
} else {
throw new ServiceException("暂未支持普通用户[" + messageInfo.getPersonId() + "]发送IM消息");
}
if (messageInfo.getToPersonIdList().size() > msgReceiverLimit) {
throw new ServiceException("消息接收用户数量超过上限:[" + msgReceiverLimit + "]");
}
private MessageDispatchRequest buildMessageDispatchRequest(MessageInfo messageInfo) {
MessageDispatchRequest messageRequest = new MessageDispatchRequest();
MessageBody messageBody = new MessageBody();
messageBody.setMsgType(NimMsgTypeEnum.TEMPLATE.getCode());
messageBody.setMsgContent(messageInfo.getMsgContent());
messageBody.setMsgHeader(messageInfo.getMsgHeader());
messageBody.setMsgBody(messageInfo.getMsgTemplateContent());
Map<String, String> defaultExtMap = Maps.newHashMap();
defaultExtMap.put("msgTemplateId", messageInfo.getMsgTemplateId());
if (messageInfo.getExtendsInfo() != null) {
defaultExtMap.putAll(messageInfo.getExtendsInfo());
}
List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
int personCount = messageInfo.getToPersonIdList().size();
if (personCount <= msgReceiverThreshold) {
messageDispatchRespList = sendOneByOneMessage(messageInfo, messageRequest);
} else {
messageDispatchRespList = sendBatchMessage(messageInfo, messageRequest);
}
insertImMessage(messageDispatchRespList, body);
return messageDispatchRespList;
messageBody.setExt(defaultExtMap);
String body = JSONUtil.toJsonStr(messageBody);
messageRequest.setBody(body);
return messageRequest;
}
private List<MessageDispatchResp> sendBatchMessage(MessageInfo messageInfo, MessageDispatchRequest messageRequest) {
//如果消息模板是针对多App端分开进行发送消息
//消息模板是针对多App端,则单个用户有多个IM账户,需要分开进行发送消息
List<AppTypeEnum> appTypeList = messageInfo.getAppTypeList();
String fromAccid = messageRequest.getFrom();
String fromAccId = messageRequest.getFrom();
List<MessageDispatchResp> messageDispatchRespList = Lists.newArrayList();
appTypeList.forEach(appTypeEnum -> {
String appType = appTypeEnum.getCode();
@ -150,45 +167,48 @@ public class MessageService {
throw new ServiceException("当前服务器不支持该appType类型!");
}
List<String> toPersonList = Lists.newArrayList(messageInfo.getToPersonIdList());
//自动添加IM账户进行批量发送,这样会导致其中部分IM账户是未注册的
//1.首先自动添加IM账户进行批量发送,返回其中IM账户未注册部分
//2.如下是默认IM账户规则
toPersonList = toPersonList.stream().map(personId -> personId.concat("_")
.concat(appType)).collect(Collectors.toList());
//消息接收者分页,然后进行批量发送
List<List<String>> personPage = Lists.partition(toPersonList, msgPersonSendOfOneBatch);
List<List<String>> personPage = Lists.partition(toPersonList, msgSendPersonOfOneBatch);
MessageBatchDispatchRequest batchDispatchRequest = new MessageBatchDispatchRequest();
batchDispatchRequest.setBody(messageRequest.getBody());
batchDispatchRequest.setFromAccid(fromAccid);
Set<String> unregisterSets = Sets.newHashSet();
personPage.stream().forEach(personList -> {
batchDispatchRequest.setFromAccid(fromAccId);
personPage.forEach(personList -> {
batchDispatchRequest.setToAccids(personList);
MessageBatchDispatchResponse response = imChannel.dispatchBatchMessage(batchDispatchRequest);
if (response != null) {
Map<String, Long> userMsgResponseMap = response.getMsgids();
//未注册的IM账户
Set<String> unregisterAccount = response.getUnregister();
if (CollectionUtils.isNotEmpty(unregisterAccount)) {
unregisterAccount = unregisterAccount.stream()
MessageBatchDispatchResponse batchResponse = imChannel.dispatchBatchMessage(batchDispatchRequest);
if (batchResponse != null) {
Map<String, Long> userMsgResponseMap = batchResponse.getMsgids();
//返回未注册的IM账户
Set<String> unregisterAccountSets = batchResponse.getUnregister();
//字符串转义字符处理
if (CollectionUtils.isNotEmpty(unregisterAccountSets)) {
unregisterAccountSets = unregisterAccountSets.stream()
.map(account -> account.replace("\"", "")).collect(Collectors.toSet());
unregisterSets.addAll(unregisterAccount);
}
if (userMsgResponseMap != null) {
//遍历批量返回中每一个账户对应的MessageId
userMsgResponseMap.forEach((imAccount, messageId) -> {
MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(String.valueOf(messageId), fromAccid,
imAccount, appType, response.getTimetag());
MessageDispatchResp messageDispatchResp =
buildMessageDispatchResp(String.valueOf(messageId), fromAccId,
imAccount, appType, batchResponse.getTimetag());
messageDispatchRespList.add(messageDispatchResp);
});
} else {
MessageDispatchResp messageDispatchResp = new MessageDispatchResp();
messageDispatchResp.setDesc(response.getDesc());
messageDispatchRespList.add(messageDispatchResp);
log.error("dispatchBatchMessage请求返回出现异常:{}", JSONUtil.toJsonStr(batchResponse));
}
unregisterAccountSets.forEach(unregisterAccount -> {
MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(null, fromAccId,
unregisterAccount, appType, batchResponse.getTimetag());
messageDispatchResp.setRegistered(false);
messageDispatchRespList.add(messageDispatchResp);
});
} else {
log.error("dispatchBatchMessage请求返回出现异常");
}
unregisterSets.forEach(unregisterImAccount -> {
MessageDispatchResp messageDispatchResp = buildMessageDispatchResp(null, fromAccid,
unregisterImAccount, appType, response.getTimetag());
messageDispatchResp.setRegistered(false);
messageDispatchRespList.add(messageDispatchResp);
});
});
});
return messageDispatchRespList;
@ -220,7 +240,6 @@ public class MessageService {
throw new ServiceException("当前服务器不支持该appType类型!");
}
List<String> toPersonList = Lists.newArrayList(messageInfo.getToPersonIdList());
List<String> personUnregisterList = Lists.newArrayList();
//进行接收用户IM账户校验 目前支持单个用户进行IM消息发送,多个IM用户进行消息接收
toPersonList.forEach(personId -> {
AccountRegister accountRegister = accountRegisterDao.lambdaQuery().eq(AccountRegister::getIsDelete, 0)
@ -230,7 +249,6 @@ public class MessageService {
if (accountRegister == null || StringUtils.isEmpty(accountRegister.getImAccount())) {
log.warn("接收用户账户[" + personId + "],appType[" + appType.getCode() + "],未注册IM账户!");
personUnregisterList.add(personId);
return;
}
messageRequest.setTo(accountRegister.getImAccount());