From fefbbac8c979eb4182f8ee9771d5e4b56215f046 Mon Sep 17 00:00:00 2001 From: wangli <274027703@qq.com> Date: Thu, 6 Jun 2024 15:35:06 +0800 Subject: [PATCH] =?UTF-8?q?update(REQ-2324)=20-=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E4=BD=BF=E7=94=A8=20Undertow=20=E9=85=8D?= =?UTF-8?q?=E5=90=88=20Nacos=20=E7=9A=84=E6=B3=A8=E5=86=8C=E4=B8=AD?= =?UTF-8?q?=E5=BF=83=E6=97=B6=EF=BC=8C=E5=81=9C=E6=AD=A2=E5=BA=94=E7=94=A8?= =?UTF-8?q?=E4=BC=9A=E5=AF=BC=E8=87=B4=E4=B8=80=E4=BA=9B=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E7=9A=84=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/common/hook/NacosShutdownHook.java | 26 +++ .../cloud/nacos/discovery/NacosWatch.java | 191 ++++++++++++++++++ .../client/naming/core/PushReceiver.java | 140 +++++++++++++ 3 files changed, 357 insertions(+) create mode 100644 workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/hook/NacosShutdownHook.java create mode 100644 workflow-engine-server/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java create mode 100644 workflow-engine-server/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java diff --git a/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/hook/NacosShutdownHook.java b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/hook/NacosShutdownHook.java new file mode 100644 index 000000000..b2eaa31a8 --- /dev/null +++ b/workflow-engine-server/src/main/java/cn/axzo/workflow/server/common/hook/NacosShutdownHook.java @@ -0,0 +1,26 @@ +package cn.axzo.workflow.server.common.hook; + +import org.springframework.context.SmartLifecycle; + +/** + * TODO + * + * @author wangli + * @since 2024/6/6 14:35 + */ +public class NacosShutdownHook implements SmartLifecycle { + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public boolean isRunning() { + return false; + } +} diff --git a/workflow-engine-server/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java b/workflow-engine-server/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java new file mode 100644 index 000000000..bfeeb41ed --- /dev/null +++ b/workflow-engine-server/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java @@ -0,0 +1,191 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.nacos.discovery; + +import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.cloud.nacos.NacosServiceManager; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; +import com.alibaba.nacos.api.naming.pojo.Instance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.cloud.client.discovery.event.HeartbeatEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.context.SmartLifecycle; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author xiaojing + * @author yuhuangbin + */ +public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle { + + private static final Logger log = LoggerFactory.getLogger(NacosWatch.class); + + private Map listenerMap = new ConcurrentHashMap<>(16); + + private final AtomicBoolean running = new AtomicBoolean(false); + + private final AtomicLong nacosWatchIndex = new AtomicLong(0); + + private ApplicationEventPublisher publisher; + + private ScheduledFuture watchFuture; + + private NacosServiceManager nacosServiceManager; + + private final NacosDiscoveryProperties properties; + + private final ThreadPoolTaskScheduler taskScheduler; + + public NacosWatch(NacosServiceManager nacosServiceManager, + NacosDiscoveryProperties properties, + ObjectProvider taskScheduler) { + this.nacosServiceManager = nacosServiceManager; + this.properties = properties; + this.taskScheduler = taskScheduler.stream().findAny() + .orElseGet(NacosWatch::getTaskScheduler); + } + + private static ThreadPoolTaskScheduler getTaskScheduler() { + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler"); + taskScheduler.initialize(); + return taskScheduler; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { + this.publisher = publisher; + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public void stop(Runnable callback) { + this.stop(); + callback.run(); + } + + @Override + public void start() { + if (this.running.compareAndSet(false, true)) { + EventListener eventListener = listenerMap.computeIfAbsent(buildKey(), + event -> new EventListener() { + @Override + public void onEvent(Event event) { + if (event instanceof NamingEvent) { + List instances = ((NamingEvent) event) + .getInstances(); + Optional instanceOptional = selectCurrentInstance( + instances); + instanceOptional.ifPresent(currentInstance -> { + resetIfNeeded(currentInstance); + }); + } + } + }); + + NamingService namingService = nacosServiceManager + .getNamingService(properties.getNacosProperties()); + try { + namingService.subscribe(properties.getService(), properties.getGroup(), + Arrays.asList(properties.getClusterName()), eventListener); + } catch (Exception e) { + log.error("namingService subscribe failed, properties:{}", properties, e); + } + + this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( + this::nacosServicesWatch, this.properties.getWatchDelay()); + } + } + + private String buildKey() { + return String.join(":", properties.getService(), properties.getGroup()); + } + + private void resetIfNeeded(Instance instance) { + if (!properties.getMetadata().equals(instance.getMetadata())) { + properties.setMetadata(instance.getMetadata()); + } + } + + private Optional selectCurrentInstance(List instances) { + return instances.stream() + .filter(instance -> properties.getIp().equals(instance.getIp()) + && properties.getPort() == instance.getPort()) + .findFirst(); + } + + @Override + public void stop() { + if (this.running.compareAndSet(true, false)) { + if (this.watchFuture != null) { + // shutdown current user-thread, + // then the other daemon-threads will terminate automatic. + this.taskScheduler.shutdown(); + this.watchFuture.cancel(true); + } + + EventListener eventListener = listenerMap.get(buildKey()); + try { + NamingService namingService = nacosServiceManager + .getNamingService(properties.getNacosProperties()); + namingService.unsubscribe(properties.getService(), properties.getGroup(), + Arrays.asList(properties.getClusterName()), eventListener); + } catch (Exception e) { + log.error("namingService unsubscribe failed, properties:{}", properties, + e); + } + } + } + + @Override + public boolean isRunning() { + return this.running.get(); + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE - 1; + } + + public void nacosServicesWatch() { + + // nacos doesn't support watch now , publish an event every 30 seconds. + this.publisher.publishEvent( + new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); + + } + +} diff --git a/workflow-engine-server/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java b/workflow-engine-server/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java new file mode 100644 index 000000000..dd9e4e795 --- /dev/null +++ b/workflow-engine-server/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java @@ -0,0 +1,140 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.core; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.lifecycle.Closeable; +import com.alibaba.nacos.common.utils.IoUtils; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.common.utils.ThreadUtils; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.nio.charset.Charset; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER; + +/** + * Push receiver. + * + * @author xuanyin + */ +public class PushReceiver implements Runnable, Closeable { + + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + private static final int UDP_MSS = 64 * 1024; + + private ScheduledExecutorService executorService; + + private DatagramSocket udpSocket; + + private HostReactor hostReactor; + + private volatile boolean closed = false; + + public PushReceiver(HostReactor hostReactor) { + try { + this.hostReactor = hostReactor; + this.udpSocket = new DatagramSocket(); + this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("com.alibaba.nacos.naming.push.receiver"); + return thread; + } + }); + + this.executorService.execute(this); + } catch (Exception e) { + NAMING_LOGGER.error("[NA] init udp socket failed", e); + } + } + + @Override + public void run() { + while (!closed) { + try { + + // byte[] is initialized with 0 full filled by default + byte[] buffer = new byte[UDP_MSS]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + + udpSocket.receive(packet); + + String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); + NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); + + PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); + String ack; + if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { + hostReactor.processServiceJson(pushPacket.data); + + // send ack to server + ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + + "\"\"}"; + } else if ("dump".equals(pushPacket.type)) { + // dump data to server + ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) + + "\"}"; + } else { + // do nothing send ack only + ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + + "\", \"data\":" + "\"\"}"; + } + + udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, + packet.getSocketAddress())); + } catch (Exception e) { + if (closed) { + return; + } + NAMING_LOGGER.error("[NA] error while receiving push data", e); + } + } + } + + @Override + public void shutdown() throws NacosException { + String className = this.getClass().getName(); + NAMING_LOGGER.info("{} do shutdown begin", className); + ThreadUtils.shutdownThreadPool(executorService, NAMING_LOGGER); + closed = true; + udpSocket.close(); + NAMING_LOGGER.info("{} do shutdown stop", className); + } + + public static class PushPacket { + + public String type; + + public long lastRefTime; + + public String data; + } + + public int getUdpPort() { + return this.udpSocket.getLocalPort(); + } +}