REQ-1898: 分批发送消息

This commit is contained in:
yanglin 2024-01-16 09:23:25 +08:00
parent 45a1df94e1
commit 29c126454f
6 changed files with 123 additions and 27 deletions

View File

@ -0,0 +1,42 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3.im;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
/**
* @author yanglin
*/
public class BatchSendResult {
private final Set<Long> requestedPersonIds;
private final Map<Long, MessageDispatchResp> personId2SendResult;
BatchSendResult(Set<String> requestedPersonIds) {
this(requestedPersonIds, Collections.emptyList());
}
BatchSendResult(Set<String> requestedPersonIds, Collection<MessageDispatchResp> respList) {
this.requestedPersonIds = requestedPersonIds.stream()
.map(Long::valueOf)
.collect(toSet());
this.personId2SendResult = respList.stream()
.collect(toMap(r -> Long.valueOf(r.getPersonId()), identity()));
}
public MessageDispatchResp findSendResult(Long personId) {
return personId2SendResult.get(personId);
}
public Set<Long> getRequestedPersonIds() {
return requestedPersonIds;
}
}

View File

@ -1,15 +1,20 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3;
package cn.axzo.msg.center.inside.notices.service.impl.v3.im;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.msg.center.common.utils.BeanConvertUtils;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Iterables;
import com.taobao.api.internal.util.NamedThreadFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -31,14 +36,28 @@ public class ImClient {
private final Semaphore semaphore = new Semaphore(200);
private final MessageApi messageApi;
public void asyncSend(MessageInfo req, BiConsumer<Exception, List<MessageDispatchResp>> callback) {
enqueue(req, callback, 0);
@Value("${msg-v3.im.batch-size:1000}")
private int batchSize;
@Value("${msg-v3.im.retry-count:3}")
private int retryCount;
@Value("${msg-v3.im.retry-wait-seconds:2}")
private long retryWaitSeconds;
@Value("${msg-v3.im.enqueue-wait-seconds:10}")
private long enqueueWaitSeconds;
public void asyncSend(MessageInfo req, BiConsumer<Exception, BatchSendResult> callback) {
AssertUtil.notEmpty(req.getToPersonIdList(), "receiver person ids can't be empty");
for (List<String> personIds : Iterables.partition(req.getToPersonIdList(), batchSize)) {
MessageInfo copy = BeanConvertUtils.copyBean(req, MessageInfo.class);
copy.setToPersonIdList(new HashSet<>(personIds));
enqueue(copy, callback, 0);
}
}
private void enqueue(
MessageInfo req, BiConsumer<Exception, List<MessageDispatchResp>> callback, int retryCount) {
MessageInfo req, BiConsumer<Exception, BatchSendResult> callback, int retryCount) {
try {
if (semaphore.tryAcquire(30, TimeUnit.SECONDS)) {
if (semaphore.tryAcquire(enqueueWaitSeconds, TimeUnit.SECONDS)) {
scheduleExecutor.execute(() -> {
try {
doSend(req, callback, retryCount);
@ -60,11 +79,12 @@ public class ImClient {
* 本地简单重试
*/
private void doSend(
MessageInfo req, BiConsumer<Exception, List<MessageDispatchResp>> callback, int retryCount) {
MessageInfo req, BiConsumer<Exception, BatchSendResult> callback, int retryCount) {
if (retryCount > 0) {
log.warn("retry sending im message. templateId={}, header={}, receiverPersonIds={}, retryCount={}",
req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount);
}
BatchSendResult failBatchSendResult = new BatchSendResult(req.getToPersonIdList());
try {
ApiResult<List<MessageDispatchResp>> sendResult = messageApi.sendMessage(req);
log.info("sending im message result, req={}, resp={}", JSON.toJSONString(req), JSON.toJSONString(sendResult));
@ -74,7 +94,7 @@ public class ImClient {
sendResult.getCode(), sendResult.getMsg(), req.getMsgTemplateId(), req.getMsgHeader(),
JSON.toJSONString(req.getToPersonIdList()), retryCount, sendResult);
log.warn(error);
callback.accept(new RuntimeException(sendResult.getMsg()), null);
callback.accept(new RuntimeException(sendResult.getMsg()), failBatchSendResult);
return;
}
if (sendResult.getData() == null || sendResult.getData().isEmpty()) {
@ -83,19 +103,20 @@ public class ImClient {
sendResult.getCode(), sendResult.getMsg(), req.getMsgTemplateId(), req.getMsgHeader(),
JSON.toJSONString(req.getToPersonIdList()), retryCount, sendResult);
log.warn(error);
callback.accept(new RuntimeException("im response with empty data"), null);
callback.accept(new RuntimeException("im response with empty data"), failBatchSendResult);
} else {
callback.accept(null, sendResult.getData());
callback.accept(null, new BatchSendResult(req.getToPersonIdList(), sendResult.getData()));
}
} catch (Exception e) {
if (retryCount > 3) {
callback.accept(e, null);
} else {
if (this.retryCount > 0 && retryCount <= this.retryCount) {
// 尽量在消息端进行重试, 因为是按template的纬度调用IM进行批量发送的
// 如果直接让上游调用端进行重试的话, 有重复发送的风险
// TODO(yl): 加入幂等控制
scheduleExecutor.schedule(
() -> enqueue(req, callback, retryCount + 1),
2L * (retryCount + 1), TimeUnit.SECONDS);
retryWaitSeconds * (retryCount + 1), TimeUnit.SECONDS);
} else {
callback.accept(e, failBatchSendResult);
}
}
}

View File

@ -8,10 +8,12 @@ import cn.axzo.msg.center.api.response.v3.TemplateSendResultV3;
import cn.axzo.msg.center.dal.MessageRecordV3Dao;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.inside.notices.service.impl.v3.EventMappingProcessor;
import cn.axzo.msg.center.inside.notices.service.impl.v3.ImClient;
import cn.axzo.msg.center.inside.notices.service.impl.v3.im.BatchSendResult;
import cn.axzo.msg.center.inside.notices.service.impl.v3.im.ImClient;
import cn.axzo.msg.center.inside.notices.utils.FunctionalTransactionTemplate;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -19,11 +21,13 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
*/
@Slf4j
@Component
@Scope("prototype")
@RequiredArgsConstructor
@ -51,12 +55,13 @@ public class MessageMappingProcessor implements EventMappingProcessor {
List<AppTypeEnum> appTypes = terminalAppMapping
.toImTypes(template.getTemplate().getPushTerminals());
MessageInfo imRequest = template.buildImRequest(templateParser, appTypes);
imClient.asyncSend(imRequest, (e, respList) -> {
imClient.asyncSend(imRequest, (e, batchResult) -> {
if (e != null) {
messageRecordV3Dao.batchSetSendFailed(template.getMessageIds(), e.getMessage());
List<Long> messageIds = template.getMessageIdOfReceivers(batchResult.getRequestedPersonIds());
messageRecordV3Dao.batchSetSendFailed(messageIds, e.getMessage());
} else {
// 不使用@Transactional不然还要把代码挪到其它类中
transactionTemplate.exec(() -> maybeSetSendSuccess(template, respList));
transactionTemplate.exec(() -> maybeSetSendSuccess(template, batchResult));
}
});
}
@ -73,12 +78,16 @@ public class MessageMappingProcessor implements EventMappingProcessor {
return templateResult;
}
private void maybeSetSendSuccess(TemplateMessage batch, List<MessageDispatchResp> respList) {
Map<String, MessageDispatchResp> receiverId2SendResult = respList.stream()
.collect(toMap(MessageDispatchResp::getPersonId, Function.identity()));
for (MessageRecordV3 message : batch.getMessageRecords()) {
MessageDispatchResp sendResult = receiverId2SendResult.get(
String.valueOf(message.getReceiverPersonId()));
private void maybeSetSendSuccess(TemplateMessage batch, BatchSendResult batchResult) {
Map<Long, MessageRecordV3> receiverId2Message = batch.getMessageRecords().stream()
.collect(toMap(MessageRecordV3::getReceiverPersonId, identity()));
for (Long personId : batchResult.getRequestedPersonIds()) {
MessageRecordV3 message = receiverId2Message.get(personId);
if (message == null) {
log.warn("can't find message with im person id? should never happen.... personId={}", personId);
continue;
}
MessageDispatchResp sendResult = batchResult.findSendResult(personId);
if (sendResult != null) {
// 把im端的id也存起来
messageRecordV3Dao.setSendSuccess(message.getId(), sendResult.getMsgid());

View File

@ -8,6 +8,7 @@ import cn.axzo.msg.center.api.request.v3.MessageSendReqV3;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.service.bizevent.request.ReachDto;
@ -23,6 +24,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
@ -59,6 +62,18 @@ public class TemplateMessage {
.collect(toList());
}
List<Long> getMessageIdOfReceivers(Set<Long> receiverPersonIds) {
return getMessageOfReceivers(receiverPersonIds).stream()
.map(BaseEntityExt::getId)
.collect(Collectors.toList());
}
List<MessageRecordV3> getMessageOfReceivers(Set<Long> receiverPersonIds) {
return getMessageRecords().stream()
.filter(m -> receiverPersonIds.contains(m.getReceiverPersonId()))
.collect(toList());
}
List<MessageRecordV3> getMessageRecords() {
if (records != null) {
return records;

View File

@ -5,6 +5,7 @@ import cn.axzo.msg.center.dal.mapper.MessageRecordV3Mapper;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
import java.util.Collection;
@ -40,6 +41,9 @@ public class MessageRecordV3Dao extends ServiceImpl<MessageRecordV3Mapper, Messa
}
public void batchSetSendFailed(Collection<Long> messageIds, String cause) {
if (CollectionUtils.isEmpty(messageIds)) {
return;
}
lambdaUpdate()
.eq(MessageRecordV3::getState, MsgStateV3Enum.UNSENT)
.in(MessageRecordV3::getId, messageIds)

View File

@ -5,7 +5,6 @@ import cn.axzo.msg.center.api.request.v3.MessageSendReqV3;
import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
import cn.axzo.msg.center.common.utils.MiscUtils;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.junit.jupiter.api.Test;
@ -30,6 +29,15 @@ class MessageServiceV3Test {
*/
@Test
void send() {
for (int i = 0; i < 3; i++) {
sendImpl();
}
// 使用异步发送, 避免测试结束
MiscUtils.sleepQuietly(10, TimeUnit.SECONDS);
}
private void sendImpl() {
JSONObject bizExtParams = new JSONObject();
bizExtParams.put("name", "杨林");
@ -59,9 +67,6 @@ class MessageServiceV3Test {
MessageSendRespV3 resp = messageServiceV3.send(req);
String json = JSON.toJSONString(resp);
System.out.println(json);
// 使用异步发送, 避免测试结束
MiscUtils.sleepQuietly(10, TimeUnit.SECONDS);
}
}