REQ-1898: 通过新接口发送消息

This commit is contained in:
yanglin 2024-01-14 15:49:23 +08:00
parent 31f413f7e6
commit 7d7dde85af
19 changed files with 526 additions and 178 deletions

View File

@ -2,15 +2,13 @@ package cn.axzo.msg.center.inside.notices.controller.msg;
import cn.axzo.msg.center.api.MessageAPIV3;
import cn.axzo.msg.center.api.request.MessagePushReqV3;
import cn.axzo.msg.center.api.response.MessageRespV3;
import cn.axzo.msg.center.api.response.MessageSendRespV3;
import cn.axzo.msg.center.inside.notices.service.MessageServiceV3;
import cn.azxo.framework.common.model.CommonResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @author yanglin
*/
@ -22,7 +20,7 @@ public class MessageAPIV3Controller implements MessageAPIV3 {
private final MessageServiceV3 messageServiceV3;
@Override
public CommonResponse<List<MessageRespV3>> send(MessagePushReqV3 req) {
public CommonResponse<MessageSendRespV3> send(MessagePushReqV3 req) {
return CommonResponse.success(messageServiceV3.send(req));
}

View File

@ -1,13 +1,11 @@
package cn.axzo.msg.center.inside.notices.service;
import cn.axzo.msg.center.api.request.MessagePushReqV3;
import cn.axzo.msg.center.api.response.MessageRespV3;
import java.util.List;
import cn.axzo.msg.center.api.response.MessageSendRespV3;
/**
* @author yanglin
*/
public interface MessageServiceV3 {
List<MessageRespV3> send(MessagePushReqV3 req);
MessageSendRespV3 send(MessagePushReqV3 req);
}

View File

@ -25,25 +25,27 @@ import java.util.function.Consumer;
@Component
@RequiredArgsConstructor
class ImClient {
private final MessageApi messageApi;
private final ScheduledExecutorService scheduleExecutor =
Executors.newScheduledThreadPool(10, new NamedThreadFactory("MessageServiceV3-send"));
Executors.newScheduledThreadPool(10, new NamedThreadFactory("ImClient-send"));
// 强行做成有界队列, 避免内存爆了
private final Semaphore semaphore = new Semaphore(200);
private final MessageApi messageApi;
void send(MessageInfo req, BiConsumer<Exception, List<MessageDispatchResp>> callback) {
scheduleExecutor.execute(() -> {
acquireAndSend(req, callback, 0);
});
void asyncSend(MessageInfo req, BiConsumer<Exception, List<MessageDispatchResp>> callback) {
enqueue(req, callback, 0);
}
private void acquireAndSend(
private void enqueue(
MessageInfo req, BiConsumer<Exception, List<MessageDispatchResp>> callback, int retryCount) {
boolean acquired = false;
try {
if (semaphore.tryAcquire(20, TimeUnit.SECONDS)) {
acquired = true;
doSend(req, callback, retryCount);
if (semaphore.tryAcquire(30, TimeUnit.SECONDS)) {
scheduleExecutor.execute(() -> {
try {
doSend(req, callback, retryCount);
} finally {
semaphore.release();
}
});
} else {
String error = String.format("发送消息超载了. templateId=%s, header=%s, receiverPersonIds=%s",
req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()));
@ -51,10 +53,6 @@ class ImClient {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (acquired) {
semaphore.release();
}
}
}
@ -64,8 +62,8 @@ class ImClient {
private void doSend(
MessageInfo req, BiConsumer<Exception, List<MessageDispatchResp>> callback, int retryCount) {
if (retryCount > 0) {
log.warn("重试发送消息 templateId={}, header={}, receiverPersonIds={}",
req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()));
log.warn("重试发送消息. templateId={}, header={}, receiverPersonIds={}, retryCount={}",
req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount);
}
Consumer<Exception> retryFun = e -> {
if (retryCount > 3) {
@ -74,20 +72,28 @@ class ImClient {
// 尽量在消息端进行重试, 因为是按template的纬度调用IM进行批量发送的
// 如果直接让上游调用端进行重试的话, 有重复发送的风险
scheduleExecutor.schedule(
() -> acquireAndSend(req, callback, retryCount + 1),
() -> enqueue(req, callback, retryCount + 1),
2L * (retryCount + 1), TimeUnit.SECONDS);
}
};
try {
ApiResult<List<MessageDispatchResp>> apiResult = messageApi.sendMessage(req);
if (apiResult.isSuccess()) {
callback.accept(null, apiResult.getData());
} else {
String error = String.format("发送消息失败. respCode=%s, respMsg=%s, templateId=%s, header=%s, receiverPersonIds=%s, retryCount=%s",
apiResult.getCode(), apiResult.getMsg(), req.getMsgTemplateId(),
ApiResult<List<MessageDispatchResp>> sendResult = messageApi.sendMessage(req);
log.info("IM消息发送调用结果, req={}, resp={}", JSON.toJSONString(req), JSON.toJSONString(sendResult));
if (sendResult.isSuccess()) {
callback.accept(null, sendResult.getData());
} else if (sendResult.getData() == null || sendResult.getData().isEmpty()) {
String error = String.format("发送消息失败, IM返回为空." +
" respCode=%s, respMsg=%s, templateId=%s, header=%s, receiverPersonIds=%s, retryCount=%s",
sendResult.getCode(), sendResult.getMsg(), req.getMsgTemplateId(),
req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount);
log.error(error);
retryFun.accept(new RuntimeException(error));
} else {
String error = String.format("发送消息失败." +
" respCode=%s, respMsg=%s, templateId=%s, header=%s, receiverPersonIds=%s, retryCount=%s",
sendResult.getCode(), sendResult.getMsg(), req.getMsgTemplateId(),
req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount);
log.error(error);
// FIXME(yl): 区分业务异常和IM系统异常?
retryFun.accept(new RuntimeException(error));
}
} catch (Exception e) {

View File

@ -2,16 +2,17 @@ package cn.axzo.msg.center.inside.notices.service.impl.v3;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.api.request.MessagePushReqV3;
import cn.axzo.msg.center.api.response.MessageRespV3;
import cn.axzo.msg.center.api.response.MessageSendRespV3;
import cn.axzo.msg.center.api.response.MessageSendResultV3;
import cn.axzo.msg.center.dal.MessageRecordV3Dao;
import cn.axzo.msg.center.domain.entity.BizEventMapping;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.inside.notices.service.MessageServiceV3;
import cn.axzo.msg.center.inside.notices.service.component.TerminalAppTypeMapping;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.service.BizEventMappingService;
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
@ -23,10 +24,12 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
/**
* @author yanglin
@ -38,13 +41,14 @@ public class MessageServiceV3Impl implements MessageServiceV3 {
private final BizEventMappingService bizEventMappingService;
private final MessageTemplateNewService messageTemplateNewService;
private final TerminalAppTypeMapping terminalAppTypeMapping;
private final TerminalAppMapping terminalAppMapping;
private final ImClient imClient;
private final MessageRecordV3Dao messageRecordV3Dao;
private final TemplateParser templateParser;
@Override
@Transactional(rollbackFor = Exception.class)
public List<MessageRespV3> send(MessagePushReqV3 req) {
public MessageSendRespV3 send(MessagePushReqV3 req) {
req.validate();
BizEventMapping mapping = bizEventMappingService
.getByBizCode(req.getBizEventMappingCode())
@ -54,38 +58,48 @@ public class MessageServiceV3Impl implements MessageServiceV3 {
AssertUtil.notEmpty(mapping.getReachConfig(), String.format(
"业务事件映射%s无业务动作配置", req.getBizEventMappingCode()));
String batchNo = UUIDUtil.uuidString();
List<TemplateBatch> batches = new ArrayList<>(mapping.getReachConfig().size());
List<TemplateMessage> batches = new ArrayList<>(mapping.getReachConfig().size());
for (ReachDto cfg : mapping.getReachConfig()) {
AssertUtil.isTrue(BizActionCategory.NOTIFICATION.is(cfg.getCategory()), "目前只支持通知");
MessageTemplateDTO template = messageTemplateNewService
.queryEnableTemplateByCode(cfg.getTemplateCode())
.orElseThrow(() -> new ServiceException(String.format(
"未查询到对应的模板, templateCode=%s", cfg.getTemplateCode())));
batches.add(new TemplateBatch(req, batchNo, template));
batches.add(new TemplateMessage(req, batchNo, template));
}
List<MessageRecordV3> records = batches.stream()
List<MessageRecordV3> messages = batches.stream()
.flatMap(b -> b.getMessageRecords().stream())
.collect(toList());
messageRecordV3Dao.saveBatch(records);
for (TemplateBatch batch : batches) {
sendBatch(batch);
messageRecordV3Dao.saveBatch(messages);
batches.forEach(this::sendTemplateBatch);
MessageSendRespV3 resp = new MessageSendRespV3();
for (MessageRecordV3 message : messages) {
resp.addResult(new MessageSendResultV3(
message.getReceiverPersonId(), message.getId()));
}
// TODO(yl): WHAT
return Collections.emptyList();
return resp;
}
private void sendBatch(TemplateBatch batch) {
List<AppTypeEnum> appTypes = terminalAppTypeMapping
private void sendTemplateBatch(TemplateMessage batch) {
List<AppTypeEnum> appTypes = terminalAppMapping
.toImTypes(batch.getTemplate().getPushTerminals());
imClient.send(batch.buildIMRequest(appTypes), (e, respList) -> {
MessageInfo imRequest = batch.buildImRequest(templateParser, appTypes);
imClient.asyncSend(imRequest, (e, respList) -> {
if (e != null) {
messageRecordV3Dao.setSendFailed(batch.getRecordIds(), e.getMessage());
} else {
for (int i = 0; i < respList.size(); i++) {
MessageDispatchResp resp = respList.get(i);
MessageRecordV3 record = batch.getMessageRecords().get(i);
messageRecordV3Dao.batchSetSendFailed(batch.getMessageIds(), e.getMessage());
return;
}
Map<String, MessageDispatchResp> receiverId2SendResult = respList.stream()
.collect(toMap(MessageDispatchResp::getPersonId, Function.identity()));
for (MessageRecordV3 message : batch.getMessageRecords()) {
MessageDispatchResp sendResult = receiverId2SendResult.get(
String.valueOf(message.getReceiverPersonId()));
if (sendResult != null) {
// 把im端的id也存起来
messageRecordV3Dao.setSendSuccess(record.getId(), resp.getMsgid());
messageRecordV3Dao.setSendSuccess(message.getId(), sendResult.getMsgid());
} else {
messageRecordV3Dao.setSendFailed(message.getId(), "无法找到IM对应的消息ID");
}
}
});

View File

@ -9,9 +9,10 @@ import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import cn.axzo.msg.center.utils.UUIDUtil;
import com.google.common.collect.ImmutableMap;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@ -29,7 +30,7 @@ import static java.util.stream.Collectors.toSet;
* @author yanglin
*/
@RequiredArgsConstructor
class TemplateBatch {
class TemplateMessage {
private final MessagePushReqV3 req;
private final String batchNo;
@Getter
@ -38,68 +39,82 @@ class TemplateBatch {
private List<MessageRecordV3> records;
private String title;
private String content;
private Map<String, Object> msgExtInfo;
Collection<Long> getRecordIds() {
Collection<Long> getMessageIds() {
return getMessageRecords().stream()
.map(MessageRecordV3::getId)
.collect(toSet());
}
Collection<Long> getReceiverPersonIds() {
return req.getReceivers().stream()
.map(PersonV3DTO::getId)
.collect(toList());
}
List<MessageRecordV3> getMessageRecords() {
if (records != null) {
return records;
}
records = new ArrayList<>(req.getReceivers().size());
for (PersonV3DTO receiver : req.getReceivers()) {
MessageRecordV3 record = new MessageRecordV3();
record.setBatchNo(batchNo);
record.setIdentityCode(UUIDUtil.uuidString());
record.setBizCategory(req.getBizCategory());
record.setSenderPersonId(req.getSender().getId());
record.setSenderOuId(req.getSender().getId());
record.setSenderWorkspaceId(req.getSender().getWorkspaceId());
record.setReceiverPersonId(receiver.getId());
record.setReceiverOuId(receiver.getOuId());
record.setReceiverWorkspaceId(receiver.getWorkspaceId());
record.setBizEventMappingCode(req.getBizEventMappingCode());
record.setTemplateCode(template.getCode());
record.setTitle(getTitle());
record.setContent(getContent());
record.setOrgType(req.getOrgType());
record.setState(MsgStateV3Enum.UNSENT);
record.setBizCode(req.getBizCode());
record.setRouterParams(JSON.toJSONString(req.getRouterParams()));
record.setBizExtParams(record.getBizExtParams());
record.setMsgExtInfo(JSON.toJSONString(getMsgExtInfo()));
record.setFailCause(null);
record.setSendTime(new Date());
record.setCreateAt(new Date());
record.setUpdateAt(new Date());
record.setIsDelete(TableIsDeleteEnum.NORMAL.value);
records = new ArrayList<>();
for (PersonV3DTO receiver : req.distinctReceivers()) {
PersonV3DTO sender = req.getSender();
MessageRecordV3 message = new MessageRecordV3();
records.add(message);
message.setBatchNo(batchNo);
message.setIdentityCode(UUIDUtil.uuidString());
message.setBizCategory(req.getBizCategory());
message.setSenderPersonId(sender == null ? null : sender.getId());
message.setSenderOuId(req.getSenderOuId());
message.setSenderWorkspaceId(req.getSenderWorkspaceId());
message.setReceiverPersonId(receiver.getId());
message.setReceiverOuId(req.getReceiversOuId());
message.setReceiverWorkspaceId(req.getReceiversWorkspaceId());
message.setBizEventMappingCode(req.getBizEventMappingCode());
message.setTemplateCode(template.getCode());
message.setTitle(parseTitle());
message.setContent(parseContent());
message.setReceiverOrgType(req.getReceiversOrgType());
message.setReceiverOrgName(req.getReceiversOrgName());
message.setState(MsgStateV3Enum.UNSENT);
message.setBizCategory(req.getBizCategory());
message.setBizCode(req.getBizCode());
message.setRouterParams(req.getRouterParams());
message.setBizExtParams(req.getBizExtParams());
message.setMsgExtInfo(getMsgExtInfo());
message.setFailCause(null);
message.setCreateAt(new Date());
message.setUpdateAt(new Date());
message.setIsDelete(TableIsDeleteEnum.NORMAL.value);
// 这里不设置发送时间, 有发送结果的时候再设置, 比较准确
}
return records;
}
MessageInfo buildIMRequest(List<AppTypeEnum> apps) {
MessageInfo buildImRequest(TemplateParser templateParser, List<AppTypeEnum> apps) {
MessageRecordV3 sample = getMessageRecords().get(0);
GeneralMessagePushVO sendVo = templateParser.parse(sample, template);
MessageInfo imReq = new MessageInfo();
imReq.setAppTypeList(apps);
imReq.setToPersonIdList(req.stringReceiverIds());
imReq.setMsgHeader(getTitle());
imReq.setMsgContent(getContent());
imReq.setMsgHeader(parseTitle());
imReq.setMsgContent(parseContent());
imReq.setMsgTemplateId(template.getCode());
imReq.setMsgTemplateContent(JSON.toJSONString(getMsgExtInfo()));
imReq.setMsgTemplateContent(JSON.toJSONString(sendVo));
// 扩展信息
Map<String, String> ext = new HashMap<>();
ext.put("minAppVersion", template.getMinAppVersion());
ext.put("workspaceId", req.stringWorkspaceId());
if (sample.getReceiverWorkspaceId() != null) {
ext.put("workspaceId", String.valueOf(sample.getReceiverWorkspaceId()));
}
imReq.setExtendsInfo(ext);
return imReq;
}
// ------------------------------- 辅助方法
String getTitle() {
String parseTitle() {
if (title != null) {
return title;
}
@ -109,7 +124,7 @@ class TemplateBatch {
return title;
}
String getContent() {
String parseContent() {
if (content != null) {
return content;
}
@ -119,19 +134,11 @@ class TemplateBatch {
return content;
}
Map<String, Object> getMsgExtInfo() {
if (msgExtInfo == null) {
msgExtInfo = ImmutableMap.of(
"bizExtParams", req.getBizExtParams(),
"routerParams", req.getRouterParams());
}
return msgExtInfo;
}
Collection<Long> getReceiverPersonIds() {
return req.getReceivers().stream()
.map(PersonV3DTO::getId)
.collect(toList());
/**
* 预留
*/
JSONObject getMsgExtInfo() {
return new JSONObject();
}
@Override
@ -141,7 +148,7 @@ class TemplateBatch {
values.put("bizCode", req.getBizCode());
values.put("templateCode", template.getCode());
values.put("bizEventMappingCode", req.getBizEventMappingCode());
values.put("receiverIds", getRecordIds());
values.put("messageIds", getMessageIds());
values.put("receiverPersonIds", getReceiverPersonIds());
return JSON.toJSONString(values);
}

View File

@ -0,0 +1,96 @@
package cn.axzo.msg.center.inside.notices.service.impl.v3;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.domain.entity.MessageRecordV3;
import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateRouterDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.service.dto.MessageCardContentItemDTO;
import cn.axzo.msg.center.utils.MessageRouterUtil;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* {@link cn.axzo.msg.center.message.service.impl.GeneralMessageServiceImpl} 拷贝进行更改
*
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class TemplateParser {
private final MessageSystemConfig messageSystemConfig;
GeneralMessagePushVO parse(MessageRecordV3 message, MessageTemplateDTO template) {
// 对应模板的路由列表
MessageTemplateRouterDTO msgTemplateRouter = template.getMsgTemplateRouter();
// 解析路由地址
msgTemplateRouter = MessageRouterUtil.parseAndConcatRouteUrl(msgTemplateRouter, message.getRouterParams());
// 获取模板卡片信息
List<MessageCardContentItemDTO> rawCardContentItems = template.getMsgCardContentItems();
List<MessageCardContentItemDTO> cardContentItems = rawCardContentItems;
if (CollectionUtils.isNotEmpty(rawCardContentItems) && Objects.nonNull(message.getBizExtParams())
&& !message.getBizExtParams().isEmpty()) {
// 克隆避免修改入参
cardContentItems = cardContentItems.stream()
.map(MessageCardContentItemDTO::deepClone)
.collect(Collectors.toList());
cardContentItems.forEach(e -> {
String value = PlaceholderResolver.getDefaultResolver()
.resolveByMap(e.getValue(), message.getBizExtParams());
e.setValue(value);
});
}
return parseRouterAndCard(message, template.getIcon(), messageSystemConfig.getOrgIcon(), msgTemplateRouter, cardContentItems);
}
private GeneralMessagePushVO parseRouterAndCard(MessageRecordV3 message, String templateIcon, String orgIcon,
MessageTemplateRouterDTO msgTemplateRouter,
List<MessageCardContentItemDTO> cardContentItems) {
GeneralMessagePushVO.CardButton cardDetailButton = GeneralMessagePushVO.parseDetailButton(msgTemplateRouter);
List<GeneralMessagePushVO.CardButton> cardButtons = GeneralMessagePushVO.parseCardButtons(msgTemplateRouter);
List<GeneralMessagePushVO.CardExtensionItem> cardExtension = CollectionUtils.isEmpty(cardContentItems) ? Collections.emptyList() :
cardContentItems.stream()
.map(GeneralMessagePushVO.CardExtensionItem::from)
.collect(Collectors.toList());
List<GeneralMessagePushVO.Subtitle> subtitles = Collections.emptyList();
GeneralMessagePushVO.Subtitle subtitleOp = parseSubtitle(message, orgIcon);
if (subtitleOp != null) {
subtitles = Lists.newArrayList(subtitleOp);
}
return GeneralMessagePushVO.builder()
// 这个identityCode应该意义不大
.identityCode(message.getIdentityCode())
.templateCode(message.getTemplateCode())
.cardBannerUrl(templateIcon)
.cardTitle(message.getTitle())
.cardDetailButton(cardDetailButton)
.subtitles(subtitles)
.cardContent(message.getContent())
.cardExtension(cardExtension)
.cardButtons(cardButtons)
.bizCode(message.getBizCode())
// 因为是异常发送, send time不太准确
.sendTimestamp(message.getCreateAt().getTime())
.build();
}
private GeneralMessagePushVO.Subtitle parseSubtitle(MessageRecordV3 message, String orgIcon) {
if (StringUtils.isBlank(message.getReceiverOrgName())) {
return null;
}
return GeneralMessagePushVO.Subtitle.builder()
.title(message.getReceiverOrgName())
.iconUrl(orgIcon)
.build();
}
}

View File

@ -1,4 +1,4 @@
package cn.axzo.msg.center.inside.notices.service.component;
package cn.axzo.msg.center.inside.notices.service.impl.v3;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.service.enums.PushTerminalEnum;
@ -15,7 +15,7 @@ import static java.util.stream.Collectors.toList;
* @author yanglin
*/
@Component
public class TerminalAppTypeMapping {
public class TerminalAppMapping {
private final ImmutableMap<PushTerminalEnum, AppTypeEnum> terminal2AppType = ImmutableMap.of(
PushTerminalEnum.B_ENTERPRISE_APP, AppTypeEnum.CMP,

View File

@ -111,14 +111,14 @@ public class GeneralMessagePushVO implements Serializable {
.build();
}
private static CardButton parseDetailButton(MessageTemplateRouterDTO msgTemplateRouter) {
public static CardButton parseDetailButton(MessageTemplateRouterDTO msgTemplateRouter) {
if (Objects.isNull(msgTemplateRouter) || Objects.isNull(msgTemplateRouter.getRouteDetail())) {
return null;
}
return CardButton.from(msgTemplateRouter.getRouteDetail());
}
private static List<CardButton> parseCardButtons(MessageTemplateRouterDTO msgTemplateRouter) {
public static List<CardButton> parseCardButtons(MessageTemplateRouterDTO msgTemplateRouter) {
if (Objects.isNull(msgTemplateRouter)
|| CollectionUtils.isEmpty(msgTemplateRouter.getRouteButtons())) {
return Collections.emptyList();
@ -138,7 +138,7 @@ public class GeneralMessagePushVO implements Serializable {
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class Subtitle {
public static class Subtitle {
/**
* 图标 - 对应消息所属组织的图标
@ -149,7 +149,7 @@ public class GeneralMessagePushVO implements Serializable {
*/
private String title;
static Optional<Subtitle> from(GeneralMessageRecord record, String orgIcon) {
public static Optional<Subtitle> from(GeneralMessageRecord record, String orgIcon) {
if (StringUtils.isBlank(record.getOrgName())) {
return Optional.empty();
}
@ -170,7 +170,7 @@ public class GeneralMessagePushVO implements Serializable {
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class CardButton {
public static class CardButton {
/**
* 按钮标题
@ -242,13 +242,13 @@ public class GeneralMessagePushVO implements Serializable {
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class CardExtensionItem {
public static class CardExtensionItem {
private String title;
private String detail;
static CardExtensionItem from(MessageCardContentItemDTO cardContentItem) {
public static CardExtensionItem from(MessageCardContentItemDTO cardContentItem) {
return CardExtensionItem.builder()
.title(cardContentItem.getLabel())
.detail(cardContentItem.getValue())

View File

@ -2,15 +2,13 @@ package cn.axzo.msg.center.api;
import cn.axzo.msg.center.api.fallback.LoggingFallbackFactory;
import cn.axzo.msg.center.api.request.MessagePushReqV3;
import cn.axzo.msg.center.api.response.MessageRespV3;
import cn.axzo.msg.center.api.response.MessageSendRespV3;
import cn.azxo.framework.common.model.CommonResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import java.util.List;
/**
* @author yanglin
*/
@ -22,7 +20,7 @@ import java.util.List;
public interface MessageAPIV3 {
@RequestMapping(value = "api/message/v3/send", method = RequestMethod.POST)
CommonResponse<List<MessageRespV3>> send(MessagePushReqV3 req);
CommonResponse<MessageSendRespV3> send(MessagePushReqV3 req);
class FallbackFactory extends LoggingFallbackFactory {

View File

@ -3,6 +3,7 @@ package cn.axzo.msg.center.api.request;
import cn.axzo.basics.common.util.AssertUtil;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
@ -10,7 +11,9 @@ import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
@ -24,7 +27,6 @@ public class MessagePushReqV3 implements Serializable {
/**
* 发起者
*/
@NotNull(message = "发起者不能为空")
private PersonV3DTO sender;
/**
@ -47,24 +49,6 @@ public class MessagePushReqV3 implements Serializable {
@NotBlank(message = "关联业务唯一标识不能为空")
private String bizCode;
/**
* 消息所属组织类型
*/
@NotNull(message = "工作台类型不能为空")
private OrganizationTypeEnum orgType;
/**
* 消息所属工作台ID
*/
@NotNull(message = "消息所属工作台ID不能为空")
private Long workspaceId;
/**
* 消息所属企业ID
* 备注如果是工人则所在企业可以为空其它均必传
*/
private Long ouId;
/**
* 业务扩展参数-JSON字符串格式
*/
@ -80,13 +64,48 @@ public class MessagePushReqV3 implements Serializable {
*/
private BizCategoryEnum bizCategory;
// ------------------------------- 辅助方法
/**
* 发送者项目部ID
*/
private Long senderWorkspaceId;
public String stringWorkspaceId() {
if (workspaceId == null) {
return null;
/**
* 发送者企业ID
*/
private Long senderOuId;
/**
* 接收者项目部ID
*/
private Long receiversWorkspaceId;
/**
* 接收者企业ID
*/
private Long receiversOuId;
/**
* 接收者(消息)所属组织类型
*/
@NotNull(message = "工作台类型不能为空")
private OrganizationTypeEnum receiversOrgType;
/**
* 接收都组织名称
*/
private String receiversOrgName;
public Collection<PersonV3DTO> distinctReceivers() {
if (receivers == null) {
return Collections.emptySet();
}
return String.valueOf(workspaceId);
HashMap<Long, PersonV3DTO> personId2Person = new HashMap<>();
for (PersonV3DTO receiver : receivers) {
if (!personId2Person.containsKey(receiver.getId())) {
personId2Person.put(receiver.getId(), receiver);
}
}
return personId2Person.values();
}
public Set<String> stringReceiverIds() {
@ -94,17 +113,36 @@ public class MessagePushReqV3 implements Serializable {
return Collections.emptySet();
}
return receivers.stream()
.map(PersonV3DTO::getId)
.map(String::valueOf)
.collect(toSet());
}
public void validate() {
// TODO(yl): 确认如此是必传
AssertUtil.notNull(sender, "sender不能为空");
AssertUtil.notNull(bizEventMappingCode, "bizEventCode不能为空");
AssertUtil.notNull(bizCode, "bizCode不能为空");
AssertUtil.notNull(bizCategory, "bizCategory不能为空");
AssertUtil.notEmpty(receivers, "receivers不能为空");
AssertUtil.notEmpty(bizEventMappingCode, "bizEventCode不能为空");
AssertUtil.notEmpty(bizCode, "bizCode不能为空");
AssertUtil.notNull(orgType, "orgType不能为空");
AssertUtil.notNull(workspaceId, "workspaceId不能为空");
for (PersonV3DTO receiver : receivers) {
AssertUtil.notNull(receiver.getId(), "接收者ID不能为空");
AssertUtil.notNull(receiver.getIdentity(), "接收者身份信息不能为空");
AssertUtil.notNull(receiver.getIdentity().getType(), "接收者身份信息类型不能为空");
}
validateReceiverTypes();
}
private void validateReceiverTypes() {
if (receiversOuId != null) {
return;
}
boolean includeNotSupport = receivers.stream()
.anyMatch(r -> r.getIdentity().getType() == IdentityTypeEnum.NOT_SUPPORT);
AssertUtil.isFalse(includeNotSupport, "不能传<NOT_SUPPORT>身份类型");
boolean includeNonWorker = receivers.stream()
.anyMatch(r -> r.getIdentity().getType() != IdentityTypeEnum.WORKER);
AssertUtil.isFalse(includeNonWorker, "<非工人>接收者必须要传接收者ouId. 在没有提供<接收者ouId>的情况下,<工人>和<非工人>分别调用接口");
}
}

View File

@ -1,12 +0,0 @@
package cn.axzo.msg.center.api.response;
import lombok.Data;
import java.io.Serializable;
/**
* @author yanglin
*/
@Data
public class MessageRespV3 implements Serializable {
}

View File

@ -0,0 +1,47 @@
package cn.axzo.msg.center.api.response;
import com.alibaba.fastjson.JSON;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* @author yanglin
*/
public class MessageSendRespV3 implements Serializable {
/**
* 接收者 -> 发送结果
*/
private final Map<Long, MessageSendResultV3> receiverPersonId2Result = new HashMap<>();
public void addResult(MessageSendResultV3 result) {
receiverPersonId2Result.put(result.getReceiverPersonId(), result);
}
public Optional<MessageSendResultV3> findResultByReceiver(Long receiverPersonId) {
if (receiverPersonId == null) {
return Optional.empty();
}
return Optional.ofNullable(receiverPersonId2Result.get(receiverPersonId));
}
/**
* 返回的数量不一定和发送请求中的receivers数量一致, 发送的时候根据receiver的personId去了重
* <p>推荐使用 {@link #findResultByReceiver}, 根据接收者获取发送结果
*
* @return 所有的发送结果
*/
public Collection<MessageSendResultV3> getSendResults() {
return new ArrayList<>(receiverPersonId2Result.values());
}
@Override
public String toString() {
return JSON.toJSONString(receiverPersonId2Result);
}
}

View File

@ -0,0 +1,30 @@
package cn.axzo.msg.center.api.response;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author yanglin
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class MessageSendResultV3 implements Serializable {
/**
* 接收者ID
*/
private Long receiverPersonId;
/**
* msg-center中的消息ID
*/
private Long messageId;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.msg.center.service.dto;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
@ -8,6 +9,7 @@ import java.io.Serializable;
* @author yanglin
*/
@Data
@Builder
public class PersonV3DTO implements Serializable {
private static final long serialVersionUID = 1231840051925115741L;
@ -25,14 +27,4 @@ public class PersonV3DTO implements Serializable {
*/
private String name;
/**
* 发送者项目部ID
*/
private Long workspaceId;
/**
* 发送者企业ID
*/
private Long ouId;
}

View File

@ -0,0 +1,27 @@
package cn.axzo.msg.center.common.utils;
import java.util.concurrent.TimeUnit;
/**
* @author yanglin
*/
public class MiscUtils {
public static void sleepQuietly(long sleepTime, TimeUnit unit) {
long deadline = System.currentTimeMillis() + unit.toMillis(sleepTime);
long waitMs = deadline - System.currentTimeMillis();
boolean isInterrupted = false;
while (waitMs > 0) {
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
isInterrupted = true;
}
waitMs = deadline - System.currentTimeMillis();
}
if (isInterrupted) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -23,16 +23,29 @@ public class MessageRecordV3Dao extends ServiceImpl<MessageRecordV3Mapper, Messa
.eq(MessageRecordV3::getId, msgId)
.set(MessageRecordV3::getState, MsgStateV3Enum.SEND_SUCCESS)
.set(MessageRecordV3::getUpdateAt, new Date())
.set(MessageRecordV3::getSendTime, new Date())
.set(MessageRecordV3::getImMsgId, imMsgId)
.update();
}
public void setSendFailed(Collection<Long> messageIds, String cause) {
public void setSendFailed(Long msgId, String cause) {
lambdaUpdate()
.eq(MessageRecordV3::getState, MsgStateV3Enum.UNSENT)
.eq(MessageRecordV3::getId, msgId)
.set(MessageRecordV3::getState, MsgStateV3Enum.SEND_FAILED)
.set(MessageRecordV3::getUpdateAt, new Date())
.set(MessageRecordV3::getSendTime, new Date())
.set(MessageRecordV3::getFailCause, cause)
.update();
}
public void batchSetSendFailed(Collection<Long> messageIds, String cause) {
lambdaUpdate()
.eq(MessageRecordV3::getState, MsgStateV3Enum.UNSENT)
.in(MessageRecordV3::getId, messageIds)
.set(MessageRecordV3::getState, MsgStateV3Enum.SEND_FAILED)
.set(MessageRecordV3::getUpdateAt, new Date())
.set(MessageRecordV3::getSendTime, new Date())
.set(MessageRecordV3::getFailCause, cause)
.update();
}

View File

@ -5,7 +5,10 @@ import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.Getter;
import lombok.Setter;
@ -79,7 +82,12 @@ public class MessageRecordV3 extends BaseEntityExt<MessageRecordV3> implements S
/**
* 消息所属组织类型 PROJECT:项目,ENT:企业,UNKNOWN:未知
*/
private OrganizationTypeEnum orgType;
private OrganizationTypeEnum receiverOrgType;
/**
* 消息所属名称
*/
private String receiverOrgName;
/**
* 发送者项目部ID
@ -115,17 +123,20 @@ public class MessageRecordV3 extends BaseEntityExt<MessageRecordV3> implements S
/**
* 路由参数
*/
private String routerParams;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject routerParams;
/**
* 业务扩展参数
*/
private String bizExtParams;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject bizExtParams;
/**
* 消息的其它信息, 备查
*/
private String msgExtInfo;
@TableField(typeHandler = FastjsonTypeHandler.class)
private JSONObject msgExtInfo;
/**
* 失败原因

View File

@ -1,7 +1,15 @@
package cn.axzo.msg.center;
import com.taobao.api.internal.util.NamedThreadFactory;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.axzo.msg.center.MsgCenterConfig.IM_CENTER;
import static cn.axzo.msg.center.MsgCenterConfig.WORKSPACE;
@ -18,4 +26,13 @@ public class MsgCenterConfig {
public static final String IM_CENTER = "cn.axzo.im.center.api.feign";
public static final String WORKSPACE = "cn.axzo.apollo.workspace";
@Bean @Primary
public ExecutorService defaultExecutor() {
return new ThreadPoolExecutor(5, 30,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory("defaultAsyncExecutor"));
}
}

View File

@ -0,0 +1,68 @@
package cn.axzo.msg.center.inside.notices.service;
import cn.axzo.msg.center.MsgCenterApplication;
import cn.axzo.msg.center.api.request.MessagePushReqV3;
import cn.axzo.msg.center.api.response.MessageSendRespV3;
import cn.axzo.msg.center.common.utils.MiscUtils;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.dto.PersonV3DTO;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* @author yanglin
*/
@SpringBootTest(classes = MsgCenterApplication.class)
class MessageServiceV3Test {
@Autowired
private MessageServiceV3 messageServiceV3;
@Test
void send() {
JSONObject bizExtParams = new JSONObject();
bizExtParams.put("name", "杨林");
JSONObject routerExtParams = new JSONObject();
routerExtParams.put("routerName", "杨林");
MessagePushReqV3 req = new MessagePushReqV3();
req.setBizEventMappingCode("652");
req.setBizCode("001");
req.setBizEventMappingCode("yl-test-mapping-01");
req.setBizExtParams(bizExtParams);
req.setRouterParams(routerExtParams);
req.setBizCategory(BizCategoryEnum.OTHER);
req.setSender(
PersonV3DTO.builder()
.id(84982L)
.identity(IdentityDTO
.builder()
.build())
.build());
req.setReceiversOuId(333L);
req.setReceivers(Collections.singletonList(
PersonV3DTO.builder()
.id(85026L)
.identity(IdentityDTO
.builder()
.type(IdentityTypeEnum.OPERATOR)
.build())
.build()));
MessageSendRespV3 resp = messageServiceV3.send(req);
System.out.println(JSON.toJSONString(resp));
// 使用异步发送, 避免测试结束
MiscUtils.sleepQuietly(10, TimeUnit.SECONDS);
}
}