学习不必那么功利,二师兄带你从更高维度轻松浏览源码~
说起 Nacos 的服务订阅机制,对此不理解的敌人,可能感觉十分神秘,这篇文章就大家深入浅出的理解一下 Nacos 2.0 客户端的订阅实现。因为波及到的内容比拟多,就分几篇来讲,本篇为第一篇。
Nacos 订阅概述
Nacos 的订阅机制,如果用一句话来形容就是:Nacos 客户端通过一个定时工作,每 6 秒从注册核心获取实例列表,当发现实例发生变化时,公布变更事件,订阅者进行业务解决。该更新实例的更新实例,该更新本地缓存的更新本地缓存。
上图画出了订阅办法的主线流程,波及的内容较多,解决细节简单。这里只用把握住外围局部即可。上面就通过代码和流程图来逐渐剖析上述过程。
从订阅到定时工作开启
咱们这里聊的订阅机制,其实实质上就是服务发现的准实时感知。下面曾经看到了当执行订阅办法时,会触发定时工作,定时去拉服务器端的数据。所以,实质上,订阅机制就是实现服务发现的一种形式,对照的形式就是间接查问接口了。
NacosNamingService 中裸露的许多重载的 subscribe,重载的目标就是让大家少写一些参数,这些参数呢,Nacos 给默认解决了。最终这些重载办法都会调用到上面这个办法:
// NacosNamingService
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {if (null == listener) {return;}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
办法中的事件监听咱们临时不聊,间接看 subscribe 办法,这里 clientProxy 类型为 NamingClientProxyDelegate。实例化 NacosNamingService 时该类被实例化,后面章节中曾经讲到,不再赘述。
而 clientProxy.subscribe 办法在 NamingClientProxyDelegate 中实现:
// NamingClientProxyDelegate
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
// 获取缓存中的 ServiceInfo
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (null == result) {
// 如果为 null,则进行订阅逻辑解决,基于 gRPC 协定
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
// 定时调度 UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
// ServiceInfo 本地缓存解决
serviceInfoHolder.processServiceInfo(result);
return result;
}
这段办法是不是眼生啊?对的,在后面剖析《Nacos Client 服务发现》时咱们曾经讲过了。看来必由之路,查问服务列表和订阅最终都调用了同一个办法。
上篇讲了其余流程,咱们这里重点看任务调度:
// ServiceInfoUpdateService
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
if (futureMap.get(serviceKey) != null) {return;}
synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}
// 构建 UpdateTask
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
futureMap.put(serviceKey, future);
}
}
该办法蕴含了构建 serviceKey、通过 serviceKey 判重,最初增加 UpdateTask。
而其中的 addTask 的实现就是发动了一个定时工作:
// ServiceInfoUpdateService
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
定时工作延时 1 秒执行。
跟踪到这里就告一阶段了。外围性能只有两个:调用订阅办法和发动定时工作。
定时工作都干了啥
UpdateTask 封装了订阅机制的外围业务逻辑,先来通过一张流程图看一下都做了啥。
有了上述流程图,根本就很清晰的理解 UpdateTask 所做的事件了。间接贴出 run 办法的所有代码:
public void run() {
long delayTime = DEFAULT_DELAY;
try {
// 判断该注册的 Service 是否被订阅,如果没有订阅则不再执行
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
NAMING_LOGGER
.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
return;
}
// 获取缓存的 service 信息
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj == null) {
// 依据 serviceName 从注册核心服务端获取 Service 信息
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
lastRefTime = serviceObj.getLastRefTime();
return;
}
// 过期服务(服务的最新更新工夫小于等于缓存刷新工夫),从注册核心从新查问
if (serviceObj.getLastRefTime() <= lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
// 解决 Service 音讯
serviceInfoHolder.processServiceInfo(serviceObj);
}
// 刷新更新工夫
lastRefTime = serviceObj.getLastRefTime();
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();
return;
}
// 下次更新缓存工夫设置,默认为 6 秒
// TODO multiple time can be configured.
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
// 重置失败数量为 0
resetFailCount();} catch (Throwable e) {incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName:" + groupedServiceName, e);
} finally {
// 下次调度刷新工夫,下次执行的工夫与 failCount 无关
// failCount=0,则下次调度工夫为 6 秒,最长为 1 分钟
// 即当无异常情况下缓存实例的刷新工夫是 6 秒
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
首先在判断服务是否是被订阅过,实现办法是 ChangeNotifier#isSubscribed:
public boolean isSubscribed(String groupName, String serviceName, String clusters) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
return CollectionUtils.isNotEmpty(eventListeners);
}
查看该办法的源码会发现,这里的 listenerMap 正是最开始的 subscribe 办法中 registerListener 注册的 EventListener。
run 办法前面的业务解决基本上都雷同了,先判断缓存是否有 ServiceInfo 信息,如果没有则查问注册核心、解决 ServiceInfo、更新上次解决工夫。
而上面判断 ServiceInfo 是否生效,正是通过“上次更新工夫”与以后 ServiceInfo 中的“上次更新工夫”做比拟来判断。如果生效,也会查问注册核心、解决 ServiceInfo、更新上次解决工夫等一系列操作。
业务逻辑最初会计算下一次定时工作的执行工夫,通过 delayTime 来提早执行。delayTime 默认为 1000L * 6,也就是 6 秒。而在 finally 外面真的发动下一次定时工作。当出现异常时,下次执行的工夫与失败次数无关,但最长不超过 1 分钟。
小结
这一篇咱们讲了 Nacos 客户端服务订阅机制的源码,次要有以下步骤:
第一步:订阅办法的调用,并进行 EventListener 的注册,前面 UpdateTask 要用来进行判断;
第二步:通过委托代理类来解决订阅逻辑,此处与获取实例列表办法应用了同一个办法;
第三步:通过定时工作执行 UpdateTask 办法,默认执行距离为 6 秒,当产生异样时会缩短,但不超过 1 分钟;
第四步:UpdateTask 办法中会比拟本地是否存在缓存,缓存是否过期。当不存在或过期时,查问注册核心,获取最新实例,更新最初获取工夫,解决 ServiceInfo。
第五步:从新计算定时工作工夫,循环执行上述流程。
下一篇,咱们会在此基础上持续解说 ServiceInfoHolder#processServiceInfo 办法中是如何对获取到的实例信息进行解决的。
博主简介:《SpringBoot 技术底细》技术图书作者,热爱钻研技术,写技术干货文章。
公众号:「程序新视界」,博主的公众号,欢送关注~
技术交换:请分割博主微信号:zhuan2quan