Merge remote-tracking branch 'refs/remotes/origin/master' into REQ-2324

# Conflicts:
#	inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/TodoRangeQueryService.java
This commit is contained in:
yangqicheng 2024-05-22 16:45:08 +08:00
commit ce69cb1f99
54 changed files with 1899 additions and 88 deletions

View File

@ -21,6 +21,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>

View File

@ -110,6 +110,43 @@ public class PendingMessageBizConfig {
return dontConcatRouterParamsTemplateCodes == null || !dontConcatRouterParamsTemplateCodes.contains(templateCode);
}
// !! msg-center MQ消息自产自消
@Getter
private long msgCenterMqSelfConsumeMaxExecMs = 20000L;
// !! 老消息缓存
/**
* 是否开启老消息走否走缓存
*/
@Getter
private boolean oldMsgStatCacheOn = true;
@Getter
private int oldMsgStatCacheDataExpireHours = 16;
@Getter
private int oldMsgStateDataBackendUpdateBatchSize = 100;
// !! 待办分类统计缓存
/**
* 待办分类统计是否走缓存
*/
@Getter
private boolean nodeStatCacheOn = true;
@Getter
private int nodeStatCacheMaxRequestNodeCodeSize = 10;
@Getter
private int nodeStatCacheDataExpireHours = 8;
public boolean determineOldMsgStatCacheOn() {
return isOldMsgStatCacheOn();
}
public boolean hasMessageDetailStyle(String code) {
return name2DetailStyle != null && name2DetailStyle.containsKey(code);
}

View File

