From 638a6fd8fbde881807cf9ed0665078c647c7f4ff Mon Sep 17 00:00:00 2001 From: yanglin Date: Tue, 25 Feb 2025 17:57:30 +0800 Subject: [PATCH] =?UTF-8?q?REQ-3345:=20=E5=A6=82=E6=9E=9C=E8=81=8A?= =?UTF-8?q?=E5=A4=A9=E6=B6=88=E6=81=AF=E5=8F=AA=E6=9C=89=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E4=BA=BA=E7=9A=84=E6=83=85=E5=86=B5=E4=B8=8B=EF=BC=8C=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/vo/req/SendChatMessageRequest.java | 5 +++ .../axzo/im/controller/MessageController.java | 40 ++++++++++++++----- .../cn/axzo/im/entity/HistoryRecordExt.java | 1 + .../java/cn/axzo/im/entity/MessageTask.java | 2 +- .../axzo/im/job/CreateMessageHistoryJob.java | 3 -- .../send/handler/CommonSendBatchHandler.java | 21 ++++++---- .../im/send/handler/CommonSendOneHandler.java | 20 ++++++---- .../axzo/im/service/MessageTaskService.java | 2 +- .../service/impl/MessageTaskServiceImpl.java | 30 +++++++++----- 9 files changed, 84 insertions(+), 40 deletions(-) diff --git a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendChatMessageRequest.java b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendChatMessageRequest.java index 4ab93f7..abbde16 100644 --- a/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendChatMessageRequest.java +++ b/im-center-api/src/main/java/cn/axzo/im/center/api/vo/req/SendChatMessageRequest.java @@ -32,6 +32,11 @@ public class SendChatMessageRequest extends SendMessageRequest { @NotEmpty(message = "消息体不能为空") private Map messageBody; + /** + * 尝试同步发送 + */ + private boolean trySyncSend; + /** * 发送文本消息 * diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java index acdfb12..bdd7fb0 100644 --- a/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java +++ b/im-center-server/src/main/java/cn/axzo/im/controller/MessageController.java @@ -27,16 +27,20 @@ import cn.axzo.im.center.api.vo.resp.MessageUpdateResponse; import cn.axzo.im.center.api.vo.resp.UpdatableMessageSendResult; import cn.axzo.im.center.api.vo.resp.UserAccountResp; import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.im.channel.netease.dto.MessageDispatchResponse; import cn.axzo.im.entity.AccountRegister; +import cn.axzo.im.entity.MessageHistory; import cn.axzo.im.entity.MessageTask; import cn.axzo.im.enums.MessageHistoryStatus; import cn.axzo.im.enums.MessageTaskStatus; +import cn.axzo.im.send.handler.CommonSendOneHandler; import cn.axzo.im.service.AccountRegisterService; import cn.axzo.im.service.AccountService; import cn.axzo.im.service.CustomMessageService; import cn.axzo.im.service.MessageHistoryService; import cn.axzo.im.service.MessageTaskService; import cn.axzo.im.service.RobotMsgTemplateService; +import cn.axzo.im.service.impl.MessageHistoryServiceImpl; import cn.axzo.im.updatable.UpdatableMessageManager; import cn.axzo.im.updatable.UpdatableMessageQueryService; import cn.axzo.im.utils.BizAssertions; @@ -72,7 +76,7 @@ import static cn.axzo.im.config.BizResultCode.SEND_IM_ACCOUNT_NOT_FOUND; import static cn.axzo.im.config.BizResultCode.SEND_PERSSON_ERROR; /** - * IM消息派发相关 + * IM消息派发相关 * * @author zuoqinbo * @version V1.0 @@ -93,7 +97,7 @@ public class MessageController implements MessageApi { @Autowired private AccountRegisterService accountRegisterService; @Autowired - private MessageHistoryService messageHistoryService; + private MessageHistoryServiceImpl messageHistoryService; @Autowired private CustomMessageService customMessageService; @Autowired @@ -102,6 +106,8 @@ public class MessageController implements MessageApi { private UpdatableMessageQueryService updatableMessageQueryService; @Autowired private TransactionTemplate transactionTemplate; + @Autowired + private CommonSendOneHandler commonSendOneHandler; @Override @@ -116,7 +122,7 @@ public class MessageController implements MessageApi { return ApiResult.ok(messageRespList); } - @ExceptionHandler({ RequestNotPermitted.class }) + @ExceptionHandler({RequestNotPermitted.class}) @ResponseStatus(HttpStatus.TOO_MANY_REQUESTS) public ApiResult handleRequestNotPermitted() { return ApiResult.err("服务器资源繁忙,请求被拒绝!"); @@ -126,6 +132,7 @@ public class MessageController implements MessageApi { /** * 发送消息时只是存储在messageTask中,通过xxlJob或者mq异步去处理 * 因为:1、为了提高接口响应性能。2、第三方接口有限流控制,防止被限流后阻塞业务 + * * @param sendMessageParam 发送消息请求参数 * @return */ @@ -244,10 +251,14 @@ public class MessageController implements MessageApi { String sendImAccount = accountService.registerAccountIfAbsent( sender.getPersonId(), sender.getOuId(), sender.getAppType()); BizAssertions.assertNotNull(sendImAccount, "创建账号失败"); + boolean syncSend = request.isTrySyncSend() + && CollectionUtils.isEmpty(request.receivePersonsOrEmpty()) + && request.getImReceiveAccounts() != null + && request.getImReceiveAccounts().size() == 1; MessageTask.BizData bizData = MessageTask.BizData.builder() .messageBody(JSON.toJSONString(request.getMessageBody())) .isSenderRobot(false) - .historyCreated(true) + .syncSend(syncSend) .senderPersonId(request.determineSenderPersonId()) .nimMessageType(request.getMessageType()) .build(); @@ -263,7 +274,7 @@ public class MessageController implements MessageApi { .build()); } } - MessageTask messageTask = transactionTemplate.execute(unused -> { + Long taskId = transactionTemplate.execute(unused -> { MessageTask task = messageTaskService.create(MessageTask.builder() .bizId(request.getBizId()) .sendImAccount(sendImAccount) @@ -275,12 +286,21 @@ public class MessageController implements MessageApi { .sendPriority(SendPriority.CHAT_MESSAGE.getPriority()) .apiChannel(ApiChannel.COMMON_MESSAGE) .build()); - task = messageTaskService.getById(task.getId()); - messageTaskService.createMessageHistory(task); - return task; + if (syncSend) { + task = messageTaskService.getById(task.getId()); + List historyIds = messageTaskService.createMessageHistory(task); + MessageHistory history = messageHistoryService.getById(historyIds.get(0)); + MessageDispatchResponse response = commonSendOneHandler.send(history); + if (response.isSuccess()) { + messageHistoryService.setSendSuccess(history, response.getMsgid(), null); + } else { + log.warn("sendChatMessage, send failed, historyId={}, taskId={}, bizId={}, failReason={}", + history.getId(), history.getImMessageTaskId(), history.getBizId(), response.getDesc()); + } + } + return task.getId(); }); - //noinspection DataFlowIssue - return ApiResult.ok(messageTask.getId()); + return ApiResult.ok(taskId); } private void ensureImAccountNotBlank(String imAccount) { diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java b/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java index 621c8f8..0119c8a 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/HistoryRecordExt.java @@ -37,4 +37,5 @@ public class HistoryRecordExt { private Long workspaceId; private Integer nimMessageType; + private boolean syncSend; } \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java index 0cc8094..c3b00fd 100644 --- a/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java +++ b/im-center-server/src/main/java/cn/axzo/im/entity/MessageTask.java @@ -170,7 +170,7 @@ public class MessageTask { private NimMessageType nimMessageType; - private boolean historyCreated; + private boolean syncSend; public boolean determineIsSenderRobot() { return isSenderRobot != null && isSenderRobot; diff --git a/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java b/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java index 2591e81..885755d 100644 --- a/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java +++ b/im-center-server/src/main/java/cn/axzo/im/job/CreateMessageHistoryJob.java @@ -59,9 +59,6 @@ public class CreateMessageHistoryJob extends IJobHandler { Page page = messageTaskService.page(req); if (CollectionUtils.isNotEmpty(page.getRecords())) { page.getRecords().forEach(messageTask -> { - MessageTask.BizData bizData = messageTask.getBizData(); - if (bizData != null && bizData.isHistoryCreated()) - return; count.set(count.get() + 1); try { messageTaskService.createMessageHistory(messageTask); diff --git a/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendBatchHandler.java b/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendBatchHandler.java index 6cb65f0..ca50b59 100644 --- a/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendBatchHandler.java +++ b/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendBatchHandler.java @@ -13,6 +13,7 @@ import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; import java.util.List; +import java.util.stream.Collectors; /** * @author yanglin @@ -27,24 +28,28 @@ public class CommonSendBatchHandler extends SendBatchHandler { @Override public void sendAndSubmitUpdate(SendExecutor> executor, List histories) { - if (CollectionUtils.isEmpty(histories)) return; + List effectiveHistories = histories.stream() + .filter(h -> !h.getOrCreateRecordExt().isSyncSend()) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(effectiveHistories)) + return; executor.log("batchSendMessage - request record size: {}, batchNo={}", - histories.size(), histories.get(0).determineBatchNo().orElse(null)); - MessageHistory sample = histories.get(0); + effectiveHistories.size(), effectiveHistories.get(0).determineBatchNo().orElse(null)); + MessageHistory sample = effectiveHistories.get(0); MessageBatchDispatchRequest batchRequest = new MessageBatchDispatchRequest(); batchRequest.setBody(sample.getMessageBody()); batchRequest.setFromAccid(sample.getFromAccount()); - batchRequest.setToAccids(Lists.transform(histories, MessageHistory::getToAccount)); + batchRequest.setToAccids(Lists.transform(effectiveHistories, MessageHistory::getToAccount)); batchRequest.setPayload(sample.getOrCreateRecordExt().getPayload()); batchRequest.populateOption(); - messageHistoryNimLogger.logAsync(histories, batchRequest); + messageHistoryNimLogger.logAsync(effectiveHistories, batchRequest); MessageBatchDispatchResponse response = imChannelProvider.dispatchBatchMessage(batchRequest); if (response.isRateLimited()) - executor.scheduleRetrySend(histories, null); + executor.scheduleRetrySend(effectiveHistories, null); else if (response.isSuccess()) - executor.setBatchSendSuccess(histories, response, null); + executor.setBatchSendSuccess(effectiveHistories, response, null); else - executor.setSendFail(histories, response.getDesc(), null); + executor.setSendFail(effectiveHistories, response.getDesc(), null); } @Override diff --git a/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendOneHandler.java b/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendOneHandler.java index 285632c..daa7ddb 100644 --- a/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendOneHandler.java +++ b/im-center-server/src/main/java/cn/axzo/im/send/handler/CommonSendOneHandler.java @@ -24,9 +24,21 @@ public class CommonSendOneHandler extends SendOneHandler { @Override public void sendAndSubmitUpdate(SendExecutor executor, MessageHistory history) { + if (history.getOrCreateRecordExt().isSyncSend()) + return; executor.log("sendMessage - historyId={}, taskId={}, bizId={}, batchNo={}", history.getId(), history.getImMessageTaskId(), history.getBizId(), history.determineBatchNo().orElse(null)); + MessageDispatchResponse response = send(history); + if (response.isRateLimited()) + executor.scheduleRetrySend(history, null); + else if (response.isSuccess()) + executor.submitSetSendSuccess(history, response.getMsgid()); + else + executor.submitSetSendFail(history, response.getDesc()); + } + + public MessageDispatchResponse send(MessageHistory history) { MessageDispatchRequest sendRequest = new MessageDispatchRequest(); sendRequest.setFrom(history.getFromAccount()); sendRequest.setOpe(history.isSendToGroup() ? 1 : 0); @@ -40,13 +52,7 @@ public class CommonSendOneHandler extends SendOneHandler { sendRequest.setPayload(history.getOrCreateRecordExt().getPayload()); sendRequest.populateOption(); messageHistoryNimLogger.logAsync(history, sendRequest); - MessageDispatchResponse response = imChannelProvider.dispatchMessage(sendRequest); - if (response.isRateLimited()) - executor.scheduleRetrySend(history, null); - else if (response.isSuccess()) - executor.submitSetSendSuccess(history, response.getMsgid()); - else - executor.submitSetSendFail(history, response.getDesc()); + return imChannelProvider.dispatchMessage(sendRequest); } @Override diff --git a/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java b/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java index 5faa80b..78a1e84 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/MessageTaskService.java @@ -22,7 +22,7 @@ public interface MessageTaskService extends IService { Page page(PageMessageTaskParam param); - void createMessageHistory(MessageTask messageTask); + List createMessageHistory(MessageTask messageTask); void update(UpdateMessageTaskParam param); diff --git a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java index 173f74f..6847869 100644 --- a/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java +++ b/im-center-server/src/main/java/cn/axzo/im/service/impl/MessageTaskServiceImpl.java @@ -45,7 +45,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; -import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -105,7 +105,7 @@ public class MessageTaskServiceImpl extends ServiceImpl createMessageHistory(MessageTask messageTask) { this.update(UpdateMessageTaskParam.builder() .id(messageTask.getId()) @@ -113,12 +113,13 @@ public class MessageTaskServiceImpl extends ServiceImpl historyIds; if (bizData.isAllPerson()) { log.info("发送全员消息, taskId={}", messageTask.getId()); - doSendAll(messageTask, bizData); + historyIds = doSendAll(messageTask, bizData); } else { log.info("发送非全员消息, taskId={}", messageTask.getId()); - doSendNotAll(messageTask); + historyIds = doSendNotAll(messageTask); } this.update(UpdateMessageTaskParam.builder() @@ -126,6 +127,7 @@ public class MessageTaskServiceImpl extends ServiceImpl doSendAll(MessageTask messageTask, MessageTask.BizData bizData) { Integer pageNumber = 1; String batchNo = UUIDUtil.uuidString(); + ArrayList historyIds = new ArrayList<>(); while (true) { Page page = accountRegisterService.page(AccountRegisterService.PageAccountRegisterParam.builder() .accountType(AccountTypeEnum.USER.getCode()) @@ -159,24 +162,27 @@ public class MessageTaskServiceImpl extends ServiceImpl receivePersons = page.getRecords().stream() .map(e -> MessageTask.ReceivePerson.builder().imAccount(e.getImAccount()).build()) .collect(Collectors.toList()); - saveMessageHistory(batchNo, receivePersons, messageTask, false); + historyIds.addAll(saveMessageHistory(batchNo, receivePersons, messageTask, false)); } if (!page.hasNext()) { break; } } + return historyIds; } - private void doSendNotAll(MessageTask messageTask) { + private List doSendNotAll(MessageTask messageTask) { int totalPersonSize = messageTask.getReceivePersons().size(); String batchNo = UUIDUtil.uuidString(); // 防止sql过长 List> receivePersons = Lists.partition(messageTask.getReceivePersons(), DEFAULT_PAGE_SIZE); - receivePersons.forEach(e -> saveMessageHistory(batchNo, e, messageTask, totalPersonSize <= 2)); + return receivePersons.stream() + .flatMap(e -> saveMessageHistory(batchNo, e, messageTask, totalPersonSize <= 2).stream()) + .collect(Collectors.toList()); } - private void saveMessageHistory(String batchNo, List receivePersons, + private List saveMessageHistory(String batchNo, List receivePersons, MessageTask messageTask, boolean tryCreateAccount) { // 排除已经发送成功的记录,防止重复发送 Set existPersons = listExistPerson(receivePersons, messageTask); @@ -197,7 +203,7 @@ public class MessageTaskServiceImpl extends ServiceImpl accountRegisters = listAccountRegisters(absentReceivePersons); @@ -213,6 +219,7 @@ public class MessageTaskServiceImpl extends ServiceImpl