Merge branch 'feature/REQ-1507' into 'dev'

feat(REQ-1507): 异步同步数据

See merge request universal/infrastructure/backend/msg-center-plat!73
This commit is contained in:
罗福 2023-11-24 01:07:08 +00:00
commit 6ca6a85911

View File

@ -25,13 +25,13 @@ import cn.axzo.msg.center.utils.UUIDUtil;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Collections;
@ -39,6 +39,12 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -51,6 +57,11 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public class PendingMessageDataInitServiceImpl implements PendingMessageDataInitService {
private final ThreadFactory asyncSendMsgThreadFactory = ThreadFactoryBuilder.create()
.setDaemon(true).setNameFormat("ASYNC_TRANSFORM_OLD_DATA_%d").get();
private final ExecutorService asyncSendMsgExecutorService = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024), asyncSendMsgThreadFactory);
private final Environment environment;
private final OrganizationalNodePractitionerWideApi organizationalNodePractitionerWideApi;
@ -64,8 +75,12 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit
private final PendingMessageRecordDao pendingMessageRecordDao;
@Override
@Transactional(rollbackFor = Exception.class)
public void transformPendingMessageRecord(Integer diffDays, Long minRecordId) {
CompletableFuture.runAsync(() -> doTransformPendingMessageRecord(diffDays, minRecordId),
asyncSendMsgExecutorService);
}
private void doTransformPendingMessageRecord(Integer diffDays, Long minRecordId) {
LocalDateTime queryFrom = LocalDateTime.now().minusDays(diffDays);
Long scrollId = Long.MAX_VALUE;
List<MessageRecord> records = messageRecordDao.lambdaQuery()
@ -87,7 +102,6 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit
}
scrollId = records.stream().min(Comparator.comparing(MessageRecord::getId))
.map(MessageRecord::getId).orElse(0L);
log.info("[cold_blade] scrollId:[{}]", scrollId);
records = messageRecordDao.lambdaQuery()
.eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE)
.eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())