diff --git a/im-center-server/src/main/java/cn/axzo/im/config/MybatisPlusConfig.java b/im-center-server/src/main/java/cn/axzo/im/config/MybatisPlusConfig.java new file mode 100644 index 0000000..437a4be --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/config/MybatisPlusConfig.java @@ -0,0 +1,15 @@ +package cn.axzo.im.config; + +import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MybatisPlusConfig { + + //@Bean + //public PaginationInterceptor paginationInterceptor() { + // return new PaginationInterceptor(); + //} + +} diff --git a/im-center-server/src/main/java/cn/axzo/im/job/RevokeAllMessagesJob.java b/im-center-server/src/main/java/cn/axzo/im/job/RevokeAllMessagesJob.java new file mode 100644 index 0000000..086bcad --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/job/RevokeAllMessagesJob.java @@ -0,0 +1,111 @@ +package cn.axzo.im.job; + +import cn.axzo.basics.common.constant.enums.TableIsDeleteEnum; +import cn.axzo.basics.common.page.PageRequest; +import cn.axzo.im.channel.netease.client.NimClient; +import cn.axzo.im.channel.netease.dto.RevokeMessageRequest; +import cn.axzo.im.dao.mapper.MessageHistoryMapper; +import cn.axzo.im.entity.MessageHistory; +import cn.axzo.im.utils.Queries; +import cn.hutool.core.thread.NamedThreadFactory; +import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.google.common.collect.Lists; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +/** + * @author yanglin + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RevokeAllMessagesJob extends IJobHandler { + + private final Executor executor = Executors.newCachedThreadPool( + new NamedThreadFactory(RevokeAllMessagesJob.class.getSimpleName(), false)); + private final MessageHistoryMapper messageHistoryMapper; + private final NimClient nimClient; + @Value("${im-center.revoke-all.token:83bn432}") + private final String token; + + @Override + @XxlJob("revokeAllMessagesJob") + public ReturnT execute(String jsonStr) { + if (StringUtils.isBlank(jsonStr)) { + log.info("无效的参数={}", jsonStr); + return ReturnT.FAIL; + } + Param param = JSON.parseObject(jsonStr, Param.class); + if (param.pageSize <= 0) { + param.pageSize = 500; + } + if (param.batchSize <= 0) + param.batchSize = 100; + if (!token.equals(param.token)) { + log.info("无效的token={}", param.token); + return ReturnT.FAIL; + } + PageRequest request = new PageRequest(); + request.setPageSize(param.pageSize); + request.setPage(1L); + IPage page = nextPage(request, param.templateCode); + int count = 0; + while (page.getRecords().size() >= param.pageSize) { + request.setPage(request.getPage() + 1); + concurrentRevoke(page.getRecords(), param); + count += page.getRecords().size(); + log.info("revoked count={}", count); + page = nextPage(request, param.templateCode); + } + return ReturnT.SUCCESS; + } + + private IPage nextPage(PageRequest request, String templateCode) { + return messageHistoryMapper.selectPage(request.toPage(), + Queries.query(MessageHistory.class) + .eq(MessageHistory::getIsDelete, TableIsDeleteEnum.NORMAL.value) + .like(StringUtils.isNotBlank(templateCode), MessageHistory::getMessageBody, templateCode) + .ne(MessageHistory::getMessageId, "") + .orderByDesc(MessageHistory::getId)); + } + + private void concurrentRevoke(List messages, Param param) { + ArrayList> futures = new ArrayList<>(); + for (List batch : Lists.partition(messages, param.batchSize)) + futures.add(CompletableFuture.runAsync(() -> revokeImpl(batch), executor)); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + private void revokeImpl(List messages) { + for (MessageHistory message : messages) { + RevokeMessageRequest request = new RevokeMessageRequest(); + request.setMessageId(message.getMessageId()); + request.setFrom(message.getFromAccount()); + request.setTo(message.getToAccount()); + NimClient.CodeResponse resp = nimClient.revoke(request); + log.info("revokeImpl, req={}, resp={}", JSON.toJSONString(request), JSON.toJSONString(resp)); + } + } + + @Data + public static class Param { + private String token; + private long pageSize; + private int batchSize; + private String templateCode; + } +} \ No newline at end of file diff --git a/im-center-server/src/main/java/cn/axzo/im/job/RevokeMessagesJob.java b/im-center-server/src/main/java/cn/axzo/im/job/RevokeMessagesJob.java deleted file mode 100644 index 2594c0f..0000000 --- a/im-center-server/src/main/java/cn/axzo/im/job/RevokeMessagesJob.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.axzo.im.job; - -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.handler.annotation.XxlJob; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * @author yanglin - */ -@Slf4j -@Component -public class RevokeMessagesJob extends IJobHandler { - - @Override - @XxlJob("revokeMessagesJob") - public ReturnT execute(String param) throws Exception { - log.info("execute...."); - return ReturnT.SUCCESS; - } - -} diff --git a/im-center-server/src/main/java/cn/axzo/im/utils/Queries.java b/im-center-server/src/main/java/cn/axzo/im/utils/Queries.java new file mode 100644 index 0000000..5247b97 --- /dev/null +++ b/im-center-server/src/main/java/cn/axzo/im/utils/Queries.java @@ -0,0 +1,18 @@ +package cn.axzo.im.utils; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; + +/** + * @author yanglin + */ +public class Queries { + + /** + * @param domainType help compiler + */ + public static LambdaQueryWrapper query( + @SuppressWarnings({"unused", "help compiler"}) Class domainType) { + return new LambdaQueryWrapper<>(); + } + +} \ No newline at end of file diff --git a/im-center-server/src/test/java/cn/axzo/im/job/RevokeAllMessagesJobTest.java b/im-center-server/src/test/java/cn/axzo/im/job/RevokeAllMessagesJobTest.java new file mode 100644 index 0000000..e6fac76 --- /dev/null +++ b/im-center-server/src/test/java/cn/axzo/im/job/RevokeAllMessagesJobTest.java @@ -0,0 +1,24 @@ +package cn.axzo.im.job; + +import cn.axzo.im.Application; +import lombok.RequiredArgsConstructor; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +/** + * @author yanglin + */ +@SpringBootTest(classes = Application.class) +@RequiredArgsConstructor(onConstructor_ = @Autowired) +class RevokeAllMessagesJobTest { + + private final RevokeAllMessagesJob revokeAllMessagesJob; + + @Test + void foo() { + String jsonStr = "{\"token\":\"83bn432\"}"; + revokeAllMessagesJob.execute(jsonStr); + } + +} \ No newline at end of file diff --git a/lombok.config b/lombok.config new file mode 100644 index 0000000..4595b5f --- /dev/null +++ b/lombok.config @@ -0,0 +1 @@ +lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Value \ No newline at end of file