REQ-3201: 重新发送消息

This commit is contained in:
yanglin 2024-12-24 15:22:17 +08:00
parent 16f0179538
commit 9ce69db978
5 changed files with 90 additions and 66 deletions

View File

@ -35,7 +35,6 @@ import cn.axzo.msg.center.notices.manager.api.dto.request.plat.CreateTemplateReq
import cn.axzo.msg.center.notices.manager.api.dto.request.plat.TingyunInterfaceListRequest;
import cn.axzo.msg.center.notices.service.api.PlatService;
import cn.axzo.msg.center.service.enums.MqMessageType;
import cn.axzo.msg.center.service.pending.request.CardUpdateRequest;
import cn.axzo.msg.center.service.pending.request.ResendMessageRequest;
import cn.axzo.msg.center.service.pending.request.RevokeByTemplateCodeRequest;
import cn.axzo.trade.web.annotation.EnableResponseAdvice;
@ -254,7 +253,7 @@ public class PrivateMessageController {
@PostMapping("/resendCards")
@EnableResponseAdvice(enable = false)
public Object resendCards(@RequestBody @Valid ResendMessageRequest request) {
public Object resendCards(@RequestBody @Valid ResendMessageRequest request) throws Exception {
cardManager.resend(request);
return "done...";
}

View File

@ -36,6 +36,7 @@ import cn.axzo.msg.center.service.pending.request.CardUpdateStateRequest;
import cn.axzo.msg.center.service.pending.request.ResendMessageRequest;
import cn.axzo.msg.center.service.pending.request.SetActionPerformedRequest;
import cn.axzo.msg.center.service.pending.response.CardSendResponse;
import cn.axzo.msg.center.utils.BatchController;
import cn.axzo.msg.center.utils.RecordCursor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -92,7 +93,7 @@ public class CardManager {
sendLogger.reloadAndLogCards("send:enqueue");
});
PushDeviceSnapshots deviceSnapshots = pushDeviceService.createDeviceSnapshots();
CardSendExecutor groupExecutor = new CardSendExecutor(request, executor);
BatchController groupExecutor = new BatchController("sendCard", request, executor);
for (CardGroup group : sendModel.getCardGroups()) {
groupExecutor.submit(() -> {
SendTemplateMessageParam imRequest = cardSupport.buildImSendRequest(
@ -208,14 +209,18 @@ public class CardManager {
return result;
}
public void resend(ResendMessageRequest request) {
public void resend(ResendMessageRequest request) throws Exception {
TemplateModelV3 templateModel = cardSupport.ensureImChannelPresent(request.getTemplateCode());
BizAssertions.assertNotNull(templateModel, "找不到对应的模板, templateCode={}", request.getTemplateCode());
BatchController resendExecutor = new BatchController("resendCard", request, executor);
for (List<Card> cards : cardsCursor(request)) {
if (request.isRebuildContent())
rebuildCardContent(templateModel, cards);
updateMessages(cards);
resendExecutor.submit(() -> {
if (request.isRebuildContent())
rebuildCardContent(templateModel, cards);
updateMessages(cards);
});
}
resendExecutor.awaitTermination(10, TimeUnit.MINUTES);
}
RecordCursor<Card> cardsCursor(CardUpdateRequest request) {

View File

@ -1,58 +0,0 @@
package cn.axzo.msg.center.message.service.card;
import cn.axzo.msg.center.utils.AsyncRunTasks;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author yanglin
*/
@Slf4j
class CardSendExecutor {
private final AsyncRunTasks asyncTasks;
private final Object request;
private final AtomicReference<Exception> error = new AtomicReference<>();
private int submitCount = 0;
CardSendExecutor(Object request, ExecutorService executor) {
this.asyncTasks = new AsyncRunTasks(executor);
this.request = request;
}
void submit(Runnable sender) {
submitCount++;
if (submitCount <= 4) {
log.info("using sync card send, submitBatchSize={}, request={}", submitCount, request);
sender.run();
return;
}
log.info("using async card send, submitBatchSize={}, request={}", submitCount, request);
asyncTasks.runAsync(() -> {
Exception exception = error.get();
if (exception != null) {
log.warn("其它分组发送消息失败, 放弃当前分组, request={}", request);
return;
}
try {
sender.run();
} catch (Exception e) {
log.warn("发送消息失败. request={}", request, e);
error.compareAndSet(null, e);
}
});
}
void awaitTermination(long timeout, TimeUnit unit) throws Exception {
asyncTasks.awaitTermination(timeout, unit);
Exception exception = error.get();
if (exception != null) {
log.warn("发送消息失败, request={}", request, exception);
throw exception;
}
}
}

View File

@ -0,0 +1,78 @@
package cn.axzo.msg.center.utils;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author yanglin
*/
@Slf4j
public class BatchController {
private final String operation;
private final AsyncRunTasks asyncTasks;
private final Object request;
private final AtomicReference<Exception> error = new AtomicReference<>();
private final int parallelismThreshold;
private int submitCount = 0;
public BatchController(String operation, Object request,
ExecutorService executor) {
this(operation, request, executor, 4);
}
public BatchController(String operation, Object request,
ExecutorService executor, int parallelismThreshold) {
this.operation = operation;
this.asyncTasks = new AsyncRunTasks(executor);
this.request = request;
this.parallelismThreshold = parallelismThreshold;
}
public void submit(Runnable task) {
submitCount++;
if (submitCount <= parallelismThreshold) {
info("using sync");
task.run();
return;
}
info(String.format("using async because of parallelismThreshold > %d", parallelismThreshold));
asyncTasks.runAsync(() -> {
Exception exception = error.get();
if (exception != null) {
warn("operation aborted because of other batch's error", exception);
return;
}
try {
task.run();
} catch (Exception e) {
warn("operation failed", e);
error.compareAndSet(null, e);
}
});
}
public void awaitTermination(long timeout, TimeUnit unit) throws Exception {
asyncTasks.awaitTermination(timeout, unit);
Exception exception = error.get();
if (exception != null) {
warn("operation failed", exception);
throw exception;
}
}
private void info(String message) {
log.info("{}. operation={}, submitBatchSize={}, request={}",
message, operation, submitCount, JSON.toJSONString(request));
}
private void warn(String message, Exception e) {
log.info("{}. operation={}, submitBatchSize={}, request={}",
message, operation, submitCount, JSON.toJSONString(request), e);
}
}

View File

@ -36,7 +36,7 @@ public class RecordCursor<T> implements Iterable<List<T>> {
return new Iterator<List<T>>() {
long maxId = 0;
List<T> batch;
volatile List<T> batch;
Boolean batchConsumed;
@Override