Merge branch 'feature/REQ-1465' of axzsource.com:universal/infrastructure/backend/msg-center-plat into dev

This commit is contained in:
luofu 2023-10-24 09:26:02 +08:00
commit d767e6b0d4
27 changed files with 735 additions and 51 deletions

View File

@ -28,6 +28,7 @@ import cn.axzo.msg.center.domain.enums.*;
import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq; import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq;
import cn.axzo.msg.center.inside.notices.event.SendMessageEvent; import cn.axzo.msg.center.inside.notices.event.SendMessageEvent;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService; import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.azxo.framework.common.utils.LogUtil; import cn.azxo.framework.common.utils.LogUtil;
import cn.azxo.framework.common.utils.LogUtil.ErrorLevel; import cn.azxo.framework.common.utils.LogUtil.ErrorLevel;
import cn.azxo.framework.common.utils.LogUtil.ErrorType; import cn.azxo.framework.common.utils.LogUtil.ErrorType;
@ -40,6 +41,7 @@ import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.text.StrSubstitutor; import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
@ -89,6 +91,9 @@ public class MessageRecordServiceImpl implements MessageRecordService {
/*@Resource /*@Resource
private IdentityProfileService identityProfileService;*/ private IdentityProfileService identityProfileService;*/
@Resource
private GeneralMessageMapperService generalMessageMapperService;
/** /**
* 新增推送消息接口 * 新增推送消息接口
* *
@ -137,10 +142,11 @@ public class MessageRecordServiceImpl implements MessageRecordService {
List<MessageRecord> pushMessages = new ArrayList<>(); List<MessageRecord> pushMessages = new ArrayList<>();
Lists.partition(Lists.newArrayList(message.getToId()), partitionSize).forEach(toIds -> { Lists.partition(Lists.newArrayList(message.getToId()), partitionSize).forEach(toIds -> {
List<MessageRecord> messageRecords = saveBatch(basic, toIds, message.getToldIdPersonIdMap()); Map<Long, MessageRecord> toIdRecordMap = Maps.newHashMap();
List<MessageRecord> messageRecords = saveBatch(basic, toIds, message.getToldIdPersonIdMap(), toIdRecordMap);
generalMessageMapperService.asyncBatchSendMessage(message, toIds, toIdRecordMap);
pushMessages.addAll(messageRecords); pushMessages.addAll(messageRecords);
}); });
if(pushAthena) { if(pushAthena) {
asyncPushAthena(message, messageTemplate.getAudioFileName(), messageModule.getModuleName(), pushMessages); asyncPushAthena(message, messageTemplate.getAudioFileName(), messageModule.getModuleName(), pushMessages);
} }
@ -163,7 +169,8 @@ public class MessageRecordServiceImpl implements MessageRecordService {
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW) @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public List<MessageRecord> saveBatch(MessageRecord basic, List<Long> toIds, Map<Long, Long> toldIdPersonIdMap) { public List<MessageRecord> saveBatch(MessageRecord basic, List<Long> toIds, Map<Long, Long> toldIdPersonIdMap,
Map<Long, MessageRecord> toIdRecordMap) {
if (CollectionUtils.isEmpty(toIds)) { if (CollectionUtils.isEmpty(toIds)) {
return Collections.emptyList(); return Collections.emptyList();
} }
@ -177,6 +184,7 @@ public class MessageRecordServiceImpl implements MessageRecordService {
messageRecord.setToId(0L); messageRecord.setToId(0L);
messageRecord.setPersonId(i); messageRecord.setPersonId(i);
pushMessages.add(messageRecord); pushMessages.add(messageRecord);
toIdRecordMap.put(i, messageRecord);
}); });
} else { } else {
toIds.forEach(i -> { toIds.forEach(i -> {
@ -188,6 +196,7 @@ public class MessageRecordServiceImpl implements MessageRecordService {
messageRecord.setPersonId(toldIdPersonIdMap.get(i)); messageRecord.setPersonId(toldIdPersonIdMap.get(i));
} }
pushMessages.add(messageRecord); pushMessages.add(messageRecord);
toIdRecordMap.put(i, messageRecord);
}); });
} }
messageRecordDao.saveBatch(pushMessages); messageRecordDao.saveBatch(pushMessages);

View File

@ -45,7 +45,6 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* @description xxx
* @author cold_blade * @author cold_blade
* @date 2023/9/13 * @date 2023/9/13
* @version 1.0 * @version 1.0

View File

@ -2,7 +2,9 @@ package cn.axzo.msg.center.message.controller;
import cn.axzo.msg.center.message.service.GeneralMessageService; import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.service.general.client.GeneralMessageClient; import cn.axzo.msg.center.service.general.client.GeneralMessageClient;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.azxo.framework.common.model.CommonResponse; import cn.azxo.framework.common.model.CommonResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -21,7 +23,13 @@ public class GeneralMessageController implements GeneralMessageClient {
private final GeneralMessageService generalMessageService; private final GeneralMessageService generalMessageService;
@Override @Override
public CommonResponse<String> sendMessage(GeneralMessageSendRequest request) { public CommonResponse<String> statisticOldData(GeneralMessageSendRequest request) {
return CommonResponse.success(generalMessageService.sendMessage(request)); return CommonResponse.success(generalMessageService.batchSendMessage(request));
}
@Override
public CommonResponse<GeneralMessageOldDataStatisticResponse> statisticOldData(
GeneralMessageOldDataStatisticRequest request) {
return CommonResponse.success(generalMessageService.statisticOldData(request));
} }
} }

View File

@ -0,0 +1,29 @@
package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.service.dto.PersonDTO;
import java.util.List;
import java.util.Map;
/**
* 新老版本消息映射的相关接口
*
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
public interface GeneralMessageMapperService {
/**
* 异步批量发送消息
*
* @param request 发送消息时的请求参数
* @param subReceiverIds 接收者id列表
* @param toIdMessageRecordMap 接收者身份id与旧消息记录的映射关系
* @return 批量发送消息请求的requestId
*/
String asyncBatchSendMessage(GeneralMessageReq request, List<Long> subReceiverIds,
Map<Long, MessageRecord> toIdMessageRecordMap);
}

