diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java index a7ec28ee..631d0d6c 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java @@ -130,6 +130,9 @@ public class PendingMessageBizConfig { @Getter private int oldMsgStateDataBackendUpdateBatchSize = 100; + @Getter + private boolean oldMsgOffline = true; + // !! 待办分类统计缓存 /** diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java index edfdef5f..c09c00cf 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java @@ -33,13 +33,9 @@ import cn.axzo.msg.center.inside.notices.service.MessageRecordService; import cn.axzo.msg.center.inside.notices.service.MessageRelationService; import cn.axzo.msg.center.inside.notices.service.MessageRouterService; import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService; -import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam; -import cn.axzo.msg.center.message.service.PendingMessageNewService; import cn.axzo.msg.center.mq.MqMessageRecord; -import cn.axzo.msg.center.mq.MqMessageType; import cn.axzo.msg.center.mq.MqProducer; -import cn.axzo.msg.center.service.dto.PersonDTO; -import cn.axzo.msg.center.service.enums.AppTerminalTypeEnum; +import cn.axzo.msg.center.service.enums.MqMessageType; import cn.azxo.framework.common.model.Page; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java index 3b6a9209..1dd00e2b 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java @@ -50,10 +50,10 @@ import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq; import cn.axzo.msg.center.inside.notices.event.SendMessageEvent; import cn.axzo.msg.center.inside.notices.service.MessageRecordService; import cn.axzo.msg.center.mq.MqMessageRecord; -import cn.axzo.msg.center.mq.MqMessageType; import cn.axzo.msg.center.mq.MqProducer; import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType; import cn.axzo.msg.center.service.dto.IdentityDTO; +import cn.axzo.msg.center.service.enums.MqMessageType; import cn.axzo.msg.center.utils.PersonIdentityUtil; import cn.azxo.framework.common.utils.LogUtil; import cn.azxo.framework.common.utils.LogUtil.ErrorLevel; diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java index 874f6098..1ffebe14 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/GeneralMessageController.java @@ -2,6 +2,7 @@ package cn.axzo.msg.center.message.controller; import cn.axzo.msg.center.api.request.CmsMsgQueryReq; import cn.axzo.msg.center.api.response.MessageNewRes; +import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; import cn.axzo.msg.center.message.service.GeneralMessageOldService; import cn.axzo.msg.center.message.service.GeneralMessageService; import cn.axzo.msg.center.service.dto.PersonDTO; @@ -26,8 +27,11 @@ import org.springframework.web.bind.annotation.RestController; @RequiredArgsConstructor public class GeneralMessageController implements GeneralMessageClient { + private static final Integer ZERO = 0; + private final GeneralMessageService generalMessageService; private final GeneralMessageOldService generalMessageOldService; + private final PendingMessageBizConfig cfg; @Override public CommonResponse batchSend(GeneralMessageSendRequest request) { @@ -44,12 +48,18 @@ public class GeneralMessageController implements GeneralMessageClient { @Override public CommonResponse countOldMsgUnreadWithIdentities( OldMsgStatWithMultiIdentifiesRequest request) { + if (cfg.isOldMsgOffline()) { + return CommonResponse.success(ZERO); + } return CommonResponse.success(generalMessageOldService .countUnreadWithIdentities(request.getPersonId(), request.getIdentities())); } @Override public CommonResponse> pageQueryOldMessage(CmsMsgQueryReq request) { + if (cfg.isOldMsgOffline()) { + return CommonResponse.success(Page.zero()); + } request.setLogRequest(true); return CommonResponse.success(generalMessageOldService.pageMsgInfo(request)); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java index a7bfc431..3ff27505 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/GeneralMessageServiceImpl.java @@ -10,12 +10,12 @@ import cn.axzo.msg.center.common.utils.PlaceholderResolver; import cn.axzo.msg.center.dal.GeneralMessageRecordDao; import cn.axzo.msg.center.domain.entity.GeneralMessageRecord; import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig; +import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO; import cn.axzo.msg.center.message.domain.dto.MessageTemplateRouterDTO; import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO; import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO; import cn.axzo.msg.center.message.service.GeneralMessageService; -import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService; import cn.axzo.msg.center.message.service.MessageTemplateNewService; import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache; import cn.axzo.msg.center.service.dto.IdentityDTO; @@ -57,6 +57,8 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class GeneralMessageServiceImpl implements GeneralMessageService { + private static final Integer ZERO = 0; + private static final PersonDTO SYSTEM_SENDER = PersonDTO.builder() .id(0L) .identity(IdentityDTO.builder().id(0L).type(IdentityTypeEnum.NOT_SUPPORT).build()) @@ -71,9 +73,9 @@ public class GeneralMessageServiceImpl implements GeneralMessageService { private final MessageSystemConfig messageSystemConfig; private final GeneralMessageRecordDao generalMessageRecordDao; private final MessageTemplateNewService messageTemplateNewService; - private final MessageSendTwiceRecordService messageSendTwiceRecordService; private final MessageRouterUtil messageRouterUtil; private final OldMsgStatCache oldMsgStatCache; + private final PendingMessageBizConfig cfg; @Override @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW) @@ -90,6 +92,13 @@ public class GeneralMessageServiceImpl implements GeneralMessageService { @Override public GeneralMessageOldDataStatisticResponse statisticOldData(GeneralMessageOldDataStatisticRequest request) { + if (cfg.isOldMsgOffline()) { + return GeneralMessageOldDataStatisticResponse.builder() + .unreadCount(ZERO) + .latestMsgSendTimestamp(null) + .latestMsgContent(null) + .build(); + } return oldMsgStatCache.getCacheResponseOrReload(request); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java index e5be678d..5a1bd800 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java @@ -15,11 +15,11 @@ import cn.axzo.msg.center.message.service.impl.GeneralMessageOldServiceImpl; import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse; import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo; import cn.axzo.msg.center.message.service.impl.person.PersonService; -import cn.axzo.msg.center.mq.MqMessageType; import cn.axzo.msg.center.notices.common.enums.ReturnCodeEnum; import cn.axzo.msg.center.notices.common.exception.BizException; import cn.axzo.msg.center.service.dto.IdentityDTO; import cn.axzo.msg.center.service.enums.MessageCategoryEnum; +import cn.axzo.msg.center.service.enums.MqMessageType; import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest; import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse; import cn.azxo.framework.common.model.Page; diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java index 63194ff2..a573cbe6 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/TodoManager.java @@ -22,11 +22,11 @@ import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaste import cn.axzo.msg.center.message.service.todo.manage.event.HandoverEvent; import cn.axzo.msg.center.message.service.todo.manage.event.NewTodoEvent; import cn.axzo.msg.center.mq.MqMessageRecord; -import cn.axzo.msg.center.mq.MqMessageType; import cn.axzo.msg.center.mq.MqProducer; import cn.axzo.msg.center.service.enums.BizCategoryEnum; import cn.axzo.msg.center.service.enums.IdentityTypeEnum; import cn.axzo.msg.center.service.enums.MessageCategoryEnum; +import cn.axzo.msg.center.service.enums.MqMessageType; import cn.axzo.msg.center.service.enums.PendingMessageStateEnum; import cn.axzo.msg.center.service.enums.TodoType; import cn.axzo.msg.center.service.enums.YesOrNo; @@ -49,6 +49,7 @@ import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapp import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Uninterruptibles; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -60,6 +61,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import java.util.ArrayList; import java.util.Collections; @@ -67,6 +69,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; @@ -90,19 +93,32 @@ public class TodoManager { private final PendingMessageBizConfig cfg; private final ApplicationContext applicationContext; private final TodoBroadcaster todoBroadcaster; + private final TransactionTemplate transactionTemplate; - @Transactional(rollbackFor = Exception.class) public List send(PendingMessagePushParam request) { TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize()); // 因为有外层事务, 所以这里可以直接调用 return send(ctx, request); } + public List send(TodoRequestContext ctx, PendingMessagePushParam request) { + // 10 seconds at most + for (int i = 0; i < 100; i++) { + try { + return transactionTemplate.execute(unused -> sendImpl(ctx, request)); + } catch (DuplicateKeyException e) { + log.info("Try to save todo but todo business already exists, request={}. " + + "retryCount={}", request, i, e); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + throw new ServiceException("服务器内部错误"); + } + /** * 发送待办或抄送 */ - @Transactional(rollbackFor = Exception.class) - public List send(TodoRequestContext ctx, PendingMessagePushParam request) { + private List sendImpl(TodoRequestContext ctx, PendingMessagePushParam request) { // 可能会存在重新查询模版的情况, 成本不高 MessageTemplateDTO template = messageTemplateNewService .queryEnableTemplateByCode(request.getTemplateCode()) @@ -120,25 +136,18 @@ public class TodoManager { request.setBizCode(UUIDUtil.uuidString()); else if (StringUtils.isBlank(request.getBizCode())) request.setBizCode(""); - Supplier todoBusinessQueryFun = () -> todoBusinessDao - .findByBiz(request.getTemplateCode(), request.getBizCode(), true) - .orElse(null); // 如果已经存在对应的待办业务, 就把待办追加到对应的待办业务上 // 流程会并发, 这里对业务进行加锁 - TodoBusiness business = todoBusinessQueryFun.get(); + TodoBusiness business = todoBusinessDao + .findByBiz(request.getTemplateCode(), request.getBizCode(), true) + .orElse(null); boolean businessCreated = false; TodoExt ext = new TodoExt(request, template); if (business == null) { business = todoRecordBuilder.buildBusiness(request, ext); business.getRecordExt().setGenBizCode(genBizCode); - try { - todoBusinessDao.save(business); - businessCreated = true; - } catch (DuplicateKeyException e) { - log.warn("Found duplicated business. Try to find saved one, request={}", request, e); - business = todoBusinessQueryFun.get(); - log.warn("Found duplicated business. Found business={}, request={}", business, request, e); - } + todoBusinessDao.save(business); + businessCreated = true; } //批量默认为false YesOrNo supportBatchProcess = request.getSupportBatchProcess() == null ? YesOrNo.NO : request.getSupportBatchProcess(); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/broadcast/TodoMqBroadcaster.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/broadcast/TodoMqBroadcaster.java index 9314782b..b2a58dce 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/broadcast/TodoMqBroadcaster.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/manage/broadcast/TodoMqBroadcaster.java @@ -9,8 +9,8 @@ import cn.axzo.msg.center.domain.entity.Todo; import cn.axzo.msg.center.domain.entity.TodoBusiness; import cn.axzo.msg.center.message.service.todo.manage.TodoExt; import cn.axzo.msg.center.mq.MqMessageRecord; -import cn.axzo.msg.center.mq.MqMessageType; import cn.axzo.msg.center.mq.MqProducer; +import cn.axzo.msg.center.service.enums.MqMessageType; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -40,7 +40,6 @@ public class TodoMqBroadcaster { .builder(MqMessageType.TODO_STATE_UPDATE, message) .messageKey(todo.getId()) .shardingKey(todo.getTemplateCode()) - .tagInfo(todo.getTemplateCode()) .build()); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqMessageRecord.java b/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqMessageRecord.java index c9bc8438..64d3ac46 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqMessageRecord.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqMessageRecord.java @@ -1,6 +1,7 @@ package cn.axzo.msg.center.mq; import cn.axzo.msg.center.api.mq.MqMessage; +import cn.axzo.msg.center.service.enums.MqMessageType; import com.alibaba.fastjson.JSON; import lombok.Getter; diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqProducer.java b/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqProducer.java index a64c36b2..2e1e1533 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqProducer.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqProducer.java @@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event; import cn.axzo.framework.rocketmq.EventProducer; import cn.axzo.msg.center.api.mq.MqMessage; import cn.axzo.msg.center.common.utils.BizAssertions; +import cn.axzo.msg.center.service.enums.MqMessageType; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqMessageType.java b/msg-center-api/src/main/java/cn/axzo/msg/center/service/enums/MqMessageType.java similarity index 95% rename from inside-notices/src/main/java/cn/axzo/msg/center/mq/MqMessageType.java rename to msg-center-api/src/main/java/cn/axzo/msg/center/service/enums/MqMessageType.java index caccfaba..50ff64e7 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/mq/MqMessageType.java +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/service/enums/MqMessageType.java @@ -1,4 +1,4 @@ -package cn.axzo.msg.center.mq; +package cn.axzo.msg.center.service.enums; import cn.axzo.framework.rocketmq.Event; import lombok.AccessLevel; diff --git a/msg-center-common/src/main/java/cn/axzo/msg/center/common/model/UserContext.java b/msg-center-common/src/main/java/cn/axzo/msg/center/common/model/UserContext.java deleted file mode 100644 index ddf6c3c6..00000000 --- a/msg-center-common/src/main/java/cn/axzo/msg/center/common/model/UserContext.java +++ /dev/null @@ -1,133 +0,0 @@ -package cn.axzo.msg.center.common.model; - -import cn.axzo.msg.center.common.manager.AcctPrincipalRes; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - -import java.io.Serializable; - -/** - * 用户环境 - * - * 这个类不建议加@Data annotation,有一些字段,可以不暴露出去。 - */ -@Getter -@Setter -@ToString -@EqualsAndHashCode -public class UserContext implements Serializable { - - public static final String MOCK = "Mock"; - private String mock; - - ///---注释开始---- - ///以下几个字段,不准备在这个context中提供,所以没有加getter、setter - /// @Getter(AccessLevel.NONE) 去掉生成getter方法 -/// @Setter(AccessLevel.NONE) 去掉生成setter方法 - //region need be deleted - - - /** - * 访问的端 若为proj,则说明是OMS跳转来项目端访问,此功能已废弃 - */ - @Deprecated - @ToString.Exclude - @EqualsAndHashCode.Exclude - private String visitTo; - - /** - * 当前工人状态 1.工人 2.班组长 - */ - @Deprecated - @ToString.Exclude - @EqualsAndHashCode.Exclude - private Integer workerIdentity; - - /** - * 分包机构 - */ - @Deprecated - @ToString.Exclude - @EqualsAndHashCode.Exclude - private Long agencyId; - - @Deprecated - @ToString.Exclude - @EqualsAndHashCode.Exclude - private Boolean isResRoleAdmin; - - //endregion - ///---注释结束--- - - - private AcctPrincipalRes acctPrincipalRes; - - - /** - * 账号ID - */ - private Long acctId; - - /** - * 用户ID - */ - private Long userId; - - /** - * 当前终端类型 - * - */ - private String terminal; - - /** - * 租户ID 不同端 - 租户ID不一样 - */ - private Long tenantId; - - /** - * 工作台ID,如果没有则传projectId - */ - private Long workspaceId; - - /** - * 用户级别 - * - */ - private String level; - - /** - * 系统类型 - */ - private String systemType; - - /** - * 系统版本 - */ - private String systemVersion; - - /** - * 设备唯一标识 - */ - private String deviceNo; - - /** - * 设备型号 - */ - private String deviceKind; - - /** - * 客户端软件版本 - */ - private String appVersion; - - /** - * 当前请求ip地址 - */ - private String ipAddress; - - public boolean isMock() { - return MOCK.equals(getMock()); - } -}