diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java index b0912365..99eb9d74 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/config/PendingMessageBizConfig.java @@ -101,7 +101,7 @@ public class PendingMessageBizConfig { private boolean formatExecSQL; @Getter - private int oldMsgStatDataExpireHours = 6; + private int oldMsgStatCacheDataExpireHours = 8; @Getter private int oldMsgStateDataBackendUpdateBatchSize = 100; @@ -109,6 +109,12 @@ public class PendingMessageBizConfig { @Getter private long msgCenterMqSelfConsumeMaxExecMs = 20000L; + @Getter + private int nodeStatCacheMaxRequestNodeCodeSize = 10; + + @Getter + private int nodeStatCacheDataExpireHours = 8; + public boolean hasMessageDetailStyle(String code) { return name2DetailStyle != null && name2DetailStyle.containsKey(code); } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java index 51e75c4a..edfdef5f 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageCoreServiceImpl.java @@ -3,6 +3,7 @@ package cn.axzo.msg.center.inside.notices.service.impl; import cn.axzo.core.domain.PageResult; import cn.axzo.msg.center.api.enums.MsgStateEnum; import cn.axzo.msg.center.api.enums.MsgTypeEnum; +import cn.axzo.msg.center.api.mq.SendMessageRecordMessage; import cn.axzo.msg.center.api.request.CmsMsgQueryReq; import cn.axzo.msg.center.api.request.MessageModuleStatisticReq; import cn.axzo.msg.center.api.request.MessagePageQueryReq; @@ -34,6 +35,9 @@ import cn.axzo.msg.center.inside.notices.service.MessageRouterService; import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService; import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam; import cn.axzo.msg.center.message.service.PendingMessageNewService; +import cn.axzo.msg.center.mq.MqMessageRecord; +import cn.axzo.msg.center.mq.MqMessageType; +import cn.axzo.msg.center.mq.MqProducer; import cn.axzo.msg.center.service.dto.PersonDTO; import cn.axzo.msg.center.service.enums.AppTerminalTypeEnum; import cn.azxo.framework.common.model.Page; @@ -99,7 +103,7 @@ public class MessageCoreServiceImpl implements MessageCoreService { @Resource private RawMessageRecordService rawMessageRecordService; @Resource - private PendingMessageNewService pendingMessageNewService; + private MqProducer mqProducer; public MsgRouteTypeEnum getSystemType(String systemType) { /*String systemType = ContextInfoHolder.get().getSystemAndDeviceInfo().getSystemType();*/ @@ -355,13 +359,18 @@ public class MessageCoreServiceImpl implements MessageCoreService { // 阅读单条数据 rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ, request.getMsgId()); - return; + } else { + List moduleIds = messageModuleService.listByModuleName(request.getModuleName()).stream() + .map(MessageModule::getId) + .collect(Collectors.toList()); + rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ, + MsgTypeEnum.GENERAL_MESSAGE, null, moduleIds); } - List moduleIds = messageModuleService.listByModuleName(request.getModuleName()).stream() - .map(MessageModule::getId) - .collect(Collectors.toList()); - rawMessageRecordService.updatePersonMessageState(request.getPersonId(), srcStates, MsgStateEnum.HAVE_READ, - MsgTypeEnum.GENERAL_MESSAGE, null, moduleIds); + SendMessageRecordMessage mqMessage = new SendMessageRecordMessage(); + mqMessage.setReadRequest(request); + mqProducer.send(MqMessageRecord + .builder(MqMessageType.OLD_MSG_SEND, mqMessage) + .build()); } @Override diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java index 7e2996ae..aad42973 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/inside/notices/service/impl/MessageRecordServiceImpl.java @@ -186,7 +186,7 @@ public class MessageRecordServiceImpl implements MessageRecordService { pushMessages.addAll(messageRecords); }); SendMessageRecordMessage mqMessage = new SendMessageRecordMessage(); - mqMessage.setRequest(message); + mqMessage.setSendRequest(message); mqProducer.send(MqMessageRecord .builder(MqMessageType.OLD_MSG_SEND, mqMessage) .build()); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/domain/param/MessageGroupNodeStatisticParam.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/domain/param/MessageGroupNodeStatisticParam.java index b758ee96..17d2be37 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/domain/param/MessageGroupNodeStatisticParam.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/domain/param/MessageGroupNodeStatisticParam.java @@ -49,6 +49,10 @@ public class MessageGroupNodeStatisticParam implements Serializable { */ private Boolean withIdentify; + public Long getPersonId() { + return getOperator().getId(); + } + public static MessageGroupNodeStatisticParam from(PendingMessageStatisticRequest request) { MessageGroupNodeStatisticParam param = BeanConverter.convert(request, MessageGroupNodeStatisticParam.class); IdentityDTO identity = IdentityDTO.builder() diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgCacheReloadJobParam.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgCacheReloadJobParam.java index af5ac67f..0f49006c 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgCacheReloadJobParam.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgCacheReloadJobParam.java @@ -9,8 +9,7 @@ import lombok.Data; @Data public class OldMsgCacheReloadJobParam { - private static final int DEFAULT_SYNC_DAY = 60; - private static final int DEFAULT_BATCH_SIZE = 1000; + private static final int DEFAULT_SYNC_DAY = 30; private int syncDay; private int batchSize; @@ -18,7 +17,6 @@ public class OldMsgCacheReloadJobParam { static OldMsgCacheReloadJobParam defaultParam() { OldMsgCacheReloadJobParam defaultParam = new OldMsgCacheReloadJobParam(); defaultParam.setSyncDay(DEFAULT_SYNC_DAY); - defaultParam.setBatchSize(DEFAULT_BATCH_SIZE); return defaultParam; } @@ -26,10 +24,6 @@ public class OldMsgCacheReloadJobParam { return syncDay <= 0 ? DEFAULT_SYNC_DAY : syncDay; } - int determineBatchSize() { - return batchSize <= 0 ? DEFAULT_BATCH_SIZE : batchSize; - } - @Override public String toString() { return JSON.toJSONString(this); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java index 604dff9b..0a83bd5b 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCache.java @@ -6,11 +6,12 @@ import cn.axzo.framework.rocketmq.EventHandler; import cn.axzo.msg.center.api.mq.SendMessageRecordMessage; import cn.axzo.msg.center.api.request.CmsMsgQueryReq; import cn.axzo.msg.center.api.request.GeneralMessageReq; +import cn.axzo.msg.center.api.request.MessageReadAllReq; import cn.axzo.msg.center.api.response.MessageNewRes; import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; import cn.axzo.msg.center.message.service.MessageSendTwiceRecordService; import cn.axzo.msg.center.message.service.impl.GeneralMessageOldServiceImpl; -import cn.axzo.msg.center.message.service.impl.oldmsg.CacheValue.IdentityResponse; +import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse; import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo; import cn.axzo.msg.center.message.service.impl.person.PersonService; import cn.axzo.msg.center.mq.MqMessageType; @@ -24,6 +25,7 @@ import cn.azxo.framework.common.model.Page; import cn.hutool.core.date.StopWatch; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -62,7 +64,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { private final PendingMessageBizConfig cfg; private final EventConsumer eventConsumer; private final PersonService personService; - private final ForkJoinPool foregroundExecutor = new ForkJoinPool(20); + private final ForkJoinPool foregroundExecutor = new ForkJoinPool(15); private final ForkJoinPool backgroundExecutor = new ForkJoinPool(10); public GeneralMessageOldDataStatisticResponse getCacheResponseOrReload(GeneralMessageOldDataStatisticRequest request) { @@ -71,7 +73,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { } private GeneralMessageOldDataStatisticResponse getCacheResponse(GeneralMessageOldDataStatisticRequest request) { - CacheValue cacheValue = getCacheValue(request.getPersonId()); + OldMsgStatCacheValue cacheValue = getCacheValue(request.getPersonId()); if (cacheValue == null) { log.info("oldMsgStat: cache miss, need to reload in foreground. request={}", request); return null; @@ -91,7 +93,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { StopWatch stopWatch = new StopWatch(String.format( "%s-reloadForeground", getClass().getSimpleName())); stopWatch.start("initPersonCacheValueForeground"); - CacheValue cacheValue = initPersonCacheValueForeground(request.getPersonId()); + OldMsgStatCacheValue cacheValue = initPersonCacheValueForeground(request.getPersonId()); stopWatch.stop(); GeneralMessageOldDataStatisticResponse response = cacheValue .findResponse(request.getIdentities()) @@ -112,13 +114,13 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { .orElse(null); } - public Map reloadBackground(Set personIds) throws Exception { + public Map reloadBackground(Set personIds) throws Exception { log.info("oldMsgStat: reload cache for person in background. personIds={}", JSON.toJSONString(personIds)); if (CollectionUtils.isEmpty(personIds)) { log.info("oldMsgStat: personIds is empty"); return Collections.emptyMap(); } - HashMap personId2CacheValue = new HashMap<>(); + HashMap personId2CacheValue = new HashMap<>(); // 温柔点, 别一次性全丢到线程池 int bachSize = cfg.getOldMsgStateDataBackendUpdateBatchSize(); List> batches = Lists.partition(new ArrayList<>(personIds), bachSize); @@ -126,7 +128,7 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { ArrayList> futures = new ArrayList<>(); for (Long personId : batch) { futures.add(CompletableFuture.supplyAsync(() -> { - CacheValue cacheValue = initPersonCacheValueBackground(personId); + OldMsgStatCacheValue cacheValue = initPersonCacheValueBackground(personId); setCacheValue(personId, cacheValue); return new PersonAndCacheValue(personId, cacheValue); }, backgroundExecutor)); @@ -140,13 +142,13 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { return personId2CacheValue; } - private CacheValue initPersonCacheValueForeground(Long personId) { + private OldMsgStatCacheValue initPersonCacheValueForeground(Long personId) { log.info("initPersonCacheValueForeground. personId={}", personId); Function buildResponseFun = asManager -> { OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager); return getIdentityResponse(personId, identifyInfo.getIdentities()); }; - CacheValue cacheValue = new CacheValue(); + OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue(); CompletableFuture asManagerFuture = CompletableFuture .supplyAsync(() -> buildResponseFun.apply(true), foregroundExecutor); CompletableFuture asWorkerFuture = CompletableFuture @@ -162,13 +164,13 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { return cacheValue; } - private CacheValue initPersonCacheValueBackground(Long personId) { + private OldMsgStatCacheValue initPersonCacheValueBackground(Long personId) { log.info("initPersonCacheValueBackground. personId={}", personId); Function buildResponseFun = asManager -> { OldMsgIdentifyInfo identifyInfo = oldMsgIdentifyFilter.getIdentifyInfo(personId, asManager); return getIdentityResponse(personId, identifyInfo.getIdentities()); }; - CacheValue cacheValue = new CacheValue(); + OldMsgStatCacheValue cacheValue = new OldMsgStatCacheValue(); cacheValue.addIdentityResponse(buildResponseFun.apply(true)); cacheValue.addIdentityResponse(buildResponseFun.apply(false)); return cacheValue; @@ -187,10 +189,10 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { // !! cache helper - private void setCacheValue(Long personId, CacheValue cacheValue) { + private void setCacheValue(Long personId, OldMsgStatCacheValue cacheValue) { log.info("set cache value for person. personId={}", personId); - long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getOldMsgStatDataExpireHours()); - // with random seconds delta, 0-600 seconds + long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getOldMsgStatCacheDataExpireHours()); + // add some random delta seconds, range: 0-600 seconds secondsDelta += new SecureRandom().nextInt(10 * 60); String redisKey = buildCacheKey(personId); @@ -198,10 +200,10 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS); } - private CacheValue getCacheValue(Long personId) { + private OldMsgStatCacheValue getCacheValue(Long personId) { String redisKey = buildCacheKey(personId); String json = stringRedisTemplate.opsForValue().get(redisKey); - return StringUtils.isBlank(json) ? null : JSON.parseObject(json, CacheValue.class); + return StringUtils.isBlank(json) ? null : JSON.parseObject(json, OldMsgStatCacheValue.class); } private static String buildCacheKey(Long personId) { @@ -273,12 +275,23 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { private void handleMqMessage(Event event) throws Exception { SendMessageRecordMessage mqMessage = event.normalizedData(SendMessageRecordMessage.class); - GeneralMessageReq request = mqMessage.getRequest(); - Map knownIdentityId2PersonId = request.getToldIdPersonIdMap(); + GeneralMessageReq sendRequest = mqMessage.getSendRequest(); + MessageReadAllReq readRequest = mqMessage.getReadRequest(); + if (sendRequest != null) { + log.info("handle mq event - driven by sendRequest={}", sendRequest); + handleSendRequestEvent(sendRequest); + } else if (readRequest != null) { + log.info("handle mq event - driven by readRequest={}", readRequest); + handleReadRequest(readRequest); + } + } + + private void handleSendRequestEvent(GeneralMessageReq sendRequest) throws Exception { + Map knownIdentityId2PersonId = sendRequest.getToldIdPersonIdMap(); if (knownIdentityId2PersonId == null) knownIdentityId2PersonId = new HashMap<>(); HashSet personIds = new HashSet<>(knownIdentityId2PersonId.values()); - Set receiverIdentityIds = request.getToId(); + Set receiverIdentityIds = sendRequest.getToId(); if (receiverIdentityIds == null) receiverIdentityIds = Collections.emptySet(); HashSet toQueryPersonIdIdentityIds = new HashSet<>(); @@ -290,11 +303,19 @@ public class OldMsgStatCache implements EventHandler, InitializingBean { PersonIdInfo personIdInfo = personService.getPersonIdsByIdentities(toQueryPersonIdIdentityIds); personIds.addAll(personIdInfo.getPersonIds()); } - StopWatch stopWatch = new StopWatch("reloadBackground driven by mq event"); - stopWatch.start("reloadBackground"); + StopWatch stopWatch = new StopWatch("reloadBackground"); + stopWatch.start("handleSendRequestEvent"); reloadBackground(personIds); stopWatch.stop(); log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS)); } + private void handleReadRequest(MessageReadAllReq readRequest) throws Exception { + StopWatch stopWatch = new StopWatch("reloadBackground"); + stopWatch.start("handleReadRequest"); + reloadBackground(Sets.newHashSet(readRequest.getPersonId())); + stopWatch.stop(); + log.info(stopWatch.prettyPrint(TimeUnit.MILLISECONDS)); + } + } \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheReloadJob.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheReloadJob.java index 15c5a788..2c05989e 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheReloadJob.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheReloadJob.java @@ -57,10 +57,10 @@ public class OldMsgStatCacheReloadJob extends IJobHandler { Date sinceDate = DateTime.now().minusDays(jobParam.determineSyncDay()).toDate(); StopWatch stopWatch = new StopWatch("reloadBackground driven by job"); Set reloadedPersonIds = new HashSet<>(); - Iterable> pages = messageRecordMapper.iterateBath( - jobParam.determineBatchSize(), query(MessageRecord.class) + Iterable> pages = messageRecordMapper + .iterateBath(query(MessageRecord.class) .ge(MessageRecord::getCreateAt, sinceDate) - // 选刷最近的 + // 先刷最近的 .orderByDesc(MessageRecord::getId)); for (Page page : pages) { stopWatch.start("reload stat, records size=" + page.getRecords().size()); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/CacheValue.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheValue.java similarity index 97% rename from inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/CacheValue.java rename to inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheValue.java index 0894aab3..5900173d 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/CacheValue.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheValue.java @@ -16,7 +16,7 @@ import java.util.Set; * @author yanglin */ @Data -public class CacheValue { +public class OldMsgStatCacheValue { private List identityResponses; diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/PersonAndCacheValue.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/PersonAndCacheValue.java index 58a37244..5868a3d5 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/PersonAndCacheValue.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/oldmsg/PersonAndCacheValue.java @@ -11,7 +11,7 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public class PersonAndCacheValue { private final Long personId; - private final CacheValue cacheValue; + private final OldMsgStatCacheValue cacheValue; @Override public String toString() { diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/TodoRangeQueryService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/TodoRangeQueryService.java index 30ef31f4..5ff148bf 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/TodoRangeQueryService.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/TodoRangeQueryService.java @@ -1,5 +1,6 @@ package cn.axzo.msg.center.message.service.todo; +import cn.axzo.basics.common.exception.ServiceException; import cn.axzo.maokai.api.util.Ref; import cn.axzo.maokai.api.vo.response.tree.ValueNode; import cn.axzo.msg.center.common.enums.TableIsDeleteEnum; @@ -14,6 +15,7 @@ import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam; import cn.axzo.msg.center.message.service.group.GroupTemplateService; import cn.axzo.msg.center.message.service.group.NodeWrapper; import cn.axzo.msg.center.message.service.impl.PendingMessageNewServiceImpl; +import cn.axzo.msg.center.message.service.todo.cache.NodeStatCache; import cn.axzo.msg.center.message.service.todo.mybatis.CollectSQLInterceptor; import cn.axzo.msg.center.message.service.todo.pagequery.PageQuerySort; import cn.axzo.msg.center.message.service.todo.queryanalyze.SimpleAnalyzer; @@ -44,6 +46,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.ForkJoinPool; import java.util.function.Function; import java.util.function.Supplier; @@ -67,6 +70,8 @@ public class TodoRangeQueryService { private final TodoRespBuilder todoRespBuilder; private final AnalysisConfig analysisConfig; private final GroupTemplateService groupTemplateService; + private final NodeStatCache nodeStatCache; + private final ForkJoinPool statExecutor = new ForkJoinPool(20); // !! page query @@ -224,6 +229,10 @@ public class TodoRangeQueryService { // !! stat public PendingMessageStatisticResponseV2 countStatGrouped(MessageGroupNodeStatisticParam request) { + return nodeStatCache.getCacheResponseOrReload(request, () -> countStatGroupedImpl(request)); + } + + private PendingMessageStatisticResponseV2 countStatGroupedImpl(MessageGroupNodeStatisticParam request) { List> nodes = determineStatNodes(request); Function, GroupStat> nodeStatFun = valueNode -> { int executableCount = countStatByNode(request, valueNode, TodoType.EXECUTABLE); @@ -236,20 +245,30 @@ public class TodoRangeQueryService { groupStat.setStat(new Stat(executableCount, copiedToMeCount)); return groupStat; }; - PendingMessageStatisticResponseV2 resp = new PendingMessageStatisticResponseV2(); - List stats = nodes.parallelStream().map(nodeStatFun).collect(toList()); - for (GroupStat stat : stats) - resp.addGroupStat(stat); - resp.setTotalStat(); - return resp; + try { + PendingMessageStatisticResponseV2 resp = new PendingMessageStatisticResponseV2(); + // 用一个偏大的线程池, IO密集型任务, commonPool不够用 + List stats = statExecutor + .submit(() -> nodes.parallelStream().map(nodeStatFun).collect(toList())) + .get(); + for (GroupStat stat : stats) + resp.addGroupStat(stat); + resp.setTotalStat(); + return resp; + } catch (Exception e) { + log.warn("分类统计异常, request={}", request, e); + throw new ServiceException("分类统计, 请稍后重试"); + } } /** * app首页 */ public Integer countStat(MessageGroupNodeStatisticParam request) { - List> nodes = determineStatNodes(request); - return countStatByNodes(request, nodes, TodoType.EXECUTABLE); + PendingMessageStatisticResponseV2 nodeStat = nodeStatCache + .getCacheResponseOrReload(request, () -> countStatGroupedImpl(request)); + Stat totalStat = nodeStat.getTotalStat(); + return totalStat == null ? 0 : totalStat.getPendingCount(); } private int countStatByNode(MessageGroupNodeStatisticParam request, @@ -265,7 +284,7 @@ public class TodoRangeQueryService { OuInfo ouInfo = OuInfo.create(request.getOuId(), request.getTerminalType(), todoType); LambdaQueryWrapper query = todoQuery(ouInfo) .in(Todo::getTemplateCode, templateCodes) - .eq(Todo::getExecutorPersonId, request.getOperator().getId()) + .eq(Todo::getExecutorPersonId, request.getPersonId()) .and(todoType == TodoType.EXECUTABLE, nested -> nested .eq(Todo::getType, TodoType.EXECUTABLE) .eq(Todo::getState, PendingMessageStateEnum.HAS_BEEN_SENT)) @@ -276,8 +295,15 @@ public class TodoRangeQueryService { } public List> determineStatNodes(MessageGroupNodeStatisticParam request) { - List> nodes = CollectionUtils.isNotEmpty(request.getGroupNodeCodes()) - ? groupTemplateService.getGroupRoot().getNodes(request.getGroupNodeCodes()) + Collection nodeCodes = request.getGroupNodeCodes(); + if (nodeCodes == null) { + nodeCodes = Collections.emptyList(); + } + nodeCodes = nodeCodes.stream() + .filter(StringUtils::isNotBlank) + .collect(toList()); + List> nodes = CollectionUtils.isNotEmpty(nodeCodes) + ? groupTemplateService.getGroupRoot().getNodes(nodeCodes) : groupTemplateService.getTodoGroups(request.getTerminalType()); // 只返回给前端叶子节点的信息, 前端不需要层级信息 return groupTemplateService.collectLeafNodes(nodes); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCache.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCache.java new file mode 100644 index 00000000..94ef1ced --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCache.java @@ -0,0 +1,99 @@ +package cn.axzo.msg.center.message.service.todo.cache; + +import cn.axzo.msg.center.dal.mapper.MessageBaseTemplateMapper; +import cn.axzo.msg.center.dal.mapper.TodoMapper; +import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig; +import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam; +import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2; +import com.alibaba.fastjson.JSON; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.security.SecureRandom; +import java.util.Collection; +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * @author yanglin + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class NodeStatCache { + + private final MessageBaseTemplateMapper messageBaseTemplateMapper; + private final TodoMapper todoMapper; + private final StringRedisTemplate stringRedisTemplate; + private final PendingMessageBizConfig cfg; + + public PendingMessageStatisticResponseV2 getCacheResponseOrReload( + MessageGroupNodeStatisticParam request, Supplier loader) { + Collection nodeCodes = request.getGroupNodeCodes(); + if (CollectionUtils.isNotEmpty(nodeCodes) && nodeCodes.size() > cfg.getNodeStatCacheMaxRequestNodeCodeSize()) { + log.info("超过了允许缓存的最大请求节点编码数量,不使用缓存, 直接从数据库获取. request={}", request); + return loader.get(); + } + Date templateLatestUpdateTime = messageBaseTemplateMapper.getTemplateLatestUpdateTime(); + Date todoLatestUpdateTime = todoMapper.getTodoLatestUpdateTime(request.getPersonId()); + NodeStatCacheValue cacheValue = getCacheValue(request); + if (cacheValue == null || cacheValue.shouldReload(templateLatestUpdateTime, todoLatestUpdateTime)) { + log.info("reloading node stat data, request={}", request); + PendingMessageStatisticResponseV2 response = loader.get(); + cacheValue = new NodeStatCacheValue(); + cacheValue.setResponse(response); + cacheValue.setTemplateLatestUpdateTime(templateLatestUpdateTime); + cacheValue.setTodoLatestUpdateTime(todoLatestUpdateTime); + setCacheValue(request, cacheValue); + } + return cacheValue.getResponse(); + } + + // !! cache helper + + private NodeStatCacheValue getCacheValue(MessageGroupNodeStatisticParam request) { + String redisKey = buildCacheKey(request); + String json = stringRedisTemplate.opsForValue().get(redisKey); + return StringUtils.isBlank(json) ? null : JSON.parseObject(json, NodeStatCacheValue.class); + } + + private void setCacheValue(MessageGroupNodeStatisticParam request, NodeStatCacheValue cacheValue) { + log.info("set cache value for request. request={}", request); + long secondsDelta = TimeUnit.HOURS.toSeconds(cfg.getNodeStatCacheDataExpireHours()); + // add some random delta seconds, range: 0-600 seconds + secondsDelta += new SecureRandom().nextInt(10 * 60); + + String redisKey = buildCacheKey(request); + String redisValue = JSON.toJSONString(cacheValue); + stringRedisTemplate.opsForValue().set(redisKey, redisValue, secondsDelta, TimeUnit.SECONDS); + } + + private static String buildCacheKey(MessageGroupNodeStatisticParam request) { + StringBuffer buf = new StringBuffer(); + buf.append("msg_center:node_stat:v1"); + Consumer appender = value -> { + if (value != null) + buf.append(":").append(value); + }; + appender.accept(request.getPersonId()); + appender.accept(request.getOuId()); + appender.accept(request.getTerminalType()); + Collection nodeCodes = request.getGroupNodeCodes(); + if (CollectionUtils.isNotEmpty(nodeCodes)) { + // 排序避免编码顺序和重复编码的影响 + nodeCodes.stream() + .filter(StringUtils::isNotBlank) + .distinct() + .sorted() + .forEach(appender); + } + return buf.toString(); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCacheValue.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCacheValue.java new file mode 100644 index 00000000..8d72daa2 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/todo/cache/NodeStatCacheValue.java @@ -0,0 +1,24 @@ +package cn.axzo.msg.center.message.service.todo.cache; + +import cn.axzo.msg.center.service.pending.response.PendingMessageStatisticResponseV2; +import lombok.Data; + +import java.util.Date; +import java.util.Objects; + +/** + * @author yanglin + */ +@Data +class NodeStatCacheValue { + + private Date templateLatestUpdateTime; + private Date todoLatestUpdateTime; + private PendingMessageStatisticResponseV2 response; + + boolean shouldReload(Date templateLatestUpdateTime, Date todoLatestUpdateTime) { + return !Objects.equals(this.templateLatestUpdateTime, templateLatestUpdateTime) + || !Objects.equals(this.todoLatestUpdateTime, todoLatestUpdateTime); + } + +} \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/api/mq/SendMessageRecordMessage.java b/msg-center-api/src/main/java/cn/axzo/msg/center/api/mq/SendMessageRecordMessage.java index 6b94c4a4..c507168d 100644 --- a/msg-center-api/src/main/java/cn/axzo/msg/center/api/mq/SendMessageRecordMessage.java +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/api/mq/SendMessageRecordMessage.java @@ -1,6 +1,7 @@ package cn.axzo.msg.center.api.mq; import cn.axzo.msg.center.api.request.GeneralMessageReq; +import cn.axzo.msg.center.api.request.MessageReadAllReq; import lombok.Data; import lombok.EqualsAndHashCode; @@ -12,5 +13,6 @@ import java.io.Serializable; @Data @EqualsAndHashCode(callSuper = true) public class SendMessageRecordMessage extends MqMessage implements Serializable { - private GeneralMessageReq request; + private GeneralMessageReq sendRequest; + private MessageReadAllReq readRequest; } \ No newline at end of file diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageBaseTemplateMapper.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageBaseTemplateMapper.java index 5937e5bb..34ce72d0 100644 --- a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageBaseTemplateMapper.java +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/MessageBaseTemplateMapper.java @@ -4,6 +4,9 @@ import cn.axzo.msg.center.domain.entity.MessageBaseTemplate; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; +import javax.annotation.Nullable; +import java.util.Date; + /** * @description * 消息基础模板Mapper @@ -13,4 +16,8 @@ import org.apache.ibatis.annotations.Mapper; */ @Mapper public interface MessageBaseTemplateMapper extends BaseMapper { + + @Nullable + Date getTemplateLatestUpdateTime(); + } diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/TodoMapper.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/TodoMapper.java index 37dfe89f..5b2bc7eb 100644 --- a/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/TodoMapper.java +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/dal/mapper/TodoMapper.java @@ -5,6 +5,7 @@ import cn.axzo.msg.center.domain.entity.Todo; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Param; +import javax.annotation.Nullable; import java.util.Date; import java.util.List; @@ -21,4 +22,7 @@ public interface TodoMapper extends BaseMapper { List getMigratedBusinessId(); -} + @Nullable + Date getTodoLatestUpdateTime(@Param("executorPersonId") Long executorPersonId); + +} \ No newline at end of file diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/util/IterableMapper.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/util/IterableMapper.java index 60519c09..d9f7c043 100644 --- a/msg-center-dal/src/main/java/cn/axzo/msg/center/util/IterableMapper.java +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/util/IterableMapper.java @@ -9,55 +9,121 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; */ public interface IterableMapper extends BaseMapper { + int DEFAULT_BATCH_SIZE = 2000; + /** - * 遍历数据 - *

对查询出的数据的操作肯定不会更新query中查询条件包含的字段 + * foreach遍历数据. 默认批量大小 + * + *

对查询出的数据的操作肯定不会更新query查询条件包含的字段 + */ + default Iterable> iterateBath(LambdaQueryWrapper query) { + return RecordIterable.iterateBath(this, DEFAULT_BATCH_SIZE, query); + } + + /** + * foreach遍历数据 + * + *

对查询出的数据的操作肯定不会更新query查询条件包含的字段 */ default Iterable> iterateBath(long pageSize, LambdaQueryWrapper query) { return RecordIterable.iterateBath(this, pageSize, query); } /** - * 遍历数据 - *

对查询出的数据的操作肯定不会更新query中查询条件包含的字段 + * foreach遍历数据. 默认批量大小 + *

对查询出的数据的操作肯定不会更新query查询条件包含的字段 + */ + default Iterable iterateElement(LambdaQueryWrapper query) { + return RecordIterable.iterateElement(this, DEFAULT_BATCH_SIZE, query); + } + + /** + * foreach遍历数据 + *

对查询出的数据的操作肯定不会更新query查询条件包含的字段 */ default Iterable iterateElement(long pageSize, LambdaQueryWrapper query) { return RecordIterable.iterateElement(this, pageSize, query); } /** - * 遍历数据 - *

对查询出的数据的操作肯定会更新query中查询条件包含的字段 - *

+ * foreach遍历数据. 默认批量大小 + * + *

对查询出的数据的操作肯定会更新query查询条件包含的字段 + *

      * LambdaQueryWrapper query = Queries.query(MessageRecord.class)
      *        .eq(MessageRecord::getWorkspaceId, workspaceId)
      *        // 通过状态查询
      *        .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
-     * for(Page page : iterateBathDoUpdateQueryFields(1000, query)) {
-     *     for(T record : page.getRecords()) {
+     * for(Page page : iterateBathDoUpdateQueryFields(1000, query)) {
+     *     for(MessageRecord record : page.getRecords()) {
      *         // 更新状态
      *         record.setState(MsgStateEnum.HAS_BEEN_SENT);
      *         messageRecordMapper.updateById(record);
      *     }
      * }
+     * 
+ */ + default Iterable> iterateBathDoUpdateQueryFields(LambdaQueryWrapper query) { + return RecordIterable.iterateBathDoUpdateQueryFields(this, DEFAULT_BATCH_SIZE, query); + } + + /** + * foreach遍历数据 + * + *

对查询出的数据的操作肯定会更新query查询条件包含的字段 + *

+     * LambdaQueryWrapper query = Queries.query(MessageRecord.class)
+     *        .eq(MessageRecord::getWorkspaceId, workspaceId)
+     *        // 通过状态查询
+     *        .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
+     * for(Page page : iterateBathDoUpdateQueryFields(1000, query)) {
+     *     for(MessageRecord record : page.getRecords()) {
+     *         // 更新状态
+     *         record.setState(MsgStateEnum.HAS_BEEN_SENT);
+     *         messageRecordMapper.updateById(record);
+     *     }
+     * }
+     * 
*/ default Iterable> iterateBathDoUpdateQueryFields(long pageSize, LambdaQueryWrapper query) { return RecordIterable.iterateBathDoUpdateQueryFields(this, pageSize, query); } /** - * 遍历数据 - *

对查询出的数据的操作肯定会更新query中查询条件包含的字段 - *

+ * foreach遍历数据. 默认批量大小 + * + *

对查询出的数据的操作肯定会更新query查询条件包含的字段 + *

      * LambdaQueryWrapper query = Queries.query(MessageRecord.class)
      *        .eq(MessageRecord::getWorkspaceId, workspaceId)
      *        // 通过状态查询
      *        .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
-     * for(T record : iterateElementDoUpdateQueryFields(1000, query)) {
+     * for(MessageRecord record : iterateElementDoUpdateQueryFields(1000, query)) {
      *     // 更新状态
      *     record.setState(MsgStateEnum.HAS_BEEN_SENT);
      *     messageRecordMapper.updateById(record);
      * }
+     * 
+ */ + default Iterable iterateElementDoUpdateQueryFields(LambdaQueryWrapper query) { + return RecordIterable.iterateElementDoUpdateQueryFields(this, DEFAULT_BATCH_SIZE, query); + } + + /** + * foreach遍历数据 + * + *

对查询出的数据的操作肯定会更新query查询条件包含的字段 + *

+     * LambdaQueryWrapper query = Queries.query(MessageRecord.class)
+     *        .eq(MessageRecord::getWorkspaceId, workspaceId)
+     *        // 通过状态查询
+     *        .eq(MessageRecord::getState, MsgStateEnum.UNSENT);
+     * for(MessageRecord record : iterateElementDoUpdateQueryFields(1000, query)) {
+     *     // 更新状态
+     *     record.setState(MsgStateEnum.HAS_BEEN_SENT);
+     *     messageRecordMapper.updateById(record);
+     * }
+     * 
*/ default Iterable iterateElementDoUpdateQueryFields(long pageSize, LambdaQueryWrapper query) { return RecordIterable.iterateElementDoUpdateQueryFields(this, pageSize, query); diff --git a/msg-center-dal/src/main/java/cn/axzo/msg/center/util/RecordIterable.java b/msg-center-dal/src/main/java/cn/axzo/msg/center/util/RecordIterable.java index a72f5954..ddb95e60 100644 --- a/msg-center-dal/src/main/java/cn/axzo/msg/center/util/RecordIterable.java +++ b/msg-center-dal/src/main/java/cn/axzo/msg/center/util/RecordIterable.java @@ -3,12 +3,15 @@ package cn.axzo.msg.center.util; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.springframework.beans.BeanUtils; import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; /** + * 不要直接使用这个类, 用 {@link IterableMapper} + * * @author yanglin */ class RecordIterable { @@ -26,13 +29,13 @@ class RecordIterable { static > Iterable> iterateBathDoUpdateQueryFields( M mapper, long pageSize, LambdaQueryWrapper query) { - return () -> new BatchIterator<>(mapper, pageSize, query, PageStrategy.WILL_UPDATE_QUERY_FIELDS); + return () -> new BatchIterator<>(mapper, pageSize, query, PageStrategy.DO_UPDATE_QUERY_FIELDS); } static > Iterable iterateElementDoUpdateQueryFields( M mapper, long pageSize, LambdaQueryWrapper query) { return () -> new ElementIterator<>( - new BatchIterator<>(mapper, pageSize, query, PageStrategy.WILL_UPDATE_QUERY_FIELDS)); + new BatchIterator<>(mapper, pageSize, query, PageStrategy.DO_UPDATE_QUERY_FIELDS)); } private static class ElementIterator> implements Iterator { @@ -58,10 +61,11 @@ class RecordIterable { } private void maybeAdvance() { - if (current == null || !current.hasNext() && batch.hasNext()) + if (current == null || !current.hasNext() && batch.hasNext()) { current = batch.hasNext() ? batch.next().getRecords().iterator() : Collections.emptyIterator(); + } } } @@ -87,7 +91,7 @@ class RecordIterable { public boolean hasNext() { maybeFetchNextPage(); // 1. 很明显的还有下一页 - // 2. 看当前页的数据消费情况 (第一页和最后一页) + // 2. 看当前页的数据以及当前页的消费情况 (第一页和最后一页) return page.getCurrent() < page.getPages() || (!pageConsumed && !page.getRecords().isEmpty()); } @@ -97,11 +101,8 @@ class RecordIterable { throw new NoSuchElementException(); pageConsumed = true; Page copy = new Page<>(); - copy.setRecords(page.getRecords()); - copy.setTotal(page.getTotal()); - copy.setSize(page.getSize()); - copy.setCurrent(page.getCurrent()); - copy.setOrders(page.orders()); + // 不能暴露内部状态. 如果外部改动了内部的page, 会影响到下次迭代 + BeanUtils.copyProperties(page, copy); return copy; } @@ -119,6 +120,6 @@ class RecordIterable { } private enum PageStrategy { - ADVANCE_CURRENT_PAGE, WILL_UPDATE_QUERY_FIELDS + ADVANCE_CURRENT_PAGE, DO_UPDATE_QUERY_FIELDS } } \ No newline at end of file diff --git a/msg-center-dal/src/main/resources/mapper/MessageBaseTemplateMapper.xml b/msg-center-dal/src/main/resources/mapper/MessageBaseTemplateMapper.xml new file mode 100644 index 00000000..c1fec104 --- /dev/null +++ b/msg-center-dal/src/main/resources/mapper/MessageBaseTemplateMapper.xml @@ -0,0 +1,9 @@ + + + + + + + \ No newline at end of file diff --git a/msg-center-dal/src/main/resources/mapper/Todo.xml b/msg-center-dal/src/main/resources/mapper/Todo.xml index 923bfd6e..15f69d7e 100644 --- a/msg-center-dal/src/main/resources/mapper/Todo.xml +++ b/msg-center-dal/src/main/resources/mapper/Todo.xml @@ -16,5 +16,9 @@ AND record_ext -> '$.isMigratedFromPendingMessage' = true + + diff --git a/start/src/test/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheReloadJobTest.java b/start/src/test/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheReloadJobTest.java new file mode 100644 index 00000000..9914197d --- /dev/null +++ b/start/src/test/java/cn/axzo/msg/center/message/service/impl/oldmsg/OldMsgStatCacheReloadJobTest.java @@ -0,0 +1,24 @@ +package cn.axzo.msg.center.message.service.impl.oldmsg; + +import cn.axzo.msg.center.MsgCenterApplication; +import lombok.RequiredArgsConstructor; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author yanglin + */ +@SpringBootTest(classes = MsgCenterApplication.class) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +class OldMsgStatCacheReloadJobTest { + private final OldMsgStatCacheReloadJob oldMsgStatCacheReloadJob; + + @Test + void foo() throws Exception { + oldMsgStatCacheReloadJob.execute(null); + } + +} \ No newline at end of file diff --git a/start/src/test/java/cn/axzo/msg/center/util/RecordIterableTest.java b/start/src/test/java/cn/axzo/msg/center/util/RecordIterableTest.java new file mode 100644 index 00000000..3b5423e4 --- /dev/null +++ b/start/src/test/java/cn/axzo/msg/center/util/RecordIterableTest.java @@ -0,0 +1,34 @@ +package cn.axzo.msg.center.util; + +import cn.axzo.msg.center.MsgCenterApplication; +import cn.axzo.msg.center.dal.mapper.MessageRecordMapper; +import cn.axzo.msg.center.domain.entity.MessageRecord; +import cn.axzo.msg.center.inside.notices.utils.Queries; +import lombok.RequiredArgsConstructor; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import static cn.axzo.msg.center.inside.notices.utils.Queries.query; +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author yanglin + */ +@SpringBootTest(classes = MsgCenterApplication.class) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +class RecordIterableTest { + + private final MessageRecordMapper messageRecordMapper; + + @Test + void foo() { + Iterable records = messageRecordMapper + .iterateElement(1, query(MessageRecord.class) + .eq(MessageRecord::getId, 3)); + for (MessageRecord record : records) { + System.out.println("record=" + record); + } + } + +} \ No newline at end of file