学习不必那么功利,二师兄带你从更高维度轻松浏览源码~

说起Nacos的服务订阅机制,对此不理解的敌人,可能感觉十分神秘,这篇文章就大家深入浅出的理解一下Nacos 2.0客户端的订阅实现。因为波及到的内容比拟多,就分几篇来讲,本篇为第一篇。

Nacos订阅概述

Nacos的订阅机制,如果用一句话来形容就是:Nacos客户端通过一个定时工作,每6秒从注册核心获取实例列表,当发现实例发生变化时,公布变更事件,订阅者进行业务解决。该更新实例的更新实例,该更新本地缓存的更新本地缓存。

上图画出了订阅办法的主线流程,波及的内容较多,解决细节简单。这里只用把握住外围局部即可。上面就通过代码和流程图来逐渐剖析上述过程。

从订阅到定时工作开启

咱们这里聊的订阅机制,其实实质上就是服务发现的准实时感知。下面曾经看到了当执行订阅办法时,会触发定时工作,定时去拉服务器端的数据。所以,实质上,订阅机制就是实现服务发现的一种形式,对照的形式就是间接查问接口了。

NacosNamingService中裸露的许多重载的subscribe,重载的目标就是让大家少写一些参数,这些参数呢,Nacos给默认解决了。最终这些重载办法都会调用到上面这个办法:

// NacosNamingServicepublic void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)        throws NacosException {    if (null == listener) {        return;    }    String clusterString = StringUtils.join(clusters, ",");    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);    clientProxy.subscribe(serviceName, groupName, clusterString);}

办法中的事件监听咱们临时不聊,间接看subscribe办法,这里clientProxy类型为NamingClientProxyDelegate。实例化NacosNamingService时该类被实例化,后面章节中曾经讲到,不再赘述。

而clientProxy.subscribe办法在NamingClientProxyDelegate中实现:

// NamingClientProxyDelegate@Overridepublic ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);    // 获取缓存中的ServiceInfo    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);    if (null == result) {        // 如果为null,则进行订阅逻辑解决,基于gRPC协定        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);    }    // 定时调度UpdateTask    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);    // ServiceInfo本地缓存解决    serviceInfoHolder.processServiceInfo(result);    return result;}

这段办法是不是眼生啊?对的,在后面剖析《Nacos Client服务发现》时咱们曾经讲过了。看来必由之路,查问服务列表和订阅最终都调用了同一个办法。

上篇讲了其余流程,咱们这里重点看任务调度:

// ServiceInfoUpdateServicepublic void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);    if (futureMap.get(serviceKey) != null) {        return;    }    synchronized (futureMap) {        if (futureMap.get(serviceKey) != null) {            return;        }        // 构建UpdateTask        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));        futureMap.put(serviceKey, future);    }}

该办法蕴含了构建serviceKey、通过serviceKey判重,最初增加UpdateTask。

而其中的addTask的实现就是发动了一个定时工作:

// ServiceInfoUpdateServiceprivate synchronized ScheduledFuture<?> addTask(UpdateTask task) {    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);}

定时工作延时1秒执行。

跟踪到这里就告一阶段了。外围性能只有两个:调用订阅办法和发动定时工作。

定时工作都干了啥

UpdateTask封装了订阅机制的外围业务逻辑,先来通过一张流程图看一下都做了啥。

有了上述流程图,根本就很清晰的理解UpdateTask所做的事件了。间接贴出run办法的所有代码:

public void run() {    long delayTime = DEFAULT_DELAY;    try {        // 判断该注册的Service是否被订阅,如果没有订阅则不再执行        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {            NAMING_LOGGER                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);            return;        }        // 获取缓存的service信息        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);        if (serviceObj == null) {            // 依据serviceName从注册核心服务端获取Service信息            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);            serviceInfoHolder.processServiceInfo(serviceObj);            lastRefTime = serviceObj.getLastRefTime();            return;        }        // 过期服务(服务的最新更新工夫小于等于缓存刷新工夫),从注册核心从新查问        if (serviceObj.getLastRefTime() <= lastRefTime) {            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);            // 解决Service音讯            serviceInfoHolder.processServiceInfo(serviceObj);        }        // 刷新更新工夫        lastRefTime = serviceObj.getLastRefTime();        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {            incFailCount();            return;        }        // 下次更新缓存工夫设置,默认为6秒        // TODO multiple time can be configured.        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;        // 重置失败数量为0        resetFailCount();    } catch (Throwable e) {        incFailCount();        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);    } finally {        // 下次调度刷新工夫,下次执行的工夫与failCount无关        // failCount=0,则下次调度工夫为6秒,最长为1分钟        // 即当无异常情况下缓存实例的刷新工夫是6秒        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);    }}

首先在判断服务是否是被订阅过,实现办法是ChangeNotifier#isSubscribed:

public boolean isSubscribed(String groupName, String serviceName, String clusters) {    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);    return CollectionUtils.isNotEmpty(eventListeners);}

查看该办法的源码会发现,这里的listenerMap正是最开始的subscribe办法中registerListener注册的EventListener。

run办法前面的业务解决基本上都雷同了,先判断缓存是否有ServiceInfo信息,如果没有则查问注册核心、解决ServiceInfo、更新上次解决工夫。

而上面判断ServiceInfo是否生效,正是通过“上次更新工夫”与以后ServiceInfo中的“上次更新工夫”做比拟来判断。如果生效,也会查问注册核心、解决ServiceInfo、更新上次解决工夫等一系列操作。

业务逻辑最初会计算下一次定时工作的执行工夫,通过delayTime来提早执行。delayTime默认为 1000L * 6,也就是6秒。而在finally外面真的发动下一次定时工作。当出现异常时,下次执行的工夫与失败次数无关,但最长不超过1分钟。

小结

这一篇咱们讲了Nacos客户端服务订阅机制的源码,次要有以下步骤:

第一步:订阅办法的调用,并进行EventListener的注册,前面UpdateTask要用来进行判断;

第二步:通过委托代理类来解决订阅逻辑,此处与获取实例列表办法应用了同一个办法;

第三步:通过定时工作执行UpdateTask办法,默认执行距离为6秒,当产生异样时会缩短,但不超过1分钟;

第四步:UpdateTask办法中会比拟本地是否存在缓存,缓存是否过期。当不存在或过期时,查问注册核心,获取最新实例,更新最初获取工夫,解决ServiceInfo。

第五步:从新计算定时工作工夫,循环执行上述流程。

下一篇,咱们会在此基础上持续解说ServiceInfoHolder#processServiceInfo办法中是如何对获取到的实例信息进行解决的。

博主简介:《SpringBoot技术底细》技术图书作者,热爱钻研技术,写技术干货文章。

公众号:「程序新视界」,博主的公众号,欢送关注~

技术交换:请分割博主微信号:zhuan2quan