From 050dd7c81b076b03feac6878f6e57ee095159991 Mon Sep 17 00:00:00 2001 From: yanglin Date: Thu, 23 Jan 2025 14:31:53 +0800 Subject: [PATCH] =?UTF-8?q?REQ-3345:=20=E5=90=8C=E6=AD=A5=E6=B6=88?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../axzo/im/controller/PrivateController.java | 2 +- .../im/group/message/GroupMessageSyncJob.java | 22 +--------- .../message/GroupMessageSyncService.java | 41 +++++++++++-------- .../message/GroupMessageSyncServiceTest.java | 23 ----------- 4 files changed, 26 insertions(+), 62 deletions(-) delete mode 100644 im-center-server/src/test/java/cn/axzo/im/group/message/GroupMessageSyncServiceTest.java diff --git a/im-center-server/src/main/java/cn/axzo/im/controller/PrivateController.java b/im-center-server/src/main/java/cn/axzo/im/controller/PrivateController.java index af035aa..9d093a2 100644 --- a/im-center-server/src/main/java/cn/axzo/im/controller/PrivateController.java +++ b/im-center-server/src/main/java/cn/axzo/im/controller/PrivateController.java @@ -113,7 +113,7 @@ public class PrivateController { @PostMapping("/private/group/syncGroupMessages") public Object syncGroupMessages() throws Exception { - groupMessageSyncService.syncAndWait(); + groupMessageSyncService.sync(); return "done"; } diff --git a/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncJob.java b/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncJob.java index 6abe7c1..d4ae12e 100644 --- a/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncJob.java +++ b/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncJob.java @@ -1,14 +1,11 @@ package cn.axzo.im.group.message; -import cn.axzo.basics.common.exception.ServiceException; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.util.concurrent.CountDownLatch; - /** * @author yanglin */ @@ -19,27 +16,10 @@ public class GroupMessageSyncJob { private final GroupMessageSyncService groupMessageSyncService; - private volatile boolean isRunning = false; - @SuppressWarnings("unused") @XxlJob("groupMessageSyncJob") public ReturnT execute(String jsonStr) throws Exception { - if (isRunning) - throw new ServiceException("任务正在执行中,请稍后再试..."); - synchronized (this) { - if (isRunning) - throw new ServiceException("任务正在执行中,请稍后再试..."); - isRunning = true; - } - CountDownLatch latch = new CountDownLatch(1); - groupMessageSyncService.sync(() -> { - synchronized (GroupMessageSyncJob.this) { - isRunning = false; - } - latch.countDown(); - log.info("group message sync job finished"); - }); - latch.await(); + groupMessageSyncService.sync(); return ReturnT.SUCCESS; } diff --git a/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncService.java b/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncService.java index 854310a..e394633 100644 --- a/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncService.java +++ b/im-center-server/src/main/java/cn/axzo/im/group/message/GroupMessageSyncService.java @@ -1,5 +1,6 @@ package cn.axzo.im.group.message; +import cn.axzo.basics.common.exception.ServiceException; import cn.axzo.im.center.common.enums.YesOrNo; import cn.axzo.im.channel.netease.client.NimClient; import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesRequest; @@ -39,28 +40,33 @@ public class GroupMessageSyncService { private final GroupMessageDao groupMessageDao; private final NimClient nimClient; - public void syncAndWait() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - sync(latch::countDown); - latch.await(); - } + private volatile boolean isRunning = false; - void sync(Runnable onComplete) { - AtomicBoolean completed = new AtomicBoolean(false); - Runnable runOnce = () -> { - if (completed.compareAndSet(false, true)) - onComplete.run(); - }; + public void sync() throws Exception { + if (isRunning) + throw new ServiceException("任务正在执行中,请稍后再试..."); + synchronized (this) { + if (isRunning) + throw new ServiceException("任务正在执行中,请稍后再试..."); + isRunning = true; + } try { - syncImpl(runOnce); - } catch (Exception e) { - log.warn("sync group message failed", e); - runOnce.run(); + syncImpl(); + } finally { + synchronized (this) { + isRunning = false; + } } } - private void syncImpl(Runnable onComplete) throws Exception { - MessageSyncController controller = new MessageSyncController(tps, onComplete); + private void syncImpl() throws Exception { + AtomicBoolean completed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + Runnable runOnce = () -> { + if (completed.compareAndSet(false, true)) + latch.countDown(); + }; + MessageSyncController controller = new MessageSyncController(tps, runOnce); Date twoDayAgo = new DateTime(new Date()).minusDays(2).toDate(); RecordCursor cursor = new RecordCursor<>(Group::getId, () -> groupDao.lambdaQuery() @@ -88,6 +94,7 @@ public class GroupMessageSyncService { } } controller.setSubmitFinished(); + latch.await(); } void saveMessagesBatch(Collection messages) { diff --git a/im-center-server/src/test/java/cn/axzo/im/group/message/GroupMessageSyncServiceTest.java b/im-center-server/src/test/java/cn/axzo/im/group/message/GroupMessageSyncServiceTest.java deleted file mode 100644 index 2f7ebe9..0000000 --- a/im-center-server/src/test/java/cn/axzo/im/group/message/GroupMessageSyncServiceTest.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.axzo.im.group.message; - -import cn.axzo.im.Application; -import lombok.RequiredArgsConstructor; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; - -/** - * @author yanglin - */ -@SpringBootTest(classes = Application.class) -@RequiredArgsConstructor(onConstructor_ = @Autowired) -class GroupMessageSyncServiceTest { - - private final GroupMessageSyncService groupMessageSyncService; - - @Test - void sync() throws Exception { - groupMessageSyncService.syncAndWait(); - } - -} \ No newline at end of file