update(REQ-2324) - 修复项目使用 Undertow 配合 Nacos 的注册中心时,停止应用会导致一些异常的输出
This commit is contained in:
parent
3f89d77d02
commit
1d1f9f240d
@ -0,0 +1,26 @@
|
|||||||
|
package cn.axzo.workflow.server.common.hook;
|
||||||
|
|
||||||
|
import org.springframework.context.SmartLifecycle;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO
|
||||||
|
*
|
||||||
|
* @author wangli
|
||||||
|
* @since 2024/6/6 14:35
|
||||||
|
*/
|
||||||
|
public class NacosShutdownHook implements SmartLifecycle {
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,191 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2013-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.nacos.discovery;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
|
||||||
|
import com.alibaba.cloud.nacos.NacosServiceManager;
|
||||||
|
import com.alibaba.nacos.api.naming.NamingService;
|
||||||
|
import com.alibaba.nacos.api.naming.listener.Event;
|
||||||
|
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||||
|
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
||||||
|
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.ObjectProvider;
|
||||||
|
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.context.ApplicationEventPublisherAware;
|
||||||
|
import org.springframework.context.SmartLifecycle;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xiaojing
|
||||||
|
* @author yuhuangbin
|
||||||
|
*/
|
||||||
|
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
|
||||||
|
|
||||||
|
private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
|
||||||
|
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
|
||||||
|
|
||||||
|
private ApplicationEventPublisher publisher;
|
||||||
|
|
||||||
|
private ScheduledFuture<?> watchFuture;
|
||||||
|
|
||||||
|
private NacosServiceManager nacosServiceManager;
|
||||||
|
|
||||||
|
private final NacosDiscoveryProperties properties;
|
||||||
|
|
||||||
|
private final ThreadPoolTaskScheduler taskScheduler;
|
||||||
|
|
||||||
|
public NacosWatch(NacosServiceManager nacosServiceManager,
|
||||||
|
NacosDiscoveryProperties properties,
|
||||||
|
ObjectProvider<ThreadPoolTaskScheduler> taskScheduler) {
|
||||||
|
this.nacosServiceManager = nacosServiceManager;
|
||||||
|
this.properties = properties;
|
||||||
|
this.taskScheduler = taskScheduler.stream().findAny()
|
||||||
|
.orElseGet(NacosWatch::getTaskScheduler);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ThreadPoolTaskScheduler getTaskScheduler() {
|
||||||
|
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||||
|
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
|
||||||
|
taskScheduler.initialize();
|
||||||
|
return taskScheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
|
||||||
|
this.publisher = publisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAutoStartup() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(Runnable callback) {
|
||||||
|
this.stop();
|
||||||
|
callback.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
if (this.running.compareAndSet(false, true)) {
|
||||||
|
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
|
||||||
|
event -> new EventListener() {
|
||||||
|
@Override
|
||||||
|
public void onEvent(Event event) {
|
||||||
|
if (event instanceof NamingEvent) {
|
||||||
|
List<Instance> instances = ((NamingEvent) event)
|
||||||
|
.getInstances();
|
||||||
|
Optional<Instance> instanceOptional = selectCurrentInstance(
|
||||||
|
instances);
|
||||||
|
instanceOptional.ifPresent(currentInstance -> {
|
||||||
|
resetIfNeeded(currentInstance);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
NamingService namingService = nacosServiceManager
|
||||||
|
.getNamingService(properties.getNacosProperties());
|
||||||
|
try {
|
||||||
|
namingService.subscribe(properties.getService(), properties.getGroup(),
|
||||||
|
Arrays.asList(properties.getClusterName()), eventListener);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("namingService subscribe failed, properties:{}", properties, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
|
||||||
|
this::nacosServicesWatch, this.properties.getWatchDelay());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String buildKey() {
|
||||||
|
return String.join(":", properties.getService(), properties.getGroup());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetIfNeeded(Instance instance) {
|
||||||
|
if (!properties.getMetadata().equals(instance.getMetadata())) {
|
||||||
|
properties.setMetadata(instance.getMetadata());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<Instance> selectCurrentInstance(List<Instance> instances) {
|
||||||
|
return instances.stream()
|
||||||
|
.filter(instance -> properties.getIp().equals(instance.getIp())
|
||||||
|
&& properties.getPort() == instance.getPort())
|
||||||
|
.findFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (this.running.compareAndSet(true, false)) {
|
||||||
|
if (this.watchFuture != null) {
|
||||||
|
// shutdown current user-thread,
|
||||||
|
// then the other daemon-threads will terminate automatic.
|
||||||
|
this.taskScheduler.shutdown();
|
||||||
|
this.watchFuture.cancel(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
EventListener eventListener = listenerMap.get(buildKey());
|
||||||
|
try {
|
||||||
|
NamingService namingService = nacosServiceManager
|
||||||
|
.getNamingService(properties.getNacosProperties());
|
||||||
|
namingService.unsubscribe(properties.getService(), properties.getGroup(),
|
||||||
|
Arrays.asList(properties.getClusterName()), eventListener);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("namingService unsubscribe failed, properties:{}", properties,
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return this.running.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPhase() {
|
||||||
|
return Integer.MAX_VALUE - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void nacosServicesWatch() {
|
||||||
|
|
||||||
|
// nacos doesn't support watch now , publish an event every 30 seconds.
|
||||||
|
this.publisher.publishEvent(
|
||||||
|
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,140 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.nacos.client.naming.core;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
|
import com.alibaba.nacos.common.lifecycle.Closeable;
|
||||||
|
import com.alibaba.nacos.common.utils.IoUtils;
|
||||||
|
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||||
|
import com.alibaba.nacos.common.utils.StringUtils;
|
||||||
|
import com.alibaba.nacos.common.utils.ThreadUtils;
|
||||||
|
|
||||||
|
import java.net.DatagramPacket;
|
||||||
|
import java.net.DatagramSocket;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
|
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push receiver.
|
||||||
|
*
|
||||||
|
* @author xuanyin
|
||||||
|
*/
|
||||||
|
public class PushReceiver implements Runnable, Closeable {
|
||||||
|
|
||||||
|
private static final Charset UTF_8 = Charset.forName("UTF-8");
|
||||||
|
|
||||||
|
private static final int UDP_MSS = 64 * 1024;
|
||||||
|
|
||||||
|
private ScheduledExecutorService executorService;
|
||||||
|
|
||||||
|
private DatagramSocket udpSocket;
|
||||||
|
|
||||||
|
private HostReactor hostReactor;
|
||||||
|
|
||||||
|
private volatile boolean closed = false;
|
||||||
|
|
||||||
|
public PushReceiver(HostReactor hostReactor) {
|
||||||
|
try {
|
||||||
|
this.hostReactor = hostReactor;
|
||||||
|
this.udpSocket = new DatagramSocket();
|
||||||
|
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
Thread thread = new Thread(r);
|
||||||
|
thread.setDaemon(true);
|
||||||
|
thread.setName("com.alibaba.nacos.naming.push.receiver");
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.executorService.execute(this);
|
||||||
|
} catch (Exception e) {
|
||||||
|
NAMING_LOGGER.error("[NA] init udp socket failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (!closed) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
// byte[] is initialized with 0 full filled by default
|
||||||
|
byte[] buffer = new byte[UDP_MSS];
|
||||||
|
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
||||||
|
|
||||||
|
udpSocket.receive(packet);
|
||||||
|
|
||||||
|
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
|
||||||
|
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
|
||||||
|
|
||||||
|
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
|
||||||
|
String ack;
|
||||||
|
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
|
||||||
|
hostReactor.processServiceJson(pushPacket.data);
|
||||||
|
|
||||||
|
// send ack to server
|
||||||
|
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
|
||||||
|
+ "\"\"}";
|
||||||
|
} else if ("dump".equals(pushPacket.type)) {
|
||||||
|
// dump data to server
|
||||||
|
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
|
||||||
|
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
|
||||||
|
+ "\"}";
|
||||||
|
} else {
|
||||||
|
// do nothing send ack only
|
||||||
|
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
|
||||||
|
+ "\", \"data\":" + "\"\"}";
|
||||||
|
}
|
||||||
|
|
||||||
|
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
|
||||||
|
packet.getSocketAddress()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
NAMING_LOGGER.error("[NA] error while receiving push data", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() throws NacosException {
|
||||||
|
String className = this.getClass().getName();
|
||||||
|
NAMING_LOGGER.info("{} do shutdown begin", className);
|
||||||
|
ThreadUtils.shutdownThreadPool(executorService, NAMING_LOGGER);
|
||||||
|
closed = true;
|
||||||
|
udpSocket.close();
|
||||||
|
NAMING_LOGGER.info("{} do shutdown stop", className);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class PushPacket {
|
||||||
|
|
||||||
|
public String type;
|
||||||
|
|
||||||
|
public long lastRefTime;
|
||||||
|
|
||||||
|
public String data;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getUdpPort() {
|
||||||
|
return this.udpSocket.getLocalPort();
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user