REQ-2129: 使用新的api发送im消息

This commit is contained in:
yanglin 2024-03-21 16:40:39 +08:00
parent 13f1f0bba4
commit 3d65a4f1ee
6 changed files with 47 additions and 251 deletions

View File

@ -1,45 +0,0 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3.im;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toSet;
/**
* @author yanglin
*/
public class BatchSendResult {
private final Set<Long> requestPersonIds;
private final Map<Long, List<MessageDispatchResp>> personId2SendResult;
BatchSendResult(Set<String> requestPersonIds) {
this(requestPersonIds, Collections.emptyList());
}
BatchSendResult(Set<String> requestPersonIds, Collection<MessageDispatchResp> respList) {
this.requestPersonIds = requestPersonIds.stream()
.map(Long::valueOf)
.collect(toSet());
this.personId2SendResult = respList.stream()
// [2024.1.16]: im端在处理逐条发送和批量发送时的逻辑不一致, 批量发送时没有返回msgId -_-
.filter(r -> r.getPersonId() != null)
.collect(groupingBy(r -> Long.valueOf(r.getPersonId())));
}
public Optional<List<MessageDispatchResp>> findSendResult(Long personId) {
return Optional.ofNullable(personId2SendResult.get(personId));
}
public Set<Long> getRequestPersonIds() {
return requestPersonIds;
}
}

View File

