Nacos源码分析03

发布时间:2024-12-17 16:31

《压力解码器》- 通过科学方法解析压力源 #生活技巧# #健康生活方式# #健康生活方式书籍# #压力管理书籍#

文章目录 Nacos源码分析03-服务端健康检查1. Nacos健康检查1.1. 1.x版本1.2. 2.x版本1.3. 《Nacos架构&原理》摘抄 2. ConnectionManager

Nacos源码分析03-服务端健康检查

本系列博客,采用官方源码版本为2.0.3

1. Nacos健康检查

1.1. 1.x版本

  在Nacos 1.x版本中,临时实例需要客户端(服务提供者)定时向Nacos发送心跳包来维持自己的健康状态。持久化实例并不基于客户端发送心跳包,而是服务端定时探测客户端进行健康检查(TCP端口探测、HTTP返回码探测)。

1.2. 2.x版本

  在Nacos 2.0版本之后持久化实例的监控检查并没有改变逻辑;但临时实例不再使用心跳包,而是通过判断gRPC长连接是否存活来判断临时实例是否健康。

1.3. 《Nacos架构&原理》摘抄

感觉此书实际攥写时间在2.0发行版本之前,部分Nacos实现细节与实际不符。

  Zookeeper 和 Eureka 都实现了⼀种 TTL 的机制, 就是如果客户端在⼀定时间内没有向注册中心发
送心跳, 则会将这个客户端摘除。 Eureka 做的更好的⼀点在于它允许在注册服务的时候, 自定义检
查自身状态的健康检查方法。 这在服务实例能够保持心跳上报的场景下, 是⼀种比较好的体验, 在
Dubbo 和 SpringCloud 这两大体系内, 也被培养成用户心智上的默认行为。 Nacos 也支持这种
TTL 机制, 不过这与 ConfigServer 在阿里巴巴内部的机制又有⼀些区别。 Nacos 目前支持临时实
例使用心跳上报方式维持活性, 发送心跳的周期默认是 5 秒, Nacos 服务端会在 15 秒没收到心
跳后将实例设置为不健康, 在 30 秒没收到心跳时将这个临时实例摘除。

2. ConnectionManager

  ConnectionManager类负责管理所有客户端的长连接。

  健康检查策略:每3s检测所有超过20s没发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection。

