REQ-3345: 如果聊天消息只有一个人的情况下,同步发送

This commit is contained in:
yanglin 2025-02-25 17:57:30 +08:00
parent b830369413
commit 638a6fd8fb
9 changed files with 84 additions and 40 deletions

View File

@ -32,6 +32,11 @@ public class SendChatMessageRequest extends SendMessageRequest {
@NotEmpty(message = "消息体不能为空")
private Map<String, Object> messageBody;
/**
* 尝试同步发送
*/
private boolean trySyncSend;
/**
* 发送文本消息
*

View File

@ -27,16 +27,20 @@ import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse;
import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult;
import cn.axzo.im.center.api.vo.resp.UserAccountResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.channel.netease.dto.MessageDispatchResponse;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageTask;
import cn.axzo.im.enums.MessageHistoryStatus;
import cn.axzo.im.enums.MessageTaskStatus;
import cn.axzo.im.send.handler.CommonSendOneHandler;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.im.service.AccountService;
import cn.axzo.im.service.CustomMessageService;
import cn.axzo.im.service.MessageHistoryService;
import cn.axzo.im.service.MessageTaskService;
import cn.axzo.im.service.RobotMsgTemplateService;
import cn.axzo.im.service.impl.MessageHistoryServiceImpl;
import cn.axzo.im.updatable.UpdatableMessageManager;
import cn.axzo.im.updatable.UpdatableMessageQueryService;
import cn.axzo.im.utils.BizAssertions;
@ -72,7 +76,7 @@ import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_NOT_FOUND;
import static cn.axzo.im.config.BizResultCode.SEND_PERSSON_ERROR;
/**
* IM消息派发相关
* IM消息派发相关
*
* @author zuoqinbo
* @version V1.0
@ -93,7 +97,7 @@ public class MessageController implements MessageApi {
@Autowired
private AccountRegisterService accountRegisterService;
@Autowired
private MessageHistoryService messageHistoryService;
private MessageHistoryServiceImpl messageHistoryService;
@Autowired
private CustomMessageService customMessageService;
@Autowired
@ -102,6 +106,8 @@ public class MessageController implements MessageApi {
private UpdatableMessageQueryService updatableMessageQueryService;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private CommonSendOneHandler commonSendOneHandler;
@Override
@ -116,7 +122,7 @@ public class MessageController implements MessageApi {
return ApiResult.ok(messageRespList);
}
@ExceptionHandler({ RequestNotPermitted.class })
@ExceptionHandler({RequestNotPermitted.class})
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
public ApiResult<String> handleRequestNotPermitted() {
return ApiResult.err("服务器资源繁忙,请求被拒绝!");
@ -126,6 +132,7 @@ public class MessageController implements MessageApi {
/**
* 发送消息时只是存储在messageTask中通过xxlJob或者mq异步去处理
* 因为1为了提高接口响应性能2第三方接口有限流控制防止被限流后阻塞业务
*
* @param sendMessageParam 发送消息请求参数
* @return
*/
@ -244,10 +251,14 @@ public class MessageController implements MessageApi {
String sendImAccount = accountService.registerAccountIfAbsent(
sender.getPersonId(), sender.getOuId(), sender.getAppType());
BizAssertions.assertNotNull(sendImAccount, "创建账号失败");
boolean syncSend = request.isTrySyncSend()
&& CollectionUtils.isEmpty(request.receivePersonsOrEmpty())
&& request.getImReceiveAccounts() != null
&& request.getImReceiveAccounts().size() == 1;
MessageTask.BizData bizData = MessageTask.BizData.builder()
.messageBody(JSON.toJSONString(request.getMessageBody()))
.isSenderRobot(false)
.historyCreated(true)
.syncSend(syncSend)
.senderPersonId(request.determineSenderPersonId())
.nimMessageType(request.getMessageType())
.build();
@ -263,7 +274,7 @@ public class MessageController implements MessageApi {
.build());
}
}
MessageTask messageTask = transactionTemplate.execute(unused -> {
Long taskId = transactionTemplate.execute(unused -> {
MessageTask task = messageTaskService.create(MessageTask.builder()
.bizId(request.getBizId())
.sendImAccount(sendImAccount)
@ -275,12 +286,21 @@ public class MessageController implements MessageApi {
.sendPriority(SendPriority.CHAT_MESSAGE.getPriority())
.apiChannel(ApiChannel.COMMON_MESSAGE)
.build());
task = messageTaskService.getById(task.getId());
messageTaskService.createMessageHistory(task);
return task;
if (syncSend) {
task = messageTaskService.getById(task.getId());
List<Long> historyIds = messageTaskService.createMessageHistory(task);
MessageHistory history = messageHistoryService.getById(historyIds.get(0));
MessageDispatchResponse response = commonSendOneHandler.send(history);
if (response.isSuccess()) {
messageHistoryService.setSendSuccess(history, response.getMsgid(), null);
} else {
log.warn("sendChatMessage, send failed, historyId={}, taskId={}, bizId={}, failReason={}",
history.getId(), history.getImMessageTaskId(), history.getBizId(), response.getDesc());
}
}
return task.getId();
});
//noinspection DataFlowIssue
return ApiResult.ok(messageTask.getId());
return ApiResult.ok(taskId);
}
private void ensureImAccountNotBlank(String imAccount) {

View File

@ -37,4 +37,5 @@ public class HistoryRecordExt {
private Long workspaceId;
private Integer nimMessageType;
private boolean syncSend;
}

View File

@ -170,7 +170,7 @@ public class MessageTask {
private NimMessageType nimMessageType;
private boolean historyCreated;
private boolean syncSend;
public boolean determineIsSenderRobot() {
return isSenderRobot != null && isSenderRobot;

View File

@ -59,9 +59,6 @@ public class CreateMessageHistoryJob extends IJobHandler {
Page<MessageTask> page = messageTaskService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
page.getRecords().forEach(messageTask -> {
MessageTask.BizData bizData = messageTask.getBizData();
if (bizData != null && bizData.isHistoryCreated())
return;
count.set(count.get() + 1);
try {
messageTaskService.createMessageHistory(messageTask);

View File

@ -13,6 +13,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author yanglin
@ -27,24 +28,28 @@ public class CommonSendBatchHandler extends SendBatchHandler {
@Override
public void sendAndSubmitUpdate(SendExecutor<List<MessageHistory>> executor, List<MessageHistory> histories) {
if (CollectionUtils.isEmpty(histories)) return;
List<MessageHistory> effectiveHistories = histories.stream()
.filter(h -> !h.getOrCreateRecordExt().isSyncSend())
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(effectiveHistories))
return;
executor.log("batchSendMessage - request record size: {}, batchNo={}",
histories.size(), histories.get(0).determineBatchNo().orElse(null));
MessageHistory sample = histories.get(0);
effectiveHistories.size(), effectiveHistories.get(0).determineBatchNo().orElse(null));
MessageHistory sample = effectiveHistories.get(0);
MessageBatchDispatchRequest batchRequest = new MessageBatchDispatchRequest();
batchRequest.setBody(sample.getMessageBody());
batchRequest.setFromAccid(sample.getFromAccount());
batchRequest.setToAccids(Lists.transform(histories, MessageHistory::getToAccount));
batchRequest.setToAccids(Lists.transform(effectiveHistories, MessageHistory::getToAccount));
batchRequest.setPayload(sample.getOrCreateRecordExt().getPayload());
batchRequest.populateOption();
messageHistoryNimLogger.logAsync(histories, batchRequest);
messageHistoryNimLogger.logAsync(effectiveHistories, batchRequest);
MessageBatchDispatchResponse response = imChannelProvider.dispatchBatchMessage(batchRequest);
if (response.isRateLimited())
executor.scheduleRetrySend(histories, null);
executor.scheduleRetrySend(effectiveHistories, null);
else if (response.isSuccess())
executor.setBatchSendSuccess(histories, response, null);
executor.setBatchSendSuccess(effectiveHistories, response, null);
else
executor.setSendFail(histories, response.getDesc(), null);
executor.setSendFail(effectiveHistories, response.getDesc(), null);
}
@Override

View File

@ -24,9 +24,21 @@ public class CommonSendOneHandler extends SendOneHandler {
@Override
public void sendAndSubmitUpdate(SendExecutor<MessageHistory> executor, MessageHistory history) {
if (history.getOrCreateRecordExt().isSyncSend())
return;
executor.log("sendMessage - historyId={}, taskId={}, bizId={}, batchNo={}",
history.getId(), history.getImMessageTaskId(),
history.getBizId(), history.determineBatchNo().orElse(null));
MessageDispatchResponse response = send(history);
if (response.isRateLimited())
executor.scheduleRetrySend(history, null);
else if (response.isSuccess())
executor.submitSetSendSuccess(history, response.getMsgid());
else
executor.submitSetSendFail(history, response.getDesc());
}
public MessageDispatchResponse send(MessageHistory history) {
MessageDispatchRequest sendRequest = new MessageDispatchRequest();
sendRequest.setFrom(history.getFromAccount());
sendRequest.setOpe(history.isSendToGroup() ? 1 : 0);
@ -40,13 +52,7 @@ public class CommonSendOneHandler extends SendOneHandler {
sendRequest.setPayload(history.getOrCreateRecordExt().getPayload());
sendRequest.populateOption();
messageHistoryNimLogger.logAsync(history, sendRequest);
MessageDispatchResponse response = imChannelProvider.dispatchMessage(sendRequest);
if (response.isRateLimited())
executor.scheduleRetrySend(history, null);
else if (response.isSuccess())
executor.submitSetSendSuccess(history, response.getMsgid());
else
executor.submitSetSendFail(history, response.getDesc());
return imChannelProvider.dispatchMessage(sendRequest);
}
@Override

View File

@ -22,7 +22,7 @@ public interface MessageTaskService extends IService<MessageTask> {
Page<MessageTask> page(PageMessageTaskParam param);
void createMessageHistory(MessageTask messageTask);
List<Long> createMessageHistory(MessageTask messageTask);
void update(UpdateMessageTaskParam param);

View File

@ -45,7 +45,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -105,7 +105,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
@Override
@Transactional
public void createMessageHistory(MessageTask messageTask) {
public List<Long> createMessageHistory(MessageTask messageTask) {
this.update(UpdateMessageTaskParam.builder()
.id(messageTask.getId())
@ -113,12 +113,13 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
.build());
MessageTask.BizData bizData = messageTask.getBizData();
List<Long> historyIds;
if (bizData.isAllPerson()) {
log.info("发送全员消息, taskId={}", messageTask.getId());
doSendAll(messageTask, bizData);
historyIds = doSendAll(messageTask, bizData);
} else {
log.info("发送非全员消息, taskId={}", messageTask.getId());
doSendNotAll(messageTask);
historyIds = doSendNotAll(messageTask);
}
this.update(UpdateMessageTaskParam.builder()
@ -126,6 +127,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
.action(MessageTask.ActionEnum.SUCCESS)
.finishedTime(new Date())
.build());
return historyIds;
}
@Override
@ -145,9 +147,10 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
this.updateById(updateMessageTask);
}
private void doSendAll(MessageTask messageTask, MessageTask.BizData bizData) {
private List<Long> doSendAll(MessageTask messageTask, MessageTask.BizData bizData) {
Integer pageNumber = 1;
String batchNo = UUIDUtil.uuidString();
ArrayList<Long> historyIds = new ArrayList<>();
while (true) {
Page<AccountRegisterDTO> page = accountRegisterService.page(AccountRegisterService.PageAccountRegisterParam.builder()
.accountType(AccountTypeEnum.USER.getCode())
@ -159,24 +162,27 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
List<MessageTask.ReceivePerson> receivePersons = page.getRecords().stream()
.map(e -> MessageTask.ReceivePerson.builder().imAccount(e.getImAccount()).build())
.collect(Collectors.toList());
saveMessageHistory(batchNo, receivePersons, messageTask, false);
historyIds.addAll(saveMessageHistory(batchNo, receivePersons, messageTask, false));
}
if (!page.hasNext()) {
break;
}
}
return historyIds;
}
private void doSendNotAll(MessageTask messageTask) {
private List<Long> doSendNotAll(MessageTask messageTask) {
int totalPersonSize = messageTask.getReceivePersons().size();
String batchNo = UUIDUtil.uuidString();
// 防止sql过长
List<List<MessageTask.ReceivePerson>> receivePersons = Lists.partition(messageTask.getReceivePersons(), DEFAULT_PAGE_SIZE);
receivePersons.forEach(e -> saveMessageHistory(batchNo, e, messageTask, totalPersonSize <= 2));
return receivePersons.stream()
.flatMap(e -> saveMessageHistory(batchNo, e, messageTask, totalPersonSize <= 2).stream())
.collect(Collectors.toList());
}
private void saveMessageHistory(String batchNo, List<MessageTask.ReceivePerson> receivePersons,
private List<Long> saveMessageHistory(String batchNo, List<MessageTask.ReceivePerson> receivePersons,
MessageTask messageTask, boolean tryCreateAccount) {
// 排除已经发送成功的记录防止重复发送
Set<String> existPersons = listExistPerson(receivePersons, messageTask);
@ -197,7 +203,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
if (CollectionUtils.isEmpty(absentReceivePersons)) {
log.info("messageTask,{}, receivePersons,{},已经存在", JSONObject.toJSONString(messageTask),
JSONObject.toJSONString(receivePersons));
return;
return Collections.emptyList();
}
Map<String, String> accountRegisters = listAccountRegisters(absentReceivePersons);
@ -213,6 +219,7 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
.map(MessageHistory::getId)
.collect(Collectors.toList());
updatableMessageManager.onHistoryCreated(historyIds);
return historyIds;
}
private MessageHistory resolveMessageHistory(String batchNo,
@ -233,6 +240,9 @@ public class MessageTaskServiceImpl extends ServiceImpl<MessageTaskMapper, Messa
messageHistory.setBatchNo(batchNo);
messageHistory.setSendPriority(messageTask.getSendPriority());
messageHistory.setApiChannel(messageTask.getApiChannel());
if (messageTask.getBizData() != null) {
messageHistory.getOrCreateRecordExt().setSyncSend(messageTask.getBizData().isSyncSend());
}
if (StringUtils.isNotBlank(receivePerson.getImAccount())) {
AccountRegisterDTO imAccount = imAccounts.get(receivePerson.getImAccount());