diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java index 6610f765..ca93d2d0 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java @@ -31,9 +31,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.UUID; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -66,41 +63,40 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit .eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE) .eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode()) .ge(MessageRecord::getCreateAt, DateFormatUtil.toDate(queryFrom)) - .between(MessageRecord::getId, minRecordId, scrollId) + .lt(MessageRecord::getId, scrollId) + .gt(MessageRecord::getId, minRecordId) .orderByDesc(MessageRecord::getId) + .last("LIMIT 1000") .list(); -// while (!records.isEmpty()) { - String profile = environment.getProperty("spring.profiles.active"); - if ("test".equals(profile) || "master".equals(profile)) { - batchSavePendingMessage(records); - } else { - log.warn("目前只有测试test环境和生产master环境支持该操作!"); + while (!records.isEmpty()) { + String profile = environment.getProperty("spring.profiles.active"); + if ("test".equals(profile) || "master".equals(profile)) { + batchSavePendingMessage(records); + } else { + log.warn("目前只有测试test环境和生产master环境支持该操作!"); + } + scrollId = records.stream().min(Comparator.comparing(MessageRecord::getId)) + .map(MessageRecord::getId).orElse(0L); + log.info("[cold_blade] scrollId:[{}]", scrollId); + records = messageRecordDao.lambdaQuery() + .eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE) + .eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode()) + .ge(MessageRecord::getCreateAt, DateFormatUtil.toDate(queryFrom)) + .lt(MessageRecord::getId, scrollId) + .gt(MessageRecord::getId, minRecordId) + .orderByDesc(MessageRecord::getId) + .last("LIMIT 1000") + .list(); } -// scrollId = records.stream().min(Comparator.comparing(MessageRecord::getId)) -// .map(MessageRecord::getId).orElse(0L); -// records = messageRecordDao.lambdaQuery() -// .eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE) -// .eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode()) -// .ge(MessageRecord::getCreateAt, DateFormatUtil.toDate(queryFrom)) -// //.between(MessageRecord::getId, minRecordId, scrollId) -// //modify by zuoqinbo -// .between(MessageRecord::getId, scrollId, Long.MAX_VALUE) -// .orderByDesc(MessageRecord::getId) -// .list(); -// } } private void batchSavePendingMessage(List records) { - // TODO: [zuoqinbo] if (CollectionUtils.isEmpty(records)) { return; } - List pendingMessageRecords = records.stream().map(new Function() { - @Override - public PendingMessageRecord apply(MessageRecord messageRecord) { - return convert(messageRecord); - } - }).collect(Collectors.toList()); + List pendingMessageRecords = records.stream() + .map(this::convert) + .collect(Collectors.toList()); //批量插入数据库 pendingMessageRecordDao.saveOrUpdateBatch(pendingMessageRecords); } @@ -129,7 +125,6 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit pendingMsgRecord.setOuId(record.getTenantId()); pendingMsgRecord.setBizCategory(BizCategoryEnum.OTHER); pendingMsgRecord.setFailCause(String.valueOf(record.getId())); - // pendingMsgRecord.setIdentityCode(UUIDUtil.uuidString()); pendingMsgRecord.setCreateAt(record.getCreateAt()); pendingMsgRecord.setUpdateAt(record.getUpdateAt()); diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageNewServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageNewServiceImpl.java index d1f52a1e..d1203be5 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageNewServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageNewServiceImpl.java @@ -507,7 +507,7 @@ public class PendingMessageNewServiceImpl implements PendingMessageNewService { Collection orderFields) { if (CollectionUtils.isEmpty(orderFields)) { // 默认时间降序 - query.orderByDesc(PendingMessageRecord::getId); + query.orderByDesc(PendingMessageRecord::getCreateAt); return; } orderFields.stream()