REQ-3201: 撤回消息

This commit is contained in:
yanglin 2024-12-17 15:34:35 +08:00
parent f8b48bbc9d
commit b193d38369
10 changed files with 310 additions and 3 deletions

View File

@ -0,0 +1,20 @@
package cn.axzo.im.center.api.feign;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.vo.req.RevokeMessageRequest;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import javax.validation.Valid;
/**
* @author yanglin
*/
@FeignClient(name = "im-center", url = "${axzo.service.im-center:http://im-center:8080}")
public interface MessageTool {
@PostMapping("/api/im/message/tool/revoke")
ApiResult<String> revoke(@RequestBody @Valid RevokeMessageRequest request);
}

View File

@ -0,0 +1,33 @@
package cn.axzo.im.center.api.vo.req;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.Set;
/**
* @author yanglin
*/
@Setter
@Getter
public class RevokeMessageRequest {
private Set<String> bizIds;
private String pattern;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@NotNull(message = "since不能为空")
private Date since;
private int parallelism = 25;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,34 @@
package cn.axzo.im.controller;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.im.center.api.feign.MessageTool;
import cn.axzo.im.center.api.vo.req.RevokeMessageRequest;
import cn.axzo.im.updatable.RevokeService;
import cn.axzo.im.utils.BizAssertions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yanglin
*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class MessageToolController implements MessageTool {
private final RevokeService revokeService;
@Override
public ApiResult<String> revoke(RevokeMessageRequest request) {
log.info("Revoke message request:{}", request);
BizAssertions.assertTrue(
CollectionUtils.isNotEmpty(request.getBizIds())
|| StringUtils.isNotBlank(request.getPattern()),
"bizIds and pattern can't be blank at the same time");
return ApiResult.ok(revokeService.revoke(request));
}
}

View File

@ -32,7 +32,7 @@ import java.util.Optional;
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class MessageHistory implements Serializable {
public class MessageHistory implements Serializable, NimMessageHistory {
private static final long serialVersionUID = 1L;

View File

@ -30,7 +30,7 @@ import java.util.Date;
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class MessageHistoryCold implements Serializable {
public class MessageHistoryCold implements Serializable, NimMessageHistory {
private static final long serialVersionUID = 1L;

View File

@ -0,0 +1,14 @@
package cn.axzo.im.entity;
/**
* @author yanglin
*/
public interface NimMessageHistory {
String getMessageId();
String getFromAccount();
String getToAccount();
}

View File

@ -0,0 +1,123 @@
package cn.axzo.im.updatable;
import cn.axzo.im.center.api.vo.req.RevokeMessageRequest;
import cn.axzo.im.channel.netease.client.NimClient;
import cn.axzo.im.dao.repository.MessageHistoryColdDao;
import cn.axzo.im.dao.repository.MessageHistoryDao;
import cn.axzo.im.entity.MessageHistory;
import cn.axzo.im.entity.MessageHistoryCold;
import cn.axzo.im.entity.NimMessageHistory;
import cn.axzo.im.utils.RecordCursor;
import cn.axzo.im.utils.ThrowableConsumer;
import com.google.common.util.concurrent.RateLimiter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author yanglin
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class RevokeService {
private final NimClient nimClient;
private final MessageHistoryDao messageHistoryDao;
private final MessageHistoryColdDao messageHistoryColdDao;
private volatile boolean isRunning = false;
public String revoke(RevokeMessageRequest request) {
if (isRunning)
return "revoke is running...";
synchronized (this) {
if (isRunning)
return "revoke is running...";
isRunning = true;
}
try {
log.info("revoke start...");
revokeImpl(request);
log.info("revoke done normally...");
} catch (Exception e) {
log.error("revoke failed", e);
return "revoke failed: " + e.getMessage();
} finally {
log.info("revoke done...");
synchronized (this) {
isRunning = false;
}
}
return "done...";
}
private void revokeImpl(RevokeMessageRequest request) throws Exception {
int threadSize = request.getParallelism() + 3;
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadSize, threadSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
RateLimiter rateLimiter = RateLimiter.create(request.getParallelism());
ThrowableConsumer<RecordCursor<? extends NimMessageHistory>> runner = cursor -> {
for (List<? extends NimMessageHistory> histories : cursor) {
for (NimMessageHistory history : histories) {
if (StringUtils.isBlank(history.getMessageId()))
continue;
executor.submit(() -> {
try {
rateLimiter.acquire();
revoke(history);
} catch (Exception e) {
log.error("revoke failed", e);
}
});
}
}
};
runner.accept(hotCursor(request));
log.info("hot cursor submitted...");
runner.accept(coldCursor(request));
log.info("cold cursor submitted...");
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
}
private void revoke(NimMessageHistory history) {
cn.axzo.im.channel.netease.dto.RevokeMessageRequest nimRequest =
new cn.axzo.im.channel.netease.dto.RevokeMessageRequest();
nimRequest.setMessageId(history.getMessageId());
nimRequest.setFrom(history.getFromAccount());
nimRequest.setTo(history.getToAccount());
NimClient.CodeResponse resp = nimClient.revoke(nimRequest);
log.info("revoke messageId={}, code={}, desc={}", history.getMessageId(), resp.getCode(), resp.getDesc());
}
private RecordCursor<? extends NimMessageHistory> hotCursor(RevokeMessageRequest request) {
return new RecordCursor<>(MessageHistory::getId, () -> messageHistoryDao.lambdaQuery()
.select(MessageHistory::getMessageId,
MessageHistory::getFromAccount,
MessageHistory::getToAccount)
.in(CollectionUtils.isNotEmpty(request.getBizIds()), MessageHistory::getBizId, request.getBizIds())
.likeRight(StringUtils.isNoneBlank(request.getPattern()), MessageHistory::getBizId, request.getPattern()));
}
private RecordCursor<? extends NimMessageHistory> coldCursor(RevokeMessageRequest request) {
return new RecordCursor<>(MessageHistoryCold::getId, () -> messageHistoryColdDao.lambdaQuery()
.select(MessageHistoryCold::getMessageId,
MessageHistoryCold::getFromAccount,
MessageHistoryCold::getToAccount)
.in(CollectionUtils.isNotEmpty(request.getBizIds()), MessageHistoryCold::getBizId, request.getBizIds())
.likeRight(StringUtils.isNoneBlank(request.getPattern()), MessageHistoryCold::getBizId, request.getPattern()));
}
}

View File

@ -32,7 +32,7 @@ public class ImProperties {
private int messageUpdateAckMaxRetryCount = 3;
private int messageUpdateAckRetryIntervalSeconds = 10;
private int messageUpdateAckRetryIntervalSeconds = 15;
/**
* 用于避免大事务以及死锁

View File

@ -0,0 +1,73 @@
package cn.axzo.im.utils;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import javax.validation.constraints.NotNull;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
/**
* @author yanglin
*/
public class RecordCursor<T> implements Iterable<List<T>> {
private final int batchSize;
private final SFunction<T, Long> idGetter;
private final Supplier<LambdaQueryChainWrapper<T>> query;
public RecordCursor(SFunction<T, Long> idGetter,
Supplier<LambdaQueryChainWrapper<T>> query) {
this(1000, idGetter, query);
}
public RecordCursor(int batchSize,
SFunction<T, Long> idGetter,
Supplier<LambdaQueryChainWrapper<T>> query) {
this.batchSize = batchSize;
this.idGetter = idGetter;
this.query = query;
}
@Override @NotNull
public Iterator<List<T>> iterator() {
return new Iterator<List<T>>() {
long maxId = 0;
List<T> batch;
Boolean batchConsumed;
@Override
public boolean hasNext() {
if (batchConsumed == null || batchConsumed) {
batch = nextBatch();
batchConsumed = false;
}
return !batch.isEmpty();
}
@Override
public List<T> next() {
if (!hasNext())
throw new NoSuchElementException();
batchConsumed = true;
return batch;
}
private List<T> nextBatch() {
List<T> cards = query.get()
.gt(idGetter, maxId)
.orderByAsc(idGetter)
.last("LIMIT " + batchSize)
.list();
if (!cards.isEmpty()) {
T maxIdRecord = cards.get(cards.size() - 1);
maxId = idGetter.apply(maxIdRecord);
}
return cards;
}
};
}
}

View File

@ -0,0 +1,10 @@
package cn.axzo.im.utils;
/**
* @author yanglin
*/
public interface ThrowableConsumer<T> {
void accept(T t) throws Exception;
}