update(REQ-2324) - 修复项目使用 Undertow 配合 Nacos 的注册中心时,停止应用会导致一些异常的输出
This commit is contained in:
parent
42affea2e1
commit
fefbbac8c9
@ -0,0 +1,26 @@
|
||||
package cn.axzo.workflow.server.common.hook;
|
||||
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
|
||||
/**
|
||||
* TODO
|
||||
*
|
||||
* @author wangli
|
||||
* @since 2024/6/6 14:35
|
||||
*/
|
||||
public class NacosShutdownHook implements SmartLifecycle {
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,191 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.discovery;
|
||||
|
||||
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
|
||||
import com.alibaba.cloud.nacos.NacosServiceManager;
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.listener.Event;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* @author xiaojing
|
||||
* @author yuhuangbin
|
||||
*/
|
||||
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
|
||||
|
||||
private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
|
||||
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
|
||||
|
||||
private ApplicationEventPublisher publisher;
|
||||
|
||||
private ScheduledFuture<?> watchFuture;
|
||||
|
||||
private NacosServiceManager nacosServiceManager;
|
||||
|
||||
private final NacosDiscoveryProperties properties;
|
||||
|
||||
private final ThreadPoolTaskScheduler taskScheduler;
|
||||
|
||||
public NacosWatch(NacosServiceManager nacosServiceManager,
|
||||
NacosDiscoveryProperties properties,
|
||||
ObjectProvider<ThreadPoolTaskScheduler> taskScheduler) {
|
||||
this.nacosServiceManager = nacosServiceManager;
|
||||
this.properties = properties;
|
||||
this.taskScheduler = taskScheduler.stream().findAny()
|
||||
.orElseGet(NacosWatch::getTaskScheduler);
|
||||
}
|
||||
|
||||
private static ThreadPoolTaskScheduler getTaskScheduler() {
|
||||
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
|
||||
taskScheduler.initialize();
|
||||
return taskScheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
|
||||
this.publisher = publisher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
this.stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (this.running.compareAndSet(false, true)) {
|
||||
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
|
||||
event -> new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
if (event instanceof NamingEvent) {
|
||||
List<Instance> instances = ((NamingEvent) event)
|
||||
.getInstances();
|
||||
Optional<Instance> instanceOptional = selectCurrentInstance(
|
||||
instances);
|
||||
instanceOptional.ifPresent(currentInstance -> {
|
||||
resetIfNeeded(currentInstance);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
NamingService namingService = nacosServiceManager
|
||||
.getNamingService(properties.getNacosProperties());
|
||||
try {
|
||||
namingService.subscribe(properties.getService(), properties.getGroup(),
|
||||
Arrays.asList(properties.getClusterName()), eventListener);
|
||||
} catch (Exception e) {
|
||||
log.error("namingService subscribe failed, properties:{}", properties, e);
|
||||
}
|
||||
|
||||
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
|
||||
this::nacosServicesWatch, this.properties.getWatchDelay());
|
||||
}
|
||||
}
|
||||
|
||||
private String buildKey() {
|
||||
return String.join(":", properties.getService(), properties.getGroup());
|
||||
}
|
||||
|
||||
private void resetIfNeeded(Instance instance) {
|
||||
if (!properties.getMetadata().equals(instance.getMetadata())) {
|
||||
properties.setMetadata(instance.getMetadata());
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<Instance> selectCurrentInstance(List<Instance> instances) {
|
||||
return instances.stream()
|
||||
.filter(instance -> properties.getIp().equals(instance.getIp())
|
||||
&& properties.getPort() == instance.getPort())
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (this.running.compareAndSet(true, false)) {
|
||||
if (this.watchFuture != null) {
|
||||
// shutdown current user-thread,
|
||||
// then the other daemon-threads will terminate automatic.
|
||||
this.taskScheduler.shutdown();
|
||||
this.watchFuture.cancel(true);
|
||||
}
|
||||
|
||||
EventListener eventListener = listenerMap.get(buildKey());
|
||||
try {
|
||||
NamingService namingService = nacosServiceManager
|
||||
.getNamingService(properties.getNacosProperties());
|
||||
namingService.unsubscribe(properties.getService(), properties.getGroup(),
|
||||
Arrays.asList(properties.getClusterName()), eventListener);
|
||||
} catch (Exception e) {
|
||||
log.error("namingService unsubscribe failed, properties:{}", properties,
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.running.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return Integer.MAX_VALUE - 1;
|
||||
}
|
||||
|
||||
public void nacosServicesWatch() {
|
||||
|
||||
// nacos doesn't support watch now , publish an event every 30 seconds.
|
||||
this.publisher.publishEvent(
|
||||
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.client.naming.core;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.common.lifecycle.Closeable;
|
||||
import com.alibaba.nacos.common.utils.IoUtils;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.common.utils.ThreadUtils;
|
||||
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
|
||||
|
||||
/**
|
||||
* Push receiver.
|
||||
*
|
||||
* @author xuanyin
|
||||
*/
|
||||
public class PushReceiver implements Runnable, Closeable {
|
||||
|
||||
private static final Charset UTF_8 = Charset.forName("UTF-8");
|
||||
|
||||
private static final int UDP_MSS = 64 * 1024;
|
||||
|
||||
private ScheduledExecutorService executorService;
|
||||
|
||||
private DatagramSocket udpSocket;
|
||||
|
||||
private HostReactor hostReactor;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
public PushReceiver(HostReactor hostReactor) {
|
||||
try {
|
||||
this.hostReactor = hostReactor;
|
||||
this.udpSocket = new DatagramSocket();
|
||||
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setDaemon(true);
|
||||
thread.setName("com.alibaba.nacos.naming.push.receiver");
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
|
||||
this.executorService.execute(this);
|
||||
} catch (Exception e) {
|
||||
NAMING_LOGGER.error("[NA] init udp socket failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!closed) {
|
||||
try {
|
||||
|
||||
// byte[] is initialized with 0 full filled by default
|
||||
byte[] buffer = new byte[UDP_MSS];
|
||||
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
||||
|
||||
udpSocket.receive(packet);
|
||||
|
||||
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
|
||||
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
|
||||
|
||||
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
|
||||
String ack;
|
||||
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
|
||||
hostReactor.processServiceJson(pushPacket.data);
|
||||
|
||||
// send ack to server
|
||||
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
|
||||
+ "\"\"}";
|
||||
} else if ("dump".equals(pushPacket.type)) {
|
||||
// dump data to server
|
||||
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
|
||||
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
|
||||
+ "\"}";
|
||||
} else {
|
||||
// do nothing send ack only
|
||||
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
|
||||
+ "\", \"data\":" + "\"\"}";
|
||||
}
|
||||
|
||||
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
|
||||
packet.getSocketAddress()));
|
||||
} catch (Exception e) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
NAMING_LOGGER.error("[NA] error while receiving push data", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws NacosException {
|
||||
String className = this.getClass().getName();
|
||||
NAMING_LOGGER.info("{} do shutdown begin", className);
|
||||
ThreadUtils.shutdownThreadPool(executorService, NAMING_LOGGER);
|
||||
closed = true;
|
||||
udpSocket.close();
|
||||
NAMING_LOGGER.info("{} do shutdown stop", className);
|
||||
}
|
||||
|
||||
public static class PushPacket {
|
||||
|
||||
public String type;
|
||||
|
||||
public long lastRefTime;
|
||||
|
||||
public String data;
|
||||
}
|
||||
|
||||
public int getUdpPort() {
|
||||
return this.udpSocket.getLocalPort();
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user