REQ-2488: 删除老消息相关代码

This commit is contained in:
yanglin 2024-07-17 17:42:10 +08:00
parent 1bc9bb0e67
commit 79315acbf3
11 changed files with 24 additions and 663 deletions

View File

@ -3,11 +3,8 @@ package cn.axzo.msg.center.inside.notices.service.impl;
import cn.axzo.msg.center.api.GeneralMessageApi;
import cn.axzo.msg.center.api.request.GeneralMessagePushReq;
import cn.axzo.msg.center.api.response.ReachResp;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
import cn.axzo.msg.center.message.service.CommonMessageRecordService;
import cn.azxo.framework.common.model.CommonResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;
/**
@ -18,12 +15,8 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class GeneralMessageApiImpl implements GeneralMessageApi {
@Autowired
private CommonMessageRecordService commonMessageRecordService;
@Override
public CommonResponse<ReachResp> reachGeneralMsg(GeneralMessagePushReq param) {
// // TODO: 2024/1/10
ReachResp result = commonMessageRecordService.reach(GeneralMessagePushParam.from(param));
return CommonResponse.success(result);
return CommonResponse.success(new ReachResp());
}
}

View File

@ -4,15 +4,30 @@ import cn.axzo.core.domain.PageResult;
import cn.axzo.core.service.ServiceException;
import cn.axzo.msg.center.api.InsideNoticesApi;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.request.*;
import cn.axzo.msg.center.api.response.*;
import cn.axzo.msg.center.api.request.ChangeMessageReq;
import cn.axzo.msg.center.api.request.ChangeMessageStateReq;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.api.request.MessageTotalReq;
import cn.axzo.msg.center.api.request.MsgReturnParamRes;
import cn.axzo.msg.center.api.request.StatisticsReq;
import cn.axzo.msg.center.api.response.InsideMessageModuleRes;
import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.api.response.MessageTotalRes;
import cn.axzo.msg.center.api.response.RelationRes;
import cn.axzo.msg.center.api.response.TemplateRes;
import cn.axzo.msg.center.common.utils.CustomBeanUtils;
import cn.axzo.msg.center.domain.entity.MessageModule;
import cn.axzo.msg.center.domain.entity.MessageRelation;
import cn.axzo.msg.center.domain.entity.MessageTemplate;
import cn.axzo.msg.center.domain.enums.ModuleBizTypeEnum;
import cn.axzo.msg.center.domain.enums.UserTypeEnum;
import cn.axzo.msg.center.inside.notices.service.*;
import cn.axzo.msg.center.inside.notices.service.MessageCoreService;
import cn.axzo.msg.center.inside.notices.service.MessageModuleService;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.inside.notices.service.MessageRelationService;
import cn.axzo.msg.center.inside.notices.service.MessageTemplateService;
import cn.azxo.framework.common.model.CommonResponse;
import cn.hutool.json.JSONUtil;
import com.google.common.collect.Lists;
@ -26,6 +41,7 @@ import javax.validation.Valid;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -56,8 +72,7 @@ public class InsideNoticesApiImpl implements InsideNoticesApi {
@Override
/*@RepeatSubCheck(value = 3000, unique = true, containParams = false)*///pudge做了前置校验
public CommonResponse<List<MsgReturnParamRes>> pushMsg(@RequestBody GeneralMessageReq message) {
List<MsgReturnParamRes> result = messageRecordService.pushMsg(message);
return CommonResponse.success(result);
return CommonResponse.success(Collections.emptyList());
}
@Override

View File