View File

@ -1,6 +1,8 @@
package cn.axzo.msg.center.message.service; package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
/** /**
* @author cold_blade * @author cold_blade
@ -13,7 +15,15 @@ public interface GeneralMessageService {
* 发送消息 * 发送消息
* *
* @param request 消息所需参数 * @param request 消息所需参数
* @return 消息的唯一标识 * @return 请求的唯一标识
*/ */
String sendMessage(GeneralMessageSendRequest request); String batchSendMessage(GeneralMessageSendRequest request);
/**
* 统计旧数据
*
* @param request 统计参数
* @return 旧数据的未读数以及最新一条消息
*/
GeneralMessageOldDataStatisticResponse statisticOldData(GeneralMessageOldDataStatisticRequest request);
} }

View File

@ -0,0 +1,21 @@
package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.service.dto.PersonDTO;
import java.util.List;
import java.util.Map;
/**
* 双发消息记录service
*
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
public interface MessageSendTwiceRecordService {
List<Long> listByPerson(Long personId);
void batchSave(Map<Long, Long> msgRecordIdPersonIdMap);
}

View File

@ -0,0 +1,18 @@
package cn.axzo.msg.center.message.service;
import java.util.Map;
import java.util.Optional;
/**
* 新老模板的关联关系service
*
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
public interface RelationTemplateMapService {
Optional<String> queryByRelationId(Long relationId);
void mapRelationAndTemplate(Map<Long, String> map);
}

View File

@ -0,0 +1,123 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.RelationTemplateMapService;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.axzo.msg.center.utils.UUIDUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class GeneralMessageMapperServiceImpl implements GeneralMessageMapperService {
private final ThreadFactory asyncSendMsgThreadFactory = ThreadFactoryBuilder.create()
.setDaemon(true).setNameFormat("ASYNC_SEND_IM_MESSAGE_%d").get();
private final ExecutorService asyncSendMsgExecutorService = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024), asyncSendMsgThreadFactory);
private final GeneralMessageService generalMessageService;
private final RelationTemplateMapService relationTemplateMapService;
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
@Override
@Transactional(rollbackFor = Exception.class)
public String asyncBatchSendMessage(GeneralMessageReq request, List<Long> subReceiverIds,
Map<Long, MessageRecord> toIdMessageRecordMap) {
if (MsgTypeEnum.PENDING_MESSAGE.equals(request.getType())) {
log.info("pending message is not supported.");
return UUIDUtil.uuidRawString();
}
if (Objects.isNull(request.getRelationId()) || MapUtil.isEmpty(request.getToldIdPersonIdMap())) {
log.info("param is invalid. relationId:[{}], toIdPersonIdMap:{}", request.getRelationId(),
request.getToldIdPersonIdMap());
return UUIDUtil.uuidRawString();
}
Optional<IdentityTypeEnum> identityTypeOp = PersonIdentityUtil.toIdentityType(request.getReceiveType());
if (!identityTypeOp.isPresent()) {
log.info("identity is invalid. relationId:[{}], receiveType:{}", request.getRelationId(),
request.getReceiveType());
return UUIDUtil.uuidRawString();
}
// 异步发送IM消息
CompletableFuture.runAsync(() -> doBatchSendMessage(request, subReceiverIds, toIdMessageRecordMap,
identityTypeOp.get()), asyncSendMsgExecutorService);
return UUIDUtil.uuidRawString();
}
private void doBatchSendMessage(GeneralMessageReq request, List<Long> subReceiverIds,
Map<Long, MessageRecord> toIdMessageRecordMap, IdentityTypeEnum identityType) {
String templateCode = relationTemplateMapService.queryByRelationId(request.getRelationId()).orElse(null);
if (StringUtils.isBlank(templateCode)) {
log.info("the relationId([{}]) is not map any new message template. ", request.getRelationId());
return;
}
log.info("start to send im message. relationId:[{}]", request.getRelationId());
Map<Long, Long> msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream()
.filter(e -> request.getToldIdPersonIdMap().containsKey(e.getKey()))
.collect(Collectors.toMap(e -> e.getValue().getId(), e -> request.getToldIdPersonIdMap().get(e.getKey())));
log.info("message send twice. relationId:[{}], msgIds:{}", request.getRelationId(), msgRecordIdPersonIdMap.values());
// 双发记录
messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap);
// 发送IM消息
generalMessageService.batchSendMessage(convert(request, templateCode, subReceiverIds, identityType));
}
private GeneralMessageSendRequest convert(GeneralMessageReq request, String templateCode,
List<Long> subReceiverIds, IdentityTypeEnum identityType) {
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))
.collect(Collectors.toList());
OrganizationTypeEnum orgType = Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN :
OrganizationTypeEnum.valueOf(request.getTerminalType().name());
return GeneralMessageSendRequest.builder()
.templateCode(templateCode)
.receiver(receivers)
.orgType(orgType)
.orgName(request.getTerminalName())
.orgId(request.getTerminalId())
.bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse(""))
.routerParams(Optional.ofNullable(request.getRouterParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.bizExtParams(Optional.ofNullable(request.getMsgParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.build();
}
}

View File

@ -1,11 +1,18 @@
package cn.axzo.msg.center.message.service.impl; package cn.axzo.msg.center.message.service.impl;
import cn.axzo.core.domain.PageResult;
import cn.axzo.im.center.api.feign.MessageApi; import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.MessageInfo; import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.api.response.MessageTotalRes;
import cn.axzo.msg.center.common.exception.ServiceException; import cn.axzo.msg.center.common.exception.ServiceException;
import cn.axzo.msg.center.common.utils.PlaceholderResolver; import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.dal.GeneralMessageRecordDao; import cn.axzo.msg.center.dal.GeneralMessageRecordDao;
import cn.axzo.msg.center.domain.entity.GeneralMessageRecord; import cn.axzo.msg.center.domain.entity.GeneralMessageRecord;
import cn.axzo.msg.center.domain.enums.UserTypeEnum;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO; import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.dto.RawMessageRouterDTO; import cn.axzo.msg.center.message.domain.dto.RawMessageRouterDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO; import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
@ -13,13 +20,19 @@ import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageTemplateNewService; import cn.axzo.msg.center.message.service.MessageTemplateNewService;
import cn.axzo.msg.center.service.dto.MessageCardContentItemDTO; import cn.axzo.msg.center.service.dto.MessageCardContentItemDTO;
import cn.axzo.msg.center.service.dto.MessageRouterButtonDTO; import cn.axzo.msg.center.service.dto.MessageRouterButtonDTO;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.GeneralMessageStateEnum; import cn.axzo.msg.center.service.enums.GeneralMessageStateEnum;
import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
import cn.axzo.msg.center.service.enums.PushTerminalEnum; import cn.axzo.msg.center.service.enums.PushTerminalEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.axzo.msg.center.utils.MessageRouterUtil; import cn.axzo.msg.center.utils.MessageRouterUtil;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.axzo.msg.center.utils.UUIDUtil; import cn.axzo.msg.center.utils.UUIDUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -30,6 +43,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -42,35 +56,72 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor @RequiredArgsConstructor
public class GeneralMessageServiceImpl implements GeneralMessageService { public class GeneralMessageServiceImpl implements GeneralMessageService {
private static final ImmutableMap<PushTerminalEnum, AppTypeEnum> PUSH_TERMINAL_APP_MAP = ImmutableMap.of(
PushTerminalEnum.B_ENTERPRISE_APP, AppTypeEnum.CMP,
PushTerminalEnum.C_WORKER_APP, AppTypeEnum.CM
);
// TODO:[cold_blade] [P2] 图片icon的最好配置在nacos上 // TODO:[cold_blade] [P2] 图片icon的最好配置在nacos上
private final String orgIcon = "https://axzo-pro.oss-cn-hangzhou.aliyuncs.com/rs_app/ic_org_icon.png"; private final String orgIcon = "https://axzo-pro.oss-cn-hangzhou.aliyuncs.com/rs_app/ic_org_icon.png";
private final MessageApi messageApi; private final MessageApi messageApi;
private final MessageTemplateNewService messageTemplateNewService; private final MessageRecordService messageRecordService;
private final GeneralMessageRecordDao generalMessageRecordDao; private final GeneralMessageRecordDao generalMessageRecordDao;
private final MessageTemplateNewService messageTemplateNewService;
@Override @Override
public String sendMessage(GeneralMessageSendRequest request) { public String batchSendMessage(GeneralMessageSendRequest request) {
// 查询模板基础信息 // 查询模板基础信息
MessageTemplateDTO template = messageTemplateNewService.queryByTemplateCode(request.getTemplateCode()) MessageTemplateDTO template = messageTemplateNewService.queryByTemplateCode(request.getTemplateCode())
.orElseThrow(() -> new ServiceException("未查询到对应的模板")); .orElseThrow(() -> new ServiceException("未查询到对应的模板"));
// 构建消息记录并存储 // 构建消息记录并存储
GeneralMessageRecord messageRecord = buildMessageRecord(request, template); List<GeneralMessageRecord> messageRecords = buildMessageRecord(request, template);
generalMessageRecordDao.save(messageRecord); generalMessageRecordDao.saveBatch(messageRecords);
// 异步推送 // 异步推送
pushMessage(messageRecord, template); pushMessage(messageRecords, template);
return messageRecord.getIdentityCode(); return UUIDUtil.uuidRawString();
} }
private GeneralMessageRecord buildMessageRecord(GeneralMessageSendRequest request, MessageTemplateDTO template) { @Override
public GeneralMessageOldDataStatisticResponse statisticOldData(GeneralMessageOldDataStatisticRequest request) {
// TODO: [cold_blade] [P0] 统计旧数据接口实现
UserTypeEnum userType = PersonIdentityUtil.toUserType(request.getIdentityType());
CmsMsgQueryReq req = new CmsMsgQueryReq();
req.setMsgType(MessageCategoryEnum.GENERAL_MESSAGE.getCode());
// 这里查询消息中心全部状态的数据
req.setMsgStatus(0);
req.setPage(1L);
req.setPageSize(1L);
// TODO: [cold_blade] [P0] 需要排除IM中的消息
PageResult<MessageNewRes> result = messageRecordService.pageMsgInfo(req, request.getPersonId(),
request.getIdentityId(), userType);
MessageTotalRes res = messageRecordService.statisticsMsg4Total(request.getPersonId(),
request.getIdentityId(), userType);
MessageNewRes msg = CollectionUtils.isNotEmpty(result.getData()) ? result.getData().get(0) : null;
return GeneralMessageOldDataStatisticResponse.builder()
.unreadCount(res.getMsgTotal())
.latestMsgSendTimestamp(Optional.ofNullable(msg).map(v -> v.getCreateAt().getTime()).orElse(null))
.latestMsgContent(Optional.ofNullable(msg).map(MessageNewRes::getContent).orElse(null))
.build();
}
private List<GeneralMessageRecord> buildMessageRecord(GeneralMessageSendRequest request, MessageTemplateDTO template) {
return request.getReceiver().stream()
.map(e -> buildMessageRecord(request, e, template))
.collect(Collectors.toList());
}
private GeneralMessageRecord buildMessageRecord(GeneralMessageSendRequest request, PersonDTO receiver,
MessageTemplateDTO template) {
return GeneralMessageRecord.builder() return GeneralMessageRecord.builder()
.identityCode(UUIDUtil.uuidString()) .identityCode(UUIDUtil.uuidString())
.senderPersonId(request.getSenderPersonId()) .senderPersonId(request.getSender().getId())
.senderId(request.getSenderIdentity().getId()) .senderId(request.getSender().getIdentity().getId())
.senderType(request.getSenderIdentity().getType()) .senderType(request.getSender().getIdentity().getType())
.receiverPersonId(request.getReceiverPersonId()) .receiverPersonId(receiver.getId())
.receiverId(request.getReceiverIdentity().getId()) .receiverId(receiver.getIdentity().getId())
.receiverType(request.getReceiverIdentity().getType()) .receiverType(receiver.getIdentity().getType())
.templateCode(template.getCode()) .templateCode(template.getCode())
.title(parseString(template.getTitle(), request.getBizExtParams())) .title(parseString(template.getTitle(), request.getBizExtParams()))
.content(parseString(template.getContent(), request.getBizExtParams())) .content(parseString(template.getContent(), request.getBizExtParams()))
@ -84,18 +135,21 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
.build(); .build();
} }
private void pushMessage(GeneralMessageRecord record, MessageTemplateDTO template) { private void pushMessage(List<GeneralMessageRecord> messageRecords, MessageTemplateDTO template) {
if (CollectionUtils.isEmpty(template.getPushTerminals())) { if (CollectionUtils.isEmpty(template.getPushTerminals())) {
// 模板未配置任何推送终端 // 模板未配置任何推送终端
return; return;
} }
List<String> appTypes = template.getPushTerminals().stream() GeneralMessageRecord record = messageRecords.get(0);
.map(PushTerminalEnum::getImTerminalFlag).collect(Collectors.toList()); List<AppTypeEnum> appTypes = template.getPushTerminals().stream()
.map(PUSH_TERMINAL_APP_MAP::get).collect(Collectors.toList());
GeneralMessagePushVO message = convert(record, template); GeneralMessagePushVO message = convert(record, template);
MessageInfo msgInfo = new MessageInfo(); MessageInfo msgInfo = new MessageInfo();
msgInfo.setAppTypeList(appTypes); msgInfo.setAppTypeList(appTypes);
// TODO: [cold_blade] [P2] 第一期只支持发送机器人相关的消息 // TODO: [cold_blade] [P2] 第一期只支持发送机器人相关的消息
msgInfo.setToPersonIdList(Lists.newArrayList(String.valueOf(record.getReceiverPersonId()))); msgInfo.setToPersonIdList(Lists.newArrayList(messageRecords.stream()
.map(e -> String.valueOf(e.getReceiverPersonId()))
.collect(Collectors.toSet())));
msgInfo.setMsgHeader(record.getTitle()); msgInfo.setMsgHeader(record.getTitle());
msgInfo.setMsgContent(record.getContent()); msgInfo.setMsgContent(record.getContent());
msgInfo.setMsgTemplateId(record.getTemplateCode()); msgInfo.setMsgTemplateId(record.getTemplateCode());

View File

@ -0,0 +1,61 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.dal.MessageSendTwiceRecordDao;
import cn.axzo.msg.center.domain.entity.MessageSendTwiceRecord;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.hutool.core.map.MapUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @description xxx
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageSendTwiceRecordServiceImpl implements MessageSendTwiceRecordService {
private final MessageSendTwiceRecordDao messageSendTwiceRecordDao;
@Override
public List<Long> listByPerson(Long personId) {
if (Objects.isNull(personId)) {
log.info("personId is null.");
return Collections.emptyList();
}
// TODO:[cold_blade] [P2] 此处的代码仅做一个双发兜底的策略后期应当全走新的消息发送流程
return messageSendTwiceRecordDao.lambdaQuery()
.eq(MessageSendTwiceRecord::getReceiverPersonId, personId)
.eq(MessageSendTwiceRecord::getIsDelete, TableIsDeleteEnum.NORMAL.value)
.select(MessageSendTwiceRecord::getOriginalMsgId)
.list().stream()
.map(MessageSendTwiceRecord::getOriginalMsgId)
.collect(Collectors.toList());
}
@Override
public void batchSave(Map<Long, Long> msgRecordIdPersonIdMap) {
if (MapUtil.isEmpty(msgRecordIdPersonIdMap)) {
return;
}
List<MessageSendTwiceRecord> records = msgRecordIdPersonIdMap.entrySet().stream()
.map(e -> {
MessageSendTwiceRecord record = new MessageSendTwiceRecord();
record.setReceiverPersonId(e.getValue());
record.setOriginalMsgId(e.getKey());
return record;
}).collect(Collectors.toList());
messageSendTwiceRecordDao.saveBatch(records);
}
}

View File

@ -0,0 +1,64 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.dal.RelationTemplateMapDao;
import cn.axzo.msg.center.domain.entity.RelationTemplateMap;
import cn.axzo.msg.center.message.service.RelationTemplateMapService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 新老模板映射关系service
*
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class RelationTemplateMapServiceImpl implements RelationTemplateMapService {
private final RelationTemplateMapDao relationTemplateMapDao;
@Override
public Optional<String> queryByRelationId(Long relationId) {
if (Objects.isNull(relationId)) {
log.info("relationId is null");
return Optional.empty();
}
return Optional.ofNullable(relationTemplateMapDao.lambdaQuery()
.eq(RelationTemplateMap::getOriginalRelationId, relationId)
.eq(RelationTemplateMap::getIsDelete, TableIsDeleteEnum.NORMAL.value)
.one()
).map(RelationTemplateMap::getTemplateCode);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void mapRelationAndTemplate(Map<Long, String> map) {
if (Objects.isNull(map) || map.isEmpty()) {
return;
}
relationTemplateMapDao.lambdaUpdate()
.in(RelationTemplateMap::getOriginalRelationId, map.keySet())
.eq(RelationTemplateMap::getIsDelete, TableIsDeleteEnum.NORMAL.value)
.remove();
List<RelationTemplateMap> rows = map.entrySet().stream()
.map(e -> {
RelationTemplateMap row = new RelationTemplateMap();
row.setOriginalRelationId(e.getKey());
row.setTemplateCode(e.getValue());
return row;
}).collect(Collectors.toList());
relationTemplateMapDao.saveBatch(rows);
}
}

View File

@ -0,0 +1,46 @@
package cn.axzo.msg.center.utils;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import cn.axzo.msg.center.domain.enums.UserTypeEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import java.util.Objects;
import java.util.Optional;
/**
* @description xxx
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
public class PersonIdentityUtil {
public static UserTypeEnum toUserType(IdentityTypeEnum identityType) {
switch (identityType) {
case WORKER:
return UserTypeEnum.CM;
case WORKER_LEADER:
return UserTypeEnum.TEAM;
case PRACTITIONER:
return UserTypeEnum.CMP;
default:
return UserTypeEnum.NOT_IDENTITY;
}
}
public static Optional<IdentityTypeEnum> toIdentityType(ReceiveTypeEnum receiveType) {
if (Objects.isNull(receiveType)) {
return Optional.empty();
}
switch (receiveType) {
case CM_WORKER:
return Optional.of(IdentityTypeEnum.WORKER);
case CM_LEADER:
return Optional.of(IdentityTypeEnum.WORKER_LEADER);
case CMP_USER:
return Optional.of(IdentityTypeEnum.PRACTITIONER);
default:
return Optional.empty();
}
}
}

View File

@ -92,5 +92,8 @@ public class MessageNewRes {
*/ */
private Integer oldTypeId; private Integer oldTypeId;
/**
* 创建时间
*/
private Date createAt;
} }

