Merge branch 'feature/REQ-1465' of axzsource.com:universal/infrastructure/backend/msg-center-plat into dev

This commit is contained in:
luofu 2023-10-25 15:35:38 +08:00
commit 828049a707
10 changed files with 145 additions and 63 deletions

View File

@ -144,7 +144,7 @@ public class MessageRecordServiceImpl implements MessageRecordService {
Lists.partition(Lists.newArrayList(message.getToId()), partitionSize).forEach(toIds -> {
Map<Long, MessageRecord> toIdRecordMap = Maps.newHashMap();
List<MessageRecord> messageRecords = saveBatch(basic, toIds, message.getToldIdPersonIdMap(), toIdRecordMap);
generalMessageMapperService.asyncBatchSendMessage(message, toIds, toIdRecordMap);
generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap);
pushMessages.addAll(messageRecords);
});
if(pushAthena) {

View File

@ -29,8 +29,9 @@ public class GeneralMessageController implements GeneralMessageClient {
private final GeneralMessageOldService generalMessageOldService;
@Override
public CommonResponse<String> pageQueryOldMessage(GeneralMessageSendRequest request) {
return CommonResponse.success(generalMessageService.batchSendMessage(request));
public CommonResponse<Void> batchSend(GeneralMessageSendRequest request) {
generalMessageService.batchSendMessage(request);
return CommonResponse.success();
}
@Override

View File

@ -0,0 +1,47 @@
package cn.axzo.msg.center.message.domain.dto;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
/**
* @author cold_blade
* @date 2023/10/25
* @version 1.0
*/
@Setter
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SendImMessageDTO implements Serializable {
private static final long serialVersionUID = 3385679937092148803L;
/**
* 自然人id
*/
private Long personId;
/**
* im消息id
*/
private String imMsgId;
public static SendImMessageDTO from(MessageDispatchResp resp) {
return SendImMessageDTO.builder()
.imMsgId(resp.getMsgid())
.personId(Long.parseLong(resp.getPersonId()))
.build();
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -2,7 +2,6 @@ package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.service.dto.PersonDTO;
import java.util.List;
import java.util.Map;
@ -20,10 +19,7 @@ public interface GeneralMessageMapperService {
* 异步批量发送消息
*
* @param request 发送消息时的请求参数
* @param subReceiverIds 接收者id列表
* @param toIdMessageRecordMap 接收者身份id与旧消息记录的映射关系
* @return 批量发送消息请求的requestId
*/
String asyncBatchSendMessage(GeneralMessageReq request, List<Long> subReceiverIds,
Map<Long, MessageRecord> toIdMessageRecordMap);
void asyncBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap);
}

View File

@ -1,9 +1,12 @@
package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import java.util.List;
/**
* @author cold_blade
* @date 2023/10/19
@ -17,7 +20,7 @@ public interface GeneralMessageService {
* @param request 消息所需参数
* @return 请求的唯一标识
*/
String batchSendMessage(GeneralMessageSendRequest request);
List<SendImMessageDTO> batchSendMessage(GeneralMessageSendRequest request);
/**
* 统计旧数据

View File

@ -3,6 +3,7 @@ package cn.axzo.msg.center.message.service.impl;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
@ -12,21 +13,22 @@ import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.axzo.msg.center.utils.UUIDUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@ -55,50 +57,69 @@ public class GeneralMessageMapperServiceImpl implements GeneralMessageMapperServ
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
@Override
@Transactional(rollbackFor = Exception.class)
public String asyncBatchSendMessage(GeneralMessageReq request, List<Long> subReceiverIds,
Map<Long, MessageRecord> toIdMessageRecordMap) {
public void asyncBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap) {
if (MsgTypeEnum.PENDING_MESSAGE.equals(request.getType())) {
log.info("pending message is not supported.");
return UUIDUtil.uuidRawString();
return;
}
if (Objects.isNull(request.getRelationId()) || MapUtil.isEmpty(request.getToldIdPersonIdMap())) {
log.info("param is invalid. relationId:[{}], toIdPersonIdMap:{}", request.getRelationId(),
request.getToldIdPersonIdMap());
return UUIDUtil.uuidRawString();
if (Objects.isNull(request.getRelationId())) {
log.info("relation id is null.");
return;
}
Optional<IdentityTypeEnum> identityTypeOp = PersonIdentityUtil.toIdentityType(request.getReceiveType());
if (!identityTypeOp.isPresent()) {
log.info("identity is invalid. relationId:[{}], receiveType:{}", request.getRelationId(),
request.getReceiveType());
return UUIDUtil.uuidRawString();
if (MapUtil.isEmpty(request.getToldIdPersonIdMap())) {
// 由于IM那边是根据personId来创建账户的,所以强依赖personId
log.info("toIdPersonIdMap is empty.");
return;
}
// 异步发送IM消息
CompletableFuture.runAsync(() -> doBatchSendMessage(request, subReceiverIds, toIdMessageRecordMap,
identityTypeOp.get()), asyncSendMsgExecutorService);
return UUIDUtil.uuidRawString();
CompletableFuture.runAsync(() -> doBatchSendMessage(request, toIdMessageRecordMap),
asyncSendMsgExecutorService);
}
private void doBatchSendMessage(GeneralMessageReq request, List<Long> subReceiverIds,
Map<Long, MessageRecord> toIdMessageRecordMap, IdentityTypeEnum identityType) {
private void doBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap) {
String templateCode = relationTemplateMapService.queryByRelationId(request.getRelationId()).orElse(null);
if (StringUtils.isBlank(templateCode)) {
log.info("the relationId([{}]) is not map any new message template. ", request.getRelationId());
return;
}
log.info("start to send im message. relationId:[{}]", request.getRelationId());
Map<Long, Long> msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream()
.filter(e -> request.getToldIdPersonIdMap().containsKey(e.getKey()))
.collect(Collectors.toMap(e -> e.getValue().getId(), e -> request.getToldIdPersonIdMap().get(e.getKey())));
log.info("message send twice. relationId:[{}], msgIds:{}", request.getRelationId(), msgRecordIdPersonIdMap.values());
// 双发记录
messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap);
// 发送IM消息
generalMessageService.batchSendMessage(convert(request, templateCode, subReceiverIds, identityType));
log.info("start to send im message. relationId:[{}], templateCode:[{}]", request.getRelationId(), templateCode);
try {
// 发送IM消息
GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode, toIdMessageRecordMap.keySet());
// IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端)
List<SendImMessageDTO> result = generalMessageService.batchSendMessage(sendImMsgRequest);
if (CollectionUtils.isEmpty(result)) {
log.info("there is not any person successfully send im message. relationId:[{}]", request.getRelationId());
return;
}
// 记录发送了IM消息的旧消息
recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap());
} catch (Exception e) {
log.warn("broke out some exception while sending im message. relationId:[{}]", request.getRelationId(), e);
}
}
private GeneralMessageSendRequest convert(GeneralMessageReq request, String templateCode,
List<Long> subReceiverIds, IdentityTypeEnum identityType) {
private void recordSendImMessage(Map<Long, MessageRecord> toIdMessageRecordMap, List<SendImMessageDTO> sendImResult,
Map<Long, Long> toIdPersonIdMap) {
// 成功发送了IM消息的personId集合
Set<Long> sucSendImMsgPersonIds = sendImResult.stream()
.map(SendImMessageDTO::getPersonId).collect(Collectors.toSet());
// 从入参中的toIdPersonIdMap中筛选发送成功的entry
Map<Long, Long> subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream()
// 过滤掉personId维度IM消息发送失败的entry
.filter(e -> sucSendImMsgPersonIds.contains(e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<Long, Long> msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream()
.filter(e -> subToIdPersonIdMap.containsKey(e.getKey()))
.collect(Collectors.toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey())));
log.info("record message that has been send im message. msgIds:{}", msgRecordIdPersonIdMap.keySet());
// 双发记录
messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap);
}
private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request, String templateCode,
Collection<Long> subReceiverIds) {
IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType());
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))

View File

@ -1,7 +1,9 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.response.MessageNewRes;
@ -13,6 +15,7 @@ import cn.axzo.msg.center.domain.enums.UserTypeEnum;
import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.dto.RawMessageRouterDTO;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.message.service.GeneralMessageOldService;
import cn.axzo.msg.center.message.service.GeneralMessageService;
@ -39,7 +42,10 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -70,16 +76,16 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
@Override
public String batchSendMessage(GeneralMessageSendRequest request) {
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public List<SendImMessageDTO> batchSendMessage(GeneralMessageSendRequest request) {
// 查询模板基础信息
MessageTemplateDTO template = messageTemplateNewService.queryByTemplateCode(request.getTemplateCode())
.orElseThrow(() -> new ServiceException("未查询到对应的模板"));
// 构建消息记录并存储
List<GeneralMessageRecord> messageRecords = buildMessageRecord(request, template);
generalMessageRecordDao.saveBatch(messageRecords);
// 异步推送
pushMessage(messageRecords, template);
return UUIDUtil.uuidRawString();
// 发送IM消息
return pushImMessage(messageRecords, template);
}
@Override
@ -129,10 +135,10 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
.build();
}
private void pushMessage(List<GeneralMessageRecord> messageRecords, MessageTemplateDTO template) {
private List<SendImMessageDTO> pushImMessage(List<GeneralMessageRecord> messageRecords, MessageTemplateDTO template) {
if (CollectionUtils.isEmpty(template.getPushTerminals())) {
// 模板未配置任何推送终端
return;
return Collections.emptyList();
}
GeneralMessageRecord record = messageRecords.get(0);
List<AppTypeEnum> appTypes = template.getPushTerminals().stream()
@ -140,7 +146,6 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
GeneralMessagePushVO message = convert(record, template);
MessageInfo msgInfo = new MessageInfo();
msgInfo.setAppTypeList(appTypes);
// TODO: [cold_blade] [P2] 第一期只支持发送机器人相关的消息
msgInfo.setToPersonIdList(Lists.newArrayList(messageRecords.stream()
.map(e -> String.valueOf(e.getReceiverPersonId()))
.collect(Collectors.toSet())));
@ -153,7 +158,14 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
ext.put("minAppVersion", template.getMinAppVersion());
msgInfo.setExtendsInfo(ext);
messageApi.sendMessage(msgInfo);
ApiResult<List<MessageDispatchResp>> result = messageApi.sendMessage(msgInfo);
if (result.isError() || CollectionUtils.isEmpty(result.getData())) {
log.warn("failed to batch send im message. result:{}", result);
return Collections.emptyList();
}
return result.getData().stream()
.map(SendImMessageDTO::from)
.collect(Collectors.toList());
}
private GeneralMessagePushVO convert(GeneralMessageRecord record, MessageTemplateDTO template) {

View File

@ -3,17 +3,18 @@ package cn.axzo.msg.center.utils;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import cn.axzo.msg.center.domain.enums.UserTypeEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Objects;
import java.util.Optional;
/**
* @description xxx
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
public class PersonIdentityUtil {
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PersonIdentityUtil {
public static UserTypeEnum toUserType(IdentityTypeEnum identityType) {
switch (identityType) {
@ -28,19 +29,19 @@ public class PersonIdentityUtil {
}
}
public static Optional<IdentityTypeEnum> toIdentityType(ReceiveTypeEnum receiveType) {
public static IdentityTypeEnum toIdentityType(ReceiveTypeEnum receiveType) {
if (Objects.isNull(receiveType)) {
return Optional.empty();
return IdentityTypeEnum.NOT_SUPPORT;
}
switch (receiveType) {
case CM_WORKER:
return Optional.of(IdentityTypeEnum.WORKER);
return IdentityTypeEnum.WORKER;
case CM_LEADER:
return Optional.of(IdentityTypeEnum.WORKER_LEADER);
return IdentityTypeEnum.WORKER_LEADER;
case CMP_USER:
return Optional.of(IdentityTypeEnum.PRACTITIONER);
return IdentityTypeEnum.PRACTITIONER;
default:
return Optional.empty();
return IdentityTypeEnum.NOT_SUPPORT;
}
}

View File

@ -2,10 +2,10 @@ package cn.axzo.msg.center.service.general.client;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.service.general.client.fallback.GeneralMessageClientFallback;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.axzo.msg.center.service.pending.client.fallback.PendingMessageClientFallback;
import cn.azxo.framework.common.model.CommonResponse;
import cn.azxo.framework.common.model.Page;
import org.springframework.cloud.openfeign.FeignClient;
@ -25,7 +25,7 @@ import javax.validation.Valid;
*/
@Component
@FeignClient(value = "msg-center", url = "${server.serviceUrl:http://msg-center:8080}",
fallback = PendingMessageClientFallback.class)
fallback = GeneralMessageClientFallback.class)
public interface GeneralMessageClient {
/**
@ -34,8 +34,9 @@ public interface GeneralMessageClient {
* @param request 消息所需参数
* @return 消息的唯一标识
*/
@Deprecated
@PostMapping(value = "/general-message/send", produces = {MediaType.APPLICATION_JSON_VALUE})
CommonResponse<String> pageQueryOldMessage(@RequestBody @Valid GeneralMessageSendRequest request);
CommonResponse<Void> batchSend(@RequestBody @Valid GeneralMessageSendRequest request);
/**
* 统计旧消息的未读数以及最新一条消息内容

View File

@ -21,9 +21,9 @@ import org.springframework.stereotype.Component;
public class GeneralMessageClientFallback implements GeneralMessageClient {
@Override
public CommonResponse<String> pageQueryOldMessage(GeneralMessageSendRequest request) {
log.error("fall back while sending message. req:{}", request);
return CommonResponse.error("fall back while sending message");
public CommonResponse<Void> batchSend(GeneralMessageSendRequest request) {
log.error("fall back while batch sending im message. req:{}", request);
return CommonResponse.error("fall back while batch sending im message");
}
@Override