From 9ce69db978a9152ee275b03b9893b0ccba33d010 Mon Sep 17 00:00:00 2001 From: yanglin Date: Tue, 24 Dec 2024 15:22:17 +0800 Subject: [PATCH] =?UTF-8?q?REQ-3201:=20=E9=87=8D=E6=96=B0=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/PrivateMessageController.java | 3 +- .../message/service/card/CardManager.java | 15 ++-- .../service/card/CardSendExecutor.java | 58 -------------- .../msg/center/utils/BatchController.java | 78 +++++++++++++++++++ .../axzo/msg/center/utils/RecordCursor.java | 2 +- 5 files changed, 90 insertions(+), 66 deletions(-) delete mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/utils/BatchController.java diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/PrivateMessageController.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/PrivateMessageController.java index 9c536336..c0394224 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/PrivateMessageController.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/PrivateMessageController.java @@ -35,7 +35,6 @@ import cn.axzo.msg.center.notices.manager.api.dto.request.plat.CreateTemplateReq import cn.axzo.msg.center.notices.manager.api.dto.request.plat.TingyunInterfaceListRequest; import cn.axzo.msg.center.notices.service.api.PlatService; import cn.axzo.msg.center.service.enums.MqMessageType; -import cn.axzo.msg.center.service.pending.request.CardUpdateRequest; import cn.axzo.msg.center.service.pending.request.ResendMessageRequest; import cn.axzo.msg.center.service.pending.request.RevokeByTemplateCodeRequest; import cn.axzo.trade.web.annotation.EnableResponseAdvice; @@ -254,7 +253,7 @@ public class PrivateMessageController { @PostMapping("/resendCards") @EnableResponseAdvice(enable = false) - public Object resendCards(@RequestBody @Valid ResendMessageRequest request) { + public Object resendCards(@RequestBody @Valid ResendMessageRequest request) throws Exception { cardManager.resend(request); return "done..."; } diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java index 8c379b1d..9e62ea77 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardManager.java @@ -36,6 +36,7 @@ import cn.axzo.msg.center.service.pending.request.CardUpdateStateRequest; import cn.axzo.msg.center.service.pending.request.ResendMessageRequest; import cn.axzo.msg.center.service.pending.request.SetActionPerformedRequest; import cn.axzo.msg.center.service.pending.response.CardSendResponse; +import cn.axzo.msg.center.utils.BatchController; import cn.axzo.msg.center.utils.RecordCursor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -92,7 +93,7 @@ public class CardManager { sendLogger.reloadAndLogCards("send:enqueue"); }); PushDeviceSnapshots deviceSnapshots = pushDeviceService.createDeviceSnapshots(); - CardSendExecutor groupExecutor = new CardSendExecutor(request, executor); + BatchController groupExecutor = new BatchController("sendCard", request, executor); for (CardGroup group : sendModel.getCardGroups()) { groupExecutor.submit(() -> { SendTemplateMessageParam imRequest = cardSupport.buildImSendRequest( @@ -208,14 +209,18 @@ public class CardManager { return result; } - public void resend(ResendMessageRequest request) { + public void resend(ResendMessageRequest request) throws Exception { TemplateModelV3 templateModel = cardSupport.ensureImChannelPresent(request.getTemplateCode()); BizAssertions.assertNotNull(templateModel, "找不到对应的模板, templateCode={}", request.getTemplateCode()); + BatchController resendExecutor = new BatchController("resendCard", request, executor); for (List cards : cardsCursor(request)) { - if (request.isRebuildContent()) - rebuildCardContent(templateModel, cards); - updateMessages(cards); + resendExecutor.submit(() -> { + if (request.isRebuildContent()) + rebuildCardContent(templateModel, cards); + updateMessages(cards); + }); } + resendExecutor.awaitTermination(10, TimeUnit.MINUTES); } RecordCursor cardsCursor(CardUpdateRequest request) { diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java deleted file mode 100644 index 4ef3ca2a..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/service/card/CardSendExecutor.java +++ /dev/null @@ -1,58 +0,0 @@ -package cn.axzo.msg.center.message.service.card; - -import cn.axzo.msg.center.utils.AsyncRunTasks; -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * @author yanglin - */ -@Slf4j -class CardSendExecutor { - - private final AsyncRunTasks asyncTasks; - private final Object request; - private final AtomicReference error = new AtomicReference<>(); - private int submitCount = 0; - - CardSendExecutor(Object request, ExecutorService executor) { - this.asyncTasks = new AsyncRunTasks(executor); - this.request = request; - } - - void submit(Runnable sender) { - submitCount++; - if (submitCount <= 4) { - log.info("using sync card send, submitBatchSize={}, request={}", submitCount, request); - sender.run(); - return; - } - log.info("using async card send, submitBatchSize={}, request={}", submitCount, request); - asyncTasks.runAsync(() -> { - Exception exception = error.get(); - if (exception != null) { - log.warn("其它分组发送消息失败, 放弃当前分组, request={}", request); - return; - } - try { - sender.run(); - } catch (Exception e) { - log.warn("发送消息失败. request={}", request, e); - error.compareAndSet(null, e); - } - }); - } - - void awaitTermination(long timeout, TimeUnit unit) throws Exception { - asyncTasks.awaitTermination(timeout, unit); - Exception exception = error.get(); - if (exception != null) { - log.warn("发送消息失败, request={}", request, exception); - throw exception; - } - } - -} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/utils/BatchController.java b/inside-notices/src/main/java/cn/axzo/msg/center/utils/BatchController.java new file mode 100644 index 00000000..0fc5a3a7 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/utils/BatchController.java @@ -0,0 +1,78 @@ +package cn.axzo.msg.center.utils; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author yanglin + */ +@Slf4j +public class BatchController { + + private final String operation; + private final AsyncRunTasks asyncTasks; + private final Object request; + private final AtomicReference error = new AtomicReference<>(); + private final int parallelismThreshold; + private int submitCount = 0; + + public BatchController(String operation, Object request, + ExecutorService executor) { + this(operation, request, executor, 4); + } + + public BatchController(String operation, Object request, + ExecutorService executor, int parallelismThreshold) { + this.operation = operation; + this.asyncTasks = new AsyncRunTasks(executor); + this.request = request; + this.parallelismThreshold = parallelismThreshold; + } + + public void submit(Runnable task) { + submitCount++; + if (submitCount <= parallelismThreshold) { + info("using sync"); + task.run(); + return; + } + info(String.format("using async because of parallelismThreshold > %d", parallelismThreshold)); + asyncTasks.runAsync(() -> { + Exception exception = error.get(); + if (exception != null) { + warn("operation aborted because of other batch's error", exception); + return; + } + try { + task.run(); + } catch (Exception e) { + warn("operation failed", e); + error.compareAndSet(null, e); + } + }); + } + + public void awaitTermination(long timeout, TimeUnit unit) throws Exception { + asyncTasks.awaitTermination(timeout, unit); + Exception exception = error.get(); + if (exception != null) { + warn("operation failed", exception); + throw exception; + } + } + + private void info(String message) { + log.info("{}. operation={}, submitBatchSize={}, request={}", + message, operation, submitCount, JSON.toJSONString(request)); + } + + private void warn(String message, Exception e) { + log.info("{}. operation={}, submitBatchSize={}, request={}", + message, operation, submitCount, JSON.toJSONString(request), e); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/utils/RecordCursor.java b/inside-notices/src/main/java/cn/axzo/msg/center/utils/RecordCursor.java index 0f76b51f..9edfc409 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/utils/RecordCursor.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/utils/RecordCursor.java @@ -36,7 +36,7 @@ public class RecordCursor implements Iterable> { return new Iterator>() { long maxId = 0; - List batch; + volatile List batch; Boolean batchConsumed; @Override