View File

@ -18,13 +18,12 @@ public enum PushTerminalEnum {
/** /**
* B-安心筑企业版 * B-安心筑企业版
*/ */
B_ENTERPRISE_APP("B-安心筑企业版", "CMP"), B_ENTERPRISE_APP("B-安心筑企业版"),
/** /**
* C-安心筑工人版 * C-安心筑工人版
*/ */
C_WORKER_APP("C-安心筑工人版", "CM"), C_WORKER_APP("C-安心筑工人版"),
; ;
private final String desc; private final String desc;
private final String imTerminalFlag;
} }

View File

@ -1,6 +1,8 @@
package cn.axzo.msg.center.service.general.client; package cn.axzo.msg.center.service.general.client;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.axzo.msg.center.service.pending.client.fallback.PendingMessageClientFallback; import cn.axzo.msg.center.service.pending.client.fallback.PendingMessageClientFallback;
import cn.azxo.framework.common.model.CommonResponse; import cn.azxo.framework.common.model.CommonResponse;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
@ -30,5 +32,15 @@ public interface GeneralMessageClient {
* @return 消息的唯一标识 * @return 消息的唯一标识
*/ */
@PostMapping(value = "/general-message/send", produces = {MediaType.APPLICATION_JSON_VALUE}) @PostMapping(value = "/general-message/send", produces = {MediaType.APPLICATION_JSON_VALUE})
CommonResponse<String> sendMessage(@RequestBody @Valid GeneralMessageSendRequest request); CommonResponse<String> statisticOldData(@RequestBody @Valid GeneralMessageSendRequest request);
/**
* 发送消息
*
* @param request 消息所需参数
* @return 消息的唯一标识
*/
@PostMapping(value = "/general-message/old-data/statistic", produces = {MediaType.APPLICATION_JSON_VALUE})
CommonResponse<GeneralMessageOldDataStatisticResponse> statisticOldData(
@RequestBody @Valid GeneralMessageOldDataStatisticRequest request);
} }