@ -1,123 +0,0 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3.im;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.msg.center.common.utils.BeanConvertUtils;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Iterables;
import com.taobao.api.internal.util.NamedThreadFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ImClient {
// 10线程 + 190的队列
private final ScheduledExecutorService scheduleExecutor =
Executors.newScheduledThreadPool(10, new NamedThreadFactory("ImClient-send"));
// 强行做成有界队列, 避免内存爆了
private final Semaphore semaphore = new Semaphore(200);
private final MessageApi messageApi;
@Value("${msg-v3.im.batch-size:1000}")
private int batchSize;
@Value("${msg-v3.im.retry-count:3}")
private int retryCount;
@Value("${msg-v3.im.retry-wait-seconds:2}")
private long retryWaitSeconds;
@Value("${msg-v3.im.enqueue-wait-seconds:10}")
private long enqueueWaitSeconds;
public void asyncSend(MessageInfo req, BiConsumer<Exception, BatchSendResult> callback) {
AssertUtil.notEmpty(req.getToPersonIdList(), "receiver person ids can't be empty");
for (List<String> personIds : Iterables.partition(req.getToPersonIdList(), batchSize)) {
MessageInfo copy = BeanConvertUtils.copyBean(req, MessageInfo.class);
copy.setToPersonIdList(new HashSet<>(personIds));
enqueue(copy, callback, 0);
}
}
private void enqueue(
MessageInfo req, BiConsumer<Exception, BatchSendResult> callback, int retryCount) {
try {
if (semaphore.tryAcquire(enqueueWaitSeconds, TimeUnit.SECONDS)) {
scheduleExecutor.execute(() -> {
try {
doSend(req, callback, retryCount);
} finally {
semaphore.release();
}
});
} else {
String error = String.format("sending im messages overload. templateId=%s, header=%s, receiverPersonIds=%s",
req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()));
callback.accept(new RuntimeException(error), null);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 本地简单重试
*/
private void doSend(
MessageInfo req, BiConsumer<Exception, BatchSendResult> callback, int retryCount) {
if (retryCount > 0) {
log.warn("retry sending im message. templateId={}, header={}, receiverPersonIds={}, retryCount={}",
req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount);
}
BatchSendResult failBatchSendResult = new BatchSendResult(req.getToPersonIdList());
try {
ApiResult<List<MessageDispatchResp>> sendResult = messageApi.sendMessage(req);
log.info("sending im message result, req={}, resp={}", JSON.toJSONString(req), JSON.toJSONString(sendResult));
if (!sendResult.isSuccess()) {
log.warn("sending im message error. respCode={}, respMsg={}, templateId={}," +
" header={}, receiverPersonIds={}, retryCount={}, resp={}",
sendResult.getCode(), sendResult.getMsg(), req.getMsgTemplateId(),
req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount, sendResult);
callback.accept(new RuntimeException(sendResult.getMsg()), failBatchSendResult);
} else if (sendResult.getData() == null || sendResult.getData().isEmpty()) {
log.warn("sending im message error, empty response data. respCode={}, respMsg={}," +
" templateId={}, header={}, receiverPersonIds={}, retryCount={}, resp={}",
sendResult.getCode(), sendResult.getMsg(), req.getMsgTemplateId(),
req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount, sendResult);
callback.accept(new RuntimeException("im response with empty data"), failBatchSendResult);
} else {
callback.accept(null, new BatchSendResult(req.getToPersonIdList(), sendResult.getData()));
}
} catch (Exception e) {
if (this.retryCount > 0 && retryCount <= this.retryCount) {
log.warn("error invoking im MessageApi#sendMessage, schedule retry." +
" retriedCount={} imReq={}", retryCount, req, e);
// 尽量在消息端进行重试, 因为是按template的纬度调用IM进行批量发送的
// 如果直接让上游调用端进行重试的话, 有重复发送的风险
// TODO(yl): 加入幂等控制
scheduleExecutor.schedule(
() -> enqueue(req, callback, retryCount + 1),
retryWaitSeconds * (retryCount + 1), TimeUnit.SECONDS);
} else {
log.error("error invoking im MessageApi#sendMessage. imReq={}", req, e);
callback.accept(e, failBatchSendResult);
}
}
}
}

View File

@ -1,32 +1,24 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3.msg;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.resp.MessageTaskResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.api.response.v3.MessageSendResultV3;
import cn.axzo.msg.center.api.response.v3.TemplateSendResultV3;
import cn.axzo.msg.center.dal.MessageRecordV3Dao;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.inside.notices.service.impl.v3.EventMappingProcessor;
import cn.axzo.msg.center.inside.notices.service.impl.v3.im.BatchSendResult;
import cn.axzo.msg.center.inside.notices.service.impl.v3.im.ImClient;
import cn.axzo.msg.center.inside.notices.utils.FunctionalTransactionTemplate;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
@ -38,10 +30,9 @@ import static java.util.stream.Collectors.toMap;
public class MessageMappingProcessor implements EventMappingProcessor {
private final MessageRecordV3Dao messageRecordV3Dao;
private final FunctionalTransactionTemplate transactionTemplate;
private final TerminalAppMapping terminalAppMapping;
private final MessageTemplateParser templateParser;
private final ImClient imClient;
private final MessageApi messageApi;
/**
* Scope("prototype") -> we're good
@ -58,16 +49,18 @@ public class MessageMappingProcessor implements EventMappingProcessor {
public void maybeAsyncSend() {
List<AppTypeEnum> appTypes = terminalAppMapping
.toImTypes(template.getTemplate().getPushTerminals());
MessageInfo imRequest = template.buildImRequest(templateParser, appTypes);
imClient.asyncSend(imRequest, (e, batchResult) -> {
if (e != null) {
List<Long> messageIds = template.getMessageIdOfReceivers(batchResult.getRequestPersonIds());
messageRecordV3Dao.batchSetSendFailed(messageIds, e.getMessage());
} else {
// 不使用@Transactional不然还要把代码挪到其它类中
transactionTemplate.exec(() -> maybeSetSendSuccess(template, batchResult));
}
});
SendTemplateMessageParam request = template.buildImRequest(templateParser, appTypes);
ApiResult<MessageTaskResp> apiResult = messageApi.sendTemplateMessageAsync(request);
log.info("sending im message result, req={}, resp={}",
JSON.toJSONString(request), JSON.toJSONString(apiResult));
if (apiResult.isSuccess()) {
MessageTaskResp resp = apiResult.getData();
messageRecordV3Dao.setSendSuccess(template.getMessageIds(), resp.getId() + "");
} else {
messageRecordV3Dao.batchSetSendFailed(template.getMessageIds(), apiResult.getMsg());
log.warn("sending im message fail, req={}, resp={}",
JSON.toJSONString(request), JSON.toJSONString(apiResult));
}
}
@Override
@ -81,36 +74,4 @@ public class MessageMappingProcessor implements EventMappingProcessor {
return templateResult;
}
private void maybeSetSendSuccess(TemplateMessage template, BatchSendResult batchResult) {
Map<Long, MessageRecordV3> receiverId2Message = template.getMessageRecords().stream()
.collect(toMap(MessageRecordV3::getReceiverPersonId, identity()));
for (Long personId : batchResult.getRequestPersonIds()) {
MessageRecordV3 message = receiverId2Message.get(personId);
if (message == null) {
log.warn("can't find message with im person id? should never happen.... personId={}", personId);
continue;
}
List<MessageDispatchResp> respList = batchResult
.findSendResult(personId)
.orElse(Collections.emptyList());
String imMessageId = respList.stream()
.map(MessageDispatchResp::getMsgid)
.filter(Objects::nonNull)
.collect(joining(","));
String sendFailCause = respList.stream()
.map(MessageDispatchResp::getSendFailCause)
.filter(Objects::nonNull)
.collect(joining(","));
List<Long> messageIds = Collections.singletonList(message.getId());
if (StringUtils.isNotBlank(imMessageId)) {
// 把im端的id也存起来
messageRecordV3Dao.setSendSuccess(message.getId(), imMessageId);
} else if (StringUtils.isNotBlank(sendFailCause)) {
messageRecordV3Dao.batchSetSendFailed(messageIds, sendFailCause);
} else {
messageRecordV3Dao.batchSetSendFailed(messageIds, "云信没有返回message_id");
}
}
}
}

View File

@ -1,14 +1,14 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3.msg;
import cn.axzo.framework.jackson.utility.JSON;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam.ReceivePerson;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.api.enums.MsgStateV3Enum;
import cn.axzo.msg.center.api.request.v3.MessageSendReqV3;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.service.bizevent.request.ReachDto;
@ -24,9 +24,6 @@ import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
@ -63,18 +60,6 @@ public class TemplateMessage {
.collect(toList());
}
List<Long> getMessageIdOfReceivers(Set<Long> receiverPersonIds) {
return getMessageOfReceivers(receiverPersonIds).stream()
.map(BaseEntityExt::getId)
.collect(Collectors.toList());
}
List<MessageRecordV3> getMessageOfReceivers(Set<Long> receiverPersonIds) {
return getMessageRecords().stream()
.filter(m -> receiverPersonIds.contains(m.getReceiverPersonId()))
.collect(toList());
}
List<MessageRecordV3> getMessageRecords() {
if (records != null) {
return records;
@ -116,19 +101,24 @@ public class TemplateMessage {
return records;
}
MessageInfo buildImRequest(MessageTemplateParser templateParser, List<AppTypeEnum> apps) {
SendTemplateMessageParam buildImRequest(MessageTemplateParser templateParser, List<AppTypeEnum> apps) {
MessageRecordV3 sample = getMessageRecords().get(0);
GeneralMessagePushVO sendVo = templateParser.parse(sample, template);
MessageInfo imReq = new MessageInfo();
imReq.setAppTypeList(apps);
imReq.setToPersonIdList(req.stringReceiverIds());
SendTemplateMessageParam imReq = new SendTemplateMessageParam();
imReq.setMsgHeader(parseTitle());
imReq.setMsgContent(parseContent());
imReq.setMsgTemplateId(template.getCode());
imReq.setMsgTemplateContent(JSON.toJSONString(sendVo));
// 接收人
ArrayList<ReceivePerson> receivers = new ArrayList<>();
for (String personId : req.stringReceiverIds()) {
for (AppTypeEnum app : apps)
receivers.add(new ReceivePerson(personId, req.getReceiversOuId(), app));
}
imReq.setReceivePersons(receivers);
// 扩展信息
Map<String, String> ext = new HashMap<>();
JSONObject ext = new JSONObject();
ext.put("minAppVersion", template.getMinAppVersion());
if (sample.getReceiverWorkspaceId() != null) {
ext.put("workspaceId", String.valueOf(sample.getReceiverWorkspaceId()));
@ -136,7 +126,7 @@ public class TemplateMessage {
if (sample.getReceiverOuId() != null) {
ext.put("ouId", String.valueOf(sample.getReceiverOuId()));
}
imReq.setExtendsInfo(ext);
imReq.setExt(ext);
return imReq;
}

View File

@ -624,7 +624,7 @@ public class PendingMessageNewServiceImpl implements PendingMessageNewService {
log.info("the [{}] record is personId.", personId);
if (personId != null) {
CustomMessageInfo messageInfo = CustomMessageInfo.builder()
.appTypeList(Lists.newArrayList(AppTypeEnum.CM))
.appTypeList(Lists.newArrayList(AppTypeEnum.CM, AppTypeEnum.CMP))
.toPersonId(String.valueOf(personId))
.bizType(BizTypeEnum.PENDING)
.build();

View File

@ -20,10 +20,23 @@ import java.util.Date;
@Component
public class MessageRecordV3Dao extends ServiceImpl<MessageRecordV3Mapper, MessageRecordV3> {
public void setSendSuccess(Long msgId, @Nullable String imMsgId) {
public void setSendSuccess(Collection<Long> messageIds, @Nullable String imMsgId) {
if (CollectionUtils.isEmpty(messageIds))
return;
lambdaUpdate()
.eq(MessageRecordV3::getState, MsgStateV3Enum.UNSENT)
.eq(MessageRecordV3::getId, msgId)
.in(MessageRecordV3::getId, messageIds)
.set(MessageRecordV3::getState, MsgStateV3Enum.SEND_SUCCESS)
.set(MessageRecordV3::getUpdateAt, new Date())
.set(MessageRecordV3::getSendTime, new Date())
.set(StringUtils.isNotBlank(imMsgId), MessageRecordV3::getImMsgId, imMsgId)
.update();
}
public void setSendSuccess(Long messageId, @Nullable String imMsgId) {
lambdaUpdate()
.eq(MessageRecordV3::getState, MsgStateV3Enum.UNSENT)
.eq(MessageRecordV3::getId, messageId)
.set(MessageRecordV3::getState, MsgStateV3Enum.SEND_SUCCESS)
.set(MessageRecordV3::getUpdateAt, new Date())
.set(MessageRecordV3::getSendTime, new Date())