REQ-2448: 迁移老的消息

This commit is contained in:
yanglin 2024-05-08 11:41:40 +08:00
parent 72c93d96fd
commit dd1ecdf37b

View File

@ -2,11 +2,13 @@ package cn.axzo.msg.center.message.xxl;
import cn.axzo.msg.center.dal.mapper.MessageRecordMapper;
import cn.axzo.msg.center.domain.entity.MessageRecord;
import cn.axzo.msg.center.domain.persistence.BaseEntity;
import cn.axzo.msg.center.inside.notices.utils.FunctionalTransactionTemplate;
import cn.axzo.msg.center.message.service.todo.mybatis.ReplaceInterceptor;
import cn.axzo.msg.center.message.service.todo.mybatis.Replacement;
import cn.axzo.msg.center.utils.DateFormatUtil;
import cn.axzo.msg.center.utils.JSONObjectUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import com.xxl.job.core.biz.model.ReturnT;
@ -62,7 +64,7 @@ public class MigrateOldMsgHotDataJob extends IJobHandler {
for (Page<MessageRecord> page : pages) {
++currentPage;
// 由于会删除数据, 总页数会变化, 所以只取第一次的总页数才是对的
if (totalPage == 0) totalPage = (int)page.getPages();
if (totalPage == 0) totalPage = (int) page.getPages();
log.info("migrating page={}, total page={}", currentPage, totalPage);
for (List<MessageRecord> batch : Lists.partition(page.getRecords(), param.saveBatch)) {
totalRecordCount += batch.size();
@ -77,6 +79,12 @@ public class MigrateOldMsgHotDataJob extends IJobHandler {
ReplaceInterceptor.enable(Replacement.TO_MESSAGE_RECORD_COLD);
try {
messageRecordMapper.batchInsertWithId(records);
} catch (Exception e) {
List<Long> ids = records.stream()
.map(BaseEntity::getId).collect(toList());
log.warn("migrate records failed, recordIds={}, records={}",
JSON.toJSONString(ids), JSON.toJSONString(records), e);
throw e;
} finally {
ReplaceInterceptor.disable();
}