View File

@ -1,7 +1,9 @@
package cn.axzo.msg.center.service.general.client.fallback; package cn.axzo.msg.center.service.general.client.fallback;
import cn.axzo.msg.center.service.general.client.GeneralMessageClient; import cn.axzo.msg.center.service.general.client.GeneralMessageClient;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.azxo.framework.common.model.CommonResponse; import cn.azxo.framework.common.model.CommonResponse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -16,8 +18,15 @@ import org.springframework.stereotype.Component;
public class GeneralMessageClientFallback implements GeneralMessageClient { public class GeneralMessageClientFallback implements GeneralMessageClient {
@Override @Override
public CommonResponse<String> sendMessage(GeneralMessageSendRequest request) { public CommonResponse<String> statisticOldData(GeneralMessageSendRequest request) {
log.error("fall back while sending message. req:{}", request); log.error("fall back while sending message. req:{}", request);
return CommonResponse.error("fall back while sending message"); return CommonResponse.error("fall back while sending message");
} }
@Override
public CommonResponse<GeneralMessageOldDataStatisticResponse> statisticOldData(
GeneralMessageOldDataStatisticRequest request) {
log.error("fall back while statistic old message. req:{}", request);
return CommonResponse.error("fall back while statistic old message");
}
} }

View File

@ -0,0 +1,48 @@
package cn.axzo.msg.center.service.general.request;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
@Setter
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageOldDataStatisticRequest implements Serializable {
private static final long serialVersionUID = -7739989493953842047L;
/**
* 自然人id
*/
@NotNull(message = "personId is required")
private Long personId;
/**
* 身份id
*/
@NotNull(message = "identityId is required")
private Long identityId;
/**
* 身份类型
*/
@NotNull(message = "identityType is required")
private IdentityTypeEnum identityType;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -1,6 +1,6 @@
package cn.axzo.msg.center.service.general.request; package cn.axzo.msg.center.service.general.request;
import cn.axzo.msg.center.service.dto.IdentityDTO; import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -11,8 +11,9 @@ import lombok.NoArgsConstructor;
import lombok.Setter; import lombok.Setter;
import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotEmpty;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection;
/** /**
* @description * @description
@ -36,25 +37,14 @@ public class GeneralMessageSendRequest implements Serializable {
@NotBlank(message = "templateCode is required") @NotBlank(message = "templateCode is required")
private String templateCode; private String templateCode;
/** /**
* 消息发送者自然人id * 消息发送者信息,可为空,默认模板对应的IM机器人
*/ */
@NotNull(message = "senderPersonId is required") private PersonDTO sender;
private Long senderPersonId;
/** /**
* 消息发送者身份 * 消息接收信息
*/ */
@NotNull(message = "senderIdentity is required") @NotEmpty(message = "receiver is required")
private IdentityDTO senderIdentity; private Collection<PersonDTO> receiver;
/**
* 消息接收者自然人id
*/
@NotNull(message = "receiverPersonId is required")
private Long receiverPersonId;
/**
* 消息接收者身份
*/
@NotNull(message = "receiverIdentity is required")
private IdentityDTO receiverIdentity;
/** /**
* 消息所属组织类型 * 消息所属组织类型
*/ */

View File

@ -0,0 +1,45 @@
package cn.axzo.msg.center.service.general.response;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
/**
* @description
* 普通消息记录统计数模型
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
@Setter
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageOldDataStatisticResponse implements Serializable {
private static final long serialVersionUID = 5740922087866033787L;
/**
* 消息的唯一标识
*/
private Integer unreadCount;
/**
* 最新的一条消息的发送时间戳
*/
private Long latestMsgSendTimestamp;
/**
* 最新的一条消息的消息内容
*/
private String latestMsgContent;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,18 @@
package cn.axzo.msg.center.dal;
import cn.axzo.msg.center.dal.mapper.MessageSendTwiceRecordMapper;
import cn.axzo.msg.center.domain.entity.MessageSendTwiceRecord;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @description
* @author cold_blade
* @date 2023/10/21
* @version 1.0
*/
@Slf4j
@Component
public class MessageSendTwiceRecordDao extends ServiceImpl<MessageSendTwiceRecordMapper, MessageSendTwiceRecord> {
}

View File

@ -0,0 +1,18 @@
package cn.axzo.msg.center.dal;
import cn.axzo.msg.center.dal.mapper.RelationTemplateMapMapper;
import cn.axzo.msg.center.domain.entity.RelationTemplateMap;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @description
* @author cold_blade
* @date 2023/10/21
* @version 1.0
*/
@Slf4j
@Component
public class RelationTemplateMapDao extends ServiceImpl<RelationTemplateMapMapper, RelationTemplateMap> {
}

View File

@ -0,0 +1,12 @@
package cn.axzo.msg.center.dal.mapper;
import cn.axzo.msg.center.domain.entity.MessageSendTwiceRecord;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author cold_blade
* @date 2023/10/21
* @version 1.0
*/
public interface MessageSendTwiceRecordMapper extends BaseMapper<MessageSendTwiceRecord> {
}

View File

@ -0,0 +1,12 @@
package cn.axzo.msg.center.dal.mapper;
import cn.axzo.msg.center.domain.entity.RelationTemplateMap;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author cold_blade
* @date 2023/10/21
* @version 1.0
*/
public interface RelationTemplateMapMapper extends BaseMapper<RelationTemplateMap> {
}

View File

@ -40,7 +40,8 @@
record.extra as ext, record.extra as ext,
record.router_params as routerParam, record.router_params as routerParam,
record.relation_id as relationId, record.relation_id as relationId,
record.old_type_id as oldTypeId record.old_type_id as oldTypeId,
record.create_at as createAt
from message_record record from message_record record
<if test="req.msgType == 2"> <if test="req.msgType == 2">
join message_relation relation join message_relation relation

View File

@ -92,5 +92,8 @@ public class MessageNewResDTO {
*/ */
private Integer oldTypeId; private Integer oldTypeId;
/**
* 创建时间
*/
private Date createAt;
} }

View File

@ -0,0 +1,36 @@
package cn.axzo.msg.center.domain.entity;
import cn.axzo.msg.center.domain.persistence.BaseEntity;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @author cold_blade
* @date 2023/10/21
* @version 1.0
*/
@Setter
@Getter
@TableName("message_send_twice_record")
public class MessageSendTwiceRecord extends BaseEntity<MessageSendTwiceRecord> implements Serializable {
private static final long serialVersionUID = 3517821492158061709L;
/**
* 原有消息记录id
*/
private Long originalMsgId;
/**
* 接收者自然人 ID
*/
private Long receiverPersonId;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,36 @@
package cn.axzo.msg.center.domain.entity;
import cn.axzo.msg.center.domain.persistence.BaseEntity;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @author cold_blade
* @date 2023/10/21
* @version 1.0
*/
@Setter
@Getter
@TableName("relation_template_map")
public class RelationTemplateMap extends BaseEntity<RelationTemplateMap> implements Serializable {
private static final long serialVersionUID = 2716916154882729387L;
/**
* 新消息模板编码
*/
private String templateCode;
/**
* 原有的模板关联关系id
*/
private Long originalRelationId;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}