@ -49,7 +49,6 @@ import cn.axzo.msg.center.domain.enums.YesNoEnum;
import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq;
import cn.axzo.msg.center.inside.notices.event.SendMessageEvent;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer;
@ -124,9 +123,6 @@ public class MessageRecordServiceImpl implements MessageRecordService {
/*@Resource
private IdentityProfileService identityProfileService;*/
@Resource
private GeneralMessageMapperService generalMessageMapperService;
/**
* 新增推送消息接口
*
@ -178,7 +174,7 @@ public class MessageRecordServiceImpl implements MessageRecordService {
Map<Long, MessageRecord> toIdRecordMap = Maps.newHashMap();
List<MessageRecord> messageRecords = saveBatch(basic, toIds, message.getToldIdPersonIdMap(), toIdRecordMap);
try {
generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap);
//generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap);
} catch (Exception e) {
log.warn("asyncBatchSendMessage push error, param= [{}]", JSON.toJSONString(message), e);
}

View File

@ -1,34 +0,0 @@
package cn.axzo.msg.center.message.send.processor;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import com.alibaba.fastjson.JSONObject;
import java.util.Objects;
/**
* @author syl
* @date 2023/12/18
*/
public interface ISendProcessor {
default void check(SendContext context) {
}
default void send(SendContext context) {
}
default void checkReach(ReachContext context) {
}
default void reach(ReachContext context) {
}
default void save(ReachContext context) {
}
default String parseString(String string, JSONObject params) {
if (Objects.isNull(params)) {
return string;
}
return PlaceholderResolver.getDefaultResolver().resolveByMap(string, params);
}
}

View File

@ -1,209 +0,0 @@
package cn.axzo.msg.center.message.send.processor;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.dal.CommonMessageRecordDao;
import cn.axzo.msg.center.domain.entity.CommonMessageRecord;
import cn.axzo.msg.center.domain.entity.MessageBaseTemplate;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.GeneralMessageStateEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.pending.response.PushMessageReturnDTO;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.axzo.msg.center.utils.UUIDUtil;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author syl
* @date 2023/12/18
*/
@Slf4j
@Component
public class NotificationProcessor implements ISendProcessor {
@Autowired
private GeneralMessageService generalMessageService;
@Autowired
private MessageSendTwiceRecordService messageSendTwiceRecordService;
@Autowired
private BizEventMappingService bizEventMappingService;
@Autowired
private CommonMessageRecordDao commonMessageRecordDao;
@Override
public void check(SendContext context) {
AssertUtil.notNull(context.getMessageParam(), "通知消息的参数错误,请检查");
AssertUtil.isTrue(CollUtil.isNotEmpty(context.getToIdMessageRecordMap()), "通知消息的人员信息不存在");
AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空");
AssertUtil.isTrue(BizActionCategory.NOTIFICATION.getCode().equals(context.getReachConfig()
.getCategory()), "消息动作不匹配");
bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig());
}
@Override
public void send(SendContext context) {
// 检查
this.check(context);
log.info("NotificationProcessor start to async send im message. bizCode= [{}]"
+ ", templateCode= [{}]", context.getBizEvent().getBizCode(),
context.getReachConfig().getTemplateCode());
// 发送
this.doBatchSendMessage(context);
}
public void doBatchSendMessage(SendContext context) {
GeneralMessageReq request = context.getMessageParam();
Map<Long, MessageRecord> toIdMessageRecordMap = context.getToIdMessageRecordMap();
String templateCode = context.getReachConfig().getTemplateCode();
try {
// 发送IM消息
GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode,
toIdMessageRecordMap.keySet());
// IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端)
List<SendImMessageDTO> result = generalMessageService
.batchSendMessage(sendImMsgRequest);
if (CollectionUtils.isEmpty(result)) {
log.info("bizEvent there is not any person successfully send im message. bizCode:[{}]",
request.getRelationId());
return;
}
// 记录发送了IM消息的旧消息
recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap());
} catch (Exception e) {
log.warn("bizEvent broke out some exception while sending im message. bizCode:[{}]",
request.getRelationId(), e);
}
}
private void recordSendImMessage(Map<Long, MessageRecord> toIdMessageRecordMap,
List<SendImMessageDTO> sendImResult,
Map<Long, Long> toIdPersonIdMap) {
// 成功发送了IM消息的personId集合
Set<Long> sucSendImMsgPersonIds = sendImResult.stream()
.map(SendImMessageDTO::getPersonId).collect(Collectors.toSet());
// 从入参中的toIdPersonIdMap中筛选发送成功的entry
Map<Long, Long> subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream()
// 过滤掉personId维度IM消息发送失败的entry
.filter(e -> sucSendImMsgPersonIds.contains(e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<Long, Long> msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream()
.filter(e -> subToIdPersonIdMap.containsKey(e.getKey()))
.collect(Collectors
.toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey())));
log.info("bizEvent record message that has been send im message. msgIds:{}",
msgRecordIdPersonIdMap.keySet());
// 双发记录
messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap);
}
private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request,
String templateCode,
Collection<Long> subReceiverIds) {
IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType());
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))
.collect(Collectors.toList());
OrganizationTypeEnum orgType =
Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN :
OrganizationTypeEnum.valueOf(request.getTerminalType().name());
return GeneralMessageSendRequest.builder()
.templateCode(templateCode)
.receiver(receivers)
.orgType(orgType)
.orgName(request.getTerminalName())
.orgId(request.getTerminalId())
.bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse(""))
.routerParams(Optional.ofNullable(request.getRouterParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.bizExtParams(Optional.ofNullable(request.getMsgParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.build();
}
@Override
public void checkReach(ReachContext context) {
AssertUtil.notNull(context.getMessageParam(), "通知消息的参数错误,请检查");
AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空");
AssertUtil.isTrue(BizActionCategory.NOTIFICATION.getCode().equals(context.getReachConfig()
.getCategory()), "消息动作不匹配");
}
@Override
public void reach(ReachContext context) {
// 检查
this.checkReach(context);
log.info("NotificationProcessor new send im message. bizCode= [{}]"
+ ", templateCode= [{}]", context.getBizEvent().getBizCode(),
context.getReachConfig().getTemplateCode());
}
@Override
public void save(ReachContext context) {
this.checkReach(context);
MessageBaseTemplate template = bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig());
context.setTemplate(template);
List<CommonMessageRecord> records = convertRecords(context);
commonMessageRecordDao.saveBatch(records);
List<PushMessageReturnDTO> msgReturnList = records.stream()
.map(r -> PushMessageReturnDTO.builder().identityCode(r.getIdentityCode()).build())
.collect(Collectors.toList());
context.setMsgReturnList(msgReturnList);
}
private List<CommonMessageRecord> convertRecords(ReachContext context) {
MessageBaseTemplate template = context.getTemplate();
GeneralMessagePushParam messageParam = context.getMessageParam();
return messageParam.getReceivers().stream().map(r -> {
CommonMessageRecord.CommonMessageRecordBuilder builder = CommonMessageRecord.builder()
.identityCode(UUIDUtil.uuidString());
if (Objects.nonNull(messageParam.getInitiator())) {
builder.senderPersonId(messageParam.getInitiator().getId());
}
builder.receiverPersonId(r.getId());
builder.bizEventCode(messageParam.getBizEventCode());
builder.templateCode(template.getCode());
builder.title(parseString(template.getTitle(), messageParam.getBizExtParams()));
builder.content(parseString(template.getContent(), messageParam.getBizExtParams()));
builder.orgType(messageParam.getOrgType());
builder.workspaceId(messageParam.getWorkspaceId());
builder.ouId(messageParam.getOuId());
builder.state(GeneralMessageStateEnum.HAS_BEEN_SENT);
builder.bizCode(messageParam.getBizCode());
builder.routerParams(messageParam.getRouterParams());
builder.bizExtParams(messageParam.getBizExtParams());
return builder.build();
}).collect(Collectors.toList());
}
}

View File

@ -1,103 +0,0 @@
package cn.axzo.msg.center.message.send.processor;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.PendingMessageNewService;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.pending.request.PendingMessagePushRequest;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.hutool.core.collection.CollUtil;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author syl
* @date 2023/12/18
*/
@Slf4j
@Component
public class PendingProcessor implements ISendProcessor {
@Autowired
private PendingMessageNewService pendingMessageNewService;
@Autowired
private BizEventMappingService bizEventMappingService;
@Override
public void check(SendContext context) {
AssertUtil.notNull(context.getMessageParam(), "待办消息的参数错误,请检查");
AssertUtil.isTrue(CollUtil.isNotEmpty(context.getToIdMessageRecordMap()), "待办消息的人员信息不存在");
AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空");
AssertUtil.isTrue(BizActionCategory.PENDING.getCode().equals(context.getReachConfig()
.getCategory()), "待办业务动作不匹配");
bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig());
}
@Override
public void send(SendContext context) {
// 检查
this.check(context);
log.info("PendingProcessor start to async send pending message. bizCode= [{}]"
+ ", templateCode= [{}]", context.getBizEvent().getBizCode(),
context.getReachConfig().getTemplateCode());
// 由于之前业务方这期未做改造而之前对外提供的参数只满足消息待办这块的发送需要定义待办相关的业务属性
// 迁移到新接口
// PendingMessagePushRequest request = buildSendRequest(context);
// pendingMessageNewService.push(PendingMessagePushParam.from(request));
}
private PendingMessagePushRequest buildSendRequest(SendContext context) {
GeneralMessageReq request = context.getMessageParam();
String templateCode = context.getReachConfig().getTemplateCode();
Collection<Long> subReceiverIds = context.getToIdMessageRecordMap().keySet();
PersonDTO promoter = PersonDTO.from(null, request.getFromId(), null);
IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType());
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))
.collect(Collectors.toList());
OrganizationTypeEnum orgType = Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN :
OrganizationTypeEnum.valueOf(request.getTerminalType().name());
return PendingMessagePushRequest.builder()
.templateCode(templateCode)
.promoter(promoter)
.executor(receivers)
.orgType(orgType)
.workspaceId(request.getTerminalId())
.ouId(null)
.bizCategory(BizCategoryEnum.OTHER)
.bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse(""))
.routerParams(null)
.bizExtParams(null)
.build();
}
@Override
public void checkReach(ReachContext context) {
}
@Override
public void reach(ReachContext context) {
// 等后续im具有待办消息的能力再接入目前统一走待办中心
}
@Override
public void save(ReachContext context) {
}
}