@ -3,6 +3,7 @@ package cn.axzo.msg.center.inside.notices.service.impl;
import cn.axzo.core.domain.PageResult;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.request.MessageModuleStatisticReq;
import cn.axzo.msg.center.api.request.MessagePageQueryReq;
@ -34,6 +35,9 @@ import cn.axzo.msg.center.inside.notices.service.MessageRouterService;
import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService;
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
import cn.axzo.msg.center.message.service.PendingMessageNewService;
import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.AppTerminalTypeEnum;
import cn.azxo.framework.common.model.Page;
@ -99,7 +103,7 @@ public class MessageCoreServiceImpl implements MessageCoreService {
@Resource
private RawMessageRecordService rawMessageRecordService;
@Resource
private PendingMessageNewService pendingMessageNewService;
private MqProducer mqProducer;
public MsgRouteTypeEnum getSystemType(String systemType) {
/*String systemType = ContextInfoHolder.get().getSystemAndDeviceInfo().getSystemType();*/
@ -355,14 +359,19 @@ public class MessageCoreServiceImpl implements MessageCoreService {
// 阅读单条数据
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
request.getMsgId());
return;
}
} else {
List<Long> moduleIds = messageModuleService.listByModuleName(request.getModuleName()).stream()
.map(MessageModule::getId)
.collect(Collectors.toList());
rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ,
MsgTypeEnum.GENERAL_MESSAGE, null, moduleIds);
}
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
mqMessage.setReadRequest(request);
mqProducer.send(MqMessageRecord
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
.build());
}
@Override
public Page<MessageDetailRes> pageQueryMessageRecords(MessagePageQueryReq request) {

View File

@ -7,6 +7,7 @@ import cn.axzo.msg.center.api.enums.MsgRecordTerminalTypeEnum;
import cn.axzo.msg.center.api.enums.MsgStateEnum;
import cn.axzo.msg.center.api.enums.MsgTypeEnum;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
@ -49,6 +50,9 @@ import cn.axzo.msg.center.domain.request.InsideCmsReadMsgReq;
import cn.axzo.msg.center.inside.notices.event.SendMessageEvent;
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
import cn.axzo.msg.center.mq.MqMessageRecord;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.mq.MqProducer;
import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
@ -115,6 +119,8 @@ public class MessageRecordServiceImpl implements MessageRecordService {
private MessageModuleDao messageModuleDao;
@Resource
private MessageRouterDao messageRouterDao;
@Resource
private MqProducer mqProducer;
/*@Resource
private IdentityProfileService identityProfileService;*/
@ -179,6 +185,11 @@ public class MessageRecordServiceImpl implements MessageRecordService {
// 不影响原有消息通道
pushMessages.addAll(messageRecords);
});
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
mqMessage.setSendRequest(message);
mqProducer.send(MqMessageRecord
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
.build());
if(pushAthena) {
asyncPushAthena(message, messageTemplate.getAudioFileName(), messageModule.getModuleName(), pushMessages);
}
@ -432,6 +443,11 @@ public class MessageRecordServiceImpl implements MessageRecordService {
} else {
messageRecordDao.readMsg(request, req.getPersonId(), req.getIdentityId());
}
SendMessageRecordMessage mqMessage = new SendMessageRecordMessage();
mqMessage.setCmsReadRequest(req);
mqProducer.send(MqMessageRecord
.builder(MqMessageType.OLD_MSG_SEND, mqMessage)
.build());
}
/**

View File

@ -69,7 +69,6 @@ public class RawMessageRecordServiceImpl implements RawMessageRecordService {
messageRecordDao.lambdaUpdate()
.set(MessageRecord::getState, tgtState)
.eq(MessageRecord::getPersonId, personId)
.in(MessageRecord::getState, srcStates)
.eq(MessageRecord::getId, msgId)
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())

View File

@ -4,6 +4,9 @@ import cn.hutool.http.HttpUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
/**
* api文档: <a href="https://developer.umeng.com/docs/67966/detail/68343">api文档</a>
*/
@Slf4j
public class PushClient {

View File

@ -50,6 +50,7 @@ public class GeneralMessageController implements GeneralMessageClient {
@Override
public CommonResponse<Page<MessageNewRes>> pageQueryOldMessage(CmsMsgQueryReq request) {
request.setLogRequest(true);
return CommonResponse.success(generalMessageOldService.pageMsgInfo(request));
}

View File

@ -9,10 +9,14 @@ import cn.axzo.msg.center.inside.notices.service.impl.TodoSearchService;
import cn.axzo.msg.center.inside.notices.service.impl.v3.MessageRecordServiceV3;
import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
import cn.axzo.msg.center.message.service.group.GroupTemplateService;
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache;
import cn.axzo.msg.center.message.service.todo.manage.TodoManager;
import cn.axzo.msg.center.message.xxl.MigrateOldMsgHotDataJob;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.cloud.nacos.parser.NacosDataParserHandler;
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@ -42,7 +46,9 @@ public class PrivateMessageController {
private final TodoSearchService todoSearchService;
private final TodoManager todoManager;
private final GroupTemplateService groupTemplateService;
private final OldMsgStatCache oldMsgStatCache;
private final NacosConfigManager nacosConfigManager;
private final MigrateOldMsgHotDataJob migrateOldMsgHotDataJob;
@PostMapping("/sendPendingMessage")
public Object sendPendingMessage(@RequestBody @Valid PendingMessagePushParam request) {
@ -74,6 +80,11 @@ public class PrivateMessageController {
return groupTemplateService.collectTemplateCodes(groupNodeCode);
}
@PostMapping("/reloadOldMsgStat")
public Object reloadOldMsgStat(@RequestParam("personId") Long personId) throws Exception {
return oldMsgStatCache.reloadBackground(Sets.newHashSet(personId));
}
@PostMapping("/getPersonIdByPhone")
public Object getPersonIdByPhone(@RequestParam("phones") String phones) {
List<String> phonesStr = Splitter.on(",")
@ -107,4 +118,12 @@ public class PrivateMessageController {
.findFirst()
.orElse(null);
}
@PostMapping("/migrateOldMsgHotData")
public Object migrateOldMsgHotData(
@RequestBody(required = false) MigrateOldMsgHotDataJob.Param param) {
migrateOldMsgHotDataJob.tryExecute(param, true);
return CommonResponse.success("submitted");
}
}

View File

@ -54,6 +54,10 @@ public class MessageGroupNodeStatisticParam implements Serializable {
*/
private List<Long> workspaceIds;
public Long getPersonId() {
return getOperator().getId();
}
public static MessageGroupNodeStatisticParam from(PendingMessageStatisticRequest request) {
MessageGroupNodeStatisticParam param = BeanConverter.convert(request, MessageGroupNodeStatisticParam.class);
IdentityDTO identity = IdentityDTO.builder()

View File

@ -52,10 +52,8 @@ public class GroupTemplateService {
return new RootNodeWrapper(TreeBuilder.build(nodes, true));
}
public List<ValueNode<NodeWrapper>> getTodoGroups(
AppTerminalTypeEnum terminal, boolean leafOnly) {
Set<Long> configuredIds = new HashSet<>(
cfg.fetchMessageGroupTreeNodeIds(terminal));
public List<ValueNode<NodeWrapper>> getTodoGroups(AppTerminalTypeEnum terminal) {
Set<Long> configuredIds = new HashSet<>(cfg.fetchMessageGroupTreeNodeIds(terminal));
ArrayList<ValueNode<NodeWrapper>> configuredNodes = new ArrayList<>();
getGroupRoot().unwrap().walkDown(node -> {
//noinspection unchecked
@ -69,10 +67,12 @@ public class GroupTemplateService {
configuredNodes.add(valueNode);
return true;
});
if (!leafOnly)
return configuredNodes;
}
public List<ValueNode<NodeWrapper>> collectLeafNodes(List<ValueNode<NodeWrapper>> nodes) {
List<ValueNode<NodeWrapper>> leafNodes = new ArrayList<>();
for (ValueNode<NodeWrapper> configuredNode : configuredNodes) {
for (ValueNode<NodeWrapper> configuredNode : nodes) {
configuredNode.walkDown(node -> {
//noinspection unchecked
ValueNode<NodeWrapper> valueNode = node.tryCast(ValueNode.class);

View File

@ -16,8 +16,11 @@ import cn.axzo.msg.center.inside.notices.service.MessageModuleService;
import cn.axzo.msg.center.inside.notices.service.MessageRelationService;
import cn.axzo.msg.center.message.service.GeneralMessageOldService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.axzo.msg.center.utils.PersonIdentityUtil;
import cn.azxo.framework.common.model.Page;
import com.alibaba.fastjson.JSON;
@ -26,7 +29,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@ -45,6 +47,7 @@ public class GeneralMessageOldServiceImpl implements GeneralMessageOldService {
private final MessageModuleService messageModuleService;
private final MessageRelationService messageRelationService;
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
private final OldMsgStatCache oldMsgStatCache;
@Override
public int countUnread(PersonDTO person) {
@ -58,16 +61,11 @@ public class GeneralMessageOldServiceImpl implements GeneralMessageOldService {
public int countUnreadWithIdentities(Long personId, List<IdentityDTO> identities) {
log.info("GeneralMessageOldServiceImpl#countUnreadWithIdentities. personId={}, identifies={}",
personId, JSON.toJSONString(identities));
if (CollectionUtils.isEmpty(identities)) {
return 0;
}
List<Long> sendTwiceMsgIds = messageSendTwiceRecordService.listByPerson(personId);
int count = 0;
for (IdentityDTO identity : identities) {
PersonDTO person = PersonDTO.from(personId, identity.getId(), identity.getType());
count += countUnread(person, sendTwiceMsgIds);
}
return count;
GeneralMessageOldDataStatisticRequest request = new GeneralMessageOldDataStatisticRequest();
request.setPersonId(personId);
request.setIdentities(identities);
GeneralMessageOldDataStatisticResponse statResp = oldMsgStatCache.getCacheResponseOrReload(request);
return statResp == null ? 0 : statResp.getUnreadCount();
}
public int countUnread(Long personId, List<IdentityDTO> identities, List<Long> excludeMsgIds) {
@ -104,6 +102,7 @@ public class GeneralMessageOldServiceImpl implements GeneralMessageOldService {
@Override
public Page<MessageNewRes> pageMsgInfo(CmsMsgQueryReq request) {
if (request.isLogRequest())
log.info("GeneralMessageOldServiceImpl#pageMsgInfo. request:{}", request);
if (CollectionUtils.isEmpty(request.determineIdentities())) {
return Page.zero();

View File

@ -5,8 +5,6 @@ import cn.axzo.im.center.api.feign.MessageApi;
import cn.axzo.im.center.api.vo.req.MessageInfo;
import cn.axzo.im.center.api.vo.resp.MessageDispatchResp;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.common.exception.ServiceException;
import cn.axzo.msg.center.common.utils.PlaceholderResolver;
import cn.axzo.msg.center.dal.GeneralMessageRecordDao;
@ -19,19 +17,18 @@ import cn.axzo.msg.center.message.domain.vo.GeneralMessagePushVO;
import cn.axzo.msg.center.message.service.GeneralMessageService;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCache;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.dto.MessageCardContentItemDTO;
import cn.axzo.msg.center.service.dto.PersonDTO;
import cn.axzo.msg.center.service.enums.GeneralMessageStateEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
import cn.axzo.msg.center.service.enums.PushTerminalEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.request.GeneralMessageSendRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.axzo.msg.center.utils.MessageRouterUtil;
import cn.axzo.msg.center.utils.UUIDUtil;
import cn.azxo.framework.common.model.Page;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
@ -48,7 +45,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@ -74,10 +70,10 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
private final MessageApi messageApi;
private final MessageSystemConfig messageSystemConfig;
private final GeneralMessageRecordDao generalMessageRecordDao;
private final GeneralMessageOldServiceImpl generalMessageOldService;
private final MessageTemplateNewService messageTemplateNewService;
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
private final MessageRouterUtil messageRouterUtil;
private final OldMsgStatCache oldMsgStatCache;
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@ -94,20 +90,7 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
@Override
public GeneralMessageOldDataStatisticResponse statisticOldData(GeneralMessageOldDataStatisticRequest request) {
// 查询双发的消息记录
List<Long> sendTwiceMsgIds = messageSendTwiceRecordService.listByPerson(request.getPersonId());
// 分页查询最新一条数据
Page<MessageNewRes> result = generalMessageOldService.pageMsgInfo(build(request, sendTwiceMsgIds));
// 统计旧的未读普通消息数量
int count = generalMessageOldService.countUnread(
request.getPersonId(), request.determineIdentities(), sendTwiceMsgIds);
// 编排组合成界面展示的数据结构
MessageNewRes msg = CollectionUtils.isNotEmpty(result.getList()) ? result.getList().get(0) : null;
return GeneralMessageOldDataStatisticResponse.builder()
.unreadCount(count)
.latestMsgSendTimestamp(Optional.ofNullable(msg).map(v -> v.getCreateAt().getTime()).orElse(null))
.latestMsgContent(Optional.ofNullable(msg).map(MessageNewRes::getContent).orElse(null))
.build();
return oldMsgStatCache.getCacheResponseOrReload(request);
}
private List<GeneralMessageRecord> buildMessageRecord(GeneralMessageSendRequest request, MessageTemplateDTO template) {
@ -208,16 +191,4 @@ public class GeneralMessageServiceImpl implements GeneralMessageService {
return PlaceholderResolver.getDefaultResolver().resolveByMap(string, params);
}
private CmsMsgQueryReq build(GeneralMessageOldDataStatisticRequest request, List<Long> excludeMsgIds) {
CmsMsgQueryReq req = new CmsMsgQueryReq();
req.setMsgType(MessageCategoryEnum.GENERAL_MESSAGE.getCode());
// 这里查询消息中心全部状态的数据
req.setMsgStatus(0);
req.setPage(1L);
req.setPageSize(1L);
req.setPersonId(request.getPersonId());
req.setIdentities(request.determineIdentities());
req.setExcludeMsgIds(excludeMsgIds);
return req;
}
}

View File

@ -0,0 +1,32 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import com.alibaba.fastjson.JSON;
import lombok.Data;
/**
* @author yanglin
*/
@Data
public class OldMsgCacheReloadJobParam {
private static final int DEFAULT_SYNC_DAY = 30;
private int syncDay;
private int batchSize;
static OldMsgCacheReloadJobParam defaultParam() {
OldMsgCacheReloadJobParam defaultParam = new OldMsgCacheReloadJobParam();
defaultParam.setSyncDay(DEFAULT_SYNC_DAY);
return defaultParam;
}
int determineSyncDay() {
return syncDay <= 0 ? DEFAULT_SYNC_DAY : syncDay;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,52 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.dto.basic.IdentityProfileDto;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import com.google.common.collect.ImmutableSet;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* @author yanglin
*/
@Component
@RequiredArgsConstructor
public class OldMsgIdentifyFilter {
private final UserProfileServiceApi userProfileServiceApi;
public OldMsgIdentifyInfo getIdentifyInfo(Long personId, boolean asManager) {
Set<IdentityTypeEnum> identifiesFilter;
// cms或者管理端
if (asManager) {
identifiesFilter = ImmutableSet.of(IdentityTypeEnum.PRACTITIONER, IdentityTypeEnum.WORKER_LEADER);
}
// 工人端
else {
identifiesFilter = ImmutableSet.of(IdentityTypeEnum.WORKER);
}
List<IdentityProfileDto> profiles = BizAssertions.assertResponse(userProfileServiceApi
.getPersonIdentityProfile(personId), "获取身份信息失败, personId={}", personId);
ArrayList<IdentityDTO> identities = new ArrayList<>();
for (IdentityProfileDto profile : profiles) {
IdentityTypeEnum identityType = IdentityTypeEnum
.codeOf(profile.getIdentityType().getCode())
.orElse(null);
if (identityType == null || !identifiesFilter.contains(identityType)) {
continue;
}
IdentityDTO identify = new IdentityDTO();
identify.setId(profile.getId());
identify.setType(identityType);
identities.add(identify);
}
return new OldMsgIdentifyInfo(personId, identities);
}
}

View File

@ -0,0 +1,23 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.List;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
public class OldMsgIdentifyInfo {
private final Long personId;
private final List<IdentityDTO> identities;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,354 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.EventConsumer;
import cn.axzo.framework.rocketmq.EventHandler;
import cn.axzo.msg.center.api.mq.SendMessageRecordMessage;
import cn.axzo.msg.center.api.request.CmsMsgQueryReq;
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.api.request.MessageReadAllReq;
import cn.axzo.msg.center.api.response.MessageNewRes;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService;
import cn.axzo.msg.center.message.service.impl.GeneralMessageOldServiceImpl;
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse;
import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo;
import cn.axzo.msg.center.message.service.impl.person.PersonService;
import cn.axzo.msg.center.mq.MqMessageType;
import cn.axzo.msg.center.notices.common.enums.ReturnCodeEnum;
import cn.axzo.msg.center.notices.common.exception.BizException;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import cn.azxo.framework.common.model.Page;
import cn.hutool.core.date.StopWatch;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class OldMsgStatCache implements EventHandler, InitializingBean {
private final ApplicationContext applicationContext;
private final MessageSendTwiceRecordService messageSendTwiceRecordService;
private final OldMsgIdentifyFilter oldMsgIdentifyFilter;
private final StringRedisTemplate stringRedisTemplate;
private final PendingMessageBizConfig cfg;
private final EventConsumer eventConsumer;
private final PersonService personService;
private final ForkJoinPool foregroundExecutor = new ForkJoinPool(15);
private final ForkJoinPool backgroundExecutor = new ForkJoinPool(10);
public GeneralMessageOldDataStatisticResponse getCacheResponseOrReload(GeneralMessageOldDataStatisticRequest request) {
if (!cfg.determineOldMsgStatCacheOn()) {
return statisticOldDataImpl(request);
}
GeneralMessageOldDataStatisticResponse resp = getCacheResponse(request);
if (resp != null) {
resp.setReloadForeground(false);
return resp;
}
GeneralMessageOldDataStatisticResponse response = reloadForeground(request);
response.setReloadForeground(true);
return response;
}
private GeneralMessageOldDataStatisticResponse getCacheResponse(GeneralMessageOldDataStatisticRequest request) {
OldMsgStatCacheValue cacheValue = getCacheValue(request.getPersonId());
if (cacheValue == null) {
log.info("oldMsgStat: cache miss, need to reload in foreground. request={}", request);
return null;
}
GeneralMessageOldDataStatisticResponse response = cacheValue
.findResponse(request.getIdentities())
.orElse(null);
if (response != null)
return response;
// 身份发生了变化, 而且异步刷新不及时
log.info("oldMsgStat: identities cache miss, need to reload in foreground. request={}", request);
return null;
}
private GeneralMessageOldDataStatisticResponse reloadForeground(GeneralMessageOldDataStatisticRequest request) {
log.info("oldMsgStat: reload cache for request. request={}", request);
StopWatch stopWatch = new StopWatch(String.format(
"%s-reloadForeground", getClass().getSimpleName()));
stopWatch.start("initPersonCacheValueForeground");
OldMsgStatCacheValue cacheValue = initPersonCacheValueForeground(request.getPersonId());
stopWatch.stop();
GeneralMessageOldDataStatisticResponse response = cacheValue
.findResponse(request.getIdentities())
.orElse(null);
// 边界
if (response == null) {
log.info("oldMsgStat: identities cache missing? reload stat for request identities. request={}", request);
stopWatch.start("initPersonCacheValueForeground - miss again?");
IdentityResponse newIdentityResponse = getIdentityResponse(request.getPersonId(), request.getIdentities());
stopWatch.stop();
cacheValue.addIdentityResponse(newIdentityResponse);
}
setCacheValue(request.getPersonId(), cacheValue);
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
return cacheValue
.findResponse(request.getIdentities())
// should never happen
.orElse(null);
}
public Map<Long, OldMsgStatCacheValue> reloadBackground(Set<Long> personIds) throws Exception {
log.info("oldMsgStat: reload cache for person in background. personIds={}", JSON.toJSONString(personIds));
if (CollectionUtils.isEmpty(personIds)) {
log.info("oldMsgStat: personIds is empty");
return Collections.emptyMap();
}
HashMap<Long, OldMsgStatCacheValue> personId2CacheValue = new HashMap<>();
// 温柔点, 别一次性全丢到线程池
int bachSize = cfg.getOldMsgStateDataBackendUpdateBatchSize();
List<List<Long>> batches = Lists.partition(new ArrayList<>(personIds), bachSize);
for (List<Long> batch : batches) {
ArrayList<CompletableFuture<PersonAndCacheValue>> futures = new ArrayList<>();
for (Long personId : batch) {
futures.add(CompletableFuture.supplyAsync(() -> {
OldMsgStatCacheValue cacheValue = initPersonCacheValueBackground(personId);
setCacheValue(personId, cacheValue);
return new PersonAndCacheValue(personId, cacheValue);
}, backgroundExecutor));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<PersonAndCacheValue> future : futures) {
PersonAndCacheValue tmp = future.get();
personId2CacheValue.put(tmp.getPersonId(), tmp.getCacheValue());
}
}
return personId2CacheValue;
}
private OldMsgStatCacheValue initPersonCacheValueForeground(Long personId) {
log.info("initPersonCacheValueForeground. personId={}", personId);
Function<Boolean, IdentityResponse> buildResponseFun = asManager -> {
OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager);
return getIdentityResponse(personId, identifyInfo.getIdentities());
};
OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue();
CompletableFuture<IdentityResponse> asManagerFuture = CompletableFuture
.supplyAsync(() -> buildResponseFun.apply(true), foregroundExecutor);
CompletableFuture<IdentityResponse> asWorkerFuture = CompletableFuture
.supplyAsync(() -> buildResponseFun.apply(false), foregroundExecutor);
CompletableFuture.allOf(asManagerFuture, asWorkerFuture).join();
try {
cacheValue.addIdentityResponse(asManagerFuture.get());
cacheValue.addIdentityResponse(asWorkerFuture.get());
} catch (Exception e) {
log.warn("oldMsgStat: fail to stat old msg. personId={}", personId, e);
throw new BizException(ReturnCodeEnum.SYSTEM_ERROR.getCode(), e);
}
return cacheValue;
}
private OldMsgStatCacheValue initPersonCacheValueBackground(Long personId) {
log.info("initPersonCacheValueBackground. personId={}", personId);
Function<Boolean, IdentityResponse> buildResponseFun = asManager -> {
OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager);
return getIdentityResponse(personId, identifyInfo.getIdentities());
};
OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue();
cacheValue.addIdentityResponse(buildResponseFun.apply(true));
cacheValue.addIdentityResponse(buildResponseFun.apply(false));
return cacheValue;
}
private IdentityResponse getIdentityResponse(Long personId, List<IdentityDTO> identities) {
GeneralMessageOldDataStatisticRequest request = new GeneralMessageOldDataStatisticRequest();
request.setPersonId(personId);
request.setIdentities(identities);
GeneralMessageOldDataStatisticResponse response = statisticOldDataImpl(request);
IdentityResponse identityResponse = new IdentityResponse();
identityResponse.setResponse(response);
identityResponse.setIdentities(new HashSet<>(identities));
return identityResponse;
}
// !! cache helper
private void setCacheValue(Long personId, OldMsgStatCacheValue cacheValue) {
cacheValue.clear();
log.info("set cache value for person. personId={}", personId);
long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getOldMsgStatCacheDataExpireHours());
// add some random delta seconds, range: 0-600 seconds
secondsDelta += new SecureRandom().nextInt(10 * 60);
String redisKey = buildCacheKey(personId);
String redisValue = JSON.toJSONString(cacheValue);
stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS);
}
private OldMsgStatCacheValue getCacheValue(Long personId) {
String redisKey = buildCacheKey(personId);
String json = stringRedisTemplate.opsForValue().get(redisKey);
if (StringUtils.isBlank(json)) {
return null;
}
try {
return JSON.parseObject(json, OldMsgStatCacheValue.class);
} catch (Exception e) {
// 人为改了缓存, 而且没有改对
log.warn("fail to parse cache value, will reload. personId={}, cacheValue={}", personId, json, e);
return null;
}
}
private static String buildCacheKey(Long personId) {
return String.format("msg-center:old-msg-stat:v1:%d", personId);
}
// !! 以下为原有的统计逻辑
private GeneralMessageOldDataStatisticResponse statisticOldDataImpl(GeneralMessageOldDataStatisticRequest request) {
if (request.determineIdentities().isEmpty()) {
return GeneralMessageOldDataStatisticResponse.builder()
.unreadCount(0)
.latestMsgSendTimestamp(null)
.latestMsgContent(null)
.build();
}
// 查询双发的消息记录
List<Long> sendTwiceMsgIds = messageSendTwiceRecordService.listByPerson(request.getPersonId());
// 打破循环依赖, 避免大量改造 (反正快下线了)
GeneralMessageOldServiceImpl generalMessageOldService = applicationContext
.getBean(GeneralMessageOldServiceImpl.class);
// 分页查询最新一条数据
Page<MessageNewRes> result = generalMessageOldService.pageMsgInfo(build(request, sendTwiceMsgIds));
// 统计旧的未读普通消息数量
int count = generalMessageOldService.countUnread(
request.getPersonId(), request.determineIdentities(), sendTwiceMsgIds);
// 编排组合成界面展示的数据结构
MessageNewRes msg = CollectionUtils.isNotEmpty(result.getList()) ? result.getList().get(0) : null;
return GeneralMessageOldDataStatisticResponse.builder()
.unreadCount(count)
.latestMsgSendTimestamp(Optional.ofNullable(msg).map(v -> v.getCreateAt().getTime()).orElse(null))
.latestMsgContent(Optional.ofNullable(msg).map(MessageNewRes::getContent).orElse(null))
.build();
}
private CmsMsgQueryReq build(GeneralMessageOldDataStatisticRequest request, List<Long> excludeMsgIds) {
CmsMsgQueryReq req = new CmsMsgQueryReq();
req.setMsgType(MessageCategoryEnum.GENERAL_MESSAGE.getCode());
// 这里查询消息中心全部状态的数据
req.setMsgStatus(0);
req.setPage(1L);
req.setPageSize(1L);
req.setPersonId(request.getPersonId());
req.setIdentities(request.determineIdentities());
req.setExcludeMsgIds(excludeMsgIds);
req.setLogRequest(false);
return req;
}
// !! 处理mq消息, 异步更新缓存
@Override
public void afterPropertiesSet() {
eventConsumer.registerHandler(MqMessageType.OLD_MSG_SEND.getEventCode(), this);
}
@Override
public void onEvent(Event event, EventConsumer.Context context) {
log.info("oldMsgStat: start - handle mq event, event={}", JSON.toJSONString(event));
try {
Thread.sleep(500);
long start = System.currentTimeMillis();
handleMqMessage(event);
long end = System.currentTimeMillis();
log.info("oldMsgStat: end - handle mq event, used={}ms, event={}", end - start, JSON.toJSONString(event));
} catch (Exception e) {
log.warn("oldMsgStat: error - handle mq event, event={}", JSON.toJSONString(event));
}
}
private void handleMqMessage(Event event) throws Exception {
SendMessageRecordMessage mqMessage = event.normalizedData(SendMessageRecordMessage.class);
GeneralMessageReq sendRequest = mqMessage.getSendRequest();
MessageReadAllReq readRequest = mqMessage.getReadRequest();
CmsReadMsgReq cmsReadRequest = mqMessage.getCmsReadRequest();
if (sendRequest != null) {
log.info("handle mq event - driven by sendRequest={}", sendRequest);
handleSendRequestEvent(sendRequest);
} else if (readRequest != null) {
log.info("handle mq event - driven by readRequest={}", readRequest);
handleReadRequest(readRequest);
} else if (cmsReadRequest != null) {
log.info("handle mq event - driven by cmsReadRequest={}", cmsReadRequest);
handleCmsReadRequest(cmsReadRequest);
}
}
private void handleSendRequestEvent(GeneralMessageReq sendRequest) throws Exception {
Map<Long, Long> knownIdentityId2PersonId = sendRequest.getToldIdPersonIdMap();
if (knownIdentityId2PersonId == null)
knownIdentityId2PersonId = new HashMap<>();
HashSet<Long> personIds = new HashSet<>(knownIdentityId2PersonId.values());
Set<Long> receiverIdentityIds = sendRequest.getToId();
if (receiverIdentityIds == null)
receiverIdentityIds = Collections.emptySet();
HashSet<Long> toQueryPersonIdIdentityIds = new HashSet<>();
for (Long identityId : receiverIdentityIds) {
if (!knownIdentityId2PersonId.containsKey(identityId))
toQueryPersonIdIdentityIds.add(identityId);
}
if (!toQueryPersonIdIdentityIds.isEmpty()) {
PersonIdInfo personIdInfo = personService.getPersonIdsByIdentities(toQueryPersonIdIdentityIds);
personIds.addAll(personIdInfo.getPersonIds());
}
StopWatch stopWatch = new StopWatch("reloadBackground");
stopWatch.start("handleSendRequestEvent");
reloadBackground(personIds);
stopWatch.stop();
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
private void handleReadRequest(MessageReadAllReq readRequest) throws Exception {
StopWatch stopWatch = new StopWatch("reloadBackground");
stopWatch.start("handleReadRequest");
reloadBackground(Sets.newHashSet(readRequest.getPersonId()));
stopWatch.stop();
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
private void handleCmsReadRequest(CmsReadMsgReq cmsReadRequest) throws Exception {
StopWatch stopWatch = new StopWatch("reloadBackground");
stopWatch.start("handleCmsReadRequest");
reloadBackground(Sets.newHashSet(cmsReadRequest.getPersonId()));
stopWatch.stop();
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
}

View File

@ -0,0 +1,100 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.message.service.impl.person.PersonService;
import cn.hutool.core.date.StopWatch;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
import static java.util.stream.Collectors.toSet;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class OldMsgStatCacheReloadJob extends IJobHandler {
private final MessageRecordMapper messageRecordMapper;
private final PersonService personService;
private final OldMsgStatCache oldMsgStatCache;
@Override
@XxlJob("oldMsgStatCacheReloadJob")
public ReturnT<String> execute(String param) throws Exception {
try {
log.info("start - running job using param: {}", param);
executeImpl(param);
log.info("end - running job using param: {}", param);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.warn("run job error, param={}", param, e);
return ReturnT.FAIL;
}
}
private void executeImpl(String param) throws Exception {
OldMsgCacheReloadJobParam jobParam = StringUtils.isBlank(param)
? OldMsgCacheReloadJobParam.defaultParam()
: JSON.parseObject(param, OldMsgCacheReloadJobParam.class);
Date sinceDate = DateTime.now().minusDays(jobParam.determineSyncDay()).toDate();
StopWatch stopWatch = new StopWatch("reloadBackground driven by job");
Set<Long> reloadedPersonIds = new HashSet<>();
Iterable<Page<MessageRecord>> pages = messageRecordMapper
.iterateBath(query(MessageRecord.class)
.ge(MessageRecord::getCreateAt, sinceDate)
// 先刷最近的
.orderByDesc(MessageRecord::getId));
for (Page<MessageRecord> page : pages) {
stopWatch.start("reload stat, records size=" + page.getRecords().size());
reloadStat(reloadedPersonIds, page);
stopWatch.stop();
}
log.info("end sync old msg stat. reloaded personIds size={}", reloadedPersonIds.size());
log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
private void reloadStat(Set<Long> reloadedPersonIds, Page<MessageRecord> page) throws Exception {
log.info("start sync old msg stat. page size={}, page current={}, total pages={}",
page.getSize(), page.getCurrent(), page.getPages());
List<MessageRecord> records = page.getRecords();
Set<Long> maybeReloadPersonIds = new HashSet<>();
// 传了personId的
records.stream()
.map(MessageRecord::getPersonId)
.filter(personId -> personId != 0L)
.forEach(maybeReloadPersonIds::add);
// 没有传personId的
Set<Long> identityIds = records.stream()
.filter(record -> record.getPersonId() == 0L)
.map(MessageRecord::getToId)
.collect(toSet());
if (!identityIds.isEmpty()) {
List<Long> fetchedPersonIds = personService.getPersonIdsByIdentities(identityIds).getPersonIds();
maybeReloadPersonIds.addAll(fetchedPersonIds);
}
maybeReloadPersonIds.removeAll(reloadedPersonIds);
if (!maybeReloadPersonIds.isEmpty()) {
oldMsgStatCache.reloadBackground(maybeReloadPersonIds);
reloadedPersonIds.addAll(maybeReloadPersonIds);
}
}
}

View File

@ -0,0 +1,56 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
import lombok.Data;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* @author yanglin
*/
@Data
public class OldMsgStatCacheValue {
private List<IdentityResponse> identityResponses;
public Optional<GeneralMessageOldDataStatisticResponse> findResponse(Collection<IdentityDTO> identities) {
if (CollectionUtils.isEmpty(identityResponses))
return Optional.empty();
return identityResponses.stream()
.filter(response -> response.isIdentitiesMatch(identities))
.map(IdentityResponse::getResponse)
.findFirst();
}
public void addIdentityResponse(IdentityResponse identityResponse) {
if (identityResponses == null)
identityResponses = new ArrayList<>();
identityResponses.add(identityResponse);
}
void clear() {
if (identityResponses == null) return;
for (IdentityResponse identityResponse : identityResponses) {
if (identityResponse.response != null)
identityResponse.response.setReloadForeground(null);
}
}
@Data
public static class IdentityResponse {
private GeneralMessageOldDataStatisticResponse response;
private Set<IdentityDTO> identities;
public boolean isIdentitiesMatch(Collection<IdentityDTO> identities) {
// 比较时忽略顺序
return this.identities.equals(new HashSet<>(identities));
}
}
}

View File

@ -0,0 +1,20 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor
public class PersonAndCacheValue {
private final Long personId;
private final OldMsgStatCacheValue cacheValue;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,25 @@
package cn.axzo.msg.center.message.service.impl.person;
import cn.axzo.basics.profiles.dto.basic.IdentityProfileDto;
import lombok.RequiredArgsConstructor;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@RequiredArgsConstructor
public class PersonIdInfo {
private final List<IdentityProfileDto> identityProfiles;
public List<Long> getPersonIds() {
return identityProfiles.stream()
.map(i -> i.getPersonProfile().getId())
.distinct()
.collect(toList());
}
}

View File

@ -0,0 +1,47 @@
package cn.axzo.msg.center.message.service.impl.person;
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
import cn.axzo.basics.profiles.common.enums.IdentityType;
import cn.axzo.basics.profiles.dto.basic.IdentityProfileDto;
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
/**
* @author yanglin
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class PersonService {
private final UserProfileServiceApi userProfileServiceApi;
public PersonIdInfo getPersonIdsByIdentities(Collection<Long> identityIds) {
log.info("start - querying person ids by identity ids: {}", JSON.toJSONString(identityIds));
ArrayList<Long> uniqueIdentityIds = new ArrayList<>(new HashSet<>(identityIds));
CommonResponse<List<IdentityProfileDto>> resp = userProfileServiceApi.getIdentitiesByIdSet(
uniqueIdentityIds, IdentityType.NOT_SUPPORT.getCode());
log.info("end - querying person ids by identity ids, request={}, response={}",
JSON.toJSONString(identityIds), JSON.toJSONString(resp));
BizAssertions.assertResponse(resp);
return new PersonIdInfo(resp.getData());
}
public Long getPersonIdByPhone(String phone) {
CommonResponse<PersonProfileDto> profileResp = userProfileServiceApi
.getUnionPersonProfile(null, phone);
PersonProfileDto personProfile = BizAssertions.assertResponse(profileResp, "未找根据手机找到人员");
BizAssertions.assertNotNull(personProfile, "未找根据手机找到人员");
return personProfile.getId();
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.msg.center.message.service.todo;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.maokai.api.util.Ref;
import cn.axzo.maokai.api.vo.response.tree.ValueNode;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
@ -14,6 +15,7 @@ import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
import cn.axzo.msg.center.message.service.group.GroupTemplateService;
import cn.axzo.msg.center.message.service.group.NodeWrapper;
import cn.axzo.msg.center.message.service.impl.PendingMessageNewServiceImpl;
import cn.axzo.msg.center.message.service.todo.cache.NodeStatCache;
import cn.axzo.msg.center.message.service.todo.mybatis.CollectSQLInterceptor;
import cn.axzo.msg.center.message.service.todo.pagequery.PageQuerySort;
import cn.axzo.msg.center.message.service.todo.queryanalyze.SimpleAnalyzer;
@ -44,6 +46,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.function.Supplier;
@ -67,6 +71,8 @@ public class TodoRangeQueryService {
private final TodoRespBuilder todoRespBuilder;
private final AnalysisConfig analysisConfig;
private final GroupTemplateService groupTemplateService;
private final NodeStatCache nodeStatCache;
private final ForkJoinPool statExecutor = new ForkJoinPool(20);
// !! page query
@ -219,14 +225,17 @@ public class TodoRangeQueryService {
if (!request.determineGroupNodeCodes().isEmpty())
return groupTemplateService.collectTemplateCodes(request.determineGroupNodeCodes());
// 获取可见的模版时, 不用定位到叶子节点
List<ValueNode<NodeWrapper>> nodes = groupTemplateService
.getTodoGroups(request.getAppTerminalType(), false);
List<ValueNode<NodeWrapper>> nodes = groupTemplateService.getTodoGroups(request.getAppTerminalType());
return groupTemplateService.collectTemplateCodes(nodes);
}
// !! stat
public PendingMessageStatisticResponseV2 countStatGrouped(MessageGroupNodeStatisticParam request) {
return nodeStatCache.getCacheResponseOrReload(request, () -> countStatGroupedImpl(request));
}
private PendingMessageStatisticResponseV2 countStatGroupedImpl(MessageGroupNodeStatisticParam request) {
List<ValueNode<NodeWrapper>> nodes = determineStatNodes(request);
Function<ValueNode<NodeWrapper>, GroupStat> nodeStatFun = valueNode -> {
int executableCount = countStatByNode(request, valueNode, TodoType.EXECUTABLE);
@ -239,20 +248,44 @@ public class TodoRangeQueryService {
groupStat.setStat(new Stat(executableCount, copiedToMeCount));
return groupStat;
};
try {
PendingMessageStatisticResponseV2 resp = new PendingMessageStatisticResponseV2();
List<GroupStat> stats = nodes.parallelStream().map(nodeStatFun).collect(toList());
for (GroupStat stat : stats)
resp.addGroupStat(stat);
// 先提交任务
CompletableFuture<Integer> uniqueExecutableCountFuture = CompletableFuture.supplyAsync(
() -> countStatByNodes(request, nodes, TodoType.EXECUTABLE), statExecutor);
CompletableFuture<Integer> uniqueCopiedToMeCountFuture = CompletableFuture.supplyAsync(
() -> countStatByNodes(request, nodes, TodoType.COPIED_TO_ME), statExecutor);
// 用一个偏大的线程池, IO密集型任务, commonPool不够用
// 提交任务并获取结果
List<GroupStat> stats = statExecutor
.submit(() -> nodes
.parallelStream()
.map(nodeStatFun)
.collect(toList()))
.get();
resp.addGroupStats(stats);
// 一个端可能会有重复分类
resp.setTotalStat();
CompletableFuture.allOf(uniqueExecutableCountFuture, uniqueCopiedToMeCountFuture).join();
// 一个端不包含重复分类
int uniqueExecutableCount = uniqueExecutableCountFuture.get();
int uniqueCopiedToMeCount = uniqueCopiedToMeCountFuture.get();
resp.setUniqueTotalStat(new Stat(uniqueExecutableCount, uniqueCopiedToMeCount));
return resp;
} catch (Exception e) {
log.warn("分类统计异常, request={}", request, e);
throw new ServiceException("分类统计, 请稍后重试");
}
}
/**
* app首页
*/
public Integer countStat(MessageGroupNodeStatisticParam request) {
List<ValueNode<NodeWrapper>> nodes = determineStatNodes(request);
return countStatByNodes(request, nodes, TodoType.EXECUTABLE);
PendingMessageStatisticResponseV2 nodeStat = nodeStatCache
.getCacheResponseOrReload(request, () -> countStatGroupedImpl(request));
Stat totalStat = nodeStat.getUniqueTotalStat();
return totalStat == null ? 0 : totalStat.getPendingCount();
}
private int countStatByNode(MessageGroupNodeStatisticParam request,
@ -268,7 +301,7 @@ public class TodoRangeQueryService {
OuInfo ouInfo = OuInfo.create(request.getOuId(), request.getTerminalType(), todoType);
LambdaQueryWrapper<Todo> query = todoQuery(ouInfo)
.in(Todo::getTemplateCode, templateCodes)
.eq(Todo::getExecutorPersonId, request.getOperator().getId())
.eq(Todo::getExecutorPersonId, request.getPersonId())
.in(CollectionUtils.isNotEmpty(request.getWorkspaceIds()), Todo::getOrgId, request.getWorkspaceIds())
.and(todoType == TodoType.EXECUTABLE, nested -> nested
.eq(Todo::getType, TodoType.EXECUTABLE)
@ -280,10 +313,18 @@ public class TodoRangeQueryService {
}
public List<ValueNode<NodeWrapper>> determineStatNodes(MessageGroupNodeStatisticParam request) {
if (CollectionUtils.isNotEmpty(request.getGroupNodeCodes()))
return groupTemplateService.getGroupRoot().getNodes(request.getGroupNodeCodes());
Collection<String> nodeCodes = request.getGroupNodeCodes();
if (nodeCodes == null) {
nodeCodes = Collections.emptyList();
}
nodeCodes = nodeCodes.stream()
.filter(StringUtils::isNotBlank)
.collect(toList());
List<ValueNode<NodeWrapper>> nodes = CollectionUtils.isNotEmpty(nodeCodes)
? groupTemplateService.getGroupRoot().getNodes(nodeCodes)
: groupTemplateService.getTodoGroups(request.getTerminalType());
// 只返回给前端叶子节点的信息, 前端不需要层级信息
return groupTemplateService.getTodoGroups(request.getTerminalType(), true);
return groupTemplateService.collectLeafNodes(nodes);
}
// helper

View File

@ -0,0 +1,118 @@
package cn.axzo.msg.center.message.service.todo.cache;
import cn.axzo.msg.center.dal.mapper.MessageBaseTemplateMapper;
import cn.axzo.msg.center.dal.mapper.TodoMapper;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class NodeStatCache {
private final MessageBaseTemplateMapper messageBaseTemplateMapper;
private final TodoMapper todoMapper;
private final StringRedisTemplate stringRedisTemplate;
private final PendingMessageBizConfig cfg;
public PendingMessageStatisticResponseV2 getCacheResponseOrReload(
MessageGroupNodeStatisticParam request, Supplier<PendingMessageStatisticResponseV2> loader) {
if (!cfg.isNodeStatCacheOn()) {
return loader.get();
}
Collection<String> nodeCodes = request.getGroupNodeCodes();
if (CollectionUtils.isNotEmpty(nodeCodes) && nodeCodes.size() > cfg.getNodeStatCacheMaxRequestNodeCodeSize()) {
log.info("超过了允许缓存的最大请求节点编码数量,不使用缓存, 直接从数据库获取. request={}", request);
return loader.get();
}
Date templateLatestUpdateTime = messageBaseTemplateMapper.getTemplateLatestUpdateTime();
Date todoLatestUpdateTime = todoMapper.getTodoLatestUpdateTime(request.getPersonId());
NodeStatCacheValue cacheValue = getCacheValue(request);
boolean reloaded = false;
if (cacheValue == null || cacheValue.shouldReload(templateLatestUpdateTime, todoLatestUpdateTime)) {
log.info("reloading node stat data, request={}", request);
PendingMessageStatisticResponseV2 response = loader.get();
cacheValue = new NodeStatCacheValue();
cacheValue.setResponse(response);
cacheValue.setTemplateLatestUpdateTime(templateLatestUpdateTime);
cacheValue.setTodoLatestUpdateTime(todoLatestUpdateTime);
setCacheValue(request, cacheValue);
reloaded = true;
}
PendingMessageStatisticResponseV2 copy = new PendingMessageStatisticResponseV2();
BeanUtils.copyProperties(cacheValue.getResponse(), copy);
copy.setCacheReloaded(reloaded);
return copy;
}
// !! cache helper
private NodeStatCacheValue getCacheValue(MessageGroupNodeStatisticParam request) {
String redisKey = buildCacheKey(request);
String json = stringRedisTemplate.opsForValue().get(redisKey);
if (StringUtils.isBlank(json)) {
return null;
}
try {
return JSON.parseObject(json, NodeStatCacheValue.class);
} catch (Exception e) {
// 人为改了缓存, 而且没有改对
log.warn("fail to parse cache value, will reload. request={}, cacheValue={}", request, json, e);
return null;
}
}
private void setCacheValue(MessageGroupNodeStatisticParam request, NodeStatCacheValue cacheValue) {
log.info("set cache value for request. request={}", request);
cacheValue.getResponse().setCacheReloaded(null);
long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getNodeStatCacheDataExpireHours());
// add some random delta seconds, range: 0-600 seconds
secondsDelta += new SecureRandom().nextInt(10 * 60);
String redisKey = buildCacheKey(request);
String redisValue = JSON.toJSONString(cacheValue);
stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS);
}
private static String buildCacheKey(MessageGroupNodeStatisticParam request) {
StringBuffer buf = new StringBuffer();
buf.append("msg-center:node-stat:v2");
Consumer<Object> appender = value -> {
if (value != null)
buf.append(":").append(value);
};
appender.accept(request.getPersonId());
appender.accept(request.getOuId());
appender.accept(request.getTerminalType());
Collection<String> nodeCodes = request.getGroupNodeCodes();
if (CollectionUtils.isNotEmpty(nodeCodes)) {
// 排序避免编码顺序和重复编码的影响
nodeCodes.stream()
.filter(StringUtils::isNotBlank)
.distinct()
.sorted()
.forEach(appender);
}
return buf.toString();
}
}

View File

@ -0,0 +1,37 @@
package cn.axzo.msg.center.message.service.todo.cache;
import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2;
import cn.axzo.msg.center.utils.DateFormatUtil;
import lombok.Data;
import java.util.Date;
import java.util.Objects;
/**
* @author yanglin
*/
@Data
class NodeStatCacheValue {
private Date templateLatestUpdateTime;
private Date todoLatestUpdateTime;
private String templateLatestUpdateTimeStr;
private String todoLatestUpdateTimeStr;
private PendingMessageStatisticResponseV2 response;
boolean shouldReload(Date templateLatestUpdateTime, Date todoLatestUpdateTime) {
return !Objects.equals(this.templateLatestUpdateTime, templateLatestUpdateTime)
|| !Objects.equals(this.todoLatestUpdateTime, todoLatestUpdateTime);
}
public void setTemplateLatestUpdateTime(Date templateLatestUpdateTime) {
this.templateLatestUpdateTime = templateLatestUpdateTime;
this.templateLatestUpdateTimeStr = DateFormatUtil.toReadableString(templateLatestUpdateTime);
}
public void setTodoLatestUpdateTime(Date todoLatestUpdateTime) {
this.todoLatestUpdateTime = todoLatestUpdateTime;
this.todoLatestUpdateTimeStr = DateFormatUtil.toReadableString(todoLatestUpdateTime);
}
}

View File

@ -0,0 +1,183 @@
package cn.axzo.msg.center.message.service.todo.mybatis;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.visitor.SQLASTVisitor;
import lombok.RequiredArgsConstructor;
import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.ParameterMapping;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.mapping.SqlSource;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Signature;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.List;
/**
* @author yanglin
*/
@Intercepts({
@Signature(type = Executor.class, method = "update",
args = {MappedStatement.class, Object.class}),
@Signature(type = Executor.class, method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
@Signature(type = Executor.class, method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
})
public class ReplaceInterceptor implements Interceptor {
private static final ThreadLocal<Replacement> LOCAL = new ThreadLocal<>();
public static void enable(Replacement replacement) {
LOCAL.set(replacement);
}
public static void disable() {
LOCAL.remove();
}
@Override
public Object intercept(Invocation invocation) throws Throwable {
Replacement replacement = LOCAL.get();
if (replacement == null)
return invocation.proceed();
final Object[] args = invocation.getArgs();
MappedStatement ms = (MappedStatement) args[0];
SqlCommandType commandType = ms.getSqlCommandType();
BoundSql boundSql = ms.getBoundSql(args[1]);
SQLStatement stmt;
MySqlStatementParser parser = new MySqlStatementParser(boundSql.getSql());
if (commandType == SqlCommandType.UPDATE) {
stmt = parser.parseUpdateStatement();
} else if (commandType == SqlCommandType.SELECT) {
stmt = parser.parseSelect();
} else if (commandType == SqlCommandType.INSERT) {
stmt = parser.parseInsert();
} else if (commandType == SqlCommandType.DELETE) {
stmt = parser.parseDeleteStatement();
} else {
return invocation.proceed();
}
if (stmt instanceof MySqlInsertStatement && replacement.isInsertPreserveId()) {
maybeInsertPreserveId((MySqlInsertStatement) stmt, boundSql.getParameterObject());
}
stmt.accept(new ReplaceVisitor(replacement));
String newSql = stmt.toString();
args[0] = copyMappedStatement(ms, parameterObject -> new CustomBoundSql(
ms.getConfiguration(), newSql, boundSql.getParameterMappings(), parameterObject, boundSql));
return invocation.proceed();
}
private void maybeInsertPreserveId(MySqlInsertStatement stmt, Object parameterObject) {
if (stmt.getColumns().isEmpty() || stmt.getValues() == null) {
return;
}
List<SQLExpr> columns = stmt.getColumns();
String idColumn = columns.stream()
.filter(column -> column instanceof SQLIdentifierExpr)
.map(SQLIdentifierExpr.class::cast)
.map(SQLIdentifierExpr::getName)
.filter(name -> name.equals("id"))
.findFirst()
.orElse(null);
if (idColumn != null) return;
Number idValue = getIdValue(parameterObject);
if (idValue == null) return;
columns.add(new SQLIdentifierExpr("id"));
stmt.getValues().addValue(idValue.longValue());
}
private Number getIdValue(Object parameterObject) {
Field field = ReflectionUtils.findField(parameterObject.getClass(), "id");
if (field == null) return null;
field.setAccessible(true);
Object value = ReflectionUtils.getField(field, parameterObject);
return value instanceof Number ? (Number) value : null;
}
@RequiredArgsConstructor
private static class ReplaceVisitor implements SQLASTVisitor {
final Replacement replacement;
@Override
public boolean visit(SQLExprTableSource x) {
SQLExpr expr = x.getExpr();
if (expr instanceof SQLIdentifierExpr) {
String tableName = ((SQLIdentifierExpr) expr).getName();
if (tableName.equals(replacement.getSrcTable())) {
x.replace(expr, new SQLIdentifierExpr(replacement.getDestTable()));
}
}
return false;
}
}
private static class CustomBoundSql extends BoundSql {
final BoundSql delegate;
CustomBoundSql(Configuration configuration, String sql, List<ParameterMapping> parameterMappings,
Object parameterObject, BoundSql delegate) {
super(configuration, sql, parameterMappings, parameterObject);
this.delegate = delegate;
}
@Override
public boolean hasAdditionalParameter(String name) {
if (super.hasAdditionalParameter(name))
return true;
return delegate.hasAdditionalParameter(name);
}
@Override
public Object getAdditionalParameter(String name) {
Object parameter = super.getAdditionalParameter(name);
if (parameter != null)
return parameter;
return delegate.getAdditionalParameter(name);
}
}
private MappedStatement copyMappedStatement(MappedStatement ms, SqlSource sqlSource) {
MappedStatement.Builder builder =
new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), sqlSource, ms.getSqlCommandType());
builder.resource(ms.getResource());
builder.fetchSize(ms.getFetchSize());
builder.statementType(ms.getStatementType());
builder.keyGenerator(ms.getKeyGenerator());
if (ms.getKeyProperties() != null && ms.getKeyProperties().length != 0) {
StringBuilder keyProperties = new StringBuilder();
for (String keyProperty : ms.getKeyProperties()) {
keyProperties.append(keyProperty).append(",");
}
keyProperties.delete(keyProperties.length() - 1, keyProperties.length());
builder.keyProperty(keyProperties.toString());
}
builder.timeout(ms.getTimeout());
builder.parameterMap(ms.getParameterMap());
builder.resultMaps(ms.getResultMaps());
builder.resultSetType(ms.getResultSetType());
builder.cache(ms.getCache());
builder.flushCacheRequired(ms.isFlushCacheRequired());
builder.useCache(ms.isUseCache());
return builder.build();
}
}

View File

@ -0,0 +1,30 @@
package cn.axzo.msg.center.message.service.todo.mybatis;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author yanglin
*/
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum Replacement {
TO_MESSAGE_RECORD_COLD("message_record", "message_record_cold", true)
;
private final String srcTable;
private final String destTable;
private final boolean insertPreserveId;
public void run(Runnable runnable) {
ReplaceInterceptor.enable(this);
try {
runnable.run();
} finally {
ReplaceInterceptor.disable();
}
}
}

View File

@ -12,6 +12,11 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class SqlInterceptorConfig {
@Bean
ReplaceInterceptor replaceTableInterceptor() {
return new ReplaceInterceptor();
}
@Bean
CollectSQLInterceptor sqlCollectInterceptor() {
return new CollectSQLInterceptor();

View File

@ -0,0 +1,130 @@
package cn.axzo.msg.center.message.xxl;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.domain.persistence.BaseEntity;
import cn.axzo.msg.center.inside.notices.utils.FunctionalTransactionTemplate;
import cn.axzo.msg.center.message.service.todo.mybatis.ReplaceInterceptor;
import cn.axzo.msg.center.message.service.todo.mybatis.Replacement;
import cn.axzo.msg.center.utils.DateFormatUtil;
import cn.axzo.msg.center.utils.JSONObjectUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import com.taobao.api.internal.util.NamedThreadFactory;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.stereotype.Component;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MigrateOldMsgHotDataJob extends IJobHandler {
private final MessageRecordMapper messageRecordMapper;
private final FunctionalTransactionTemplate transactionTemplate;
private final ExecutorService executor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("migrate-old-msg-hot-data-job"));
private volatile boolean isRunning = false;
@Override
@XxlJob("migrateOldMsgHotDataJob")
public ReturnT<String> execute(String paramStr) throws Exception {
log.info("start - run job with param={}", paramStr);
try {
Param param = JSONObjectUtil.parseObject(paramStr, Param.class);
tryExecute(param, false);
log.info("end - run job with param={}", param);
return ReturnT.SUCCESS;
} catch (Exception e) {
log.warn("job failed. param={}", paramStr, e);
return ReturnT.FAIL;
}
}
public void tryExecute(@Nullable Param param, boolean async) {
synchronized (this) {
if (isRunning) throw new ServiceException("job is running, please wait");
isRunning = true;
}
Runnable task = () -> {
try {
executeImpl(param == null ? new Param() : param);
} finally {
isRunning = false;
}
};
if (async) {
executor.execute(task);
} else {
task.run();
}
}
private void executeImpl(Param param) {
Date until = DateTime.now().minusDays(param.daysAgo).toDate();
Iterable<Page<MessageRecord>> pages = messageRecordMapper
.iterateBathStickyPage(query(MessageRecord.class)
.le(MessageRecord::getCreateAt, until)
.orderByAsc(MessageRecord::getId));
log.info("migrating records before/equal date: {} and it's {} days ago",
DateFormatUtil.toReadableString(until), param.daysAgo);
int currentPage = 0, totalPage = 0, totalRecordCount = 0;
for (Page<MessageRecord> page : pages) {
++currentPage;
// 由于会删除数据, 总页数会变化, 所以只取第一次的总页数才是对的
if (totalPage == 0) totalPage = (int) page.getPages();
log.info("migrating page={}, total page={}", currentPage, totalPage);
for (List<MessageRecord> batch : Lists.partition(page.getRecords(), param.saveBatch)) {
totalRecordCount += batch.size();
transactionTemplate.exec(() -> migrateRecords(batch));
log.info("current total migrated record count={}", totalRecordCount);
}
}
log.info("final total migrated record count={}", totalRecordCount);
}
private void migrateRecords(List<MessageRecord> records) {
ReplaceInterceptor.enable(Replacement.TO_MESSAGE_RECORD_COLD);
try {
messageRecordMapper.batchInsertWithId(records);
} catch (Exception e) {
List<Long> ids = records.stream()
.map(BaseEntity::getId).collect(toList());
log.warn("migrate records failed, recordIds={}, records={}",
JSON.toJSONString(ids), JSON.toJSONString(records), e);
throw e;
} finally {
ReplaceInterceptor.disable();
}
List<Long> ids = records.stream()
.map(MessageRecord::getId)
.collect(toList());
messageRecordMapper.deleteBatchIds(ids);
}
@Data
public static class Param {
private int daysAgo = 180;
private int saveBatch = 200;
}
}

View File

@ -1,5 +1,6 @@
package cn.axzo.msg.center.mq;
import cn.axzo.framework.rocketmq.Event;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@ -12,9 +13,16 @@ import lombok.RequiredArgsConstructor;
public enum MqMessageType {
TODO_PRESET_BUTTON_PRESSED("msg-center-todo", "msg-center-todo-preset-button-pressed", "预设按钮被点击"),
OLD_MSG_SEND("msg-center-old-msg", "old-msg-send", "发送旧消息");
;
private final String model;
private final String tag;
private final String desc;
public Event.EventCode getEventCode() {
return Event.EventCode.builder()
.module(model)
.name(tag)
.build();
}
}

View File

@ -1,7 +1,6 @@
package cn.axzo.msg.center.mq;
import cn.axzo.framework.rocketmq.Event;
import cn.axzo.framework.rocketmq.Event.EventCode;
import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.msg.center.api.mq.MqMessage;
import cn.axzo.msg.center.common.utils.BizAssertions;
@ -25,12 +24,11 @@ public class MqProducer {
log.info("开始 - 发送消息. messageRecord={}", record);
try {
sendImpl(record);
log.info("结束 - 发送消息. messageRecord={}", record);
} catch (Exception e) {
log.warn("异常 - 发送消息. messageRecord={}", record, e);
// 由调用方决定怎么处理异常
throw e;
} finally {
log.info("结束 - 发送消息. messageRecord={}", record);
}
}
@ -40,7 +38,7 @@ public class MqProducer {
.shardingKey(record.getShardingKey())
.targetId(record.getTargetId())
.targetType(messageType.getModel())
.eventCode(new EventCode(messageType.getModel(), messageType.getTag()))
.eventCode(messageType.getEventCode())
.eventModule(messageType.getModel())
.eventName(messageType.getTag())
.operatorId(record.getOperatorId())

View File

@ -8,19 +8,28 @@ import cn.axzo.framework.rocketmq.EventProducer;
import cn.axzo.framework.rocketmq.EventProducer.Context;
import cn.axzo.framework.rocketmq.RocketMQEventProducer;
import cn.axzo.framework.rocketmq.RocketMQEventProducer.RocketMQMessageMeta;
import cn.axzo.framework.rocketmq.utils.TraceUtils;
import cn.axzo.msg.center.api.mq.MqMessage;
import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
/**
@ -77,6 +86,52 @@ public class RocketMQConfig {
}
}
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "topic_msg_center_${spring.profiles.active}",
consumerGroup = "GID_topic_msg_center_self_consume_${spring.profiles.active}",
consumeMode = ConsumeMode.ORDERLY,
nameServer = "${rocketmq.name-server}"
)
public static class MsgCenterListener extends BaseListener implements RocketMQListener<MessageExt> {
private final EventConsumer eventConsumer;
private final PendingMessageBizConfig cfg;
@Override
public void onMessage(MessageExt message) {
onEvent(message, eventConsumer);
}
/**
* 拷贝下来改一下, 设置最大处理时间
*/
public void onEvent(MessageExt message, EventConsumer eventConsumer) {
String topic = message.getTopic();
String value = new String(message.getBody());
Map<String, String> headers = message.getProperties();
MDC.put(TraceUtils.CTX_LOG_ID, headers.get(TraceUtils.TRACE_ID));
MDC.put(TraceUtils.TRACE_ID, headers.get(TraceUtils.TRACE_ID));
MDC.put(TraceUtils.TRACE_ID_IN_MDC, headers.get(TraceUtils.TRACE_ID));
if (log.isDebugEnabled()) {
log.debug("received message, topic={}, headers={}, value={}", topic, headers, value);
}
//当前消息所在分区的lag. 而不是整个topic
Long partitionLag = Optional.ofNullable(message.getProperties().get(MessageConst.PROPERTY_MAX_OFFSET))
.map(e -> Long.parseLong(e) - message.getQueueOffset()).orElse(0L);
eventConsumer.onEvent(value, EventConsumer.Context.builder()
.msgId(message.getMsgId())
.ext(ImmutableMap.of("topic", topic))
.headers(Maps.transformValues(headers, header -> Optional.ofNullable(header).map(String::getBytes).orElse(new byte[] {})))
.lagSupplier(() -> partitionLag)
.maxAllowElapsedMillis(cfg.getMsgCenterMqSelfConsumeMaxExecMs())
.build());
}
}
@Bean
EventHandlerRepository eventHandlerRepository() {
return new EventHandlerRepository((ex, logText) -> {

View File

@ -1,5 +1,6 @@
package cn.axzo.msg.center.utils;
import cn.axzo.msg.center.common.utils.ReflectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.AccessLevel;
@ -35,6 +36,19 @@ public final class JSONObjectUtil {
return JSON.parseObject(str);
}
/**
* 解析JSON字符串若字符串格式不正确抛异常
*
* @param str 待解析的字符串
* @return JSONObject
*/
public static <T> T parseObject(String str, Class<T> type) {
if (StringUtils.isBlank(str)) {
return ReflectionUtils.newInstance(type);
}
return JSON.parseObject(str, type);
}
/**
* 解析JSON字符串若字符串格式不正确抛异常
*

View File

@ -0,0 +1,20 @@
package cn.axzo.msg.center.api.mq;
import cn.axzo.msg.center.api.request.CmsReadMsgReq;
import cn.axzo.msg.center.api.request.GeneralMessageReq;
import cn.axzo.msg.center.api.request.MessageReadAllReq;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* @author yanglin
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class SendMessageRecordMessage extends MqMessage implements Serializable {
private GeneralMessageReq sendRequest;
private MessageReadAllReq readRequest;
private CmsReadMsgReq cmsReadRequest;
}

View File

@ -107,4 +107,5 @@ public class CmsMsgQueryReq extends PageRequest {
@Deprecated
private Long identityId;
private boolean logRequest = true;
}

View File

@ -1,6 +1,7 @@
package cn.axzo.msg.center.api.request;
import cn.axzo.msg.center.api.enums.BizTypeEnum;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import javax.validation.constraints.NotNull;
@ -29,4 +30,8 @@ public class CmsReadMsgReq {
*/
private transient BizTypeEnum bizTypeEnum = BizTypeEnum.CONSTRUCTION;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -2,7 +2,7 @@ package cn.axzo.msg.center.api.request;
import cn.axzo.msg.center.api.enums.MsgRecordTerminalTypeEnum;
import cn.axzo.msg.center.api.enums.ReceiveTypeEnum;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -78,5 +78,10 @@ public class GeneralMessageReq extends AbstractMessage implements Serializable {
* appClient: e.g. cmp cm
*/
public String appClient;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -40,6 +40,19 @@ public class IdentityDTO implements Serializable {
return Objects.nonNull(id) && Objects.nonNull(type);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IdentityDTO that = (IdentityDTO) o;
return Objects.equals(id, that.id) && type == that.type;
}
@Override
public int hashCode() {
return Objects.hash(id, type);
}
@Override
public String toString() {
return JSON.toJSONString(this);

View File

@ -38,6 +38,8 @@ public class GeneralMessageOldDataStatisticResponse implements Serializable {
*/
private String latestMsgContent;
private Boolean reloadForeground;
@Override
public String toString() {
return JSON.toJSONString(this);

View File

@ -28,15 +28,26 @@ public class PendingMessageStatisticResponseV2 implements Serializable {
private static final long serialVersionUID = 972200410809186003L;
/**
* 统计情况总数 (各分类相加)
* 统计总数 (各分类相加)
*/
private Stat totalStat;
/**
* 统计总数 (一个端分类配置重复了也没有关系)
*/
private Stat uniqueTotalStat;
/**
* 分类情况
*/
private List<GroupStat> groupStats = Collections.synchronizedList(new ArrayList<>());
private Boolean cacheReloaded;
public void addGroupStats(List<GroupStat> groupStats) {
this.groupStats.addAll(groupStats);
}
public void addGroupStat(GroupStat groupStat) {
this.groupStats.add(groupStat);
}

View File

@ -1,5 +1,7 @@
package cn.axzo.msg.center.common.utils;
import cn.axzo.basics.common.exception.ServiceException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
@ -15,6 +17,14 @@ import java.util.function.Predicate;
*/
public final class ReflectionUtils {
public static <T> T newInstance(Class<T> type) {
try {
return type.newInstance();
} catch (Exception e) {
throw new ServiceException(String.format("初始化失败. type=%s", type.getName()), e);
}
}
/**
* 获取所有field字段包含父类继承的

View File

@ -4,6 +4,9 @@ import cn.axzo.msg.center.domain.entity.MessageBaseTemplate;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import javax.annotation.Nullable;
import java.util.Date;
/**
* @description
* 消息基础模板Mapper
@ -13,4 +16,8 @@ import org.apache.ibatis.annotations.Mapper;
*/
@Mapper
public interface MessageBaseTemplateMapper extends BaseMapper<MessageBaseTemplate> {
@Nullable
Date getTemplateLatestUpdateTime();
}

View File

@ -9,8 +9,7 @@ import cn.axzo.msg.center.domain.dto.MsgStatisticsDTO;
import cn.axzo.msg.center.domain.dto.UpdateReadDTO;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType;
import cn.axzo.msg.center.service.dto.IdentityDTO;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import cn.axzo.msg.center.util.IterableMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Param;
@ -23,8 +22,11 @@ import java.util.List;
* @since 2022-03-28 14:59:16
*/
/*@Mapper*/
public interface MessageRecordMapper extends BaseMapper<MessageRecord>{
public interface MessageRecordMapper extends IterableMapper<MessageRecord> {
void batchInsertWithId(@Param("records") List<MessageRecord> records);
void deleteRecordsPhysically(@Param("ids") List<Long> ids);
List<MsgStatisticsDTO> statisticsMsg(@Param("bizType") Integer bizType,
@Param("personId") Long personId,

View File

@ -5,6 +5,7 @@ import cn.axzo.msg.center.domain.entity.Todo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.List;
@ -21,4 +22,7 @@ public interface TodoMapper extends BaseMapper<Todo> {
List<Long> getMigratedBusinessId();
@Nullable
Date getTodoLatestUpdateTime(@Param("executorPersonId") Long executorPersonId);
}

View File

@ -0,0 +1,47 @@
package cn.axzo.msg.center.util;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* @author yanglin
*/
public interface IterableMapper<T> extends BaseMapper<T> {
// 注意自己的分页插件有没有限制分页的最大条数
int DEFAULT_BATCH_SIZE = 2000;
default Iterable<Page<T>> iterateBath(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBath(this, DEFAULT_BATCH_SIZE, query);
}
default Iterable<Page<T>> iterateBath(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBath(this, pageSize, query);
}
default Iterable<T> iterateElement(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElement(this, DEFAULT_BATCH_SIZE, query);
}
default Iterable<T> iterateElement(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElement(this, pageSize, query);
}
default Iterable<Page<T>> iterateBathStickyPage(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBathStickyPage(this, DEFAULT_BATCH_SIZE, query);
}
default Iterable<Page<T>> iterateBathStickyPage(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateBathStickyPage(this, pageSize, query);
}
default Iterable<T> iterateElementStickyPage(LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElementStickyPage(this, DEFAULT_BATCH_SIZE, query);
}
default Iterable<T> iterateElementStickyPage(long pageSize, LambdaQueryWrapper<T> query) {
return RecordIterable.iterateElementStickyPage(this, pageSize, query);
}
}

View File

@ -0,0 +1,127 @@
package cn.axzo.msg.center.util;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.beans.BeanUtils;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* 不要直接使用这个类, {@link IterableMapper}
*
* @author yanglin
*/
class RecordIterable {
static <T, M extends BaseMapper<T>> Iterable<Page<T>> iterateBath(
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
return () -> new BatchIterator<>(
mapper, pageSize, query, PageStrategy.ADVANCE_CURRENT_PAGE);
}
static <T, M extends BaseMapper<T>> Iterable<T> iterateElement(
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
return () -> new ElementIterator<>(new BatchIterator<>(
mapper, pageSize, query, PageStrategy.ADVANCE_CURRENT_PAGE));
}
static <T, M extends BaseMapper<T>> Iterable<Page<T>> iterateBathStickyPage(
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
return () -> new BatchIterator<>(
mapper, pageSize, query, PageStrategy.STICKY_PAGE);
}
static <T, M extends BaseMapper<T>> Iterable<T> iterateElementStickyPage(
M mapper, long pageSize, LambdaQueryWrapper<T> query) {
return () -> new ElementIterator<>(new BatchIterator<>(
mapper, pageSize, query, PageStrategy.STICKY_PAGE));
}
private static class ElementIterator<T, M extends BaseMapper<T>> implements Iterator<T> {
private final BatchIterator<T, M> pageIterator;
private Iterator<T> elementIterator;
ElementIterator(BatchIterator<T, M> pageIterator) {
this.pageIterator = pageIterator;
}
@Override
public boolean hasNext() {
maybeAdvance();
return elementIterator.hasNext() || pageIterator.hasNext();
}
@Override
public T next() {
if (!hasNext())
throw new NoSuchElementException();
return elementIterator.next();
}
private void maybeAdvance() {
if (elementIterator == null || !elementIterator.hasNext() && pageIterator.hasNext()) {
elementIterator = pageIterator.hasNext()
? pageIterator.next().getRecords().iterator()
: Collections.emptyIterator();
}
}
}
private static class BatchIterator<T, M extends BaseMapper<T>> implements Iterator<Page<T>> {
private final M mapper;
private final LambdaQueryWrapper<T> query;
private final PageStrategy pageStrategy;
private final Page<T> page;
private boolean pageConsumed = false;
BatchIterator(M mapper, long pageSize, LambdaQueryWrapper<T> query, PageStrategy pageStrategy) {
this.mapper = mapper;
this.query = query;
this.pageStrategy = pageStrategy;
this.page = new Page<>();
this.page.setSize(pageSize);
this.page.setCurrent(0);
}
@Override
public boolean hasNext() {
maybeFetchNextPage();
// 1. 很明显的还有下一页
// 2. 看当前页的数据以及当前页的消费情况 (第一页和最后一页)
return page.getCurrent() < page.getPages() || (!pageConsumed && !page.getRecords().isEmpty());
}
@Override
public Page<T> next() {
if (!hasNext())
throw new NoSuchElementException();
pageConsumed = true;
Page<T> copy = new Page<>();
// 不能暴露内部状态. 如果外部改动了内部的page, 会影响到下次迭代
BeanUtils.copyProperties(page, copy);
return copy;
}
private void maybeFetchNextPage() {
// 还没有获取过数据或者当前页数据已经消费, 就获取新page数据
if (page.getCurrent() == 0 || (pageConsumed && page.hasNext())) {
if (pageStrategy == PageStrategy.ADVANCE_CURRENT_PAGE)
page.setCurrent(page.getCurrent() + 1);
else
page.setCurrent(1);
mapper.selectPage(page, query);
pageConsumed = false;
}
}
}
private enum PageStrategy {
ADVANCE_CURRENT_PAGE, STICKY_PAGE
}
}

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.axzo.msg.center.dal.mapper.MessageBaseTemplateMapper">
<select id="getTemplateLatestUpdateTime" resultType="date">
SELECT max(update_at) FROM message_base_template
</select>
</mapper>

View File

@ -2,6 +2,30 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.axzo.msg.center.dal.mapper.MessageRecordMapper">
<insert id="batchInsertWithId">
INSERT INTO message_record(
id, `from_id`, `to_id`, `person_id`, `type`, `relation_id`, `module_id`, `title`, `content`, `receive_type`, `terminal_type`,
`terminal_id`, `terminal_name`, `state`, `biz_id`, `router_params`, `extra`, `retry_count`, `callback_url`, `fail_cause`,
`event_source`, `tenant_id`, `is_delete`, `create_at`, `update_at`, `old_type_id`
)
VALUES
<foreach collection="records" item="i" separator=",">
(
#{i.id}, #{i.fromId}, #{i.toId}, #{i.personId}, COALESCE(#{i.type}, 0), #{i.relationId}, #{i.moduleId}, #{i.title}, #{i.content}, COALESCE(#{i.receiveType}, 0), COALESCE(#{i.terminalType}, 0),
#{i.terminalId}, #{i.terminalName}, COALESCE(#{i.state}, 0), #{i.bizId}, #{i.routerParams}, #{i.extra}, #{i.retryCount}, #{i.callbackUrl}, #{i.failCause},
#{i.eventSource}, #{i.tenantId}, #{i.isDelete}, #{i.createAt}, #{i.updateAt}, #{i.oldTypeId}
)
</foreach>
</insert>
<delete id="deleteRecordsPhysically">
DELETE FROM message_record
WHERE id IN
<foreach collection="ids" item="id" separator="," open="(" close=")">
#{id}
</foreach>
</delete>
<select id="statisticsMsg"
resultType="cn.axzo.msg.center.domain.dto.MsgStatisticsDTO">
select type,

View File

@ -16,5 +16,9 @@
AND record_ext -> '$.isMigratedFromPendingMessage' = true
</select>
<select id="getTodoLatestUpdateTime" resultType="date">
SELECT max(update_at) FROM todo WHERE executor_person_id = #{executorPersonId}
</select>
</mapper>

View File

@ -5,7 +5,7 @@
<include resource="logback/logback-axzo.xml" />
<!-- 覆盖开发环境日志配置 -->
<springProfile name="local,dev">
<logger name="cn.axzo" level="DEBUG" />
<logger name="cn.axzo" level="INFO" />
</springProfile>
<appender name="XXL-JOB" class="cn.axzo.msg.center.notices.client.util.XxlAppender">

View File

@ -194,9 +194,6 @@ public class SmsManager extends BaseManager implements SmsGateway {
request.setParams(param);
request.setRequestNo(UUID.randomUUID().toString());
request.setInternalObj(MnsType.VERIFY_CODE);
HashMap<String, Object> params = new HashMap<>();
params.put("code", code);
request.setParams(params);
messageService.sendMessage(request);
}
}

View File

@ -0,0 +1,50 @@
package cn.axzo.msg.center.dal.mapper;
import cn.axzo.msg.center.MsgCenterApplication;
import cn.axzo.msg.center.dal.MessageRecordDao;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.message.service.todo.mybatis.Replacement;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Iterator;
import java.util.List;
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@SpringBootTest(classes = MsgCenterApplication.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class MessageRecordMapperTest {
private final MessageRecordMapper messageRecordMapper;
private final MessageRecordDao messageRecordDao;
@Test
void tryMigrate() {
Iterable<Page<MessageRecord>> pages = messageRecordMapper
.iterateBath(10, query(MessageRecord.class)
.orderByAsc(MessageRecord::getId));
Iterator<Page<MessageRecord>> iterator = pages.iterator();
// 试试前5页数据测试一下
for (int i = 0; i < 5 && iterator.hasNext(); i++) {
Page<MessageRecord> page = iterator.next();
Replacement.TO_MESSAGE_RECORD_COLD.run(() -> {
List<MessageRecord> records = page.getRecords();
List<Long> ids = records.stream()
.map(MessageRecord::getId)
.collect(toList());
messageRecordMapper.deleteRecordsPhysically(ids);
messageRecordDao.saveBatch(records);
});
}
}
}

View File

@ -0,0 +1,24 @@
package cn.axzo.msg.center.message.service.impl.oldmsg;
import cn.axzo.msg.center.MsgCenterApplication;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author yanglin
*/
@SpringBootTest(classes = MsgCenterApplication.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class OldMsgStatCacheReloadJobTest {
private final OldMsgStatCacheReloadJob oldMsgStatCacheReloadJob;
@Test
void foo() throws Exception {
oldMsgStatCacheReloadJob.execute(null);
}
}

View File

@ -0,0 +1,24 @@
package cn.axzo.msg.center.message.xxl;
import cn.axzo.msg.center.MsgCenterApplication;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Commit;
/**
* @author yanglin
*/
@SpringBootTest(classes = MsgCenterApplication.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class MigrateOldMsgHotDataJobTest {
private final MigrateOldMsgHotDataJob migrateOldMsgHotDataJob;
@Test @Commit
void foo() throws Exception {
migrateOldMsgHotDataJob.execute("{ \"daysAgo\": 180, \"saveBatch\": 200 }\n");
}
}

View File

@ -0,0 +1,34 @@
package cn.axzo.msg.center.util;
import cn.axzo.msg.center.MsgCenterApplication;
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.inside.notices.utils.Queries;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static cn.axzo.msg.center.inside.notices.utils.Queries.query;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author yanglin
*/
@SpringBootTest(classes = MsgCenterApplication.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class RecordIterableTest {
private final MessageRecordMapper messageRecordMapper;
@Test
void foo() {
Iterable<MessageRecord> records = messageRecordMapper
.iterateElement(1, query(MessageRecord.class)
.eq(MessageRecord::getId, 3));
for (MessageRecord record : records) {
System.out.println("record=" + record);
}
}
}