REQ-3345: 同步消息

This commit is contained in:
yanglin 2025-01-23 14:31:53 +08:00
parent c94e781f51
commit 050dd7c81b
4 changed files with 26 additions and 62 deletions

View File

@ -113,7 +113,7 @@ public class PrivateController {
@PostMapping("/private/group/syncGroupMessages")
public Object syncGroupMessages() throws Exception {
groupMessageSyncService.syncAndWait();
groupMessageSyncService.sync();
return "done";
}

View File

@ -1,14 +1,11 @@
package cn.axzo.im.group.message;
import cn.axzo.basics.common.exception.ServiceException;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
/**
* @author yanglin
*/
@ -19,27 +16,10 @@ public class GroupMessageSyncJob {
private final GroupMessageSyncService groupMessageSyncService;
private volatile boolean isRunning = false;
@SuppressWarnings("unused")
@XxlJob("groupMessageSyncJob")
public ReturnT<String> execute(String jsonStr) throws Exception {
if (isRunning)
throw new ServiceException("任务正在执行中,请稍后再试...");
synchronized (this) {
if (isRunning)
throw new ServiceException("任务正在执行中,请稍后再试...");
isRunning = true;
}
CountDownLatch latch = new CountDownLatch(1);
groupMessageSyncService.sync(() -> {
synchronized (GroupMessageSyncJob.this) {
isRunning = false;
}
latch.countDown();
log.info("group message sync job finished");
});
latch.await();
groupMessageSyncService.sync();
return ReturnT.SUCCESS;
}

View File

@ -1,5 +1,6 @@
package cn.axzo.im.group.message;
import cn.axzo.basics.common.exception.ServiceException;
import cn.axzo.im.center.common.enums.YesOrNo;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.channel.netease.dto.NimGroupGetMessagesRequest;
@ -39,28 +40,33 @@ public class GroupMessageSyncService {
private final GroupMessageDao groupMessageDao;
private final NimClient nimClient;
public void syncAndWait() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
sync(latch::countDown);
latch.await();
}
private volatile boolean isRunning = false;
void sync(Runnable onComplete) {
AtomicBoolean completed = new AtomicBoolean(false);
Runnable runOnce = () -> {
if (completed.compareAndSet(false, true))
onComplete.run();
};
public void sync() throws Exception {
if (isRunning)
throw new ServiceException("任务正在执行中,请稍后再试...");
synchronized (this) {
if (isRunning)
throw new ServiceException("任务正在执行中,请稍后再试...");
isRunning = true;
}
try {
syncImpl(runOnce);
} catch (Exception e) {
log.warn("sync group message failed", e);
runOnce.run();
syncImpl();
} finally {
synchronized (this) {
isRunning = false;
}
}
}
private void syncImpl(Runnable onComplete) throws Exception {
MessageSyncController controller = new MessageSyncController(tps, onComplete);
private void syncImpl() throws Exception {
AtomicBoolean completed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
Runnable runOnce = () -> {
if (completed.compareAndSet(false, true))
latch.countDown();
};
MessageSyncController controller = new MessageSyncController(tps, runOnce);
Date twoDayAgo = new DateTime(new Date()).minusDays(2).toDate();
RecordCursor<Group> cursor = new RecordCursor<>(Group::getId, () ->
groupDao.lambdaQuery()
@ -88,6 +94,7 @@ public class GroupMessageSyncService {
}
}
controller.setSubmitFinished();
latch.await();
}
void saveMessagesBatch(Collection<GroupMessage> messages) {

View File

@ -1,23 +0,0 @@
package cn.axzo.im.group.message;
import cn.axzo.im.Application;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author yanglin
*/
@SpringBootTest(classes = Application.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class GroupMessageSyncServiceTest {
private final GroupMessageSyncService groupMessageSyncService;
@Test
void sync() throws Exception {
groupMessageSyncService.syncAndWait();
}
}