REQ-2010: 撤回所有消息

This commit is contained in:
yanglin 2024-02-29 11:18:16 +08:00
parent a7a52c6f31
commit ba1b0a80b5
6 changed files with 169 additions and 23 deletions

View File

@ -0,0 +1,15 @@
package cn.axzo.im.config;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MybatisPlusConfig {
//@Bean
//public PaginationInterceptor paginationInterceptor() {
// return new PaginationInterceptor();
//}
}

View File

@ -0,0 +1,111 @@
package cn.axzo.im.job;
import cn.axzo.basics.common.constant.enums.TableIsDeleteEnum;
import cn.axzo.basics.common.page.PageRequest;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.channel.netease.dto.RevokeMessageRequest;
import cn.axzo.im.dao.mapper.MessageHistoryMapper;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.utils.Queries;
import cn.hutool.core.thread.NamedThreadFactory;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.google.common.collect.Lists;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RevokeAllMessagesJob extends IJobHandler {
private final Executor executor = Executors.newCachedThreadPool(
new NamedThreadFactory(RevokeAllMessagesJob.class.getSimpleName(), false));
private final MessageHistoryMapper messageHistoryMapper;
private final NimClient nimClient;
@Value("${im-center.revoke-all.token:83bn432}")
private final String token;
@Override
@XxlJob("revokeAllMessagesJob")
public ReturnT<String> execute(String jsonStr) {
if (StringUtils.isBlank(jsonStr)) {
log.info("无效的参数={}", jsonStr);
return ReturnT.FAIL;
}
Param param = JSON.parseObject(jsonStr, Param.class);
if (param.pageSize <= 0) {
param.pageSize = 500;
}
if (param.batchSize <= 0)
param.batchSize = 100;
if (!token.equals(param.token)) {
log.info("无效的token={}", param.token);
return ReturnT.FAIL;
}
PageRequest request = new PageRequest();
request.setPageSize(param.pageSize);
request.setPage(1L);
IPage<MessageHistory> page = nextPage(request, param.templateCode);
int count = 0;
while (page.getRecords().size() >= param.pageSize) {
request.setPage(request.getPage() + 1);
concurrentRevoke(page.getRecords(), param);
count += page.getRecords().size();
log.info("revoked count={}", count);
page = nextPage(request, param.templateCode);
}
return ReturnT.SUCCESS;
}
private IPage<MessageHistory> nextPage(PageRequest request, String templateCode) {
return messageHistoryMapper.selectPage(request.toPage(),
Queries.query(MessageHistory.class)
.eq(MessageHistory::getIsDelete, TableIsDeleteEnum.NORMAL.value)
.like(StringUtils.isNotBlank(templateCode), MessageHistory::getMessageBody, templateCode)
.ne(MessageHistory::getMessageId, "")
.orderByDesc(MessageHistory::getId));
}
private void concurrentRevoke(List<MessageHistory> messages, Param param) {
ArrayList<CompletableFuture<Void>> futures = new ArrayList<>();
for (List<MessageHistory> batch : Lists.partition(messages, param.batchSize))
futures.add(CompletableFuture.runAsync(() -> revokeImpl(batch), executor));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
private void revokeImpl(List<MessageHistory> messages) {
for (MessageHistory message : messages) {
RevokeMessageRequest request = new RevokeMessageRequest();
request.setMessageId(message.getMessageId());
request.setFrom(message.getFromAccount());
request.setTo(message.getToAccount());
NimClient.CodeResponse resp = nimClient.revoke(request);
log.info("revokeImpl, req={}, resp={}", JSON.toJSONString(request), JSON.toJSONString(resp));
}
}
@Data
public static class Param {
private String token;
private long pageSize;
private int batchSize;
private String templateCode;
}
}

View File

@ -1,23 +0,0 @@
package cn.axzo.im.job;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author yanglin
*/
@Slf4j
@Component
public class RevokeMessagesJob extends IJobHandler {
@Override
@XxlJob("revokeMessagesJob")
public ReturnT<String> execute(String param) throws Exception {
log.info("execute....");
return ReturnT.SUCCESS;
}
}

View File

@ -0,0 +1,18 @@
package cn.axzo.im.utils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
/**
* @author yanglin
*/
public class Queries {
/**
* @param domainType help compiler
*/
public static <T> LambdaQueryWrapper<T> query(
@SuppressWarnings({"unused", "help compiler"}) Class<T> domainType) {
return new LambdaQueryWrapper<>();
}
}

View File

@ -0,0 +1,24 @@
package cn.axzo.im.job;
import cn.axzo.im.Application;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author yanglin
*/
@SpringBootTest(classes = Application.class)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class RevokeAllMessagesJobTest {
private final RevokeAllMessagesJob revokeAllMessagesJob;
@Test
void foo() {
String jsonStr = "{\"token\":\"83bn432\"}";
revokeAllMessagesJob.execute(jsonStr);
}
}

1
lombok.config Normal file
View File

@ -0,0 +1 @@
lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Value