Merge branch 'refs/heads/master' into REQ-2723

This commit is contained in:
yanglin 2024-08-19 09:36:20 +08:00
commit c4ef37f291
12 changed files with 56 additions and 161 deletions

View File

@ -130,6 +130,9 @@ public class PendingMessageBizConfig {
@Getter @Getter
private int oldMsgStateDataBackendUpdateBatchSize = 100; private int oldMsgStateDataBackendUpdateBatchSize = 100;
@Getter
private boolean oldMsgOffline = true;
// !! 待办分类统计缓存 // !! 待办分类统计缓存
/** /**

View File

@ -33,13 +33,9 @@ import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.inside.notices.service.MessageRelationService; import cn.axzo.msg.center.inside.notices.service.MessageRelationService;
import cn.axzo.msg.center.inside.notices.service.MessageRouterService; import cn.axzo.msg.center.inside.notices.service.MessageRouterService;
import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService; import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService;
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
import cn.axzo.msg.center.message.service.PendingMessageNewService;
import cn.axzo.msg.center.mq.MqMessageRecord; import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer; import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.dto.PersonDTO; import cn.axzo.msg.center.service.enums.MqMessageType;
import cn.axzo.msg.center.service.enums.AppTerminalTypeEnum;
import cn.azxo.framework.common.model.Page; import cn.azxo.framework.common.model.Page;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;

View File

@ -50,10 +50,10 @@ import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq;
import cn.axzo.msg.center.inside.notices.event.SendMessageEvent; import cn.axzo.msg.center.inside.notices.event.SendMessageEvent;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService; import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.mq.MqMessageRecord; import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer; import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType; import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType;
import cn.axzo.msg.center.service.dto.IdentityDTO; import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.enums.MqMessageType;
import cn.axzo.msg.center.utils.PersonIdentityUtil; import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.azxo.framework.common.utils.LogUtil; import cn.azxo.framework.common.utils.LogUtil;
import cn.azxo.framework.common.utils.LogUtil.ErrorLevel; import cn.azxo.framework.common.utils.LogUtil.ErrorLevel;

View File

@ -2,6 +2,7 @@ package cn.axzo.msg.center.message.controller;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq; import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.response.MessageNewRes; import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.service.GeneralMessageOldService; import cn.axzo.msg.center.message.service.GeneralMessageOldService;
import cn.axzo.msg.center.message.service.GeneralMessageService; import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.service.dto.PersonDTO; import cn.axzo.msg.center.service.dto.PersonDTO;
@ -26,8 +27,11 @@ import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor @RequiredArgsConstructor
public class GeneralMessageController implements GeneralMessageClient { public class GeneralMessageController implements GeneralMessageClient {
private static final Integer ZERO = 0;
private final GeneralMessageService generalMessageService; private final GeneralMessageService generalMessageService;
private final GeneralMessageOldService generalMessageOldService; private final GeneralMessageOldService generalMessageOldService;
private final PendingMessageBizConfig cfg;
@Override @Override
public CommonResponse<Void> batchSend(GeneralMessageSendRequest request) { public CommonResponse<Void> batchSend(GeneralMessageSendRequest request) {
@ -44,12 +48,18 @@ public class GeneralMessageController implements GeneralMessageClient {
@Override @Override
public CommonResponse<Integer> countOldMsgUnreadWithIdentities( public CommonResponse<Integer> countOldMsgUnreadWithIdentities(
OldMsgStatWithMultiIdentifiesRequest request) { OldMsgStatWithMultiIdentifiesRequest request) {
if (cfg.isOldMsgOffline()) {
return CommonResponse.success(ZERO);
}
return CommonResponse.success(generalMessageOldService return CommonResponse.success(generalMessageOldService
.countUnreadWithIdentities(request.getPersonId(), request.getIdentities())); .countUnreadWithIdentities(request.getPersonId(), request.getIdentities()));
} }
@Override @Override
public CommonResponse<Page<MessageNewRes>> pageQueryOldMessage(CmsMsgQueryReq request) { public CommonResponse<Page<MessageNewRes>> pageQueryOldMessage(CmsMsgQueryReq request) {
if (cfg.isOldMsgOffline()) {
return CommonResponse.success(Page.zero());
}
request.setLogRequest(true); request.setLogRequest(true);
return CommonResponse.success(generalMessageOldService.pageMsgInfo(request)); return CommonResponse.success(generalMessageOldService.pageMsgInfo(request));
} }

View File

@ -10,12 +10,12 @@ import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.dal.GeneralMessageRecordDao; import cn.axzo.msg.center.dal.GeneralMessageRecordDao;
import cn.axzo.msg.center.domain.entity.GeneralMessageRecord; import cn.axzo.msg.center.domain.entity.GeneralMessageRecord;
import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig; import cn.axzo.msg.center.inside.notices.config.MessageSystemConfig;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO; import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
import cn.axzo.msg.center.message.domain.dto.MessageTemplateRouterDTO; import cn.axzo.msg.center.message.domain.dto.MessageTemplateRouterDTO;
import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO; import cn.axzo.msg.center.message.domain.dto.SendImMessageDTO;
import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO; import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.message.service.GeneralMessageService; import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.MessageTemplateNewService; import cn.axzo.msg.center.message.service.MessageTemplateNewService;
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache; import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache;
import cn.axzo.msg.center.service.dto.IdentityDTO; import cn.axzo.msg.center.service.dto.IdentityDTO;
@ -57,6 +57,8 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor @RequiredArgsConstructor
public class GeneralMessageServiceImpl implements GeneralMessageService { public class GeneralMessageServiceImpl implements GeneralMessageService {
private static final Integer ZERO = 0;
private static final PersonDTO SYSTEM_SENDER = PersonDTO.builder() private static final PersonDTO SYSTEM_SENDER = PersonDTO.builder()
.id(0L) .id(0L)
.identity(IdentityDTO.builder().id(0L).type(IdentityTypeEnum.NOT_SUPPORT).build()) .identity(IdentityDTO.builder().id(0L).type(IdentityTypeEnum.NOT_SUPPORT).build())
@ -71,9 +73,9 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
private final MessageSystemConfig messageSystemConfig; private final MessageSystemConfig messageSystemConfig;
private final GeneralMessageRecordDao generalMessageRecordDao; private final GeneralMessageRecordDao generalMessageRecordDao;
private final MessageTemplateNewService messageTemplateNewService; private final MessageTemplateNewService messageTemplateNewService;
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
private final MessageRouterUtil messageRouterUtil; private final MessageRouterUtil messageRouterUtil;
private final OldMsgStatCache oldMsgStatCache; private final OldMsgStatCache oldMsgStatCache;
private final PendingMessageBizConfig cfg;
@Override @Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW) @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@ -90,6 +92,13 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
@Override @Override
public GeneralMessageOldDataStatisticResponse statisticOldData(GeneralMessageOldDataStatisticRequest request) { public GeneralMessageOldDataStatisticResponse statisticOldData(GeneralMessageOldDataStatisticRequest request) {
if (cfg.isOldMsgOffline()) {
return GeneralMessageOldDataStatisticResponse.builder()
.unreadCount(ZERO)
.latestMsgSendTimestamp(null)
.latestMsgContent(null)
.build();
}
return oldMsgStatCache.getCacheResponseOrReload(request); return oldMsgStatCache.getCacheResponseOrReload(request);
} }

View File

@ -15,11 +15,11 @@ import cn.axzo.msg.center.message.service.impl.GeneralMessageOldServiceImpl;
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse; import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse;
import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo; import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo;
import cn.axzo.msg.center.message.service.impl.person.PersonService; import cn.axzo.msg.center.message.service.impl.person.PersonService;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.notices.common.enums.ReturnCodeEnum; import cn.axzo.msg.center.notices.common.enums.ReturnCodeEnum;
import cn.axzo.msg.center.notices.common.exception.BizException; import cn.axzo.msg.center.notices.common.exception.BizException;
import cn.axzo.msg.center.service.dto.IdentityDTO; import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.enums.MessageCategoryEnum; import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
import cn.axzo.msg.center.service.enums.MqMessageType;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest; import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse; import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.azxo.framework.common.model.Page; import cn.azxo.framework.common.model.Page;

View File

@ -22,11 +22,11 @@ import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaste
import cn.axzo.msg.center.message.service.todo.manage.event.HandoverEvent; import cn.axzo.msg.center.message.service.todo.manage.event.HandoverEvent;
import cn.axzo.msg.center.message.service.todo.manage.event.NewTodoEvent; import cn.axzo.msg.center.message.service.todo.manage.event.NewTodoEvent;
import cn.axzo.msg.center.mq.MqMessageRecord; import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer; import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.enums.BizCategoryEnum; import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum; import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.MessageCategoryEnum; import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
import cn.axzo.msg.center.service.enums.MqMessageType;
import cn.axzo.msg.center.service.enums.PendingMessageStateEnum; import cn.axzo.msg.center.service.enums.PendingMessageStateEnum;
import cn.axzo.msg.center.service.enums.TodoType; import cn.axzo.msg.center.service.enums.TodoType;
import cn.axzo.msg.center.service.enums.YesOrNo; import cn.axzo.msg.center.service.enums.YesOrNo;
@ -49,6 +49,7 @@ import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapp
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -60,6 +61,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -67,6 +69,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
@ -90,19 +93,32 @@ public class TodoManager {
private final PendingMessageBizConfig cfg; private final PendingMessageBizConfig cfg;
private final ApplicationContext applicationContext; private final ApplicationContext applicationContext;
private final TodoBroadcaster todoBroadcaster; private final TodoBroadcaster todoBroadcaster;
private final TransactionTemplate transactionTemplate;
@Transactional(rollbackFor = Exception.class)
public List<PushPendingMessageDTO> send(PendingMessagePushParam request) { public List<PushPendingMessageDTO> send(PendingMessagePushParam request) {
TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize()); TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize());
// 因为有外层事务, 所以这里可以直接调用 // 因为有外层事务, 所以这里可以直接调用
return send(ctx, request); return send(ctx, request);
} }
public List<PushPendingMessageDTO> send(TodoRequestContext ctx, PendingMessagePushParam request) {
// 10 seconds at most
for (int i = 0; i < 100; i++) {
try {
return transactionTemplate.execute(unused -> sendImpl(ctx, request));
} catch (DuplicateKeyException e) {
log.info("Try to save todo but todo business already exists, request={}. " +
"retryCount={}", request, i, e);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}
throw new ServiceException("服务器内部错误");
}
/** /**
* 发送待办或抄送 * 发送待办或抄送
*/ */
@Transactional(rollbackFor = Exception.class) private List<PushPendingMessageDTO> sendImpl(TodoRequestContext ctx, PendingMessagePushParam request) {
public List<PushPendingMessageDTO> send(TodoRequestContext ctx, PendingMessagePushParam request) {
// 可能会存在重新查询模版的情况, 成本不高 // 可能会存在重新查询模版的情况, 成本不高
MessageTemplateDTO template = messageTemplateNewService MessageTemplateDTO template = messageTemplateNewService
.queryEnableTemplateByCode(request.getTemplateCode()) .queryEnableTemplateByCode(request.getTemplateCode())
@ -120,25 +136,18 @@ public class TodoManager {
request.setBizCode(UUIDUtil.uuidString()); request.setBizCode(UUIDUtil.uuidString());
else if (StringUtils.isBlank(request.getBizCode())) else if (StringUtils.isBlank(request.getBizCode()))
request.setBizCode(""); request.setBizCode("");
Supplier<TodoBusiness> todoBusinessQueryFun = () -> todoBusinessDao
.findByBiz(request.getTemplateCode(), request.getBizCode(), true)
.orElse(null);
// 如果已经存在对应的待办业务, 就把待办追加到对应的待办业务上 // 如果已经存在对应的待办业务, 就把待办追加到对应的待办业务上
// 流程会并发, 这里对业务进行加锁 // 流程会并发, 这里对业务进行加锁
TodoBusiness business = todoBusinessQueryFun.get(); TodoBusiness business = todoBusinessDao
.findByBiz(request.getTemplateCode(), request.getBizCode(), true)
.orElse(null);
boolean businessCreated = false; boolean businessCreated = false;
TodoExt ext = new TodoExt(request, template); TodoExt ext = new TodoExt(request, template);
if (business == null) { if (business == null) {
business = todoRecordBuilder.buildBusiness(request, ext); business = todoRecordBuilder.buildBusiness(request, ext);
business.getRecordExt().setGenBizCode(genBizCode); business.getRecordExt().setGenBizCode(genBizCode);
try { todoBusinessDao.save(business);
todoBusinessDao.save(business); businessCreated = true;
businessCreated = true;
} catch (DuplicateKeyException e) {
log.warn("Found duplicated business. Try to find saved one, request={}", request, e);
business = todoBusinessQueryFun.get();
log.warn("Found duplicated business. Found business={}, request={}", business, request, e);
}
} }
//批量默认为false //批量默认为false
YesOrNo supportBatchProcess = request.getSupportBatchProcess() == null ? YesOrNo.NO : request.getSupportBatchProcess(); YesOrNo supportBatchProcess = request.getSupportBatchProcess() == null ? YesOrNo.NO : request.getSupportBatchProcess();

View File

@ -9,8 +9,8 @@ import cn.axzo.msg.center.domain.entity.Todo;
import cn.axzo.msg.center.domain.entity.TodoBusiness; import cn.axzo.msg.center.domain.entity.TodoBusiness;
import cn.axzo.msg.center.message.service.todo.manage.TodoExt; import cn.axzo.msg.center.message.service.todo.manage.TodoExt;
import cn.axzo.msg.center.mq.MqMessageRecord; import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer; import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.enums.MqMessageType;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -40,7 +40,6 @@ public class TodoMqBroadcaster {
.builder(MqMessageType.TODO_STATE_UPDATE, message) .builder(MqMessageType.TODO_STATE_UPDATE, message)
.messageKey(todo.getId()) .messageKey(todo.getId())
.shardingKey(todo.getTemplateCode()) .shardingKey(todo.getTemplateCode())
.tagInfo(todo.getTemplateCode())
.build()); .build());
} }

View File

@ -1,6 +1,7 @@
package cn.axzo.msg.center.mq; package cn.axzo.msg.center.mq;
import cn.axzo.msg.center.api.mq.MqMessage; import cn.axzo.msg.center.api.mq.MqMessage;
import cn.axzo.msg.center.service.enums.MqMessageType;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.Getter; import lombok.Getter;

View File

@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventProducer; import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.msg.center.api.mq.MqMessage; import cn.axzo.msg.center.api.mq.MqMessage;
import cn.axzo.msg.center.common.utils.BizAssertions; import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.service.enums.MqMessageType;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View File

@ -1,4 +1,4 @@
package cn.axzo.msg.center.mq; package cn.axzo.msg.center.service.enums;
import cn.axzo.framework.rocketmq.Event; import cn.axzo.framework.rocketmq.Event;
import lombok.AccessLevel; import lombok.AccessLevel;

View File

@ -1,133 +0,0 @@
package cn.axzo.msg.center.common.model;
import cn.axzo.msg.center.common.manager.AcctPrincipalRes;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
/**
* 用户环境
*
* 这个类不建议加@Data annotation有一些字段可以不暴露出去
*/
@Getter
@Setter
@ToString
@EqualsAndHashCode
public class UserContext implements Serializable {
public static final String MOCK = "Mock";
private String mock;
///---注释开始----
///以下几个字段不准备在这个context中提供所以没有加gettersetter
/// @Getter(AccessLevel.NONE) 去掉生成getter方法
/// @Setter(AccessLevel.NONE) 去掉生成setter方法
//region need be deleted
/**
* 访问的端 若为proj则说明是OMS跳转来项目端访问此功能已废弃
*/
@Deprecated
@ToString.Exclude
@EqualsAndHashCode.Exclude
private String visitTo;
/**
* 当前工人状态 1.工人 2.班组长
*/
@Deprecated
@ToString.Exclude
@EqualsAndHashCode.Exclude
private Integer workerIdentity;
/**
* 分包机构
*/
@Deprecated
@ToString.Exclude
@EqualsAndHashCode.Exclude
private Long agencyId;
@Deprecated
@ToString.Exclude
@EqualsAndHashCode.Exclude
private Boolean isResRoleAdmin;
//endregion
///---注释结束---
private AcctPrincipalRes acctPrincipalRes;
/**
* 账号ID
*/
private Long acctId;
/**
* 用户ID
*/
private Long userId;
/**
* 当前终端类型
*
*/
private String terminal;
/**
* 租户ID 不同端 - 租户ID不一样
*/
private Long tenantId;
/**
* 工作台ID如果没有则传projectId
*/
private Long workspaceId;
/**
* 用户级别
*
*/
private String level;
/**
* 系统类型
*/
private String systemType;
/**
* 系统版本
*/
private String systemVersion;
/**
* 设备唯一标识
*/
private String deviceNo;
/**
* 设备型号
*/
private String deviceKind;
/**
* 客户端软件版本
*/
private String appVersion;
/**
* 当前请求ip地址
*/
private String ipAddress;
public boolean isMock() {
return MOCK.equals(getMock());
}
}