View File

@ -1,13 +0,0 @@
package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.api.response.ReachResp;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
/**
* @author syl
* @date 2024/1/8
*/
public interface CommonMessageRecordService {
ReachResp reach(GeneralMessagePushParam param);
}

View File

@ -1,25 +0,0 @@
package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import java.util.List;
import java.util.Map;
/**
* 新老版本消息映射的相关接口
*
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
public interface GeneralMessageMapperService {
/**
* 异步批量发送消息
*
* @param request 发送消息时的请求参数
* @param toIdMessageRecordMap 接收者身份id与旧消息记录的映射关系
*/
void asyncBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap);
}

View File

@ -1,80 +0,0 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.response.ReachResp;
import cn.axzo.msg.center.domain.entity.BizEventMapping;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
import cn.axzo.msg.center.message.send.processor.ISendProcessor;
import cn.axzo.msg.center.message.send.processor.ReachContext;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.CommonMessageRecordService;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
* @author syl
* @date 2024/1/8
*/
@Slf4j
@Service
public class CommonMessageRecordServiceImpl implements CommonMessageRecordService {
@Value("${msg-im-server.partition-size:500}")
private Integer partitionSize;
private final ThreadFactory asyncReachThreadFactory = ThreadFactoryBuilder.create()
.setDaemon(true).setNameFormat("REACH_IM_MESSAGE_%d").get();
private final ExecutorService reachExecutorService = new ThreadPoolExecutor(5,
10, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024), asyncReachThreadFactory);
@Autowired
private BizEventMappingService bizEventMappingService;
@Autowired
private ApplicationContext applicationContext;
@Override
public ReachResp reach(GeneralMessagePushParam param) {
String bizEventCode = param.getBizEventCode();
Optional<BizEventMapping> bizEventOpt = bizEventMappingService.getByBizCode(bizEventCode);
AssertUtil.isTrue(bizEventOpt.isPresent(), String.format("业务事件映射%s不存在", bizEventCode));
BizEventMapping bizEvent = bizEventOpt.get();
AssertUtil.notEmpty(bizEvent.getReachConfig(), String.format("业务事件映射%s无业务动作配置", bizEventCode));
log.info("start to send reach message. bizCode:[{}]", bizEventCode);
ReachResp resp = new ReachResp();
bizEvent.getReachConfig().forEach(r -> {
BizActionCategory category = BizActionCategory.getCategory(r.getCategory());
ISendProcessor processor = applicationContext.getBean(category.getProcessor(), ISendProcessor.class);
ReachContext reachContext = ReachContext.builder()
.bizEvent(bizEvent).reachConfig(r)
.messageParam(param)
.build();
processor.save(reachContext);
switch (category) {
case NOTIFICATION:
// resp.setMsgReturnList(reachContext.getMsgReturnList());
break;
case PENDING:
resp.setPendingList(reachContext.getPendingList());
break;
default:
}
CompletableFuture.runAsync(() -> processor.reach(reachContext), reachExecutorService);
});
return resp;
}
}

