From 31f413f7e61d72fff096deccb2aae0cb21aa18b1 Mon Sep 17 00:00:00 2001 From: yanglin Date: Fri, 12 Jan 2024 19:14:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E5=AD=98=E4=B8=80=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../msg/MessageAPIV3Controller.java | 29 ++++ .../notices/service/MessageServiceV3.java | 13 ++ .../component/TerminalAppTypeMapping.java | 39 +++++ .../notices/service/impl/v3/ImClient.java | 98 ++++++++++++ .../service/impl/v3/MessageServiceV3Impl.java | 94 +++++++++++ .../service/impl/v3/TemplateBatch.java | 148 ++++++++++++++++++ .../cn/axzo/msg/center/api/MessageAPIV3.java | 34 ++++ .../msg/center/api/enums/MsgStateV3Enum.java | 33 ++++ .../api/fallback/LoggingFallbackFactory.java | 77 +++++++++ .../center/api/request/MessagePushReqV3.java | 110 +++++++++++++ .../center/api/response/MessageRespV3.java | 12 ++ .../msg/center/service/dto/PersonV3DTO.java | 38 +++++ .../common/utils/PlaceholderResolver.java | 4 + .../msg/center/dal/MessageRecordV3Dao.java | 39 +++++ .../dal/mapper/MessageRecordV3Mapper.java | 12 ++ .../center/domain/entity/MessageRecordV3.java | 144 +++++++++++++++++ .../domain/enums/BizActionCategory.java | 9 +- 17 files changed, 931 insertions(+), 2 deletions(-) create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/controller/msg/MessageAPIV3Controller.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/MessageServiceV3.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/component/TerminalAppTypeMapping.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/ImClient.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/MessageServiceV3Impl.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/TemplateBatch.java create mode 100644 msg-center-api/src/main/java/cn/axzo/msg/center/api/MessageAPIV3.java create mode 100644 msg-center-api/src/main/java/cn/axzo/msg/center/api/enums/MsgStateV3Enum.java create mode 100644 msg-center-api/src/main/java/cn/axzo/msg/center/api/fallback/LoggingFallbackFactory.java create mode 100644 msg-center-api/src/main/java/cn/axzo/msg/center/api/request/MessagePushReqV3.java create mode 100644 msg-center-api/src/main/java/cn/axzo/msg/center/api/response/MessageRespV3.java create mode 100644 msg-center-api/src/main/java/cn/axzo/msg/center/service/dto/PersonV3DTO.java create mode 100644 msg-center-dal/src/main/java/cn/axzo/msg/center/dal/MessageRecordV3Dao.java create mode 100644 msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageRecordV3Mapper.java create mode 100644 msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/MessageRecordV3.java diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/controller/msg/MessageAPIV3Controller.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/controller/msg/MessageAPIV3Controller.java new file mode 100644 index 00000000..2cced683 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/controller/msg/MessageAPIV3Controller.java @@ -0,0 +1,29 @@ +package cn.axzo.msg.center.inside.notices.controller.msg; + +import cn.axzo.msg.center.api.MessageAPIV3; +import cn.axzo.msg.center.api.request.MessagePushReqV3; +import cn.axzo.msg.center.api.response.MessageRespV3; +import cn.axzo.msg.center.inside.notices.service.MessageServiceV3; +import cn.azxo.framework.common.model.CommonResponse; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * @author yanglin + */ +@Slf4j +@RestController +@RequiredArgsConstructor +public class MessageAPIV3Controller implements MessageAPIV3 { + + private final MessageServiceV3 messageServiceV3; + + @Override + public CommonResponse> send(MessagePushReqV3 req) { + return CommonResponse.success(messageServiceV3.send(req)); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/MessageServiceV3.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/MessageServiceV3.java new file mode 100644 index 00000000..a83cc3ed --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/MessageServiceV3.java @@ -0,0 +1,13 @@ +package cn.axzo.msg.center.inside.notices.service; + +import cn.axzo.msg.center.api.request.MessagePushReqV3; +import cn.axzo.msg.center.api.response.MessageRespV3; + +import java.util.List; + +/** + * @author yanglin + */ +public interface MessageServiceV3 { + List send(MessagePushReqV3 req); +} diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/component/TerminalAppTypeMapping.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/component/TerminalAppTypeMapping.java new file mode 100644 index 00000000..96f75156 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/component/TerminalAppTypeMapping.java @@ -0,0 +1,39 @@ +package cn.axzo.msg.center.inside.notices.service.component; + +import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.msg.center.service.enums.PushTerminalEnum; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ +@Component +public class TerminalAppTypeMapping { + + private final ImmutableMap terminal2AppType = ImmutableMap.of( + PushTerminalEnum.B_ENTERPRISE_APP, AppTypeEnum.CMP, + PushTerminalEnum.C_WORKER_APP, AppTypeEnum.CM + ); + + public List toImTypes(List terminals) { + if (CollectionUtils.isEmpty(terminals)) { + return Collections.emptyList(); + } + return terminals.stream() + .map(this::toImType) + .distinct() + .collect(toList()); + } + + public AppTypeEnum toImType(PushTerminalEnum terminal) { + return terminal2AppType.get(terminal); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/ImClient.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/ImClient.java new file mode 100644 index 00000000..e415b3aa --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/ImClient.java @@ -0,0 +1,98 @@ +package cn.axzo.msg.center.inside.notices.service.impl.v3; + +import cn.axzo.framework.domain.web.result.ApiResult; +import cn.axzo.im.center.api.feign.MessageApi; +import cn.axzo.im.center.api.vo.req.MessageInfo; +import cn.axzo.im.center.api.vo.resp.MessageDispatchResp; +import com.alibaba.fastjson.JSON; +import com.taobao.api.internal.util.NamedThreadFactory; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +/** + * @author yanglin + */ +@Slf4j +@Component +@RequiredArgsConstructor +class ImClient { + private final MessageApi messageApi; + private final ScheduledExecutorService scheduleExecutor = + Executors.newScheduledThreadPool(10, new NamedThreadFactory("MessageServiceV3-send")); + // 强行做成有界队列, 避免内存爆了 + private final Semaphore semaphore = new Semaphore(200); + + void send(MessageInfo req, BiConsumer> callback) { + scheduleExecutor.execute(() -> { + acquireAndSend(req, callback, 0); + }); + } + + private void acquireAndSend( + MessageInfo req, BiConsumer> callback, int retryCount) { + boolean acquired = false; + try { + if (semaphore.tryAcquire(20, TimeUnit.SECONDS)) { + acquired = true; + doSend(req, callback, retryCount); + } else { + String error = String.format("发送消息超载了. templateId=%s, header=%s, receiverPersonIds=%s", + req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList())); + callback.accept(new RuntimeException(error), null); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + if (acquired) { + semaphore.release(); + } + } + } + + /** + * 本地简单重试 + */ + private void doSend( + MessageInfo req, BiConsumer> callback, int retryCount) { + if (retryCount > 0) { + log.warn("重试发送消息 templateId={}, header={}, receiverPersonIds={}", + req.getMsgTemplateId(), req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList())); + } + Consumer retryFun = e -> { + if (retryCount > 3) { + callback.accept(e, null); + } else { + // 尽量在消息端进行重试, 因为是按template的纬度调用IM进行批量发送的 + // 如果直接让上游调用端进行重试的话, 有重复发送的风险 + scheduleExecutor.schedule( + () -> acquireAndSend(req, callback, retryCount + 1), + 2L * (retryCount + 1), TimeUnit.SECONDS); + } + }; + try { + ApiResult> apiResult = messageApi.sendMessage(req); + if (apiResult.isSuccess()) { + callback.accept(null, apiResult.getData()); + } else { + String error = String.format("发送消息失败. respCode=%s, respMsg=%s, templateId=%s, header=%s, receiverPersonIds=%s, retryCount=%s", + apiResult.getCode(), apiResult.getMsg(), req.getMsgTemplateId(), + req.getMsgHeader(), JSON.toJSONString(req.getToPersonIdList()), retryCount); + log.error(error); + // FIXME(yl): 区分业务异常和IM系统异常? + retryFun.accept(new RuntimeException(error)); + } + } catch (Exception e) { + retryFun.accept(e); + } + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/MessageServiceV3Impl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/MessageServiceV3Impl.java new file mode 100644 index 00000000..2973f4aa --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/MessageServiceV3Impl.java @@ -0,0 +1,94 @@ +package cn.axzo.msg.center.inside.notices.service.impl.v3; + +import cn.axzo.basics.common.exception.ServiceException; +import cn.axzo.basics.common.util.AssertUtil; +import cn.axzo.im.center.api.vo.resp.MessageDispatchResp; +import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.msg.center.api.request.MessagePushReqV3; +import cn.axzo.msg.center.api.response.MessageRespV3; +import cn.axzo.msg.center.dal.MessageRecordV3Dao; +import cn.axzo.msg.center.domain.entity.BizEventMapping; +import cn.axzo.msg.center.domain.entity.MessageRecordV3; +import cn.axzo.msg.center.domain.enums.BizActionCategory; +import cn.axzo.msg.center.inside.notices.service.MessageServiceV3; +import cn.axzo.msg.center.inside.notices.service.component.TerminalAppTypeMapping; +import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO; +import cn.axzo.msg.center.message.service.BizEventMappingService; +import cn.axzo.msg.center.message.service.MessageTemplateNewService; +import cn.axzo.msg.center.service.bizevent.request.ReachDto; +import cn.axzo.msg.center.utils.UUIDUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * @author yanglin + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class MessageServiceV3Impl implements MessageServiceV3 { + + private final BizEventMappingService bizEventMappingService; + private final MessageTemplateNewService messageTemplateNewService; + private final TerminalAppTypeMapping terminalAppTypeMapping; + private final ImClient imClient; + private final MessageRecordV3Dao messageRecordV3Dao; + + @Override + @Transactional(rollbackFor = Exception.class) + public List send(MessagePushReqV3 req) { + req.validate(); + BizEventMapping mapping = bizEventMappingService + .getByBizCode(req.getBizEventMappingCode()) + .orElse(null); + AssertUtil.notNull(mapping, String.format("找不到对应的事件映射: %s", req.getBizEventMappingCode())); + //noinspection DataFlowIssue + AssertUtil.notEmpty(mapping.getReachConfig(), String.format( + "业务事件映射%s无业务动作配置", req.getBizEventMappingCode())); + String batchNo = UUIDUtil.uuidString(); + List batches = new ArrayList<>(mapping.getReachConfig().size()); + for (ReachDto cfg : mapping.getReachConfig()) { + AssertUtil.isTrue(BizActionCategory.NOTIFICATION.is(cfg.getCategory()), "目前只支持通知"); + MessageTemplateDTO template = messageTemplateNewService + .queryEnableTemplateByCode(cfg.getTemplateCode()) + .orElseThrow(() -> new ServiceException(String.format( + "未查询到对应的模板, templateCode=%s", cfg.getTemplateCode()))); + batches.add(new TemplateBatch(req, batchNo, template)); + } + List records = batches.stream() + .flatMap(b -> b.getMessageRecords().stream()) + .collect(toList()); + messageRecordV3Dao.saveBatch(records); + for (TemplateBatch batch : batches) { + sendBatch(batch); + } + // TODO(yl): WHAT + return Collections.emptyList(); + } + + private void sendBatch(TemplateBatch batch) { + List appTypes = terminalAppTypeMapping + .toImTypes(batch.getTemplate().getPushTerminals()); + imClient.send(batch.buildIMRequest(appTypes), (e, respList) -> { + if (e != null) { + messageRecordV3Dao.setSendFailed(batch.getRecordIds(), e.getMessage()); + } else { + for (int i = 0; i < respList.size(); i++) { + MessageDispatchResp resp = respList.get(i); + MessageRecordV3 record = batch.getMessageRecords().get(i); + // 把im端的id也存起来 + messageRecordV3Dao.setSendSuccess(record.getId(), resp.getMsgid()); + } + } + }); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/TemplateBatch.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/TemplateBatch.java new file mode 100644 index 00000000..75508ba0 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/v3/TemplateBatch.java @@ -0,0 +1,148 @@ +package cn.axzo.msg.center.inside.notices.service.impl.v3; + +import cn.axzo.framework.jackson.utility.JSON; +import cn.axzo.im.center.api.vo.req.MessageInfo; +import cn.axzo.im.center.common.enums.AppTypeEnum; +import cn.axzo.msg.center.api.enums.MsgStateV3Enum; +import cn.axzo.msg.center.api.request.MessagePushReqV3; +import cn.axzo.msg.center.common.enums.TableIsDeleteEnum; +import cn.axzo.msg.center.common.utils.PlaceholderResolver; +import cn.axzo.msg.center.domain.entity.MessageRecordV3; +import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO; +import cn.axzo.msg.center.service.dto.PersonV3DTO; +import cn.axzo.msg.center.utils.UUIDUtil; +import com.google.common.collect.ImmutableMap; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + +/** + * @author yanglin + */ +@RequiredArgsConstructor +class TemplateBatch { + private final MessagePushReqV3 req; + private final String batchNo; + @Getter + private final MessageTemplateDTO template; + + private List records; + private String title; + private String content; + private Map msgExtInfo; + + Collection getRecordIds() { + return getMessageRecords().stream() + .map(MessageRecordV3::getId) + .collect(toSet()); + } + + List getMessageRecords() { + if (records != null) { + return records; + } + records = new ArrayList<>(req.getReceivers().size()); + for (PersonV3DTO receiver : req.getReceivers()) { + MessageRecordV3 record = new MessageRecordV3(); + record.setBatchNo(batchNo); + record.setIdentityCode(UUIDUtil.uuidString()); + record.setBizCategory(req.getBizCategory()); + record.setSenderPersonId(req.getSender().getId()); + record.setSenderOuId(req.getSender().getId()); + record.setSenderWorkspaceId(req.getSender().getWorkspaceId()); + record.setReceiverPersonId(receiver.getId()); + record.setReceiverOuId(receiver.getOuId()); + record.setReceiverWorkspaceId(receiver.getWorkspaceId()); + record.setBizEventMappingCode(req.getBizEventMappingCode()); + record.setTemplateCode(template.getCode()); + record.setTitle(getTitle()); + record.setContent(getContent()); + record.setOrgType(req.getOrgType()); + record.setState(MsgStateV3Enum.UNSENT); + record.setBizCode(req.getBizCode()); + record.setRouterParams(JSON.toJSONString(req.getRouterParams())); + record.setBizExtParams(record.getBizExtParams()); + record.setMsgExtInfo(JSON.toJSONString(getMsgExtInfo())); + record.setFailCause(null); + record.setSendTime(new Date()); + record.setCreateAt(new Date()); + record.setUpdateAt(new Date()); + record.setIsDelete(TableIsDeleteEnum.NORMAL.value); + } + return records; + } + + MessageInfo buildIMRequest(List apps) { + MessageInfo imReq = new MessageInfo(); + imReq.setAppTypeList(apps); + imReq.setToPersonIdList(req.stringReceiverIds()); + imReq.setMsgHeader(getTitle()); + imReq.setMsgContent(getContent()); + imReq.setMsgTemplateId(template.getCode()); + imReq.setMsgTemplateContent(JSON.toJSONString(getMsgExtInfo())); + // 扩展信息 + Map ext = new HashMap<>(); + ext.put("minAppVersion", template.getMinAppVersion()); + ext.put("workspaceId", req.stringWorkspaceId()); + imReq.setExtendsInfo(ext); + return imReq; + } + + // ------------------------------- 辅助方法 + + String getTitle() { + if (title != null) { + return title; + } + title = req.getBizExtParams() == null + ? template.getTitle() + : PlaceholderResolver.resolve(template.getTitle(), req.getBizExtParams()); + return title; + } + + String getContent() { + if (content != null) { + return content; + } + content = req.getBizExtParams() == null + ? template.getContent() + : PlaceholderResolver.resolve(template.getContent(), req.getBizExtParams()); + return content; + } + + Map getMsgExtInfo() { + if (msgExtInfo == null) { + msgExtInfo = ImmutableMap.of( + "bizExtParams", req.getBizExtParams(), + "routerParams", req.getRouterParams()); + } + return msgExtInfo; + } + + Collection getReceiverPersonIds() { + return req.getReceivers().stream() + .map(PersonV3DTO::getId) + .collect(toList()); + } + + @Override + public String toString() { + HashMap values = new HashMap<>(); + values.put("batchNo", batchNo); + values.put("bizCode", req.getBizCode()); + values.put("templateCode", template.getCode()); + values.put("bizEventMappingCode", req.getBizEventMappingCode()); + values.put("receiverIds", getRecordIds()); + values.put("receiverPersonIds", getReceiverPersonIds()); + return JSON.toJSONString(values); + } +} \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/api/MessageAPIV3.java b/msg-center-api/src/main/java/cn/axzo/msg/center/api/MessageAPIV3.java new file mode 100644 index 00000000..0667bd6c --- /dev/null +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/api/MessageAPIV3.java @@ -0,0 +1,34 @@ +package cn.axzo.msg.center.api; + +import cn.axzo.msg.center.api.fallback.LoggingFallbackFactory; +import cn.axzo.msg.center.api.request.MessagePushReqV3; +import cn.axzo.msg.center.api.response.MessageRespV3; +import cn.azxo.framework.common.model.CommonResponse; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; + +import java.util.List; + +/** + * @author yanglin + */ +@Component +@FeignClient( + value = "msg-center", + url = "${server.serviceUrl:http://msg-center:8080}", + fallbackFactory = MessageAPIV3.FallbackFactory.class) +public interface MessageAPIV3 { + + @RequestMapping(value = "api/message/v3/send", method = RequestMethod.POST) + CommonResponse> send(MessagePushReqV3 req); + + class FallbackFactory extends LoggingFallbackFactory { + + public FallbackFactory() { + super(MessageAPIV3.class); + } + + } +} \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/api/enums/MsgStateV3Enum.java b/msg-center-api/src/main/java/cn/axzo/msg/center/api/enums/MsgStateV3Enum.java new file mode 100644 index 00000000..8044de8a --- /dev/null +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/api/enums/MsgStateV3Enum.java @@ -0,0 +1,33 @@ +package cn.axzo.msg.center.api.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; +import lombok.Getter; + +/** + * @author: yanglin + */ +@Getter +public enum MsgStateV3Enum { + + UNSENT("UNSENT", "未发送"), + SEND_FAILED("SEND_FAILED", "发送失败"), + SEND_SUCCESS("SEND_SUCCESS", "已发送"); + + @EnumValue + private final String code; + private final String message; + + MsgStateV3Enum(String code, String message) { + this.code = code; + this.message = message; + } + + public String code() { + return this.code; + } + + public String message() { + return this.message; + } + +} \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/api/fallback/LoggingFallbackFactory.java b/msg-center-api/src/main/java/cn/axzo/msg/center/api/fallback/LoggingFallbackFactory.java new file mode 100644 index 00000000..5e7c6bbe --- /dev/null +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/api/fallback/LoggingFallbackFactory.java @@ -0,0 +1,77 @@ +package cn.axzo.msg.center.api.fallback; + +import cn.axzo.basics.common.exception.ServiceException; +import cn.azxo.framework.common.model.CommonResponse; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.openfeign.FallbackFactory; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * @author yanglin + */ +@Slf4j +public abstract class LoggingFallbackFactory implements FallbackFactory { + + private final Class type; + + public LoggingFallbackFactory(Class apiInterfaceType) { + if (!apiInterfaceType.isInterface()) { + throw new RuntimeException(String.format("目标类型必须是接口, 实际上是: %s", apiInterfaceType.getName())); + } + this.type = apiInterfaceType; + } + + @Override + public Object create(Throwable cause) { + return Proxy.newProxyInstance( + getClass().getClassLoader(), new Class[]{type}, new Logging(type, cause)); + } + + @RequiredArgsConstructor + private static class Logging implements InvocationHandler { + + private final Class type; + private final Throwable cause; + + @Override + @SuppressWarnings({"SuspiciousInvocationHandlerImplementation", "代码逻辑做了检查"}) + public Object invoke(Object proxy, Method method, Object[] args) { + if (ReflectionUtils.isObjectMethod(method)) { + log.error("method invoking error {}#{}", type.getName(), method.getName(), cause); + return CommonResponse.error(String.format("调用异常: %s#%s", type.getName(), method.getName())); + } + if (method.getReturnType() != CommonResponse.class) { + throw new RuntimeException(String.format( + "调用类型不匹配, 期望: CommonResponse, 实际上是:%s", method.getReturnType().getName())); + } + if (cause instanceof ServiceException) { + Object errorData = getErrorData((ServiceException) cause); + // 大概率是业务约束不满足, 不打印成error + log.warn("method invoking exception {}#{}, message={}, errorData={}", + type.getName(), method.getName(), cause.getMessage(), errorData, cause); + return CommonResponse.error(cause.getMessage() == null ? "调用异常" : cause.getMessage()); + } else { + log.error("method invoking error {}#{}", type.getName(), method.getName(), cause); + return CommonResponse.error(String.format("调用异常: %s#%s", type.getName(), method.getName())); + } + } + + private Object getErrorData(ServiceException e) { + try { + Field field = ServiceException.class.getDeclaredField("dataObject"); + field.setAccessible(true); + return field.get(e); + } catch (Exception ex) { + log.error("获取ServiceException中的dataObject出错", ex); + return null; + } + } + + } +} \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/api/request/MessagePushReqV3.java b/msg-center-api/src/main/java/cn/axzo/msg/center/api/request/MessagePushReqV3.java new file mode 100644 index 00000000..182a5f26 --- /dev/null +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/api/request/MessagePushReqV3.java @@ -0,0 +1,110 @@ +package cn.axzo.msg.center.api.request; + +import cn.axzo.basics.common.util.AssertUtil; +import cn.axzo.msg.center.service.dto.PersonV3DTO; +import cn.axzo.msg.center.service.enums.BizCategoryEnum; +import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; +import com.alibaba.fastjson.JSONObject; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static java.util.stream.Collectors.toSet; + +/** + * @author yanglin + */ +@Data +public class MessagePushReqV3 implements Serializable { + /** + * 发起者 + */ + @NotNull(message = "发起者不能为空") + private PersonV3DTO sender; + + /** + * 接收者列表 + */ + @NotNull(message = "接收者列表不能为空") + private List receivers; + + /** + * 业务事件code + * oms->消息模板 配置 + */ + @NotBlank(message = "业务事件code不能为空") + private String bizEventMappingCode; + + /** + * 关联业务唯一标识 + * 例如: 请假申请的编号 + */ + @NotBlank(message = "关联业务唯一标识不能为空") + private String bizCode; + + /** + * 消息所属组织类型 + */ + @NotNull(message = "工作台类型不能为空") + private OrganizationTypeEnum orgType; + + /** + * 消息所属工作台ID + */ + @NotNull(message = "消息所属工作台ID不能为空") + private Long workspaceId; + + /** + * 消息所属企业ID + * 【备注:如果是工人,则所在企业可以为空;其它均必传】 + */ + private Long ouId; + + /** + * 业务扩展参数-JSON字符串格式 + */ + private JSONObject bizExtParams; + + /** + * 路由参数-JSON字符串格式 + */ + private JSONObject routerParams; + + /** + * 业务类型- 待办使用 + */ + private BizCategoryEnum bizCategory; + + // ------------------------------- 辅助方法 + + public String stringWorkspaceId() { + if (workspaceId == null) { + return null; + } + return String.valueOf(workspaceId); + } + + public Set stringReceiverIds() { + if (receivers == null) { + return Collections.emptySet(); + } + return receivers.stream() + .map(String::valueOf) + .collect(toSet()); + } + + public void validate() { + // TODO(yl): 确认如此是必传 + AssertUtil.notNull(sender, "sender不能为空"); + AssertUtil.notEmpty(receivers, "receivers不能为空"); + AssertUtil.notEmpty(bizEventMappingCode, "bizEventCode不能为空"); + AssertUtil.notEmpty(bizCode, "bizCode不能为空"); + AssertUtil.notNull(orgType, "orgType不能为空"); + AssertUtil.notNull(workspaceId, "workspaceId不能为空"); + } +} \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/api/response/MessageRespV3.java b/msg-center-api/src/main/java/cn/axzo/msg/center/api/response/MessageRespV3.java new file mode 100644 index 00000000..b58714ec --- /dev/null +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/api/response/MessageRespV3.java @@ -0,0 +1,12 @@ +package cn.axzo.msg.center.api.response; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author yanglin + */ +@Data +public class MessageRespV3 implements Serializable { +} diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/service/dto/PersonV3DTO.java b/msg-center-api/src/main/java/cn/axzo/msg/center/service/dto/PersonV3DTO.java new file mode 100644 index 00000000..f40eccae --- /dev/null +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/service/dto/PersonV3DTO.java @@ -0,0 +1,38 @@ +package cn.axzo.msg.center.service.dto; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author yanglin + */ +@Data +public class PersonV3DTO implements Serializable { + + private static final long serialVersionUID = 1231840051925115741L; + + /** + * 自然人id + */ + private Long id; + /** + * 身份信息 + */ + private IdentityDTO identity; + /** + * 姓名 + */ + private String name; + + /** + * 发送者项目部ID + */ + private Long workspaceId; + + /** + * 发送者企业ID + */ + private Long ouId; + +} \ No newline at end of file diff --git a/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/PlaceholderResolver.java b/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/PlaceholderResolver.java index 3e4d6a2f..8f5af3c2 100644 --- a/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/PlaceholderResolver.java +++ b/msg-center-common/src/main/java/cn/axzo/msg/center/common/utils/PlaceholderResolver.java @@ -46,6 +46,10 @@ public class PlaceholderResolver { this.placeholderSuffix = placeholderSuffix; } + public static String resolve(String template, Map values) { + return getDefaultResolver().resolveByMap(template, values); + } + /** * 获取默认的占位符解析器,即占位符前缀为"${", 后缀为"}" * diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/MessageRecordV3Dao.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/MessageRecordV3Dao.java new file mode 100644 index 00000000..6a64d747 --- /dev/null +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/MessageRecordV3Dao.java @@ -0,0 +1,39 @@ +package cn.axzo.msg.center.dal; + +import cn.axzo.msg.center.api.enums.MsgStateV3Enum; +import cn.axzo.msg.center.dal.mapper.MessageRecordV3Mapper; +import cn.axzo.msg.center.domain.entity.MessageRecordV3; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Date; + +/** + * @author yanglin + */ +@Slf4j +@Component +public class MessageRecordV3Dao extends ServiceImpl { + + public void setSendSuccess(Long msgId, String imMsgId) { + lambdaUpdate() + .eq(MessageRecordV3::getState, MsgStateV3Enum.UNSENT) + .eq(MessageRecordV3::getId, msgId) + .set(MessageRecordV3::getState, MsgStateV3Enum.SEND_SUCCESS) + .set(MessageRecordV3::getUpdateAt, new Date()) + .set(MessageRecordV3::getImMsgId, imMsgId) + .update(); + } + + public void setSendFailed(Collection messageIds, String cause) { + lambdaUpdate() + .eq(MessageRecordV3::getState, MsgStateV3Enum.UNSENT) + .in(MessageRecordV3::getId, messageIds) + .set(MessageRecordV3::getState, MsgStateV3Enum.SEND_FAILED) + .set(MessageRecordV3::getUpdateAt, new Date()) + .set(MessageRecordV3::getFailCause, cause) + .update(); + } +} \ No newline at end of file diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageRecordV3Mapper.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageRecordV3Mapper.java new file mode 100644 index 00000000..bbac5ee3 --- /dev/null +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageRecordV3Mapper.java @@ -0,0 +1,12 @@ +package cn.axzo.msg.center.dal.mapper; + +import cn.axzo.msg.center.domain.entity.MessageRecordV3; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** + * @author yanglin + */ +@Mapper +public interface MessageRecordV3Mapper extends BaseMapper { +} diff --git a/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/MessageRecordV3.java b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/MessageRecordV3.java new file mode 100644 index 00000000..1d344119 --- /dev/null +++ b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/entity/MessageRecordV3.java @@ -0,0 +1,144 @@ +package cn.axzo.msg.center.domain.entity; + +import cn.axzo.msg.center.api.enums.MsgStateV3Enum; +import cn.axzo.msg.center.domain.persistence.BaseEntityExt; +import cn.axzo.msg.center.service.enums.BizCategoryEnum; +import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; +import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author cold_blade + * @version 1.0 + * @description 消息基础模板数据模型 + * @date 2023/9/20 + */ +@Setter +@Getter +@TableName("message_record_v3") +public class MessageRecordV3 extends BaseEntityExt implements Serializable { + + private static final long serialVersionUID = -880409106378455813L; + + /** + * 批次号 + */ + private String batchNo; + + /** + * IM端的消息ID + */ + private String imMsgId; + + /** + * 消息唯一标识 + */ + private String identityCode; + + /** + * 业务类型 + *

FLOW:流程,OTHER:其它 + */ + private BizCategoryEnum bizCategory; + + /** + * 发送自然人ID + */ + private Long senderPersonId; + + /** + * 接收自然人ID + */ + private Long receiverPersonId; + + /** + * 业务事件映射code + */ + private String bizEventMappingCode; + + /** + * 模版code + */ + private String templateCode; + + /** + * 消息标题 + */ + private String title; + + /** + * 消息内容 + */ + private String content; + + /** + * 消息所属组织类型 PROJECT:项目,ENT:企业,UNKNOWN:未知 + */ + private OrganizationTypeEnum orgType; + + /** + * 发送者项目部ID + */ + private Long senderWorkspaceId; + + /** + * 发送者企业ID + */ + private Long senderOuId; + + /** + * 接收者项目部ID + */ + private Long receiverWorkspaceId; + + /** + * 接收者企业ID + */ + private Long receiverOuId; + + /** + * 消息状态 UNSENT:未发送,SEND_SUCCESS:已发送,SEND_FAILED:发送失败 + */ + private MsgStateV3Enum state; + + /** + * 关联业务编码 + *

业务上的唯一标识, 用于向上回述 + */ + private String bizCode; + + /** + * 路由参数 + */ + private String routerParams; + + /** + * 业务扩展参数 + */ + private String bizExtParams; + + /** + * 消息的其它信息, 备查 + */ + private String msgExtInfo; + + /** + * 失败原因 + */ + private String failCause; + + /** + * 发送时间 + */ + private Date sendTime; + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} \ No newline at end of file diff --git a/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/BizActionCategory.java b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/BizActionCategory.java index 39028733..bdc6c639 100644 --- a/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/BizActionCategory.java +++ b/msg-center-domain/src/main/java/cn/axzo/msg/center/domain/enums/BizActionCategory.java @@ -2,11 +2,12 @@ package cn.axzo.msg.center.domain.enums; import cn.axzo.basics.common.exception.ServiceException; import cn.axzo.msg.center.service.enums.MessageCategoryEnum; +import lombok.AllArgsConstructor; +import lombok.Getter; + import java.util.Arrays; import java.util.Collections; import java.util.List; -import lombok.AllArgsConstructor; -import lombok.Getter; /** * 业务动作分类 @@ -41,4 +42,8 @@ public enum BizActionCategory { .findFirst() .orElseThrow(() -> new ServiceException("BizEvent Action is invalid")); } + + public boolean is(String code) { + return this.code.equals(code); + } }