Merge branch 'feature/REQ-1507' of axzsource.com:universal/infrastructure/backend/msg-center-plat into dev

This commit is contained in:
luofu 2023-11-20 12:20:38 +08:00
commit 1f36df1234
2 changed files with 26 additions and 31 deletions

View File

@ -31,9 +31,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -66,41 +63,40 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit
.eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE)
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())
.ge(MessageRecord::getCreateAt, DateFormatUtil.toDate(queryFrom))
.between(MessageRecord::getId, minRecordId, scrollId)
.lt(MessageRecord::getId, scrollId)
.gt(MessageRecord::getId, minRecordId)
.orderByDesc(MessageRecord::getId)
.last("LIMIT 1000")
.list();
// while (!records.isEmpty()) {
String profile = environment.getProperty("spring.profiles.active");
if ("test".equals(profile) || "master".equals(profile)) {
batchSavePendingMessage(records);
} else {
log.warn("目前只有测试test环境和生产master环境支持该操作");
while (!records.isEmpty()) {
String profile = environment.getProperty("spring.profiles.active");
if ("test".equals(profile) || "master".equals(profile)) {
batchSavePendingMessage(records);
} else {
log.warn("目前只有测试test环境和生产master环境支持该操作");
}
scrollId = records.stream().min(Comparator.comparing(MessageRecord::getId))
.map(MessageRecord::getId).orElse(0L);
log.info("[cold_blade] scrollId:[{}]", scrollId);
records = messageRecordDao.lambdaQuery()
.eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE)
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())
.ge(MessageRecord::getCreateAt, DateFormatUtil.toDate(queryFrom))
.lt(MessageRecord::getId, scrollId)
.gt(MessageRecord::getId, minRecordId)
.orderByDesc(MessageRecord::getId)
.last("LIMIT 1000")
.list();
}
// scrollId = records.stream().min(Comparator.comparing(MessageRecord::getId))
// .map(MessageRecord::getId).orElse(0L);
// records = messageRecordDao.lambdaQuery()
// .eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE)
// .eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())
// .ge(MessageRecord::getCreateAt, DateFormatUtil.toDate(queryFrom))
// //.between(MessageRecord::getId, minRecordId, scrollId)
// //modify by zuoqinbo
// .between(MessageRecord::getId, scrollId, Long.MAX_VALUE)
// .orderByDesc(MessageRecord::getId)
// .list();
// }
}
private void batchSavePendingMessage(List<MessageRecord> records) {
// TODO: [zuoqinbo]
if (CollectionUtils.isEmpty(records)) {
return;
}
List<PendingMessageRecord> pendingMessageRecords = records.stream().map(new Function<MessageRecord, PendingMessageRecord>() {
@Override
public PendingMessageRecord apply(MessageRecord messageRecord) {
return convert(messageRecord);
}
}).collect(Collectors.toList());
List<PendingMessageRecord> pendingMessageRecords = records.stream()
.map(this::convert)
.collect(Collectors.toList());
//批量插入数据库
pendingMessageRecordDao.saveOrUpdateBatch(pendingMessageRecords);
}
@ -129,7 +125,6 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit
pendingMsgRecord.setOuId(record.getTenantId());
pendingMsgRecord.setBizCategory(BizCategoryEnum.OTHER);
pendingMsgRecord.setFailCause(String.valueOf(record.getId()));
//
pendingMsgRecord.setIdentityCode(UUIDUtil.uuidString());
pendingMsgRecord.setCreateAt(record.getCreateAt());
pendingMsgRecord.setUpdateAt(record.getUpdateAt());

View File

@ -507,7 +507,7 @@ public class PendingMessageNewServiceImpl implements PendingMessageNewService {
Collection<QueryOrderByDTO> orderFields) {
if (CollectionUtils.isEmpty(orderFields)) {
// 默认时间降序
query.orderByDesc(PendingMessageRecord::getId);
query.orderByDesc(PendingMessageRecord::getCreateAt);
return;
}
orderFields.stream()