View File

@ -1,180 +0,0 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.BizEventMapping;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.message.send.processor.ISendProcessor;
import cn.axzo.msg.center.message.send.processor.SendContext;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.RelationTemplateMapService;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class GeneralMessageMapperServiceImpl implements GeneralMessageMapperService {
private final ThreadFactory asyncSendMsgThreadFactory = ThreadFactoryBuilder.create()
.setDaemon(true).setNameFormat("ASYNC_SEND_IM_MESSAGE_%d").get();
private final ExecutorService asyncSendMsgExecutorService = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024), asyncSendMsgThreadFactory);
private final GeneralMessageService generalMessageService;
private final RelationTemplateMapService relationTemplateMapService;
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
private final BizEventMappingService bizEventMappingService;
private final ApplicationContext applicationContext;
@Override
public void asyncBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap) {
log.info("do some check before send im message. relationId:[{}]", request.getRelationId());
if (MsgTypeEnum.PENDING_MESSAGE.equals(request.getType())) {
// 待办消息暂时不支持
log.info("pending message is not supported.");
return;
}
if (Objects.isNull(request.getRelationId())) {
log.info("relation id is null.");
return;
}
log.info("request relationId= [{}], toldIdPersonIdMap= [{}]", request.getRelationId(),
JSON.toJSONString(request.getToldIdPersonIdMap()));
// IM 需要personId过滤掉 personId = 0 的数据
Map<Long,Long> imToldIdPersonIdMap = request.getToldIdPersonIdMap().entrySet()
.stream()
.filter(entry -> entry.getValue() > 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
log.info("request relationId= [{}], final toldIdPersonIdMap= [{}]", request.getRelationId(),
JSON.toJSONString(imToldIdPersonIdMap));
request.setToldIdPersonIdMap(imToldIdPersonIdMap);
if (MapUtil.isEmpty(request.getToldIdPersonIdMap())) {
// 由于IM那边是根据personId来创建账户的,所以强依赖personId
log.info("toIdPersonIdMap is empty.");
return;
}
String bizCode = String.valueOf(request.getRelationId());
Optional<BizEventMapping> bizEventOpt = bizEventMappingService.getByBizCode(bizCode);
AssertUtil.isTrue(bizEventOpt.isPresent(), String.format("业务事件映射%s不存在", bizCode));
BizEventMapping bizEvent = bizEventOpt.get();
AssertUtil.notEmpty(bizEvent.getReachConfig(), String.format("业务事件映射%s无业务动作配置", bizCode));
log.info("start to send reach message. bizCode:[{}]", bizCode);
bizEvent.getReachConfig().forEach(r -> {
BizActionCategory category = BizActionCategory.getCategory(r.getCategory());
ISendProcessor processor = applicationContext.getBean(category.getProcessor(), ISendProcessor.class);
SendContext sendContext = SendContext.builder()
.bizEvent(bizEvent).reachConfig(r)
.messageParam(request)
.toIdMessageRecordMap(toIdMessageRecordMap)
.build();
CompletableFuture.runAsync(() -> processor.send(sendContext), asyncSendMsgExecutorService);
});
}
private void doBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap) {
String templateCode = relationTemplateMapService.queryByRelationId(request.getRelationId()).orElse(null);
if (StringUtils.isBlank(templateCode)) {
log.info("the relationId([{}]) is not map any new message template. ", request.getRelationId());
return;
}
log.info("start to send im message. relationId:[{}], templateCode:[{}]", request.getRelationId(), templateCode);
try {
// 发送IM消息
GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode, toIdMessageRecordMap.keySet());
// IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端)
List<SendImMessageDTO> result = generalMessageService.batchSendMessage(sendImMsgRequest);
if (CollectionUtils.isEmpty(result)) {
log.info("there is not any person successfully send im message. relationId:[{}]", request.getRelationId());
return;
}
// 记录发送了IM消息的旧消息
recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap());
} catch (Exception e) {
log.warn("broke out some exception while sending im message. relationId:[{}]", request.getRelationId(), e);
}
}
private void recordSendImMessage(Map<Long, MessageRecord> toIdMessageRecordMap, List<SendImMessageDTO> sendImResult,
Map<Long, Long> toIdPersonIdMap) {
// 成功发送了IM消息的personId集合
Set<Long> sucSendImMsgPersonIds = sendImResult.stream()
.map(SendImMessageDTO::getPersonId).collect(Collectors.toSet());
// 从入参中的toIdPersonIdMap中筛选发送成功的entry
Map<Long, Long> subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream()
// 过滤掉personId维度IM消息发送失败的entry
.filter(e -> sucSendImMsgPersonIds.contains(e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<Long, Long> msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream()
.filter(e -> subToIdPersonIdMap.containsKey(e.getKey()))
.collect(Collectors.toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey())));
log.info("record message that has been send im message. msgIds:{}", msgRecordIdPersonIdMap.keySet());
// 双发记录
messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap);
}
private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request, String templateCode,
Collection<Long> subReceiverIds) {
IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType());
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))
.collect(Collectors.toList());
OrganizationTypeEnum orgType = Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN :
OrganizationTypeEnum.valueOf(request.getTerminalType().name());
return GeneralMessageSendRequest.builder()
.templateCode(templateCode)
.receiver(receivers)
.orgType(orgType)
.orgName(request.getTerminalName())
.orgId(request.getTerminalId())
.bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse(""))
.routerParams(Optional.ofNullable(request.getRouterParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.bizExtParams(Optional.ofNullable(request.getMsgParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.build();
}
}

View File

@ -17,6 +17,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.Collections;
import java.util.List;
/**
@ -45,7 +46,7 @@ public class MessageController {
*/
@RequestMapping(value = "api/message/push", method = RequestMethod.POST)
CommonResponse<List<MsgReturnParamRes>> pushMsg(@RequestBody GeneralMessageReq message){
return CommonResponse.success(messageRecordService.pushMsg(message));
return CommonResponse.success(Collections.emptyList());
}
/**