From 974227cacbd02b6960b1bacaab2bbd587e4e6578 Mon Sep 17 00:00:00 2001 From: luofu Date: Fri, 24 Nov 2023 09:04:47 +0800 Subject: [PATCH] =?UTF-8?q?feat(REQ-1507):=20=E5=BC=82=E6=AD=A5=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 背景: https://jira.axzo.cn/browse/REQ-1507?goToView=1 影响: 无 --- .../PendingMessageDataInitServiceImpl.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java index 41bc3273..478c10ab 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java @@ -25,13 +25,13 @@ import cn.axzo.msg.center.utils.UUIDUtil; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import jodd.util.concurrent.ThreadFactoryBuilder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.Collections; @@ -39,6 +39,12 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -51,6 +57,11 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class PendingMessageDataInitServiceImpl implements PendingMessageDataInitService { + private final ThreadFactory asyncSendMsgThreadFactory = ThreadFactoryBuilder.create() + .setDaemon(true).setNameFormat("ASYNC_TRANSFORM_OLD_DATA_%d").get(); + private final ExecutorService asyncSendMsgExecutorService = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), asyncSendMsgThreadFactory); + private final Environment environment; private final OrganizationalNodePractitionerWideApi organizationalNodePractitionerWideApi; @@ -64,8 +75,12 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit private final PendingMessageRecordDao pendingMessageRecordDao; @Override - @Transactional(rollbackFor = Exception.class) public void transformPendingMessageRecord(Integer diffDays, Long minRecordId) { + CompletableFuture.runAsync(() -> doTransformPendingMessageRecord(diffDays, minRecordId), + asyncSendMsgExecutorService); + } + + private void doTransformPendingMessageRecord(Integer diffDays, Long minRecordId) { LocalDateTime queryFrom = LocalDateTime.now().minusDays(diffDays); Long scrollId = Long.MAX_VALUE; List records = messageRecordDao.lambdaQuery() @@ -87,7 +102,6 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit } scrollId = records.stream().min(Comparator.comparing(MessageRecord::getId)) .map(MessageRecord::getId).orElse(0L); - log.info("[cold_blade] scrollId:[{}]", scrollId); records = messageRecordDao.lambdaQuery() .eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE) .eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())