From 4e92024d56e5d95dd6e5952c3468075c329e8f40 Mon Sep 17 00:00:00 2001 From: yanglin Date: Fri, 11 Oct 2024 16:33:18 +0800 Subject: [PATCH] =?UTF-8?q?REQ-2752:=20=E5=88=B7=E5=AE=A1=E6=89=B9?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E6=8C=89=E9=92=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WorkflowButtonSyncClientController.java | 60 +++++++++++++++---- .../axzo/msg/center/utils/AsyncRunTasks.java | 31 ++++++++++ .../msg/center/utils/AsyncSupplyTasks.java | 34 +++++++++++ .../cn/axzo/msg/center/utils/AsyncTasks.java | 49 +++++++++++++++ .../axzo/msg/center/utils/FutureWrapper.java | 39 ++++++++++++ .../client/WorkflowButtonSyncClient.java | 2 +- .../request/WorkflowSyncButtonsRequest.java | 5 ++ 7 files changed, 206 insertions(+), 14 deletions(-) create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncRunTasks.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncSupplyTasks.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncTasks.java create mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/utils/FutureWrapper.java diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/WorkflowButtonSyncClientController.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/WorkflowButtonSyncClientController.java index 82533cd6..e4697faf 100644 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/WorkflowButtonSyncClientController.java +++ b/inside-notices/src/main/java/cn/axzo/msg/center/message/controller/WorkflowButtonSyncClientController.java @@ -1,12 +1,16 @@ package cn.axzo.msg.center.message.controller; import cn.axzo.msg.center.dal.MessageTemplateButtonV3Dao; +import cn.axzo.msg.center.dal.MessageTemplateV3Dao; import cn.axzo.msg.center.domain.entity.MessageTemplateButtonV3; +import cn.axzo.msg.center.domain.entity.MessageTemplateV3; import cn.axzo.msg.center.service.domain.UrlConfig; +import cn.axzo.msg.center.service.enums.MessageCategoryEnum; import cn.axzo.msg.center.service.enums.RouterCategoryEnum; import cn.axzo.msg.center.service.pending.client.WorkflowButtonSyncClient; import cn.axzo.msg.center.service.pending.request.WorkflowSyncButtonsRequest; import cn.axzo.msg.center.service.pending.request.WorkflowSyncButtonsRequest.WorkflowButton; +import cn.axzo.msg.center.utils.AsyncRunTasks; import cn.azxo.framework.common.model.CommonResponse; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -14,6 +18,8 @@ import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toMap; @@ -26,31 +32,59 @@ import static java.util.stream.Collectors.toMap; @RequiredArgsConstructor public class WorkflowButtonSyncClientController implements WorkflowButtonSyncClient { + private final MessageTemplateV3Dao messageTemplateV3Dao; private final MessageTemplateButtonV3Dao messageTemplateButtonV3Dao; @Override - public CommonResponse syncButtons(WorkflowSyncButtonsRequest request) { + public CommonResponse syncButtons(WorkflowSyncButtonsRequest request) throws Exception { + Map code2Template = messageTemplateV3Dao.lambdaQuery() + .list().stream() + .collect(toMap(MessageTemplateV3::getCode, identity())); Map code2FlowButton = request.getButtons().stream() .collect(toMap(WorkflowButton::getCode, identity(), (oldValue, newValue) -> oldValue)); List buttons = messageTemplateButtonV3Dao.lambdaQuery().list(); + + int nThreads = 10; + AsyncRunTasks tasks = new AsyncRunTasks(Executors.newFixedThreadPool(nThreads)); + Semaphore semaphore = new Semaphore(nThreads); for (MessageTemplateButtonV3 button : buttons) { - WorkflowButton workflowButton = code2FlowButton.get(button.getCode()); - if (workflowButton == null) continue; - MessageTemplateButtonV3 update = new MessageTemplateButtonV3(); - update.setId(button.getId()); - if (button.getCategory() == RouterCategoryEnum.JUMP) { - UrlConfig urlConfig = new UrlConfig(); - urlConfig.applyUrlAsDefaults(workflowButton.getUrl()); - update.setUrlConfig(urlConfig); - } else { - update.setApiUrl(workflowButton.getUrl()); - } - messageTemplateButtonV3Dao.updateById(update); + semaphore.acquire(); + tasks.runAsync(() -> { + try { + sync(code2Template, code2FlowButton, button); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + semaphore.release(); + } + }); } + tasks.awaitTermination(); return CommonResponse.success(); } + private void sync(Map code2Template, + Map code2FlowButton, + MessageTemplateButtonV3 button) { + MessageTemplateV3 template = code2Template.get(button.getTemplateCode()); + if (template.getMsgCategory() != MessageCategoryEnum.APPROVAL_PENDING_MESSAGE) + return; + + WorkflowButton workflowButton = code2FlowButton.get(button.getCode()); + if (workflowButton == null) return; + MessageTemplateButtonV3 update = new MessageTemplateButtonV3(); + update.setId(button.getId()); + if (button.getCategory() == RouterCategoryEnum.JUMP) { + UrlConfig urlConfig = new UrlConfig(); + urlConfig.applyUrlAsDefaults(workflowButton.getUrl()); + update.setUrlConfig(urlConfig); + } else { + update.setApiUrl(workflowButton.getUrl()); + } + messageTemplateButtonV3Dao.updateById(update); + } + } \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncRunTasks.java b/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncRunTasks.java new file mode 100644 index 00000000..10876ec7 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncRunTasks.java @@ -0,0 +1,31 @@ +package cn.axzo.msg.center.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +/** + * @author yanglin + */ +public class AsyncRunTasks extends AsyncTasks { + + private final ExecutorService executor; + + /** + * use common pool + */ + public AsyncRunTasks() { + this(null); + } + + public AsyncRunTasks(ExecutorService executor) { + this.executor = executor; + } + + public void runAsync(Runnable runnable) { + CompletableFuture future = executor == null + ? CompletableFuture.runAsync(runnable) + : CompletableFuture.runAsync(runnable, executor); + add(future); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncSupplyTasks.java b/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncSupplyTasks.java new file mode 100644 index 00000000..68143451 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncSupplyTasks.java @@ -0,0 +1,34 @@ +package cn.axzo.msg.center.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +/** + * @author yanglin + */ +public class AsyncSupplyTasks extends AsyncTasks { + + private final ExecutorService executor; + + /** + * use common pool + */ + public AsyncSupplyTasks() { + this(null); + } + + public AsyncSupplyTasks(ExecutorService executor) { + this.executor = executor; + } + + public FutureWrapper supplyAsync(Supplier supplier) { + CompletableFuture future = executor == null + ? CompletableFuture.supplyAsync(supplier) + : CompletableFuture.supplyAsync(supplier, executor); + //noinspection unchecked + add((CompletableFuture) future); + return new FutureWrapper<>(future); + } + +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncTasks.java b/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncTasks.java new file mode 100644 index 00000000..dc7d8cb7 --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/utils/AsyncTasks.java @@ -0,0 +1,49 @@ +package cn.axzo.msg.center.utils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author yanglin + */ +public class AsyncTasks { + + private final List> futures = Collections.synchronizedList(new ArrayList<>()); + private final boolean removeFutureWhenComplete; + + public AsyncTasks() { + this(true); + } + + AsyncTasks(boolean removeFutureWhenComplete) { + this.removeFutureWhenComplete = removeFutureWhenComplete; + } + + public void add(CompletableFuture future) { + futures.add(future); + if (removeFutureWhenComplete) + future.whenComplete((unused, throwable) -> futures.remove(future)); + } + + CompletableFuture getFuture(int index) { + return futures.get(index); + } + + public void awaitTermination() { + collectAll().join(); + } + + public void awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + collectAll().get(timeout, unit); + } + + private CompletableFuture collectAll() { + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + } +} \ No newline at end of file diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/utils/FutureWrapper.java b/inside-notices/src/main/java/cn/axzo/msg/center/utils/FutureWrapper.java new file mode 100644 index 00000000..5ca9d08f --- /dev/null +++ b/inside-notices/src/main/java/cn/axzo/msg/center/utils/FutureWrapper.java @@ -0,0 +1,39 @@ +package cn.axzo.msg.center.utils; + +import cn.axzo.basics.common.exception.ServiceException; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author yanglin + */ +@Slf4j +@RequiredArgsConstructor(access = AccessLevel.PACKAGE) +public class FutureWrapper { + + private final CompletableFuture future; + + public T get() { + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + log.error("执行异常", e); + throw new ServiceException("执行异常"); + } + } + + public T get(long timeout, TimeUnit unit) { + try { + return future.get(timeout, unit); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("执行异常", e); + throw new ServiceException("执行异常"); + } + } +} \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/client/WorkflowButtonSyncClient.java b/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/client/WorkflowButtonSyncClient.java index 6e5185d3..52ef8303 100644 --- a/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/client/WorkflowButtonSyncClient.java +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/client/WorkflowButtonSyncClient.java @@ -18,6 +18,6 @@ import javax.validation.Valid; public interface WorkflowButtonSyncClient { @PostMapping(value = "/pending-message/workflow/syncButtons", produces = {MediaType.APPLICATION_JSON_VALUE}) - CommonResponse syncButtons(@RequestBody @Valid WorkflowSyncButtonsRequest request); + CommonResponse syncButtons(@RequestBody @Valid WorkflowSyncButtonsRequest request) throws Exception; } \ No newline at end of file diff --git a/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/WorkflowSyncButtonsRequest.java b/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/WorkflowSyncButtonsRequest.java index a1789ca4..0edc5fac 100644 --- a/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/WorkflowSyncButtonsRequest.java +++ b/msg-center-api/src/main/java/cn/axzo/msg/center/service/pending/request/WorkflowSyncButtonsRequest.java @@ -23,6 +23,11 @@ public class WorkflowSyncButtonsRequest { private String code; private RouterCategoryEnum category; private String url; + + @Override + public String toString() { + return code; + } } } \ No newline at end of file