实例列表获取次要是 HostReactor#getServiceInfo 办法。Nacos – 启动中 namingService.subscribe 注册监听的时候,也会调用这个办法。
getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
// 如果产生故障转移,就从文件缓存里取
NAMING_LOGGER.debug("failover-mode:" + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);
}
// 从 serviceInfoMap 里取
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// serviceInfoMap 没有
if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 内存没有,从服务器取
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
// 如果正在更新,则 wait,防止多线程同时调用服务器
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 开启定时工作更新服务列表
scheduleUpdateIfAbsent(serviceName, clusters);
// 从内存里取
return serviceInfoMap.get(serviceObj.getKey());
}
updateServiceNow
从服务器获取,NamingProxy 会调用 NamingProxy#reqApi,他会随机取一个 server,调用 NamingProxy#callServer。NamingProxy 的代码就略了。
private void updateServiceNow(String serviceName, String clusters) {
try {updateService(serviceName, clusters);
} catch (NacosException e) {NAMING_LOGGER.error("[NA] failed to update serviceName:" + serviceName, e);
}
}
public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 从服务器取
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
// 解析返回的字符串
processServiceJson(result);
}
} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();
}
}
}
}
processServiceJson
次要是判断是否有更新,有更新发送给 serviceChanged,并写入文件
public ServiceInfo processServiceJson(String json) {ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {return oldService;}
boolean changed = false;
if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t:" + oldService.getLastRefTime() + ", new-t:"
+ serviceInfo.getLastRefTime());
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
// 上面是批改、新增、移除的筛选。List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils
.equals(host.toString(), oldHostMap.get(key).toString())) {modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {continue;}
if (!newHostMap.containsKey(key)) {remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service:" + serviceInfo.getKey() + "->"
+ JacksonUtils.toJson(newHosts));
}
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service:" + serviceInfo.getKey() + "->"
+ JacksonUtils.toJson(remvHosts));
}
if (modHosts.size() > 0) {
changed = true;
updateBeatInfo(modHosts);
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service:" + serviceInfo.getKey() + "->"
+ JacksonUtils.toJson(modHosts));
}
serviceInfo.setJsonFromServer(json);
// 有更新发送给 serviceChanged,并写入文件
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
} else {
changed = true;
NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service:" + serviceInfo.getKey() + "->"
+ JacksonUtils.toJson(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 有更新发送给 serviceChanged,并写入文件
eventDispatcher.serviceChanged(serviceInfo);
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service:" + serviceInfo.getKey() + "->"
+ JacksonUtils.toJson(serviceInfo.getHosts()));
}
return serviceInfo;
}
scheduleUpdateIfAbsent
把工作退出线程池
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
// 工作曾经有了就不加了
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}
synchronized (futureMap) {
// // 工作曾经有了就不加了
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}
// 把工作退出线程池
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
// 退出工作
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
UpdateTask.run
定时工作更新,默认每 10 秒更新,如果失败了,就每次乘以 2,比方第一次 1 秒,第二次 2 秒,第三次 4 秒,最多是 2 的 6 次方,最大期待 60 秒。
public void run() {
long delayTime = DEFAULT_DELAY;
try {ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
// serviceInfoMap 没有就间接更新
if (serviceObj == null) {updateService(serviceName, clusters);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
// 失败了就减少失败次数
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
// 胜利就重置为 1
resetFailCount();} catch (Throwable e) {incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName:" + serviceName, e);
} finally {
// 从新到线程池
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
总结
联合 Nacos – HostReactor 的创立。