如果客户端持续与服务端通讯,服务端是不需要主动探活的。每3秒执行是源自时设置定时任务时,初始化延迟1秒和间隔3秒的策略。超过20s没发生过通讯,这个20秒配置在ConnectionManager.KEEP_ALIVE_TIME。

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>(); @PostConstruct public void start() { // 启动不健康连接排除功能. RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { int totalCount = connections.size(); Loggers.REMOTE_DIGEST.info("Connection check task start"); MetricsMonitor.getLongConnectionMonitor().set(totalCount); //统计过时(20s)连接 Set<Map.Entry<String, Connection>> entries = connections.entrySet(); int currentSdkClientCount = currentSdkClientCount(); boolean isLoaderClient = loadClient >= 0; int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit; int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0); Loggers.REMOTE_DIGEST .info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}", totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount), currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount); List<String> expelClient = new LinkedList<>(); Map<String, AtomicInteger> expelForIp = new HashMap<>(16); //1. calculate expel count of ip. for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); String appName = client.getMetaInfo().getAppName(); String clientIp = client.getMetaInfo().getClientIp(); if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) { //get limit for current ip. int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp); if (countLimitOfIp < 0) { int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName); countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp; } if (countLimitOfIp < 0) { countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault(); } if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) { AtomicInteger currentCountIp = connectionForClientIp.get(clientIp); if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) { expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp)); } } } } Loggers.REMOTE_DIGEST .info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size()); if (expelForIp.size() > 0) { Loggers.REMOTE_DIGEST.info("Over limit ip expel info, {}", expelForIp); } Set<String> outDatedConnections = new HashSet<>(); long now = System.currentTimeMillis(); //2.get expel connection for ip limit. for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); String clientIp = client.getMetaInfo().getClientIp(); AtomicInteger integer = expelForIp.get(clientIp); if (integer != null && integer.intValue() > 0) { integer.decrementAndGet(); expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { outDatedConnections.add(client.getMetaInfo().getConnectionId()); } } //3. if total count is still over limit. if (expelCount > 0) { for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo() .isSdkSource() && expelCount > 0) { expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; outDatedConnections.remove(client.getMetaInfo().getConnectionId()); } } } String serverIp = null; String serverPort = null; if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) { String[] split = redirectAddress.split(Constants.COLON); serverIp = split[0]; serverPort = split[1]; } for (String expelledClientId : expelClient) { try { Connection connection = getConnection(expelledClientId); if (connection != null) { ConnectResetRequest connectResetRequest = new ConnectResetRequest(); connectResetRequest.setServerIp(serverIp); connectResetRequest.setServerPort(serverPort); connection.asyncRequest(connectResetRequest, null); Loggers.REMOTE_DIGEST .info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}", expelledClientId, connectResetRequest.getServerIp(), connectResetRequest.getServerPort()); } } catch (ConnectionAlreadyClosedException e) { unregister(expelledClientId); } catch (Exception e) { Loggers.REMOTE_DIGEST.error("Error occurs when expel connection, expelledClientId:{}", expelledClientId, e); } } //4.client active detection. Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size()); //异步请求所有需要检测的连接 if (CollectionUtils.isNotEmpty(outDatedConnections)) { Set<String> successConnections = new HashSet<>(); final CountDownLatch latch = new CountDownLatch(outDatedConnections.size()); for (String outDateConnectionId : outDatedConnections) { try { Connection connection = getConnection(outDateConnectionId); if (connection != null) { ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest(); connection.asyncRequest(clientDetectionRequest, new RequestCallBack() { @Override public Executor getExecutor() { return null; } @Override public long getTimeout() { return 1000L; } @Override public void onResponse(Response response) { latch.countDown(); if (response != null && response.isSuccess()) { connection.freshActiveTime(); successConnections.add(outDateConnectionId); } } @Override public void onException(Throwable e) { latch.countDown(); } }); Loggers.REMOTE_DIGEST .info("[{}]send connection active request ", outDateConnectionId); } else { latch.countDown(); } } catch (ConnectionAlreadyClosedException e) { latch.countDown(); } catch (Exception e) { Loggers.REMOTE_DIGEST .error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e); latch.countDown(); } } latch.await(3000L, TimeUnit.MILLISECONDS); Loggers.REMOTE_DIGEST .info("Out dated connection check successCount={}", successConnections.size());// 对于没有成功响应的客户端,执行unregister移出 for (String outDateConnectionId : outDatedConnections) { if (!successConnections.contains(outDateConnectionId)) { Loggers.REMOTE_DIGEST .info("[{}]Unregister Out dated connection....", outDateConnectionId); unregister(outDateConnectionId); } } } //reset loader client if (isLoaderClient) { loadClient = -1; redirectAddress = null; } Loggers.REMOTE_DIGEST.info("Connection check task end"); } catch (Throwable e) { Loggers.REMOTE.error("Error occurs during connection check... ", e); } } }, 1000L, 3000L, TimeUnit.MILLISECONDS); } //注销连接方法 public synchronized void unregister(String connectionId) { Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count <= 0) { connectionForClientIp.remove(clientIp); } } remove.close(); Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } }

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222

  移除connection后,继承ClientConnectionEventListener的ConnectionBasedClientManager会移除Client,发布ClientDisconnectEvent事件

@Override public boolean clientDisconnected(String clientId) { Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId); ConnectionBasedClient client = clients.remove(clientId); if (null == client) { return true; } client.release(); NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); return true; } 1234567891011

  ClientDisconnectEvent会触发几个事件:

Distro协议:同步移除的client数据

清除两个索引缓存:ClientServiceIndexesManager中Service与发布Client的关系;ServiceStorage中Service与Instance的关系

服务订阅:ClientDisconnectEvent会间接触发ServiceChangedEvent事件,将服务变更通知客户端。

网址:Nacos源码分析03 https://www.yuejiaxmz.com/news/view/500374

相关内容

运维(10) 解决Nacos服务注册使用Docker容器内网ip问题(指定注册ip或
A =Java基础与源码
【项目】在线教育平台项目总结
QQ三国源码
约翰·威克/分布式新闻头条项目
ssmFunnycooking菜谱分享app(开题+源码)
超详细!动态规划详解分析(典型例题分析和对比,附源码)
基于Java的个人理财管理系统(源码+论文+需求分析+数据库文件+演示视频).zip资源
2024年数码相机行业发展现状分析 数码相机行业市场规模及未来趋势分析
Java家庭理财系统(开题+源码)

随便看看