diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java index 41bc3273..478c10ab 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/impl/PendingMessageDataInitServiceImpl.java @@ -25,13 +25,13 @@ import cn.axzo.msg.center.utils.UUIDUtil; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import jodd.util.concurrent.ThreadFactoryBuilder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.Collections; @@ -39,6 +39,12 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -51,6 +57,11 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class PendingMessageDataInitServiceImpl implements PendingMessageDataInitService { + private final ThreadFactory asyncSendMsgThreadFactory = ThreadFactoryBuilder.create() + .setDaemon(true).setNameFormat("ASYNC_TRANSFORM_OLD_DATA_%d").get(); + private final ExecutorService asyncSendMsgExecutorService = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), asyncSendMsgThreadFactory); + private final Environment environment; private final OrganizationalNodePractitionerWideApi organizationalNodePractitionerWideApi; @@ -64,8 +75,12 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit private final PendingMessageRecordDao pendingMessageRecordDao; @Override - @Transactional(rollbackFor = Exception.class) public void transformPendingMessageRecord(Integer diffDays, Long minRecordId) { + CompletableFuture.runAsync(() -> doTransformPendingMessageRecord(diffDays, minRecordId), + asyncSendMsgExecutorService); + } + + private void doTransformPendingMessageRecord(Integer diffDays, Long minRecordId) { LocalDateTime queryFrom = LocalDateTime.now().minusDays(diffDays); Long scrollId = Long.MAX_VALUE; List records = messageRecordDao.lambdaQuery() @@ -87,7 +102,6 @@ public class PendingMessageDataInitServiceImpl implements PendingMessageDataInit } scrollId = records.stream().min(Comparator.comparing(MessageRecord::getId)) .map(MessageRecord::getId).orElse(0L); - log.info("[cold_blade] scrollId:[{}]", scrollId); records = messageRecordDao.lambdaQuery() .eq(MessageRecord::getType, MsgTypeEnum.PENDING_MESSAGE) .eq(MessageRecord::getIsDelete, YesNoEnum.NO.getCode())