diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java index 142569b8..e14d7c95 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java @@ -144,7 +144,7 @@ public class MessageRecordServiceImpl implements MessageRecordService { Lists.partition(Lists.newArrayList(message.getToId()), partitionSize).forEach(toIds -> { Map toIdRecordMap = Maps.newHashMap(); List messageRecords = saveBatch(basic, toIds, message.getToldIdPersonIdMap(), toIdRecordMap); - generalMessageMapperService.asyncBatchSendMessage(message, toIds, toIdRecordMap); + generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap); pushMessages.addAll(messageRecords); }); if(pushAthena) { diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java index face93a4..a921596a 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java @@ -29,8 +29,9 @@ public class GeneralMessageController implements GeneralMessageClient { private final GeneralMessageOldService generalMessageOldService; @Override - public CommonResponse pageQueryOldMessage(GeneralMessageSendRequest request) { - return CommonResponse.success(generalMessageService.batchSendMessage(request)); + public CommonResponse batchSend(GeneralMessageSendRequest request) { + generalMessageService.batchSendMessage(request); + return CommonResponse.success(); } @Override diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/domain/dto/SendImMessageDTO.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/domain/dto/SendImMessageDTO.java new file mode 100644 index 00000000..650088e9 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/domain/dto/SendImMessageDTO.java @@ -0,0 +1,47 @@ +package cn.axzo.msg.center.message.domain.dto; + +import cn.axzo.im.center.api.vo.resp.MessageDispatchResp; +import com.alibaba.fastjson.JSON; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.io.Serializable; + +/** + * @author cold_blade + * @date 2023/10/25 + * @version 1.0 + */ +@Setter +@Getter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SendImMessageDTO implements Serializable { + + private static final long serialVersionUID = 3385679937092148803L; + + /** + * 自然人id + */ + private Long personId; + /** + * im消息id + */ + private String imMsgId; + + public static SendImMessageDTO from(MessageDispatchResp resp) { + return SendImMessageDTO.builder() + .imMsgId(resp.getMsgid()) + .personId(Long.parseLong(resp.getPersonId())) + .build(); + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageMapperService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageMapperService.java index 7033f110..c30c5165 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageMapperService.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageMapperService.java @@ -2,7 +2,6 @@ package cn.axzo.msg.center.message.service; import cn.axzo.msg.center.api.request.GeneralMessageReq; import cn.axzo.msg.center.domain.entity.MessageRecord; -import cn.axzo.msg.center.service.dto.PersonDTO; import java.util.List; import java.util.Map; @@ -20,10 +19,7 @@ public interface GeneralMessageMapperService { * 异步批量发送消息 * * @param request 发送消息时的请求参数 - * @param subReceiverIds 接收者id列表 * @param toIdMessageRecordMap 接收者身份id与旧消息记录的映射关系 - * @return 批量发送消息请求的requestId */ - String asyncBatchSendMessage(GeneralMessageReq request, List subReceiverIds, - Map toIdMessageRecordMap); + void asyncBatchSendMessage(GeneralMessageReq request, Map toIdMessageRecordMap); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageService.java index e1f66514..eb2fa03a 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageService.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageService.java @@ -1,9 +1,12 @@ package cn.axzo.msg.center.message.service; +import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO; import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse; +import java.util.List; + /** * @author cold_blade * @date 2023/10/19 @@ -17,7 +20,7 @@ public interface GeneralMessageService { * @param request 消息所需参数 * @return 请求的唯一标识 */ - String batchSendMessage(GeneralMessageSendRequest request); + List batchSendMessage(GeneralMessageSendRequest request); /** * 统计旧数据 diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageMapperServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageMapperServiceImpl.java index cf7e1e67..c56761a4 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageMapperServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageMapperServiceImpl.java @@ -3,6 +3,7 @@ package cn.axzo.msg.center.message.service.impl; import cn.axzo.msg.center.api.enums.MsgTypeEnum; import cn.axzo.msg.center.api.request.GeneralMessageReq; import cn.axzo.msg.center.domain.entity.MessageRecord; +import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO; import cn.axzo.msg.center.message.service.GeneralMessageMapperService; import cn.axzo.msg.center.message.service.GeneralMessageService; import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService; @@ -12,21 +13,22 @@ import cn.axzo.msg.center.service.enums.IdentityTypeEnum; import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.utils.PersonIdentityUtil; -import cn.axzo.msg.center.utils.UUIDUtil; import cn.hutool.core.map.MapUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import jodd.util.concurrent.ThreadFactoryBuilder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -55,50 +57,69 @@ public class GeneralMessageMapperServiceImpl implements GeneralMessageMapperServ private final MessageSendTwiceRecordService messageSendTwiceRecordService; @Override - @Transactional(rollbackFor = Exception.class) - public String asyncBatchSendMessage(GeneralMessageReq request, List subReceiverIds, - Map toIdMessageRecordMap) { + public void asyncBatchSendMessage(GeneralMessageReq request, Map toIdMessageRecordMap) { if (MsgTypeEnum.PENDING_MESSAGE.equals(request.getType())) { log.info("pending message is not supported."); - return UUIDUtil.uuidRawString(); + return; } - if (Objects.isNull(request.getRelationId()) || MapUtil.isEmpty(request.getToldIdPersonIdMap())) { - log.info("param is invalid. relationId:[{}], toIdPersonIdMap:{}", request.getRelationId(), - request.getToldIdPersonIdMap()); - return UUIDUtil.uuidRawString(); + if (Objects.isNull(request.getRelationId())) { + log.info("relation id is null."); + return; } - Optional identityTypeOp = PersonIdentityUtil.toIdentityType(request.getReceiveType()); - if (!identityTypeOp.isPresent()) { - log.info("identity is invalid. relationId:[{}], receiveType:{}", request.getRelationId(), - request.getReceiveType()); - return UUIDUtil.uuidRawString(); + if (MapUtil.isEmpty(request.getToldIdPersonIdMap())) { + // 由于IM那边是根据personId来创建账户的,所以强依赖personId + log.info("toIdPersonIdMap is empty."); + return; } // 异步发送IM消息 - CompletableFuture.runAsync(() -> doBatchSendMessage(request, subReceiverIds, toIdMessageRecordMap, - identityTypeOp.get()), asyncSendMsgExecutorService); - return UUIDUtil.uuidRawString(); + CompletableFuture.runAsync(() -> doBatchSendMessage(request, toIdMessageRecordMap), + asyncSendMsgExecutorService); } - private void doBatchSendMessage(GeneralMessageReq request, List subReceiverIds, - Map toIdMessageRecordMap, IdentityTypeEnum identityType) { + private void doBatchSendMessage(GeneralMessageReq request, Map toIdMessageRecordMap) { String templateCode = relationTemplateMapService.queryByRelationId(request.getRelationId()).orElse(null); if (StringUtils.isBlank(templateCode)) { log.info("the relationId([{}]) is not map any new message template. ", request.getRelationId()); return; } - log.info("start to send im message. relationId:[{}]", request.getRelationId()); - Map msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream() - .filter(e -> request.getToldIdPersonIdMap().containsKey(e.getKey())) - .collect(Collectors.toMap(e -> e.getValue().getId(), e -> request.getToldIdPersonIdMap().get(e.getKey()))); - log.info("message send twice. relationId:[{}], msgIds:{}", request.getRelationId(), msgRecordIdPersonIdMap.values()); - // 双发记录 - messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap); - // 发送IM消息 - generalMessageService.batchSendMessage(convert(request, templateCode, subReceiverIds, identityType)); + log.info("start to send im message. relationId:[{}], templateCode:[{}]", request.getRelationId(), templateCode); + try { + // 发送IM消息 + GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode, toIdMessageRecordMap.keySet()); + // IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端) + List result = generalMessageService.batchSendMessage(sendImMsgRequest); + if (CollectionUtils.isEmpty(result)) { + log.info("there is not any person successfully send im message. relationId:[{}]", request.getRelationId()); + return; + } + // 记录发送了IM消息的旧消息 + recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap()); + } catch (Exception e) { + log.warn("broke out some exception while sending im message. relationId:[{}]", request.getRelationId(), e); + } } - private GeneralMessageSendRequest convert(GeneralMessageReq request, String templateCode, - List subReceiverIds, IdentityTypeEnum identityType) { + private void recordSendImMessage(Map toIdMessageRecordMap, List sendImResult, + Map toIdPersonIdMap) { + // 成功发送了IM消息的personId集合 + Set sucSendImMsgPersonIds = sendImResult.stream() + .map(SendImMessageDTO::getPersonId).collect(Collectors.toSet()); + // 从入参中的toIdPersonIdMap中筛选发送成功的entry + Map subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream() + // 过滤掉personId维度IM消息发送失败的entry + .filter(e -> sucSendImMsgPersonIds.contains(e.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream() + .filter(e -> subToIdPersonIdMap.containsKey(e.getKey())) + .collect(Collectors.toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey()))); + log.info("record message that has been send im message. msgIds:{}", msgRecordIdPersonIdMap.keySet()); + // 双发记录 + messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap); + } + + private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request, String templateCode, + Collection subReceiverIds) { + IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType()); List receivers = subReceiverIds.stream() .filter(request.getToldIdPersonIdMap()::containsKey) .map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType)) diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java index 140f590e..0fab0c6a 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java @@ -1,7 +1,9 @@ package cn.axzo.msg.center.message.service.impl; +import cn.axzo.framework.domain.web.result.ApiResult; import cn.axzo.im.center.api.feign.MessageApi; import cn.axzo.im.center.api.vo.req.MessageInfo; +import cn.axzo.im.center.api.vo.resp.MessageDispatchResp; import cn.axzo.im.center.common.enums.AppTypeEnum; import cn.axzo.msg.center.api.request.CmsMsgQueryReq; import cn.axzo.msg.center.api.response.MessageNewRes; @@ -13,6 +15,7 @@ import cn.axzo.msg.center.domain.enums.UserTypeEnum; import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig; import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO; import cn.axzo.msg.center.message.domain.dto.RawMessageRouterDTO; +import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO; import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO; import cn.axzo.msg.center.message.service.GeneralMessageOldService; import cn.axzo.msg.center.message.service.GeneralMessageService; @@ -39,7 +42,10 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,16 +76,16 @@ public class GeneralMessageServiceImpl implements GeneralMessageService { private final MessageSendTwiceRecordService messageSendTwiceRecordService; @Override - public String batchSendMessage(GeneralMessageSendRequest request) { + @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW) + public List batchSendMessage(GeneralMessageSendRequest request) { // 查询模板基础信息 MessageTemplateDTO template = messageTemplateNewService.queryByTemplateCode(request.getTemplateCode()) .orElseThrow(() -> new ServiceException("未查询到对应的模板")); // 构建消息记录并存储 List messageRecords = buildMessageRecord(request, template); generalMessageRecordDao.saveBatch(messageRecords); - // 异步推送 - pushMessage(messageRecords, template); - return UUIDUtil.uuidRawString(); + // 发送IM消息 + return pushImMessage(messageRecords, template); } @Override @@ -129,10 +135,10 @@ public class GeneralMessageServiceImpl implements GeneralMessageService { .build(); } - private void pushMessage(List messageRecords, MessageTemplateDTO template) { + private List pushImMessage(List messageRecords, MessageTemplateDTO template) { if (CollectionUtils.isEmpty(template.getPushTerminals())) { // 模板未配置任何推送终端 - return; + return Collections.emptyList(); } GeneralMessageRecord record = messageRecords.get(0); List appTypes = template.getPushTerminals().stream() @@ -140,7 +146,6 @@ public class GeneralMessageServiceImpl implements GeneralMessageService { GeneralMessagePushVO message = convert(record, template); MessageInfo msgInfo = new MessageInfo(); msgInfo.setAppTypeList(appTypes); - // TODO: [cold_blade] [P2] 第一期只支持发送机器人相关的消息 msgInfo.setToPersonIdList(Lists.newArrayList(messageRecords.stream() .map(e -> String.valueOf(e.getReceiverPersonId())) .collect(Collectors.toSet()))); @@ -153,7 +158,14 @@ public class GeneralMessageServiceImpl implements GeneralMessageService { ext.put("minAppVersion", template.getMinAppVersion()); msgInfo.setExtendsInfo(ext); - messageApi.sendMessage(msgInfo); + ApiResult> result = messageApi.sendMessage(msgInfo); + if (result.isError() || CollectionUtils.isEmpty(result.getData())) { + log.warn("failed to batch send im message. result:{}", result); + return Collections.emptyList(); + } + return result.getData().stream() + .map(SendImMessageDTO::from) + .collect(Collectors.toList()); } private GeneralMessagePushVO convert(GeneralMessageRecord record, MessageTemplateDTO template) { diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/utils/PersonIdentityUtil.java b/inside-notices/src/main/java/cn/axzo/msg/center/utils/PersonIdentityUtil.java index 62fa1be7..d567fdcd 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/utils/PersonIdentityUtil.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/utils/PersonIdentityUtil.java @@ -3,17 +3,18 @@ package cn.axzo.msg.center.utils; import cn.axzo.msg.center.api.enums.ReceiveTypeEnum; import cn.axzo.msg.center.domain.enums.UserTypeEnum; import cn.axzo.msg.center.service.enums.IdentityTypeEnum; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import java.util.Objects; -import java.util.Optional; /** - * @description xxx * @author cold_blade * @date 2023/10/23 * @version 1.0 */ -public class PersonIdentityUtil { +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class PersonIdentityUtil { public static UserTypeEnum toUserType(IdentityTypeEnum identityType) { switch (identityType) { @@ -28,19 +29,19 @@ public class PersonIdentityUtil { } } - public static Optional toIdentityType(ReceiveTypeEnum receiveType) { + public static IdentityTypeEnum toIdentityType(ReceiveTypeEnum receiveType) { if (Objects.isNull(receiveType)) { - return Optional.empty(); + return IdentityTypeEnum.NOT_SUPPORT; } switch (receiveType) { case CM_WORKER: - return Optional.of(IdentityTypeEnum.WORKER); + return IdentityTypeEnum.WORKER; case CM_LEADER: - return Optional.of(IdentityTypeEnum.WORKER_LEADER); + return IdentityTypeEnum.WORKER_LEADER; case CMP_USER: - return Optional.of(IdentityTypeEnum.PRACTITIONER); + return IdentityTypeEnum.PRACTITIONER; default: - return Optional.empty(); + return IdentityTypeEnum.NOT_SUPPORT; } } diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/GeneralMessageClient.java b/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/GeneralMessageClient.java index e69800b1..581798c6 100644 --- a/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/GeneralMessageClient.java +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/GeneralMessageClient.java @@ -2,10 +2,10 @@ package cn.axzo.msg.center.service.general.client; import cn.axzo.msg.center.api.request.CmsMsgQueryReq; import cn.axzo.msg.center.api.response.MessageNewRes; +import cn.axzo.msg.center.service.general.client.fallback.GeneralMessageClientFallback; import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse; -import cn.axzo.msg.center.service.pending.client.fallback.PendingMessageClientFallback; import cn.azxo.framework.common.model.CommonResponse; import cn.azxo.framework.common.model.Page; import org.springframework.cloud.openfeign.FeignClient; @@ -25,7 +25,7 @@ import javax.validation.Valid; */ @Component @FeignClient(value = "msg-center", url = "${server.serviceUrl:http://msg-center:8080}", - fallback = PendingMessageClientFallback.class) + fallback = GeneralMessageClientFallback.class) public interface GeneralMessageClient { /** @@ -34,8 +34,9 @@ public interface GeneralMessageClient { * @param request 消息所需参数 * @return 消息的唯一标识 */ + @Deprecated @PostMapping(value = "/general-message/send", produces = {MediaType.APPLICATION_JSON_VALUE}) - CommonResponse pageQueryOldMessage(@RequestBody @Valid GeneralMessageSendRequest request); + CommonResponse batchSend(@RequestBody @Valid GeneralMessageSendRequest request); /** * 统计旧消息的未读数以及最新一条消息内容 diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/fallback/GeneralMessageClientFallback.java b/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/fallback/GeneralMessageClientFallback.java index 92bb88ce..ac6d965e 100644 --- a/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/fallback/GeneralMessageClientFallback.java +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/service/general/client/fallback/GeneralMessageClientFallback.java @@ -21,9 +21,9 @@ import org.springframework.stereotype.Component; public class GeneralMessageClientFallback implements GeneralMessageClient { @Override - public CommonResponse pageQueryOldMessage(GeneralMessageSendRequest request) { - log.error("fall back while sending message. req:{}", request); - return CommonResponse.error("fall back while sending message"); + public CommonResponse batchSend(GeneralMessageSendRequest request) { + log.error("fall back while batch sending im message. req:{}", request); + return CommonResponse.error("fall back while batch sending im message"); } @Override