学习不必那么功利,二师兄带你从更高维度轻松浏览源码~
说起Nacos的服务订阅机制,对此不理解的敌人,可能感觉十分神秘,这篇文章就大家深入浅出的理解一下Nacos 2.0客户端的订阅实现。因为波及到的内容比拟多,就分几篇来讲,本篇为第一篇。
Nacos订阅概述
Nacos的订阅机制,如果用一句话来形容就是:Nacos客户端通过一个定时工作,每6秒从注册核心获取实例列表,当发现实例发生变化时,公布变更事件,订阅者进行业务解决。该更新实例的更新实例,该更新本地缓存的更新本地缓存。
上图画出了订阅办法的主线流程,波及的内容较多,解决细节简单。这里只用把握住外围局部即可。上面就通过代码和流程图来逐渐剖析上述过程。
从订阅到定时工作开启
咱们这里聊的订阅机制,其实实质上就是服务发现的准实时感知。下面曾经看到了当执行订阅办法时,会触发定时工作,定时去拉服务器端的数据。所以,实质上,订阅机制就是实现服务发现的一种形式,对照的形式就是间接查问接口了。
NacosNamingService中裸露的许多重载的subscribe,重载的目标就是让大家少写一些参数,这些参数呢,Nacos给默认解决了。最终这些重载办法都会调用到上面这个办法:
// NacosNamingServicepublic void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { if (null == listener) { return; } String clusterString = StringUtils.join(clusters, ","); changeNotifier.registerListener(groupName, serviceName, clusterString, listener); clientProxy.subscribe(serviceName, groupName, clusterString);}
办法中的事件监听咱们临时不聊,间接看subscribe办法,这里clientProxy类型为NamingClientProxyDelegate。实例化NacosNamingService时该类被实例化,后面章节中曾经讲到,不再赘述。
而clientProxy.subscribe办法在NamingClientProxyDelegate中实现:
// NamingClientProxyDelegate@Overridepublic ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); // 获取缓存中的ServiceInfo ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (null == result) { // 如果为null,则进行订阅逻辑解决,基于gRPC协定 result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } // 定时调度UpdateTask serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); // ServiceInfo本地缓存解决 serviceInfoHolder.processServiceInfo(result); return result;}
这段办法是不是眼生啊?对的,在后面剖析《Nacos Client服务发现》时咱们曾经讲过了。看来必由之路,查问服务列表和订阅最终都调用了同一个办法。
上篇讲了其余流程,咱们这里重点看任务调度:
// ServiceInfoUpdateServicepublic void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) { String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); if (futureMap.get(serviceKey) != null) { return; } synchronized (futureMap) { if (futureMap.get(serviceKey) != null) { return; } // 构建UpdateTask ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters)); futureMap.put(serviceKey, future); }}
该办法蕴含了构建serviceKey、通过serviceKey判重,最初增加UpdateTask。
而其中的addTask的实现就是发动了一个定时工作:
// ServiceInfoUpdateServiceprivate synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);}
定时工作延时1秒执行。
跟踪到这里就告一阶段了。外围性能只有两个:调用订阅办法和发动定时工作。
定时工作都干了啥
UpdateTask封装了订阅机制的外围业务逻辑,先来通过一张流程图看一下都做了啥。
有了上述流程图,根本就很清晰的理解UpdateTask所做的事件了。间接贴出run办法的所有代码:
public void run() { long delayTime = DEFAULT_DELAY; try { // 判断该注册的Service是否被订阅,如果没有订阅则不再执行 if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) { NAMING_LOGGER .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters); return; } // 获取缓存的service信息 ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (serviceObj == null) { // 依据serviceName从注册核心服务端获取Service信息 serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); serviceInfoHolder.processServiceInfo(serviceObj); lastRefTime = serviceObj.getLastRefTime(); return; } // 过期服务(服务的最新更新工夫小于等于缓存刷新工夫),从注册核心从新查问 if (serviceObj.getLastRefTime() <= lastRefTime) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); // 解决Service音讯 serviceInfoHolder.processServiceInfo(serviceObj); } // 刷新更新工夫 lastRefTime = serviceObj.getLastRefTime(); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } // 下次更新缓存工夫设置,默认为6秒 // TODO multiple time can be configured. delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; // 重置失败数量为0 resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e); } finally { // 下次调度刷新工夫,下次执行的工夫与failCount无关 // failCount=0,则下次调度工夫为6秒,最长为1分钟 // 即当无异常情况下缓存实例的刷新工夫是6秒 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); }}
首先在判断服务是否是被订阅过,实现办法是ChangeNotifier#isSubscribed:
public boolean isSubscribed(String groupName, String serviceName, String clusters) { String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); return CollectionUtils.isNotEmpty(eventListeners);}
查看该办法的源码会发现,这里的listenerMap正是最开始的subscribe办法中registerListener注册的EventListener。
run办法前面的业务解决基本上都雷同了,先判断缓存是否有ServiceInfo信息,如果没有则查问注册核心、解决ServiceInfo、更新上次解决工夫。
而上面判断ServiceInfo是否生效,正是通过“上次更新工夫”与以后ServiceInfo中的“上次更新工夫”做比拟来判断。如果生效,也会查问注册核心、解决ServiceInfo、更新上次解决工夫等一系列操作。
业务逻辑最初会计算下一次定时工作的执行工夫,通过delayTime来提早执行。delayTime默认为 1000L * 6,也就是6秒。而在finally外面真的发动下一次定时工作。当出现异常时,下次执行的工夫与失败次数无关,但最长不超过1分钟。
小结
这一篇咱们讲了Nacos客户端服务订阅机制的源码,次要有以下步骤:
第一步:订阅办法的调用,并进行EventListener的注册,前面UpdateTask要用来进行判断;
第二步:通过委托代理类来解决订阅逻辑,此处与获取实例列表办法应用了同一个办法;
第三步:通过定时工作执行UpdateTask办法,默认执行距离为6秒,当产生异样时会缩短,但不超过1分钟;
第四步:UpdateTask办法中会比拟本地是否存在缓存,缓存是否过期。当不存在或过期时,查问注册核心,获取最新实例,更新最初获取工夫,解决ServiceInfo。
第五步:从新计算定时工作工夫,循环执行上述流程。
下一篇,咱们会在此基础上持续解说ServiceInfoHolder#processServiceInfo办法中是如何对获取到的实例信息进行解决的。
博主简介:《SpringBoot技术底细》技术图书作者,热爱钻研技术,写技术干货文章。
公众号:「程序新视界」,博主的公众号,欢送关注~
技术交换:请分割博主微信号:zhuan2quan