Merge branch 'refs/heads/REQ-2723'

This commit is contained in:
yanglin 2024-09-18 19:15:35 +08:00
commit 97d25e45b0
30 changed files with 491 additions and 786 deletions

View File

@ -21,6 +21,7 @@ public class MessageAPIV3Controller implements MessageAPIV3 {
@Override
public CommonResponse<MessageSendRespV3> send(MessageSendReqV3 req) {
log.info("MessageAPIV3Controller.send req={}", req);
return CommonResponse.success(messageServiceV3.send(req));
}

View File

@ -0,0 +1,29 @@
package cn.axzo.msg.center.inside.notices.controller.msg;
import cn.axzo.msg.center.api.MessageAPIV4;
import cn.axzo.msg.center.api.request.v4.MessageSendRequestV4;
import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
import cn.axzo.msg.center.inside.notices.service.MessageServiceV4;
import cn.axzo.trade.web.annotation.EnableResponseAdvice;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yanglin
*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class MessageAPIV4Controller implements MessageAPIV4 {
private final MessageServiceV4 messageServiceV4;
@Override
@EnableResponseAdvice(enable = false)
public MessageSendRespV3 send(MessageSendRequestV4 request) {
log.info("MessageAPIV4Controller.send req={}", request);
return messageServiceV4.send(request);
}
}

View File

@ -7,5 +7,5 @@ import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
* @author yanglin
*/
public interface MessageServiceV3 {
MessageSendRespV3 send(MessageSendReqV3 req);
MessageSendRespV3 send(MessageSendReqV3 request);
}

View File

@ -0,0 +1,13 @@
package cn.axzo.msg.center.inside.notices.service;
import cn.axzo.msg.center.api.request.v4.MessageSendRequestV4;
import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
/**
* @author yanglin
*/
public interface MessageServiceV4 {
MessageSendRespV3 send(MessageSendRequestV4 request);
}

View File

@ -3,11 +3,8 @@ package cn.axzo.msg.center.inside.notices.service.impl;
import cn.axzo.msg.center.api.GeneralMessageApi;
import cn.axzo.msg.center.api.request.GeneralMessagePushReq;
import cn.axzo.msg.center.api.response.ReachResp;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
import cn.axzo.msg.center.message.service.CommonMessageRecordService;
import cn.azxo.framework.common.model.CommonResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;
/**
@ -18,12 +15,8 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class GeneralMessageApiImpl implements GeneralMessageApi {
@Autowired
private CommonMessageRecordService commonMessageRecordService;
@Override
public CommonResponse<ReachResp> reachGeneralMsg(GeneralMessagePushReq param) {
// // TODO: 2024/1/10
ReachResp result = commonMessageRecordService.reach(GeneralMessagePushParam.from(param));
return CommonResponse.success(result);
return CommonResponse.success(new ReachResp());
}
}

View File

@ -4,15 +4,30 @@ import cn.axzo.core.domain.PageResult;
import cn.axzo.core.service.ServiceException;
import cn.axzo.msg.center.api.InsideNoticesApi;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.request.*;
import cn.axzo.msg.center.api.response.*;
import cn.axzo.msg.center.api.request.ChangeMessageReq;
import cn.axzo.msg.center.api.request.ChangeMessageStateReq;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.api.request.MessageTotalReq;
import cn.axzo.msg.center.api.request.MsgReturnParamRes;
import cn.axzo.msg.center.api.request.StatisticsReq;
import cn.axzo.msg.center.api.response.InsideMessageModuleRes;
import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.api.response.MessageTotalRes;
import cn.axzo.msg.center.api.response.RelationRes;
import cn.axzo.msg.center.api.response.TemplateRes;
import cn.axzo.msg.center.common.utils.CustomBeanUtils;
import cn.axzo.msg.center.domain.entity.MessageModule;
import cn.axzo.msg.center.domain.entity.MessageRelation;
import cn.axzo.msg.center.domain.entity.MessageTemplate;
import cn.axzo.msg.center.domain.enums.ModuleBizTypeEnum;
import cn.axzo.msg.center.domain.enums.UserTypeEnum;
import cn.axzo.msg.center.inside.notices.service.*;
import cn.axzo.msg.center.inside.notices.service.MessageCoreService;
import cn.axzo.msg.center.inside.notices.service.MessageModuleService;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.inside.notices.service.MessageRelationService;
import cn.axzo.msg.center.inside.notices.service.MessageTemplateService;
import cn.azxo.framework.common.model.CommonResponse;
import cn.hutool.json.JSONUtil;
import com.google.common.collect.Lists;
@ -26,6 +41,7 @@ import javax.validation.Valid;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -56,8 +72,7 @@ public class InsideNoticesApiImpl implements InsideNoticesApi {
@Override
/*@RepeatSubCheck(value = 3000, unique = true, containParams = false)*///pudge做了前置校验
public CommonResponse<List<MsgReturnParamRes>> pushMsg(@RequestBody GeneralMessageReq message) {
List<MsgReturnParamRes> result = messageRecordService.pushMsg(message);
return CommonResponse.success(result);
return CommonResponse.success(Collections.emptyList());
}
@Override

View File

@ -49,7 +49,6 @@ import cn.axzo.msg.center.domain.enums.YesNoEnum;
import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq;
import cn.axzo.msg.center.inside.notices.event.SendMessageEvent;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType;
@ -124,9 +123,6 @@ public class MessageRecordServiceImpl implements MessageRecordService {
/*@Resource
private IdentityProfileService identityProfileService;*/
@Resource
private GeneralMessageMapperService generalMessageMapperService;
/**
* 新增推送消息接口
*
@ -178,7 +174,7 @@ public class MessageRecordServiceImpl implements MessageRecordService {
Map<Long, MessageRecord> toIdRecordMap = Maps.newHashMap();
List<MessageRecord> messageRecords = saveBatch(basic, toIds, message.getToldIdPersonIdMap(), toIdRecordMap);
try {
generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap);
//generalMessageMapperService.asyncBatchSendMessage(message, toIdRecordMap);
} catch (Exception e) {
log.warn("asyncBatchSendMessage push error, param= [{}]", JSON.toJSONString(message), e);
}

View File

@ -0,0 +1,24 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3;
import com.alibaba.fastjson.JSONObject;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import javax.validation.Valid;
/**
* 过度接口
*
* @author yanglin
*/
@Component
@FeignClient(value = "event-hub", url = "http://event-hub:8080")
public interface ActionClient {
@RequestMapping(value = "action/performAction", method = RequestMethod.POST)
JSONObject performAction(@RequestBody @Valid JSONObject request);
}

View File

@ -1,30 +1,15 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.msg.center.api.request.v3.MessageSendReqV3;
import cn.axzo.msg.center.api.request.v3.PendingSendInfo;
import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.domain.entity.BizEventMapping;
import cn.axzo.msg.center.domain.enums.Channels;
import cn.axzo.msg.center.inside.notices.service.MessageServiceV3;
import cn.axzo.msg.center.inside.notices.service.impl.v3.msg.MessageMappingProcessor;
import cn.axzo.msg.center.inside.notices.service.impl.v3.msg.TemplateMessage;
import cn.axzo.msg.center.inside.notices.service.impl.v3.todo.TodoMappingProcessor;
import cn.axzo.msg.center.inside.notices.utils.FunctionalTransactionTemplate;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
import cn.axzo.msg.center.service.bizevent.request.ReachDto;
import cn.axzo.msg.center.utils.UUIDUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @author yanglin
*/
@ -33,51 +18,32 @@ import java.util.List;
@RequiredArgsConstructor
public class MessageServiceV3Impl implements MessageServiceV3 {
private final BizEventMappingService bizEventMappingService;
private final MessageTemplateNewService messageTemplateNewService;
private final FunctionalTransactionTemplate transactionTemplate;
private final ApplicationContext beanFactory;
private final ActionClient actionClient;
/**
* 过渡期. 有一个逻辑上的循环依赖. 等业务方都切换到event-hub的接口后将这个接口下线
*/
@Override
public MessageSendRespV3 send(MessageSendReqV3 req) {
log.info("received sending message request. req={}", req);
req.validate();
BizEventMapping mapping = bizEventMappingService
.getByBizCode(req.getBizEventMappingCode())
.orElse(null);
BizAssertions.assertNotNull(mapping, String.format(
"找不到对应的事件映射: %s", req.getBizEventMappingCode()));
//noinspection DataFlowIssue
BizAssertions.assertNotEmpty(mapping.getReachConfig(), String.format(
"业务事件映射%s无业务动作配置", req.getBizEventMappingCode()));
String sendRequestNo = UUIDUtil.uuidString();
List<EventMappingProcessor> processors = new ArrayList<>();
for (ReachDto cfg : mapping.getReachConfig()) {
MessageTemplateDTO template = messageTemplateNewService
.queryEnableTemplateByCode(cfg.getTemplateCode())
.orElseThrow(() -> new ServiceException(String.format(
"未查询到对应的模板, templateCode=%s", cfg.getTemplateCode())));
if (Channels.NOTIFICATION.is(cfg.getCategory())) {
// @Scope("prototype") -> we're good
MessageMappingProcessor imProcessor = beanFactory.getBean(MessageMappingProcessor.class);
imProcessor.setTemplate(new TemplateMessage(req, sendRequestNo, cfg, template));
processors.add(imProcessor);
} else if (Channels.PENDING.is(cfg.getCategory())) {
// @Scope("prototype") -> we're good
TodoMappingProcessor todoProcessor = beanFactory.getBean(TodoMappingProcessor.class);
todoProcessor.setSendRequestNo(sendRequestNo);
todoProcessor.setSendRequestV3(req);
todoProcessor.setTemplate(template);
todoProcessor.setReachConfig(cfg);
processors.add(todoProcessor);
}
}
transactionTemplate.exec(() -> processors.forEach(EventMappingProcessor::saveRecords));
processors.forEach(EventMappingProcessor::maybeAsyncSend);
MessageSendRespV3 resp = new MessageSendRespV3(sendRequestNo, new HashMap<>());
processors.forEach(p -> resp.addResult(p.buildTemplateSendResult()));
return resp;
public MessageSendRespV3 send(MessageSendReqV3 request) {
// 1. 发送消息/待办的请求
JSONObject messageSendRequest = new JSONObject();
// 2. 设置基本的发送消息的属性
messageSendRequest.put("sendBasicInfo", request);
// 3. 设置发送待办的信息
PendingSendInfo pendingInfo = request.getChannelParam(
MessageSendReqV3.CHANNEL_PENDING, PendingSendInfo.class);
messageSendRequest.put("pendingSendInfo", pendingInfo);
// 4. 构建event-hub的请求
JSONObject performActionRequest = new JSONObject();
performActionRequest.put("eventCode", request.getBizEventMappingCode());
performActionRequest.put("messageSendRequest", messageSendRequest);
// 5. 调用event-hub的接口
JSONObject response = actionClient.performAction(performActionRequest);
log.info("actionClient.performAction, request={}, resp={}", performActionRequest, response);
BizAssertions.assertTrue(response.getIntValue("code") == 200, response.getString("msg"));
return response
.getJSONObject("data")
.getObject("messageSendResp", MessageSendRespV3.class);
}
}

View File

@ -68,7 +68,7 @@ public class MessageMappingProcessor implements EventMappingProcessor {
@Override
public TemplateSendResultV3 buildTemplateSendResult() {
TemplateSendResultV3 templateResult = new TemplateSendResultV3(
template.getTemplateCode(), template.getConfig().getCategory());
template.getTemplateCode(), template.getChannel().name());
for (MessageRecordV3 message : template.getMessageRecords()) {
templateResult.addResult(new MessageSendResultV3(
message.getReceiverPersonId(), message.getId()));

View File

@ -5,14 +5,15 @@ import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam;
import cn.axzo.im.center.api.vo.req.SendTemplateMessageParam.ReceivePerson;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.api.enums.MsgStateV3Enum;
import cn.axzo.msg.center.api.request.v3.MessageSendReqV3;
import cn.axzo.msg.center.api.request.v4.MessageSendBasicInfoV4;
import cn.axzo.msg.center.api.request.v4.MessageSendRequestV4;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.service.bizevent.request.ReachDto;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import cn.axzo.msg.center.service.enums.Channel;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.utils.UUIDUtil;
import com.alibaba.fastjson.JSONObject;
@ -39,10 +40,10 @@ public class TemplateMessage {
public static final String BIZ_ID_PREFIX = "msg-center";
private final MessageSendReqV3 req;
private final MessageSendRequestV4 req;
private final String batchNo;
@Getter
private final ReachDto config;
private final Channel channel;
@Getter
private final MessageTemplateDTO template;
@ -71,32 +72,33 @@ public class TemplateMessage {
return records;
}
records = new ArrayList<>();
for (PersonV3DTO receiver : req.distinctReceivers()) {
PersonV3DTO sender = req.getSender();
MessageSendBasicInfoV4 sendBasicInfo = req.getSendBasicInfo();
for (PersonV3DTO receiver : sendBasicInfo.distinctReceivers()) {
PersonV3DTO sender = sendBasicInfo.getSender();
MessageRecordV3 message = new MessageRecordV3();
records.add(message);
message.setBatchNo(batchNo);
message.setIdentityCode(UUIDUtil.uuidString());
message.setSenderPersonId(sender == null ? null : sender.getId());
message.setSenderOuId(req.getSenderOuId());
message.setSenderWorkspaceId(req.getSenderWorkspaceId());
message.setSenderOuId(sendBasicInfo.getSenderOuId());
message.setSenderWorkspaceId(sendBasicInfo.getSenderWorkspaceId());
message.setReceiverPersonId(receiver.getId());
message.setReceiverOuId(req.determineReceiversOuId());
message.setReceiverWorkspaceId(req.determineReceiversWorkspaceId());
message.setBizEventMappingCode(req.getBizEventMappingCode());
message.setReceiverOuId(sendBasicInfo.determineReceiversOuId());
message.setReceiverWorkspaceId(sendBasicInfo.determineReceiversWorkspaceId());
message.setBizEventMappingCode(req.getEventMappingCode());
message.setTemplateCode(template.getCode());
message.setTitle(parseTitle());
message.setContent(parseContent());
OrganizationTypeEnum orgType = req.getReceiversOrgType();
OrganizationTypeEnum orgType = sendBasicInfo.getReceiversOrgType();
if (orgType == null) {
orgType = OrganizationTypeEnum.UNKNOWN;
}
message.setReceiverOrgType(orgType.stringCode());
message.setSubtitle(req.getSubtitle());
message.setSubtitle(sendBasicInfo.getSubtitle());
message.setState(MsgStateV3Enum.UNSENT);
message.setBizCode(req.determineBizCode());
message.setRouterParams(req.getRouterParams());
message.setBizExtParams(req.getBizExtParams());
message.setBizCode(sendBasicInfo.determineBizCode());
message.setRouterParams(sendBasicInfo.getRouterParams());
message.setBizExtParams(sendBasicInfo.getBizExtParams());
message.setMsgExtInfo(getMsgExtInfo());
message.setFailCause(null);
message.setCreateAt(new Date());
@ -115,8 +117,9 @@ public class TemplateMessage {
MessageRecordV3 sample = getMessageRecords().get(0);
GeneralMessagePushVO sendVo = templateParser.parse(sample, template);
MessageSendBasicInfoV4 sendBasicInfo = req.getSendBasicInfo();
SendTemplateMessageParam imReq = new SendTemplateMessageParam();
imReq.setBizId(String.format("%s:%s", getBizIdPrefix(template.getCode()), req.determineBizCode()));
imReq.setBizId(String.format("%s:%s", getBizIdPrefix(template.getCode()), sendBasicInfo.determineBizCode()));
imReq.setSendPriority(template.determineImSendPriority());
imReq.setMsgHeader(parseTitle());
imReq.setMsgContent(parseContent());
@ -126,9 +129,9 @@ public class TemplateMessage {
ArrayList<ReceivePerson> receivers = new ArrayList<>();
Set<Long> cmUnique = new HashSet<>();
Set<OuAndPerson> cmpUnique = new HashSet<>();
for (PersonV3DTO receiver : req.receivers()) {
for (PersonV3DTO receiver : sendBasicInfo.receivers()) {
PersonV3DTO.ReceiveModel imReceiveModel = receiver.getImReceiveModel();
Long ouId = imReceiveModel == null ? req.determineReceiversOuId() : imReceiveModel.getOuId();
Long ouId = imReceiveModel == null ? sendBasicInfo.determineReceiversOuId() : imReceiveModel.getOuId();
for (AppTypeEnum app : apps) {
if (app == AppTypeEnum.CM && !cmUnique.add(receiver.getId())) {
continue;
@ -157,14 +160,16 @@ public class TemplateMessage {
String parseTitle() {
if (title == null) {
title = PlaceholderResolver.tryResolve(template.getTitle(), req.getBizExtParams());
title = PlaceholderResolver.tryResolve(
template.getTitle(), req.getSendBasicInfo().getBizExtParams());
}
return title;
}
String parseContent() {
if (content == null) {
content = PlaceholderResolver.tryResolve(template.getContent(), req.getBizExtParams());
content = PlaceholderResolver.tryResolve(
template.getContent(), req.getSendBasicInfo().getBizExtParams());
}
return content;
}
@ -186,8 +191,8 @@ public class TemplateMessage {
public String toString() {
HashMap<String, Object> values = new HashMap<>();
values.put("batchNo", batchNo);
values.put("bizEventMappingCode", req.getBizEventMappingCode());
values.put("bizCode", req.getBizCode());
values.put("bizEventMappingCode", req.getEventMappingCode());
values.put("bizCode", req.getSendBasicInfo().getBizCode());
values.put("templateCode", template.getCode());
values.put("messageIds", getMessageIds());
values.put("receiverPersonIds", getReceiverPersonIds());

View File

@ -1,6 +1,7 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3.todo;
import cn.axzo.msg.center.api.request.v3.MessageSendReqV3;
import cn.axzo.msg.center.api.request.v4.MessageSendBasicInfoV4;
import cn.axzo.msg.center.api.request.v4.MessageSendRequestV4;
import cn.axzo.msg.center.api.request.v3.PendingSendInfo;
import cn.axzo.msg.center.api.response.v3.MessageSendResultV3;
import cn.axzo.msg.center.api.response.v3.TemplateSendResultV3;
@ -9,9 +10,9 @@ import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
import cn.axzo.msg.center.message.service.todo.manage.TodoManager;
import cn.axzo.msg.center.message.service.todo.manage.TodoRequestContext;
import cn.axzo.msg.center.service.bizevent.request.ReachDto;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import cn.axzo.msg.center.service.enums.Channel;
import cn.axzo.msg.center.service.pending.response.PushPendingMessageDTO;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@ -35,35 +36,35 @@ import java.util.stream.Collectors;
public class TodoMappingProcessor implements EventMappingProcessor {
private final TodoManager todoManager;
private MessageSendReqV3 sendRequestV3;
private MessageSendRequestV4 req;
private String sendRequestNo;
private MessageTemplateDTO template;
private ReachDto reachConfig;
private Channel channel;
private List<PushPendingMessageDTO> sendResults;
@Override
public void saveRecords() {
PendingSendInfo pendingInfo = sendRequestV3.getChannelParam(
MessageSendReqV3.CHANNEL_PENDING, PendingSendInfo.class);
PendingSendInfo pendingInfo = req.getPendingSendInfo();
PendingMessagePushParam request = new PendingMessagePushParam();
if (pendingInfo != null)
BeanUtils.copyProperties(pendingInfo, request);
request.setPromoter(sendRequestV3.getSender() == null ? null : sendRequestV3.getSender().asV1());
request.setExecutor(convertReceivers(sendRequestV3.getReceivers()));
MessageSendBasicInfoV4 sendBasicInfo = req.getSendBasicInfo();
request.setPromoter(sendBasicInfo.getSender() == null ? null : sendBasicInfo.getSender().asV1());
request.setExecutor(convertReceivers(sendBasicInfo.getReceivers()));
request.setTemplateCode(template.getCode());
request.setOuId(sendRequestV3.getReceiversOuId());
request.setWorkspaceId(sendRequestV3.getReceiversWorkspaceId());
request.setPromoterOuId(sendRequestV3.getSenderOuId());
request.setPromoterWorkspaceId(sendRequestV3.getSenderWorkspaceId());
request.setBizCode(sendRequestV3.getBizCode());
request.setOrgType(sendRequestV3.getReceiversOrgType());
request.setBizExtParams(sendRequestV3.getBizExtParams() == null
? null : sendRequestV3.getBizExtParams().toJSONString());
request.setRouterParams(sendRequestV3.getRouterParams() == null
? null : sendRequestV3.getRouterParams().toJSONString());
request.setOuId(sendBasicInfo.getReceiversOuId());
request.setWorkspaceId(sendBasicInfo.getReceiversWorkspaceId());
request.setPromoterOuId(sendBasicInfo.getSenderOuId());
request.setPromoterWorkspaceId(sendBasicInfo.getSenderWorkspaceId());
request.setBizCode(sendBasicInfo.getBizCode());
request.setOrgType(sendBasicInfo.getReceiversOrgType());
request.setBizExtParams(sendBasicInfo.getBizExtParams() == null
? null : sendBasicInfo.getBizExtParams().toJSONString());
request.setRouterParams(sendBasicInfo.getRouterParams() == null
? null : sendBasicInfo.getRouterParams().toJSONString());
TodoRequestContext ctx = TodoRequestContext.create(
"MessageAPIV3#send", sendRequestNo, sendRequestV3);
"MessageAPIV3#send", sendRequestNo, sendBasicInfo);
sendResults = todoManager.send(ctx, request);
}
@ -84,7 +85,7 @@ public class TodoMappingProcessor implements EventMappingProcessor {
@Override
public TemplateSendResultV3 buildTemplateSendResult() {
TemplateSendResultV3 resultV3 = new TemplateSendResultV3(
template.getCode(), reachConfig.getCategory());
template.getCode(), channel.name());
for (PushPendingMessageDTO todoResult : sendResults)
resultV3.addResult(new MessageSendResultV3(
todoResult.getExecutorPersonId(), todoResult.getId()));

View File

@ -0,0 +1,80 @@
package cn.axzo.msg.center.inside.notices.service.impl.v4;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.msg.center.api.request.v4.MessageSendBasicInfoV4;
import cn.axzo.msg.center.api.request.v4.MessageSendRequestV4;
import cn.axzo.msg.center.api.request.v4.TemplateInfo;
import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.inside.notices.service.MessageServiceV4;
import cn.axzo.msg.center.inside.notices.service.impl.v3.EventMappingProcessor;
import cn.axzo.msg.center.inside.notices.service.impl.v3.msg.MessageMappingProcessor;
import cn.axzo.msg.center.inside.notices.service.impl.v3.msg.TemplateMessage;
import cn.axzo.msg.center.inside.notices.service.impl.v3.todo.TodoMappingProcessor;
import cn.axzo.msg.center.inside.notices.utils.FunctionalTransactionTemplate;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
import cn.axzo.msg.center.service.enums.Channel;
import cn.axzo.msg.center.utils.UUIDUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @author yanglin
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageServiceV4Impl implements MessageServiceV4 {
private final MessageTemplateNewService messageTemplateNewService;
private final FunctionalTransactionTemplate transactionTemplate;
private final ApplicationContext beanFactory;
@Override
public MessageSendRespV3 send(MessageSendRequestV4 request) {
log.info("received sending message request. req={}", request);
BizAssertions.assertNotNull(request.getTemplates(), "模版信息不能为空");
MessageSendBasicInfoV4 sendBasicInfo = request.getSendBasicInfo();
BizAssertions.assertNotNull(sendBasicInfo, "发送IM和待办的基本信息不能为空");
sendBasicInfo.validate();
String sendRequestNo = UUIDUtil.uuidString();
List<EventMappingProcessor> processors = new ArrayList<>();
for (TemplateInfo info : request.getTemplates()) {
BizAssertions.assertNotEmpty(info.getTemplateCodes(), "模版编码不能为空");
for (String templateCode : info.getTemplateCodes()) {
MessageTemplateDTO template = messageTemplateNewService
.queryEnableTemplateByCode(templateCode)
.orElseThrow(() -> new ServiceException(String.format(
"未查询到对应的模板, templateCode=%s", templateCode)));
if (info.getChannel() == Channel.NOTIFICATION) {
// @Scope("prototype") -> we're good
MessageMappingProcessor imProcessor = beanFactory.getBean(MessageMappingProcessor.class);
imProcessor.setTemplate(new TemplateMessage(request, sendRequestNo, info.getChannel(), template));
processors.add(imProcessor);
} else if (info.getChannel() == Channel.PENDING) {
// @Scope("prototype") -> we're good
TodoMappingProcessor todoProcessor = beanFactory.getBean(TodoMappingProcessor.class);
todoProcessor.setSendRequestNo(sendRequestNo);
todoProcessor.setReq(request);
todoProcessor.setTemplate(template);
todoProcessor.setChannel(info.getChannel());
processors.add(todoProcessor);
}
}
}
transactionTemplate.exec(() -> processors.forEach(EventMappingProcessor::saveRecords));
processors.forEach(EventMappingProcessor::maybeAsyncSend);
MessageSendRespV3 resp = new MessageSendRespV3(sendRequestNo, new HashMap<>());
processors.forEach(p -> resp.addResult(p.buildTemplateSendResult()));
return resp;
}
}

View File

@ -33,11 +33,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
import org.yaml.snakeyaml.nodes.Tag;
import org.yaml.snakeyaml.representer.Representer;
import javax.validation.Valid;
import java.util.Map;

View File

@ -1,34 +0,0 @@
package cn.axzo.msg.center.message.send.processor;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import com.alibaba.fastjson.JSONObject;
import java.util.Objects;
/**
* @author syl
* @date 2023/12/18
*/
public interface ISendProcessor {
default void check(SendContext context) {
}
default void send(SendContext context) {
}
default void checkReach(ReachContext context) {
}
default void reach(ReachContext context) {
}
default void save(ReachContext context) {
}
default String parseString(String string, JSONObject params) {
if (Objects.isNull(params)) {
return string;
}
return PlaceholderResolver.getDefaultResolver().resolveByMap(string, params);
}
}

View File

@ -1,209 +0,0 @@
package cn.axzo.msg.center.message.send.processor;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.dal.CommonMessageRecordDao;
import cn.axzo.msg.center.domain.entity.CommonMessageRecord;
import cn.axzo.msg.center.domain.entity.MessageBaseTemplate;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.GeneralMessageStateEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.pending.response.PushMessageReturnDTO;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.axzo.msg.center.utils.UUIDUtil;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author syl
* @date 2023/12/18
*/
@Slf4j
@Component
public class NotificationProcessor implements ISendProcessor {
@Autowired
private GeneralMessageService generalMessageService;
@Autowired
private MessageSendTwiceRecordService messageSendTwiceRecordService;
@Autowired
private BizEventMappingService bizEventMappingService;
@Autowired
private CommonMessageRecordDao commonMessageRecordDao;
@Override
public void check(SendContext context) {
AssertUtil.notNull(context.getMessageParam(), "通知消息的参数错误,请检查");
AssertUtil.isTrue(CollUtil.isNotEmpty(context.getToIdMessageRecordMap()), "通知消息的人员信息不存在");
AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空");
AssertUtil.isTrue(BizActionCategory.NOTIFICATION.getCode().equals(context.getReachConfig()
.getCategory()), "消息动作不匹配");
bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig());
}
@Override
public void send(SendContext context) {
// 检查
this.check(context);
log.info("NotificationProcessor start to async send im message. bizCode= [{}]"
+ ", templateCode= [{}]", context.getBizEvent().getBizCode(),
context.getReachConfig().getTemplateCode());
// 发送
this.doBatchSendMessage(context);
}
public void doBatchSendMessage(SendContext context) {
GeneralMessageReq request = context.getMessageParam();
Map<Long, MessageRecord> toIdMessageRecordMap = context.getToIdMessageRecordMap();
String templateCode = context.getReachConfig().getTemplateCode();
try {
// 发送IM消息
GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode,
toIdMessageRecordMap.keySet());
// IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端)
List<SendImMessageDTO> result = generalMessageService
.batchSendMessage(sendImMsgRequest);
if (CollectionUtils.isEmpty(result)) {
log.info("bizEvent there is not any person successfully send im message. bizCode:[{}]",
request.getRelationId());
return;
}
// 记录发送了IM消息的旧消息
recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap());
} catch (Exception e) {
log.warn("bizEvent broke out some exception while sending im message. bizCode:[{}]",
request.getRelationId(), e);
}
}
private void recordSendImMessage(Map<Long, MessageRecord> toIdMessageRecordMap,
List<SendImMessageDTO> sendImResult,
Map<Long, Long> toIdPersonIdMap) {
// 成功发送了IM消息的personId集合
Set<Long> sucSendImMsgPersonIds = sendImResult.stream()
.map(SendImMessageDTO::getPersonId).collect(Collectors.toSet());
// 从入参中的toIdPersonIdMap中筛选发送成功的entry
Map<Long, Long> subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream()
// 过滤掉personId维度IM消息发送失败的entry
.filter(e -> sucSendImMsgPersonIds.contains(e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<Long, Long> msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream()
.filter(e -> subToIdPersonIdMap.containsKey(e.getKey()))
.collect(Collectors
.toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey())));
log.info("bizEvent record message that has been send im message. msgIds:{}",
msgRecordIdPersonIdMap.keySet());
// 双发记录
messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap);
}
private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request,
String templateCode,
Collection<Long> subReceiverIds) {
IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType());
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))
.collect(Collectors.toList());
OrganizationTypeEnum orgType =
Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN :
OrganizationTypeEnum.valueOf(request.getTerminalType().name());
return GeneralMessageSendRequest.builder()
.templateCode(templateCode)
.receiver(receivers)
.orgType(orgType)
.orgName(request.getTerminalName())
.orgId(request.getTerminalId())
.bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse(""))
.routerParams(Optional.ofNullable(request.getRouterParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.bizExtParams(Optional.ofNullable(request.getMsgParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.build();
}
@Override
public void checkReach(ReachContext context) {
AssertUtil.notNull(context.getMessageParam(), "通知消息的参数错误,请检查");
AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空");
AssertUtil.isTrue(BizActionCategory.NOTIFICATION.getCode().equals(context.getReachConfig()
.getCategory()), "消息动作不匹配");
}
@Override
public void reach(ReachContext context) {
// 检查
this.checkReach(context);
log.info("NotificationProcessor new send im message. bizCode= [{}]"
+ ", templateCode= [{}]", context.getBizEvent().getBizCode(),
context.getReachConfig().getTemplateCode());
}
@Override
public void save(ReachContext context) {
this.checkReach(context);
MessageBaseTemplate template = bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig());
context.setTemplate(template);
List<CommonMessageRecord> records = convertRecords(context);
commonMessageRecordDao.saveBatch(records);
List<PushMessageReturnDTO> msgReturnList = records.stream()
.map(r -> PushMessageReturnDTO.builder().identityCode(r.getIdentityCode()).build())
.collect(Collectors.toList());
context.setMsgReturnList(msgReturnList);
}
private List<CommonMessageRecord> convertRecords(ReachContext context) {
MessageBaseTemplate template = context.getTemplate();
GeneralMessagePushParam messageParam = context.getMessageParam();
return messageParam.getReceivers().stream().map(r -> {
CommonMessageRecord.CommonMessageRecordBuilder builder = CommonMessageRecord.builder()
.identityCode(UUIDUtil.uuidString());
if (Objects.nonNull(messageParam.getInitiator())) {
builder.senderPersonId(messageParam.getInitiator().getId());
}
builder.receiverPersonId(r.getId());
builder.bizEventCode(messageParam.getBizEventCode());
builder.templateCode(template.getCode());
builder.title(parseString(template.getTitle(), messageParam.getBizExtParams()));
builder.content(parseString(template.getContent(), messageParam.getBizExtParams()));
builder.orgType(messageParam.getOrgType());
builder.workspaceId(messageParam.getWorkspaceId());
builder.ouId(messageParam.getOuId());
builder.state(GeneralMessageStateEnum.HAS_BEEN_SENT);
builder.bizCode(messageParam.getBizCode());
builder.routerParams(messageParam.getRouterParams());
builder.bizExtParams(messageParam.getBizExtParams());
return builder.build();
}).collect(Collectors.toList());
}
}

View File

@ -1,103 +0,0 @@
package cn.axzo.msg.center.message.send.processor;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.PendingMessageNewService;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.pending.request.PendingMessagePushRequest;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.hutool.core.collection.CollUtil;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author syl
* @date 2023/12/18
*/
@Slf4j
@Component
public class PendingProcessor implements ISendProcessor {
@Autowired
private PendingMessageNewService pendingMessageNewService;
@Autowired
private BizEventMappingService bizEventMappingService;
@Override
public void check(SendContext context) {
AssertUtil.notNull(context.getMessageParam(), "待办消息的参数错误,请检查");
AssertUtil.isTrue(CollUtil.isNotEmpty(context.getToIdMessageRecordMap()), "待办消息的人员信息不存在");
AssertUtil.notNull(context.getReachConfig(), "触达配置不能为空");
AssertUtil.isTrue(BizActionCategory.PENDING.getCode().equals(context.getReachConfig()
.getCategory()), "待办业务动作不匹配");
bizEventMappingService.checkTemplateCodeByBizEvent(context.getReachConfig());
}
@Override
public void send(SendContext context) {
// 检查
this.check(context);
log.info("PendingProcessor start to async send pending message. bizCode= [{}]"
+ ", templateCode= [{}]", context.getBizEvent().getBizCode(),
context.getReachConfig().getTemplateCode());
// 由于之前业务方这期未做改造而之前对外提供的参数只满足消息待办这块的发送需要定义待办相关的业务属性
// 迁移到新接口
// PendingMessagePushRequest request = buildSendRequest(context);
// pendingMessageNewService.push(PendingMessagePushParam.from(request));
}
private PendingMessagePushRequest buildSendRequest(SendContext context) {
GeneralMessageReq request = context.getMessageParam();
String templateCode = context.getReachConfig().getTemplateCode();
Collection<Long> subReceiverIds = context.getToIdMessageRecordMap().keySet();
PersonDTO promoter = PersonDTO.from(null, request.getFromId(), null);
IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType());
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))
.collect(Collectors.toList());
OrganizationTypeEnum orgType = Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN :
OrganizationTypeEnum.valueOf(request.getTerminalType().name());
return PendingMessagePushRequest.builder()
.templateCode(templateCode)
.promoter(promoter)
.executor(receivers)
.orgType(orgType)
.workspaceId(request.getTerminalId())
.ouId(null)
.bizCategory(BizCategoryEnum.OTHER)
.bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse(""))
.routerParams(null)
.bizExtParams(null)
.build();
}
@Override
public void checkReach(ReachContext context) {
}
@Override
public void reach(ReachContext context) {
// 等后续im具有待办消息的能力再接入目前统一走待办中心
}
@Override
public void save(ReachContext context) {
}
}

View File

@ -1,13 +0,0 @@
package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.api.response.ReachResp;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
/**
* @author syl
* @date 2024/1/8
*/
public interface CommonMessageRecordService {
ReachResp reach(GeneralMessagePushParam param);
}

View File

@ -1,25 +0,0 @@
package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import java.util.List;
import java.util.Map;
/**
* 新老版本消息映射的相关接口
*
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
public interface GeneralMessageMapperService {
/**
* 异步批量发送消息
*
* @param request 发送消息时的请求参数
* @param toIdMessageRecordMap 接收者身份id与旧消息记录的映射关系
*/
void asyncBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap);
}

View File

@ -1,80 +0,0 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.response.ReachResp;
import cn.axzo.msg.center.domain.entity.BizEventMapping;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.domain.param.GeneralMessagePushParam;
import cn.axzo.msg.center.message.send.processor.ISendProcessor;
import cn.axzo.msg.center.message.send.processor.ReachContext;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.CommonMessageRecordService;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
* @author syl
* @date 2024/1/8
*/
@Slf4j
@Service
public class CommonMessageRecordServiceImpl implements CommonMessageRecordService {
@Value("${msg-im-server.partition-size:500}")
private Integer partitionSize;
private final ThreadFactory asyncReachThreadFactory = ThreadFactoryBuilder.create()
.setDaemon(true).setNameFormat("REACH_IM_MESSAGE_%d").get();
private final ExecutorService reachExecutorService = new ThreadPoolExecutor(5,
10, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024), asyncReachThreadFactory);
@Autowired
private BizEventMappingService bizEventMappingService;
@Autowired
private ApplicationContext applicationContext;
@Override
public ReachResp reach(GeneralMessagePushParam param) {
String bizEventCode = param.getBizEventCode();
Optional<BizEventMapping> bizEventOpt = bizEventMappingService.getByBizCode(bizEventCode);
AssertUtil.isTrue(bizEventOpt.isPresent(), String.format("业务事件映射%s不存在", bizEventCode));
BizEventMapping bizEvent = bizEventOpt.get();
AssertUtil.notEmpty(bizEvent.getReachConfig(), String.format("业务事件映射%s无业务动作配置", bizEventCode));
log.info("start to send reach message. bizCode:[{}]", bizEventCode);
ReachResp resp = new ReachResp();
bizEvent.getReachConfig().forEach(r -> {
BizActionCategory category = BizActionCategory.getCategory(r.getCategory());
ISendProcessor processor = applicationContext.getBean(category.getProcessor(), ISendProcessor.class);
ReachContext reachContext = ReachContext.builder()
.bizEvent(bizEvent).reachConfig(r)
.messageParam(param)
.build();
processor.save(reachContext);
switch (category) {
case NOTIFICATION:
// resp.setMsgReturnList(reachContext.getMsgReturnList());
break;
case PENDING:
resp.setPendingList(reachContext.getPendingList());
break;
default:
}
CompletableFuture.runAsync(() -> processor.reach(reachContext), reachExecutorService);
});
return resp;
}
}

View File

@ -1,180 +0,0 @@
package cn.axzo.msg.center.message.service.impl;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.domain.entity.BizEventMapping;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.message.send.processor.ISendProcessor;
import cn.axzo.msg.center.message.send.processor.SendContext;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.RelationTemplateMapService;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
* @author cold_blade
* @date 2023/10/23
* @version 1.0
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class GeneralMessageMapperServiceImpl implements GeneralMessageMapperService {
private final ThreadFactory asyncSendMsgThreadFactory = ThreadFactoryBuilder.create()
.setDaemon(true).setNameFormat("ASYNC_SEND_IM_MESSAGE_%d").get();
private final ExecutorService asyncSendMsgExecutorService = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024), asyncSendMsgThreadFactory);
private final GeneralMessageService generalMessageService;
private final RelationTemplateMapService relationTemplateMapService;
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
private final BizEventMappingService bizEventMappingService;
private final ApplicationContext applicationContext;
@Override
public void asyncBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap) {
log.info("do some check before send im message. relationId:[{}]", request.getRelationId());
if (MsgTypeEnum.PENDING_MESSAGE.equals(request.getType())) {
// 待办消息暂时不支持
log.info("pending message is not supported.");
return;
}
if (Objects.isNull(request.getRelationId())) {
log.info("relation id is null.");
return;
}
log.info("request relationId= [{}], toldIdPersonIdMap= [{}]", request.getRelationId(),
JSON.toJSONString(request.getToldIdPersonIdMap()));
// IM 需要personId过滤掉 personId = 0 的数据
Map<Long,Long> imToldIdPersonIdMap = request.getToldIdPersonIdMap().entrySet()
.stream()
.filter(entry -> entry.getValue() > 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
log.info("request relationId= [{}], final toldIdPersonIdMap= [{}]", request.getRelationId(),
JSON.toJSONString(imToldIdPersonIdMap));
request.setToldIdPersonIdMap(imToldIdPersonIdMap);
if (MapUtil.isEmpty(request.getToldIdPersonIdMap())) {
// 由于IM那边是根据personId来创建账户的,所以强依赖personId
log.info("toIdPersonIdMap is empty.");
return;
}
String bizCode = String.valueOf(request.getRelationId());
Optional<BizEventMapping> bizEventOpt = bizEventMappingService.getByBizCode(bizCode);
AssertUtil.isTrue(bizEventOpt.isPresent(), String.format("业务事件映射%s不存在", bizCode));
BizEventMapping bizEvent = bizEventOpt.get();
AssertUtil.notEmpty(bizEvent.getReachConfig(), String.format("业务事件映射%s无业务动作配置", bizCode));
log.info("start to send reach message. bizCode:[{}]", bizCode);
bizEvent.getReachConfig().forEach(r -> {
BizActionCategory category = BizActionCategory.getCategory(r.getCategory());
ISendProcessor processor = applicationContext.getBean(category.getProcessor(), ISendProcessor.class);
SendContext sendContext = SendContext.builder()
.bizEvent(bizEvent).reachConfig(r)
.messageParam(request)
.toIdMessageRecordMap(toIdMessageRecordMap)
.build();
CompletableFuture.runAsync(() -> processor.send(sendContext), asyncSendMsgExecutorService);
});
}
private void doBatchSendMessage(GeneralMessageReq request, Map<Long, MessageRecord> toIdMessageRecordMap) {
String templateCode = relationTemplateMapService.queryByRelationId(request.getRelationId()).orElse(null);
if (StringUtils.isBlank(templateCode)) {
log.info("the relationId([{}]) is not map any new message template. ", request.getRelationId());
return;
}
log.info("start to send im message. relationId:[{}], templateCode:[{}]", request.getRelationId(), templateCode);
try {
// 发送IM消息
GeneralMessageSendRequest sendImMsgRequest = buildSendRequest(request, templateCode, toIdMessageRecordMap.keySet());
// IM消息的发送是基于personId+应用终端的(eg: 工人端/企业端)
List<SendImMessageDTO> result = generalMessageService.batchSendMessage(sendImMsgRequest);
if (CollectionUtils.isEmpty(result)) {
log.info("there is not any person successfully send im message. relationId:[{}]", request.getRelationId());
return;
}
// 记录发送了IM消息的旧消息
recordSendImMessage(toIdMessageRecordMap, result, request.getToldIdPersonIdMap());
} catch (Exception e) {
log.warn("broke out some exception while sending im message. relationId:[{}]", request.getRelationId(), e);
}
}
private void recordSendImMessage(Map<Long, MessageRecord> toIdMessageRecordMap, List<SendImMessageDTO> sendImResult,
Map<Long, Long> toIdPersonIdMap) {
// 成功发送了IM消息的personId集合
Set<Long> sucSendImMsgPersonIds = sendImResult.stream()
.map(SendImMessageDTO::getPersonId).collect(Collectors.toSet());
// 从入参中的toIdPersonIdMap中筛选发送成功的entry
Map<Long, Long> subToIdPersonIdMap = toIdPersonIdMap.entrySet().stream()
// 过滤掉personId维度IM消息发送失败的entry
.filter(e -> sucSendImMsgPersonIds.contains(e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<Long, Long> msgRecordIdPersonIdMap = toIdMessageRecordMap.entrySet().stream()
.filter(e -> subToIdPersonIdMap.containsKey(e.getKey()))
.collect(Collectors.toMap(e -> e.getValue().getId(), e -> subToIdPersonIdMap.get(e.getKey())));
log.info("record message that has been send im message. msgIds:{}", msgRecordIdPersonIdMap.keySet());
// 双发记录
messageSendTwiceRecordService.batchSave(msgRecordIdPersonIdMap);
}
private GeneralMessageSendRequest buildSendRequest(GeneralMessageReq request, String templateCode,
Collection<Long> subReceiverIds) {
IdentityTypeEnum identityType = PersonIdentityUtil.toIdentityType(request.getReceiveType());
List<PersonDTO> receivers = subReceiverIds.stream()
.filter(request.getToldIdPersonIdMap()::containsKey)
.map(e -> PersonDTO.from(request.getToldIdPersonIdMap().get(e), e, identityType))
.collect(Collectors.toList());
OrganizationTypeEnum orgType = Objects.isNull(request.getTerminalType()) ? OrganizationTypeEnum.UNKNOWN :
OrganizationTypeEnum.valueOf(request.getTerminalType().name());
return GeneralMessageSendRequest.builder()
.templateCode(templateCode)
.receiver(receivers)
.orgType(orgType)
.orgName(request.getTerminalName())
.orgId(request.getTerminalId())
.bizCode(Optional.ofNullable(request.getBizId()).map(String::valueOf).orElse(""))
.routerParams(Optional.ofNullable(request.getRouterParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.bizExtParams(Optional.ofNullable(request.getMsgParams())
.map(v -> JSONObject.parseObject(JSON.toJSONString(v)))
.orElseGet(JSONObject::new))
.build();
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.msg.center.api;
import cn.axzo.msg.center.api.request.v4.MessageSendRequestV4;
import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import javax.validation.Valid;
/**
* @author yanglin
*/
@Component
@FeignClient(
value = "msg-center",
url = "${server.serviceUrl:http://msg-center:8080}")
public interface MessageAPIV4 {
@RequestMapping(value = "api/message/v4/send", method = RequestMethod.POST)
MessageSendRespV3 send(@RequestBody @Valid MessageSendRequestV4 request);
}

View File

@ -0,0 +1,133 @@
package cn.axzo.msg.center.api.request.v4;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
@Slf4j
public class MessageSendBasicInfoV4 implements Serializable {
/**
* 发起者
* <p>如果是平台则发起人为空
*/
private PersonV3DTO sender;
/**
* 接收者列表
*/
@NotNull(message = "接收者列表不能为空")
private List<PersonV3DTO> receivers;
/**
* 关联业务唯一标识
* 例如: 请假申请的编号
*/
private String bizCode;
/**
* 业务扩展参数-JSON字符串格式
*/
private JSONObject bizExtParams;
/**
* 路由参数-JSON字符串格式
*/
private JSONObject routerParams;
/**
* 发送者项目部ID
*/
private Long senderWorkspaceId;
/**
* 发送者企业ID
*/
private Long senderOuId;
/**
* 接收者项目部ID
*/
private Long receiversWorkspaceId;
/**
* 接收者企业ID
* <p>如果是工人则所在企业可以为空其它均必传
*/
private Long receiversOuId;
/**
* 接收者(消息)所属组织类型
*/
private OrganizationTypeEnum receiversOrgType;
/**
* 副标题
*/
private String subtitle;
public Collection<PersonV3DTO> distinctReceivers() {
if (receivers == null) {
return Collections.emptySet();
}
HashMap<Long, PersonV3DTO> personId2Person = new HashMap<>();
for (PersonV3DTO receiver : receivers) {
if (!personId2Person.containsKey(receiver.getId())) {
personId2Person.put(receiver.getId(), receiver);
}
}
return personId2Person.values();
}
public List<PersonV3DTO> receivers() {
if (receivers == null) {
return Collections.emptyList();
}
return receivers;
}
public Long determineReceiversOuId() {
return receiversOuId == null ? 0L : receiversOuId;
}
public Long determineReceiversWorkspaceId() {
return receiversWorkspaceId == null ? 0L : receiversWorkspaceId;
}
public String determineBizCode() {
return bizCode == null ? "" : bizCode;
}
public void validate() {
AssertUtil.notEmpty(receivers, "receivers不能为空");
for (PersonV3DTO receiver : receivers) {
AssertUtil.notNull(receiver.getId(), "接收者ID不能为空");
if (receiver.getId() <= 0)
log.warn("接收者ID <= 0, request={}", this);
// 去掉这个校验
//AssertUtil.isFalse(receiver.getId() <= 0, "接收者ID必须>=0");
}
}
@Override
public String toString() {
return JSONObject.toJSONString(this);
}
}

View File

@ -0,0 +1,56 @@
package cn.axzo.msg.center.api.request.v4;
import cn.axzo.msg.center.api.request.v3.PendingSendInfo;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* @author yanglin
*/
@Setter
@Getter
@Slf4j
public class MessageSendRequestV4 implements Serializable {
// !! biz fields
/**
* 发送IM和待办的基本信息, 必传
*/
@NotNull
private MessageSendBasicInfoV4 sendBasicInfo;
/**
* 发送待办时必传
*/
private PendingSendInfo pendingSendInfo;
// !! internal fields
/**
* 事件映射code
*/
private String eventMappingCode;
/**
* 业务方不用传这个字段, 不用关心这个字段
*/
private List<TemplateInfo> templates = new ArrayList<>();
public void addTemplate(TemplateInfo templateInfo) {
templates.add(templateInfo);
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.msg.center.api.request.v4;
import cn.axzo.msg.center.service.enums.Channel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author yanglin
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TemplateInfo {
/**
* 模板类型
*/
private Channel channel;
/**
* 模版编码
*/
private List<String> templateCodes;
}

View File

@ -7,7 +7,6 @@ import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;

View File

@ -1,6 +1,5 @@
package cn.axzo.msg.center.domain.enums;
package cn.axzo.msg.center.service.enums;
import cn.axzo.msg.center.service.enums.CodeDefinition;
import lombok.AllArgsConstructor;
import lombok.Getter;
@ -11,7 +10,7 @@ import lombok.Getter;
*/
@Getter
@AllArgsConstructor
public enum Channels implements CodeDefinition<String> {
public enum Channel implements CodeDefinition<String> {
/**
* 通知

View File

@ -17,6 +17,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.Collections;
import java.util.List;
/**
@ -45,7 +46,7 @@ public class MessageController {
*/
@RequestMapping(value = "api/message/push", method = RequestMethod.POST)
CommonResponse<List<MsgReturnParamRes>> pushMsg(@RequestBody GeneralMessageReq message){
return CommonResponse.success(messageRecordService.pushMsg(message));
return CommonResponse.success(Collections.emptyList());
}
/**

View File

@ -4,11 +4,9 @@ import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* @author cn
@ -26,7 +24,8 @@ import org.springframework.scheduling.annotation.EnableAsync;
"cn.axzo.log",
"cn.axzo.pluto.api",
"cn.axzo.basics.profiles.api",
"cn.axzo.apollo.api"
"cn.axzo.apollo.api",
"cn.axzo.msg.center.inside.notices.service.impl"
})
/*@EnableAsync*/
public class MsgCenterApplication {

View File

@ -3,17 +3,11 @@ package cn.axzo.msg.center.inside.notices.service;
import cn.axzo.msg.center.MsgCenterApplication;
import cn.axzo.msg.center.api.request.v3.MessageSendReqV3;
import cn.axzo.msg.center.api.response.v3.MessageSendRespV3;
import cn.axzo.msg.center.common.utils.MiscUtils;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* @author yanglin
*/