From fea19159db768f152ecd16939f82349886015776 Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Thu, 21 Nov 2024 09:26:03 +0800 Subject: [PATCH] =?UTF-8?q?feat(REQ-3004)=20-=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AE=A1=E6=89=B9=E5=AE=9E=E4=BE=8B=E6=95=B0=E6=8D=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=20ES=20=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/conf/SupportRefreshProperties.java | 3 + .../SelfBoradcastRocketConfiguration.java | 18 +++--- .../ElasticSearchBatchSyncListener.java | 57 +++++++++++++++++++ 3 files changed, 69 insertions(+), 9 deletions(-) create mode 100644 workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java diff --git a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java index 09f4e3e26..e087425a4 100644 --- a/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java +++ b/workflow-engine-core/src/main/java/cn/axzo/workflow/core/conf/SupportRefreshProperties.java @@ -74,4 +74,7 @@ public class SupportRefreshProperties { */ @Value("${workflow.useNewToAdminApi:false}") private Boolean useNewToAdminApi; + + @Value("${workflow.esSyncBatchSize:10}") + private Integer esSyncBatchSize; } diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/SelfBoradcastRocketConfiguration.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/SelfBoradcastRocketConfiguration.java index 61e746101..77969ded1 100644 --- a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/SelfBoradcastRocketConfiguration.java +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/SelfBoradcastRocketConfiguration.java @@ -18,18 +18,18 @@ import javax.annotation.Resource; * @author wangli * @since 2024-11-06 15:05 */ -@Configuration(proxyBeanMethods = false) +//@Configuration(proxyBeanMethods = false) public class SelfBoradcastRocketConfiguration { public static final String DEFAULT_EVENT = "topic_workflow_engine_"; - @Component - @ConditionalOnProperty(name = "rocketmq.name-server") - @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}", - consumerGroup = "GID_${spring.application.name}_workflow_engine_${spring.profiles.active}_consumer", - consumeMode = ConsumeMode.ORDERLY, - maxReconsumeTimes = 3, - nameServer = "${rocketmq.name-server}" - ) +// @Component +// @ConditionalOnProperty(name = "rocketmq.name-server") +// @RocketMQMessageListener(topic = DEFAULT_EVENT + "${spring.profiles.active}", +// consumerGroup = "GID_${spring.application.name}_workflow_engine_${spring.profiles.active}_consumer", +// consumeMode = ConsumeMode.ORDERLY, +// maxReconsumeTimes = 3, +// nameServer = "${rocketmq.name-server}" +// ) public static class WorkflowEngineBroadcastConsumer extends BaseListener implements RocketMQListener { @Resource private EventConsumer eventConsumer; diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java new file mode 100644 index 000000000..8e80a01e5 --- /dev/null +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/mq/inside/consumer/ElasticSearchBatchSyncListener.java @@ -0,0 +1,57 @@ +package cn.axzo.workflow.server.mq.inside.consumer; + +import cn.axzo.workflow.core.conf.SupportRefreshProperties; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; + +/** + * 批量同步 ES + * + * @author wangli + * @since 2024-11-20 18:06 + */ +@Component +@Slf4j +public class ElasticSearchBatchSyncListener { + @Resource + private SupportRefreshProperties refreshProperties; + @Resource + private RocketMQProperties rocketMQProperties; + @Value("GID_${spring.application.name}_workflow_engine_${spring.profiles.active}_consumer") + private String consumerGroup; + @Value("topic_workflow_engine_${spring.profiles.active}") + private String topic; + @SneakyThrows + @PostConstruct + public void init() { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); + consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.subscribe(topic, "*"); + consumer.setConsumeMessageBatchMaxSize(refreshProperties.getEsSyncBatchSize()); + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + log.warn("拉取消息条数:{}", msgs.size()); + try { + for (MessageExt msg : msgs) { + String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8); + } + } catch (Exception e) { + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); +// consumer.start(); + } +}