乐趣区

关于源码分析:04篇-Nacos-Client服务订阅机制之核心流程

学习不必那么功利,二师兄带你从更高维度轻松浏览源码~

说起 Nacos 的服务订阅机制,对此不理解的敌人,可能感觉十分神秘,这篇文章就大家深入浅出的理解一下 Nacos 2.0 客户端的订阅实现。因为波及到的内容比拟多,就分几篇来讲,本篇为第一篇。

Nacos 订阅概述

Nacos 的订阅机制,如果用一句话来形容就是:Nacos 客户端通过一个定时工作,每 6 秒从注册核心获取实例列表,当发现实例发生变化时,公布变更事件,订阅者进行业务解决。该更新实例的更新实例,该更新本地缓存的更新本地缓存。

上图画出了订阅办法的主线流程,波及的内容较多,解决细节简单。这里只用把握住外围局部即可。上面就通过代码和流程图来逐渐剖析上述过程。

从订阅到定时工作开启

咱们这里聊的订阅机制,其实实质上就是服务发现的准实时感知。下面曾经看到了当执行订阅办法时,会触发定时工作,定时去拉服务器端的数据。所以,实质上,订阅机制就是实现服务发现的一种形式,对照的形式就是间接查问接口了。

NacosNamingService 中裸露的许多重载的 subscribe,重载的目标就是让大家少写一些参数,这些参数呢,Nacos 给默认解决了。最终这些重载办法都会调用到上面这个办法:

// NacosNamingService
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {if (null == listener) {return;}
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

办法中的事件监听咱们临时不聊,间接看 subscribe 办法,这里 clientProxy 类型为 NamingClientProxyDelegate。实例化 NacosNamingService 时该类被实例化,后面章节中曾经讲到,不再赘述。

而 clientProxy.subscribe 办法在 NamingClientProxyDelegate 中实现:

// NamingClientProxyDelegate
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 获取缓存中的 ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        // 如果为 null,则进行订阅逻辑解决,基于 gRPC 协定
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 定时调度 UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // ServiceInfo 本地缓存解决
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

这段办法是不是眼生啊?对的,在后面剖析《Nacos Client 服务发现》时咱们曾经讲过了。看来必由之路,查问服务列表和订阅最终都调用了同一个办法。

上篇讲了其余流程,咱们这里重点看任务调度:

// ServiceInfoUpdateService
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {return;}
    synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}
        // 构建 UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

该办法蕴含了构建 serviceKey、通过 serviceKey 判重,最初增加 UpdateTask。

而其中的 addTask 的实现就是发动了一个定时工作:

// ServiceInfoUpdateService
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

定时工作延时 1 秒执行。

跟踪到这里就告一阶段了。外围性能只有两个:调用订阅办法和发动定时工作。

定时工作都干了啥

UpdateTask 封装了订阅机制的外围业务逻辑,先来通过一张流程图看一下都做了啥。

有了上述流程图,根本就很清晰的理解 UpdateTask 所做的事件了。间接贴出 run 办法的所有代码:

public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        // 判断该注册的 Service 是否被订阅,如果没有订阅则不再执行
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
            NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
            return;
        }

        // 获取缓存的 service 信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            // 依据 serviceName 从注册核心服务端获取 Service 信息
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        // 过期服务(服务的最新更新工夫小于等于缓存刷新工夫),从注册核心从新查问
        if (serviceObj.getLastRefTime() <= lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 解决 Service 音讯
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        // 刷新更新工夫
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();
            return;
        }
        // 下次更新缓存工夫设置,默认为 6 秒
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失败数量为 0
        resetFailCount();} catch (Throwable e) {incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName:" + groupedServiceName, e);
    } finally {
        // 下次调度刷新工夫,下次执行的工夫与 failCount 无关
        // failCount=0,则下次调度工夫为 6 秒,最长为 1 分钟
        // 即当无异常情况下缓存实例的刷新工夫是 6 秒
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

首先在判断服务是否是被订阅过,实现办法是 ChangeNotifier#isSubscribed:

public boolean isSubscribed(String groupName, String serviceName, String clusters) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    return CollectionUtils.isNotEmpty(eventListeners);
}

查看该办法的源码会发现,这里的 listenerMap 正是最开始的 subscribe 办法中 registerListener 注册的 EventListener。

run 办法前面的业务解决基本上都雷同了,先判断缓存是否有 ServiceInfo 信息,如果没有则查问注册核心、解决 ServiceInfo、更新上次解决工夫。

而上面判断 ServiceInfo 是否生效,正是通过“上次更新工夫”与以后 ServiceInfo 中的“上次更新工夫”做比拟来判断。如果生效,也会查问注册核心、解决 ServiceInfo、更新上次解决工夫等一系列操作。

业务逻辑最初会计算下一次定时工作的执行工夫,通过 delayTime 来提早执行。delayTime 默认为 1000L * 6,也就是 6 秒。而在 finally 外面真的发动下一次定时工作。当出现异常时,下次执行的工夫与失败次数无关,但最长不超过 1 分钟。

小结

这一篇咱们讲了 Nacos 客户端服务订阅机制的源码,次要有以下步骤:

第一步:订阅办法的调用,并进行 EventListener 的注册,前面 UpdateTask 要用来进行判断;

第二步:通过委托代理类来解决订阅逻辑,此处与获取实例列表办法应用了同一个办法;

第三步:通过定时工作执行 UpdateTask 办法,默认执行距离为 6 秒,当产生异样时会缩短,但不超过 1 分钟;

第四步:UpdateTask 办法中会比拟本地是否存在缓存,缓存是否过期。当不存在或过期时,查问注册核心,获取最新实例,更新最初获取工夫,解决 ServiceInfo。

第五步:从新计算定时工作工夫,循环执行上述流程。

下一篇,咱们会在此基础上持续解说 ServiceInfoHolder#processServiceInfo 办法中是如何对获取到的实例信息进行解决的。

博主简介:《SpringBoot 技术底细》技术图书作者,热爱钻研技术,写技术干货文章。

公众号:「程序新视界」,博主的公众号,欢送关注~

技术交换:请分割博主微信号:zhuan2quan

退出移动版