diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/GeneralMessageApiImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/GeneralMessageApiImpl.java index f471bba0..9f28f720 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/GeneralMessageApiImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/GeneralMessageApiImpl.java @@ -3,11 +3,8 @@ package cn.axzo.msg.center.inside.notices.service.impl; import cn.axzo.msg.center.api.GeneralMessageApi; import cn.axzo.msg.center.api.request.GeneralMessagePushReq; import cn.axzo.msg.center.api.response.ReachResp; -import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam; -import cn.axzo.msg.center.message.service.CommonMessageRecordService; import cn.azxo.framework.common.model.CommonResponse; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RestController; /** @@ -18,12 +15,8 @@ import org.springframework.web.bind.annotation.RestController; @RestController public class GeneralMessageApiImpl implements GeneralMessageApi { - @Autowired - private CommonMessageRecordService commonMessageRecordService; @Override public CommonResponse reachGeneralMsg(GeneralMessagePushReq param) { - // // TODO: 2024/1/10 - ReachResp result = commonMessageRecordService.reach(GeneralMessagePushParam.from(param)); - return CommonResponse.success(result); + return CommonResponse.success(new ReachResp()); } } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/InsideNoticesApiImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/InsideNoticesApiImpl.java index 0c0a3cec..9feffb17 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/InsideNoticesApiImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/InsideNoticesApiImpl.java @@ -4,15 +4,30 @@ import cn.axzo.core.domain.PageResult; import cn.axzo.core.service.ServiceException; import cn.axzo.msg.center.api.InsideNoticesApi; import cn.axzo.msg.center.api.enums.MsgStateEnum; -import cn.axzo.msg.center.api.request.*; -import cn.axzo.msg.center.api.response.*; +import cn.axzo.msg.center.api.request.ChangeMessageReq; +import cn.axzo.msg.center.api.request.ChangeMessageStateReq; +import cn.axzo.msg.center.api.request.CmsMsgQueryReq; +import cn.axzo.msg.center.api.request.CmsReadMsgReq; +import cn.axzo.msg.center.api.request.GeneralMessageReq; +import cn.axzo.msg.center.api.request.MessageTotalReq; +import cn.axzo.msg.center.api.request.MsgReturnParamRes; +import cn.axzo.msg.center.api.request.StatisticsReq; +import cn.axzo.msg.center.api.response.InsideMessageModuleRes; +import cn.axzo.msg.center.api.response.MessageNewRes; +import cn.axzo.msg.center.api.response.MessageTotalRes; +import cn.axzo.msg.center.api.response.RelationRes; +import cn.axzo.msg.center.api.response.TemplateRes; import cn.axzo.msg.center.common.utils.CustomBeanUtils; import cn.axzo.msg.center.domain.entity.MessageModule; import cn.axzo.msg.center.domain.entity.MessageRelation; import cn.axzo.msg.center.domain.entity.MessageTemplate; import cn.axzo.msg.center.domain.enums.ModuleBizTypeEnum; import cn.axzo.msg.center.domain.enums.UserTypeEnum; -import cn.axzo.msg.center.inside.notices.service.*; +import cn.axzo.msg.center.inside.notices.service.MessageCoreService; +import cn.axzo.msg.center.inside.notices.service.MessageModuleService; +import cn.axzo.msg.center.inside.notices.service.MessageRecordService; +import cn.axzo.msg.center.inside.notices.service.MessageRelationService; +import cn.axzo.msg.center.inside.notices.service.MessageTemplateService; import cn.azxo.framework.common.model.CommonResponse; import cn.hutool.json.JSONUtil; import com.google.common.collect.Lists; @@ -26,6 +41,7 @@ import javax.validation.Valid; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -56,8 +72,7 @@ public class InsideNoticesApiImpl implements InsideNoticesApi { @Override /*@RepeatSubCheck(value = 3000, unique = true, containParams = false)*///pudge做了前置校验 public CommonResponse> pushMsg(@RequestBody GeneralMessageReq message) { - List result = messageRecordService.pushMsg(message); - return CommonResponse.success(result); + return CommonResponse.success(Collections.emptyList()); } @Override diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java index 1a558bd6..3b6a9209 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java @@ -49,7 +49,6 @@ import cn.axzo.msg.center.domain.enums.YesNoEnum; import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq; import cn.axzo.msg.center.inside.notices.event.SendMessageEvent; import cn.axzo.msg.center.inside.notices.service.MessageRecordService; -import cn.axzo.msg.center.message.service.GeneralMessageMapperService; import cn.axzo.msg.center.mq.MqMessageRecord; import cn.axzo.msg.center.mq.MqMessageType; import cn.axzo.msg.center.mq.MqProducer; @@ -124,9 +123,6 @@ public class MessageRecordServiceImpl implements MessageRecordService { /*@Resource private IdentityProfileService identityProfileService;*/ - @Resource - private GeneralMessageMapperService generalMessageMapperService; - /** * 新增推送消息接口 * @@ -178,7 +174,7 @@ public class MessageRecordServiceImpl implements MessageRecordService { Map toIdRecordMap = Maps.newHashMap(); List messageRecords = saveBatch(basic, toIds, message.getToldIdPersonIdMap(), toIdRecordMap); try { - generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap); + //generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap); } catch (Exception e) { log.warn("asyncBatchSendMessage push error, param= [{}]", JSON.toJSONString(message), e); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/ISendProcessor.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/ISendProcessor.java deleted file mode 100644 index 35b79c20..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/ISendProcessor.java +++ /dev/null @@ -1,34 +0,0 @@ -package cn.axzo.msg.center.message.send.processor; - -import cn.axzo.msg.center.common.utils.PlaceholderResolver; -import com.alibaba.fastjson.JSONObject; -import java.util.Objects; - -/** - * @author syl - * @date 2023/12/18 - */ -public interface ISendProcessor { - - default void check(SendContext context) { - } - - default void send(SendContext context) { - } - - default void checkReach(ReachContext context) { - } - - default void reach(ReachContext context) { - } - - default void save(ReachContext context) { - } - - default String parseString(String string, JSONObject params) { - if (Objects.isNull(params)) { - return string; - } - return PlaceholderResolver.getDefaultResolver().resolveByMap(string, params); - } -} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/NotificationProcessor.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/NotificationProcessor.java deleted file mode 100644 index 2b0d51a2..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/NotificationProcessor.java +++ /dev/null @@ -1,209 +0,0 @@ -package cn.axzo.msg.center.message.send.processor; - -import cn.axzo.basics.common.util.AssertUtil; -import cn.axzo.msg.center.api.request.GeneralMessageReq; -import cn.axzo.msg.center.dal.CommonMessageRecordDao; -import cn.axzo.msg.center.domain.entity.CommonMessageRecord; -import cn.axzo.msg.center.domain.entity.MessageBaseTemplate; -import cn.axzo.msg.center.domain.entity.MessageRecord; -import cn.axzo.msg.center.domain.enums.BizActionCategory; -import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO; -import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam; -import cn.axzo.msg.center.message.service.BizEventMappingService; -import cn.axzo.msg.center.message.service.GeneralMessageService; -import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService; -import cn.axzo.msg.center.service.dto.PersonDTO; -import cn.axzo.msg.center.service.enums.GeneralMessageStateEnum; -import cn.axzo.msg.center.service.enums.IdentityTypeEnum; -import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; -import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; -import cn.axzo.msg.center.service.pending.response.PushMessageReturnDTO; -import cn.axzo.msg.center.utils.PersonIdentityUtil; -import cn.axzo.msg.center.utils.UUIDUtil; -import cn.hutool.core.collection.CollUtil; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * @author syl - * @date 2023/12/18 - */ -@Slf4j -@Component -public class NotificationProcessor implements ISendProcessor { - - @Autowired - private GeneralMessageService generalMessageService; - - @Autowired - private MessageSendTwiceRecordService messageSendTwiceRecordService; - - @Autowired - private BizEventMappingService bizEventMappingService; - - @Autowired - private CommonMessageRecordDao commonMessageRecordDao; - - @Override - public void check(SendContext context) { - AssertUtil.notNull(context.getMessageParam(), "通知消息的参数错误,请检查"); - AssertUtil.isTrue(CollUtil.isNotEmpty(context.getToIdMessageRecordMap()), "通知消息的人员信息不存在"); - AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空"); - AssertUtil.isTrue(BizActionCategory.NOTIFICATION.getCode().equals(context.getReachConfig() - .getCategory()), "消息动作不匹配"); - bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig()); - } - - @Override - public void send(SendContext context) { - // 检查 - this.check(context); - log.info("NotificationProcessor start to async send im message. bizCode= [{}]" - + ", templateCode= [{}]", context.getBizEvent().getBizCode(), - context.getReachConfig().getTemplateCode()); - // 发送 - this.doBatchSendMessage(context); - } - - public void doBatchSendMessage(SendContext context) { - GeneralMessageReq request = context.getMessageParam(); - Map toIdMessageRecordMap = context.getToIdMessageRecordMap(); - String templateCode = context.getReachConfig().getTemplateCode(); - try { - // 发送IM消息 - GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode, - toIdMessageRecordMap.keySet()); - // IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端) - List result = generalMessageService - .batchSendMessage(sendImMsgRequest); - if (CollectionUtils.isEmpty(result)) { - log.info("bizEvent there is not any person successfully send im message. bizCode:[{}]", - request.getRelationId()); - return; - } - // 记录发送了IM消息的旧消息 - recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap()); - } catch (Exception e) { - log.warn("bizEvent broke out some exception while sending im message. bizCode:[{}]", - request.getRelationId(), e); - } - } - - private void recordSendImMessage(Map toIdMessageRecordMap, - List sendImResult, - Map toIdPersonIdMap) { - // 成功发送了IM消息的personId集合 - Set sucSendImMsgPersonIds = sendImResult.stream() - .map(SendImMessageDTO::getPersonId).collect(Collectors.toSet()); - // 从入参中的toIdPersonIdMap中筛选发送成功的entry - Map subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream() - // 过滤掉personId维度IM消息发送失败的entry - .filter(e -> sucSendImMsgPersonIds.contains(e.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Map msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream() - .filter(e -> subToIdPersonIdMap.containsKey(e.getKey())) - .collect(Collectors - .toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey()))); - log.info("bizEvent record message that has been send im message. msgIds:{}", - msgRecordIdPersonIdMap.keySet()); - // 双发记录 - messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap); - } - - - private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request, - String templateCode, - Collection subReceiverIds) { - IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType()); - List receivers = subReceiverIds.stream() - .filter(request.getToldIdPersonIdMap()::containsKey) - .map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType)) - .collect(Collectors.toList()); - OrganizationTypeEnum orgType = - Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN : - OrganizationTypeEnum.valueOf(request.getTerminalType().name()); - return GeneralMessageSendRequest.builder() - .templateCode(templateCode) - .receiver(receivers) - .orgType(orgType) - .orgName(request.getTerminalName()) - .orgId(request.getTerminalId()) - .bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse("")) - .routerParams(Optional.ofNullable(request.getRouterParams()) - .map(v -> JSONObject.parseObject(JSON.toJSONString(v))) - .orElseGet(JSONObject::new)) - .bizExtParams(Optional.ofNullable(request.getMsgParams()) - .map(v -> JSONObject.parseObject(JSON.toJSONString(v))) - .orElseGet(JSONObject::new)) - .build(); - } - - - @Override - public void checkReach(ReachContext context) { - AssertUtil.notNull(context.getMessageParam(), "通知消息的参数错误,请检查"); - AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空"); - AssertUtil.isTrue(BizActionCategory.NOTIFICATION.getCode().equals(context.getReachConfig() - .getCategory()), "消息动作不匹配"); - } - @Override - public void reach(ReachContext context) { - // 检查 - this.checkReach(context); - log.info("NotificationProcessor new send im message. bizCode= [{}]" - + ", templateCode= [{}]", context.getBizEvent().getBizCode(), - context.getReachConfig().getTemplateCode()); - - - } - - @Override - public void save(ReachContext context) { - this.checkReach(context); - MessageBaseTemplate template = bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig()); - context.setTemplate(template); - List records = convertRecords(context); - commonMessageRecordDao.saveBatch(records); - List msgReturnList = records.stream() - .map(r -> PushMessageReturnDTO.builder().identityCode(r.getIdentityCode()).build()) - .collect(Collectors.toList()); - context.setMsgReturnList(msgReturnList); - } - - private List convertRecords(ReachContext context) { - MessageBaseTemplate template = context.getTemplate(); - GeneralMessagePushParam messageParam = context.getMessageParam(); - return messageParam.getReceivers().stream().map(r -> { - CommonMessageRecord.CommonMessageRecordBuilder builder = CommonMessageRecord.builder() - .identityCode(UUIDUtil.uuidString()); - if (Objects.nonNull(messageParam.getInitiator())) { - builder.senderPersonId(messageParam.getInitiator().getId()); - } - builder.receiverPersonId(r.getId()); - builder.bizEventCode(messageParam.getBizEventCode()); - builder.templateCode(template.getCode()); - builder.title(parseString(template.getTitle(), messageParam.getBizExtParams())); - builder.content(parseString(template.getContent(), messageParam.getBizExtParams())); - builder.orgType(messageParam.getOrgType()); - builder.workspaceId(messageParam.getWorkspaceId()); - builder.ouId(messageParam.getOuId()); - builder.state(GeneralMessageStateEnum.HAS_BEEN_SENT); - builder.bizCode(messageParam.getBizCode()); - builder.routerParams(messageParam.getRouterParams()); - builder.bizExtParams(messageParam.getBizExtParams()); - return builder.build(); - }).collect(Collectors.toList()); - - } -} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/PendingProcessor.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/PendingProcessor.java deleted file mode 100644 index 8270d8cb..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/send/processor/PendingProcessor.java +++ /dev/null @@ -1,103 +0,0 @@ -package cn.axzo.msg.center.message.send.processor; - -import cn.axzo.basics.common.util.AssertUtil; -import cn.axzo.msg.center.api.request.GeneralMessageReq; -import cn.axzo.msg.center.domain.enums.BizActionCategory; -import cn.axzo.msg.center.message.service.BizEventMappingService; -import cn.axzo.msg.center.message.service.PendingMessageNewService; -import cn.axzo.msg.center.service.dto.PersonDTO; -import cn.axzo.msg.center.service.enums.BizCategoryEnum; -import cn.axzo.msg.center.service.enums.IdentityTypeEnum; -import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; -import cn.axzo.msg.center.service.pending.request.PendingMessagePushRequest; -import cn.axzo.msg.center.utils.PersonIdentityUtil; -import cn.hutool.core.collection.CollUtil; -import java.util.Collection; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * @author syl - * @date 2023/12/18 - */ -@Slf4j -@Component -public class PendingProcessor implements ISendProcessor { - - @Autowired - private PendingMessageNewService pendingMessageNewService; - - @Autowired - private BizEventMappingService bizEventMappingService; - - @Override - public void check(SendContext context) { - AssertUtil.notNull(context.getMessageParam(), "待办消息的参数错误,请检查"); - AssertUtil.isTrue(CollUtil.isNotEmpty(context.getToIdMessageRecordMap()), "待办消息的人员信息不存在"); - AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空"); - AssertUtil.isTrue(BizActionCategory.PENDING.getCode().equals(context.getReachConfig() - .getCategory()), "待办业务动作不匹配"); - - bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig()); - } - - @Override - public void send(SendContext context) { - // 检查 - this.check(context); - log.info("PendingProcessor start to async send pending message. bizCode= [{}]" - + ", templateCode= [{}]", context.getBizEvent().getBizCode(), - context.getReachConfig().getTemplateCode()); - - // 由于之前业务方这期未做改造,而之前对外提供的参数只满足消息,待办这块的发送需要定义待办相关的业务属性 - // 迁移到新接口 -// PendingMessagePushRequest request = buildSendRequest(context); -// pendingMessageNewService.push(PendingMessagePushParam.from(request)); - } - - - private PendingMessagePushRequest buildSendRequest(SendContext context) { - GeneralMessageReq request = context.getMessageParam(); - String templateCode = context.getReachConfig().getTemplateCode(); - Collection subReceiverIds = context.getToIdMessageRecordMap().keySet(); - - PersonDTO promoter = PersonDTO.from(null, request.getFromId(), null); - IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType()); - List receivers = subReceiverIds.stream() - .filter(request.getToldIdPersonIdMap()::containsKey) - .map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType)) - .collect(Collectors.toList()); - OrganizationTypeEnum orgType = Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN : - OrganizationTypeEnum.valueOf(request.getTerminalType().name()); - return PendingMessagePushRequest.builder() - .templateCode(templateCode) - .promoter(promoter) - .executor(receivers) - .orgType(orgType) - .workspaceId(request.getTerminalId()) - .ouId(null) - .bizCategory(BizCategoryEnum.OTHER) - .bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse("")) - .routerParams(null) - .bizExtParams(null) - .build(); - } - - @Override - public void checkReach(ReachContext context) { - - } - @Override - public void reach(ReachContext context) { - // 等后续im具有待办消息的能力,再接入,目前统一走待办中心 - } - - @Override - public void save(ReachContext context) { - } -} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/CommonMessageRecordService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/CommonMessageRecordService.java deleted file mode 100644 index fd427b67..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/CommonMessageRecordService.java +++ /dev/null @@ -1,13 +0,0 @@ -package cn.axzo.msg.center.message.service; - -import cn.axzo.msg.center.api.response.ReachResp; -import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam; - -/** - * @author syl - * @date 2024/1/8 - */ -public interface CommonMessageRecordService { - - ReachResp reach(GeneralMessagePushParam param); -} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageMapperService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageMapperService.java deleted file mode 100644 index c30c5165..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/GeneralMessageMapperService.java +++ /dev/null @@ -1,25 +0,0 @@ -package cn.axzo.msg.center.message.service; - -import cn.axzo.msg.center.api.request.GeneralMessageReq; -import cn.axzo.msg.center.domain.entity.MessageRecord; - -import java.util.List; -import java.util.Map; - -/** - * 新老版本消息映射的相关接口 - * - * @author cold_blade - * @date 2023/10/23 - * @version 1.0 - */ -public interface GeneralMessageMapperService { - - /** - * 异步批量发送消息 - * - * @param request 发送消息时的请求参数 - * @param toIdMessageRecordMap 接收者身份id与旧消息记录的映射关系 - */ - void asyncBatchSendMessage(GeneralMessageReq request, Map toIdMessageRecordMap); -} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/CommonMessageRecordServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/CommonMessageRecordServiceImpl.java deleted file mode 100644 index 8e954f3a..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/CommonMessageRecordServiceImpl.java +++ /dev/null @@ -1,80 +0,0 @@ -package cn.axzo.msg.center.message.service.impl; - -import cn.axzo.basics.common.util.AssertUtil; -import cn.axzo.msg.center.api.response.ReachResp; -import cn.axzo.msg.center.domain.entity.BizEventMapping; -import cn.axzo.msg.center.domain.enums.BizActionCategory; -import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam; -import cn.axzo.msg.center.message.send.processor.ISendProcessor; -import cn.axzo.msg.center.message.send.processor.ReachContext; -import cn.axzo.msg.center.message.service.BizEventMappingService; -import cn.axzo.msg.center.message.service.CommonMessageRecordService; -import java.util.Optional; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import jodd.util.concurrent.ThreadFactoryBuilder; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Service; - -/** - * @author syl - * @date 2024/1/8 - */ -@Slf4j -@Service -public class CommonMessageRecordServiceImpl implements CommonMessageRecordService { - - @Value("${msg-im-server.partition-size:500}") - private Integer partitionSize; - - private final ThreadFactory asyncReachThreadFactory = ThreadFactoryBuilder.create() - .setDaemon(true).setNameFormat("REACH_IM_MESSAGE_%d").get(); - private final ExecutorService reachExecutorService = new ThreadPoolExecutor(5, - 10, 5, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1024), asyncReachThreadFactory); - - @Autowired - private BizEventMappingService bizEventMappingService; - @Autowired - private ApplicationContext applicationContext; - - @Override - public ReachResp reach(GeneralMessagePushParam param) { - String bizEventCode = param.getBizEventCode(); - Optional bizEventOpt = bizEventMappingService.getByBizCode(bizEventCode); - AssertUtil.isTrue(bizEventOpt.isPresent(), String.format("业务事件映射%s不存在", bizEventCode)); - BizEventMapping bizEvent = bizEventOpt.get(); - AssertUtil.notEmpty(bizEvent.getReachConfig(), String.format("业务事件映射%s无业务动作配置", bizEventCode)); - - log.info("start to send reach message. bizCode:[{}]", bizEventCode); - ReachResp resp = new ReachResp(); - bizEvent.getReachConfig().forEach(r -> { - BizActionCategory category = BizActionCategory.getCategory(r.getCategory()); - ISendProcessor processor = applicationContext.getBean(category.getProcessor(), ISendProcessor.class); - - ReachContext reachContext = ReachContext.builder() - .bizEvent(bizEvent).reachConfig(r) - .messageParam(param) - .build(); - processor.save(reachContext); - switch (category) { - case NOTIFICATION: - // resp.setMsgReturnList(reachContext.getMsgReturnList()); - break; - case PENDING: - resp.setPendingList(reachContext.getPendingList()); - break; - default: - } - CompletableFuture.runAsync(() -> processor.reach(reachContext), reachExecutorService); - }); - return resp; - } -} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageMapperServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageMapperServiceImpl.java deleted file mode 100644 index dcbed2e7..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageMapperServiceImpl.java +++ /dev/null @@ -1,180 +0,0 @@ -package cn.axzo.msg.center.message.service.impl; - -import cn.axzo.basics.common.util.AssertUtil; -import cn.axzo.msg.center.api.enums.MsgTypeEnum; -import cn.axzo.msg.center.api.request.GeneralMessageReq; -import cn.axzo.msg.center.domain.entity.BizEventMapping; -import cn.axzo.msg.center.domain.entity.MessageRecord; -import cn.axzo.msg.center.domain.enums.BizActionCategory; -import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO; -import cn.axzo.msg.center.message.send.processor.ISendProcessor; -import cn.axzo.msg.center.message.send.processor.SendContext; -import cn.axzo.msg.center.message.service.BizEventMappingService; -import cn.axzo.msg.center.message.service.GeneralMessageMapperService; -import cn.axzo.msg.center.message.service.GeneralMessageService; -import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService; -import cn.axzo.msg.center.message.service.RelationTemplateMapService; -import cn.axzo.msg.center.service.dto.PersonDTO; -import cn.axzo.msg.center.service.enums.IdentityTypeEnum; -import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; -import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest; -import cn.axzo.msg.center.utils.PersonIdentityUtil; -import cn.hutool.core.map.MapUtil; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import jodd.util.concurrent.ThreadFactoryBuilder; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Service; - -/** - * @author cold_blade - * @date 2023/10/23 - * @version 1.0 - */ -@Slf4j -@Service -@RequiredArgsConstructor -public class GeneralMessageMapperServiceImpl implements GeneralMessageMapperService { - - private final ThreadFactory asyncSendMsgThreadFactory = ThreadFactoryBuilder.create() - .setDaemon(true).setNameFormat("ASYNC_SEND_IM_MESSAGE_%d").get(); - private final ExecutorService asyncSendMsgExecutorService = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1024), asyncSendMsgThreadFactory); - - private final GeneralMessageService generalMessageService; - private final RelationTemplateMapService relationTemplateMapService; - private final MessageSendTwiceRecordService messageSendTwiceRecordService; - private final BizEventMappingService bizEventMappingService; - private final ApplicationContext applicationContext; - - @Override - public void asyncBatchSendMessage(GeneralMessageReq request, Map toIdMessageRecordMap) { - log.info("do some check before send im message. relationId:[{}]", request.getRelationId()); - if (MsgTypeEnum.PENDING_MESSAGE.equals(request.getType())) { - // 待办消息暂时不支持 - log.info("pending message is not supported."); - return; - } - if (Objects.isNull(request.getRelationId())) { - log.info("relation id is null."); - return; - } - log.info("request relationId= [{}], toldIdPersonIdMap= [{}]", request.getRelationId(), - JSON.toJSONString(request.getToldIdPersonIdMap())); - // IM 需要personId,过滤掉 personId = 0 的数据 - Map imToldIdPersonIdMap = request.getToldIdPersonIdMap().entrySet() - .stream() - .filter(entry -> entry.getValue() > 0) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - log.info("request relationId= [{}], final toldIdPersonIdMap= [{}]", request.getRelationId(), - JSON.toJSONString(imToldIdPersonIdMap)); - request.setToldIdPersonIdMap(imToldIdPersonIdMap); - if (MapUtil.isEmpty(request.getToldIdPersonIdMap())) { - // 由于IM那边是根据personId来创建账户的,所以强依赖personId - log.info("toIdPersonIdMap is empty."); - return; - } - - String bizCode = String.valueOf(request.getRelationId()); - Optional bizEventOpt = bizEventMappingService.getByBizCode(bizCode); - AssertUtil.isTrue(bizEventOpt.isPresent(), String.format("业务事件映射%s不存在", bizCode)); - BizEventMapping bizEvent = bizEventOpt.get(); - AssertUtil.notEmpty(bizEvent.getReachConfig(), String.format("业务事件映射%s无业务动作配置", bizCode)); - - log.info("start to send reach message. bizCode:[{}]", bizCode); - bizEvent.getReachConfig().forEach(r -> { - BizActionCategory category = BizActionCategory.getCategory(r.getCategory()); - ISendProcessor processor = applicationContext.getBean(category.getProcessor(), ISendProcessor.class); - - SendContext sendContext = SendContext.builder() - .bizEvent(bizEvent).reachConfig(r) - .messageParam(request) - .toIdMessageRecordMap(toIdMessageRecordMap) - .build(); - CompletableFuture.runAsync(() -> processor.send(sendContext), asyncSendMsgExecutorService); - }); - } - - private void doBatchSendMessage(GeneralMessageReq request, Map toIdMessageRecordMap) { - String templateCode = relationTemplateMapService.queryByRelationId(request.getRelationId()).orElse(null); - if (StringUtils.isBlank(templateCode)) { - log.info("the relationId([{}]) is not map any new message template. ", request.getRelationId()); - return; - } - log.info("start to send im message. relationId:[{}], templateCode:[{}]", request.getRelationId(), templateCode); - try { - // 发送IM消息 - GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode, toIdMessageRecordMap.keySet()); - // IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端) - List result = generalMessageService.batchSendMessage(sendImMsgRequest); - if (CollectionUtils.isEmpty(result)) { - log.info("there is not any person successfully send im message. relationId:[{}]", request.getRelationId()); - return; - } - // 记录发送了IM消息的旧消息 - recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap()); - } catch (Exception e) { - log.warn("broke out some exception while sending im message. relationId:[{}]", request.getRelationId(), e); - } - } - - private void recordSendImMessage(Map toIdMessageRecordMap, List sendImResult, - Map toIdPersonIdMap) { - // 成功发送了IM消息的personId集合 - Set sucSendImMsgPersonIds = sendImResult.stream() - .map(SendImMessageDTO::getPersonId).collect(Collectors.toSet()); - // 从入参中的toIdPersonIdMap中筛选发送成功的entry - Map subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream() - // 过滤掉personId维度IM消息发送失败的entry - .filter(e -> sucSendImMsgPersonIds.contains(e.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Map msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream() - .filter(e -> subToIdPersonIdMap.containsKey(e.getKey())) - .collect(Collectors.toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey()))); - log.info("record message that has been send im message. msgIds:{}", msgRecordIdPersonIdMap.keySet()); - // 双发记录 - messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap); - } - - private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request, String templateCode, - Collection subReceiverIds) { - IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType()); - List receivers = subReceiverIds.stream() - .filter(request.getToldIdPersonIdMap()::containsKey) - .map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType)) - .collect(Collectors.toList()); - OrganizationTypeEnum orgType = Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN : - OrganizationTypeEnum.valueOf(request.getTerminalType().name()); - return GeneralMessageSendRequest.builder() - .templateCode(templateCode) - .receiver(receivers) - .orgType(orgType) - .orgName(request.getTerminalName()) - .orgId(request.getTerminalId()) - .bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse("")) - .routerParams(Optional.ofNullable(request.getRouterParams()) - .map(v -> JSONObject.parseObject(JSON.toJSONString(v))) - .orElseGet(JSONObject::new)) - .bizExtParams(Optional.ofNullable(request.getMsgParams()) - .map(v -> JSONObject.parseObject(JSON.toJSONString(v))) - .orElseGet(JSONObject::new)) - .build(); - } -} diff --git a/msg-center-webapi/src/main/java/cn/axzo/msg/center/webapi/MessageController.java b/msg-center-webapi/src/main/java/cn/axzo/msg/center/webapi/MessageController.java index 2d24f888..7d922792 100644 --- a/msg-center-webapi/src/main/java/cn/axzo/msg/center/webapi/MessageController.java +++ b/msg-center-webapi/src/main/java/cn/axzo/msg/center/webapi/MessageController.java @@ -17,6 +17,7 @@ import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import javax.validation.Valid; +import java.util.Collections; import java.util.List; /** @@ -45,7 +46,7 @@ public class MessageController { */ @RequestMapping(value = "api/message/push", method = RequestMethod.POST) CommonResponse> pushMsg(@RequestBody GeneralMessageReq message){ - return CommonResponse.success(messageRecordService.pushMsg(message)); + return CommonResponse.success(Collections.emptyList()); } /**