关于nacos:Nacos-实例列表获取

51次阅读

共计 6581 个字符,预计需要花费 17 分钟才能阅读完成。

实例列表获取次要是 HostReactor#getServiceInfo 办法。Nacos – 启动中 namingService.subscribe 注册监听的时候,也会调用这个办法。

getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    // 如果产生故障转移,就从文件缓存里取   
    NAMING_LOGGER.debug("failover-mode:" + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);
    }
    // 从 serviceInfoMap 里取
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    // serviceInfoMap 没有
    if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);
        
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        
        updatingMap.put(serviceName, new Object());
        // 内存没有,从服务器取
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);
        
    } else if (updatingMap.containsKey(serviceName)) {
        // 如果正在更新,则 wait,防止多线程同时调用服务器
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    // 开启定时工作更新服务列表
    scheduleUpdateIfAbsent(serviceName, clusters);
    // 从内存里取
    return serviceInfoMap.get(serviceObj.getKey());
}

updateServiceNow

从服务器获取,NamingProxy 会调用 NamingProxy#reqApi,他会随机取一个 server,调用 NamingProxy#callServer。NamingProxy 的代码就略了。

private void updateServiceNow(String serviceName, String clusters) {
    try {updateService(serviceName, clusters);
    } catch (NacosException e) {NAMING_LOGGER.error("[NA] failed to update serviceName:" + serviceName, e);
    }
}

public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        // 从服务器取
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        
        if (StringUtils.isNotEmpty(result)) {
            // 解析返回的字符串
            processServiceJson(result);
        }
    } finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();
            }
        }
    }
}

processServiceJson

次要是判断是否有更新,有更新发送给 serviceChanged,并写入文件

public ServiceInfo processServiceJson(String json) {ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {return oldService;}
    
    boolean changed = false;
    
    if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t:" + oldService.getLastRefTime() + ", new-t:"
                    + serviceInfo.getLastRefTime());
        }
        
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        
        Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
        for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);
        }
        
        Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
        for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);
        }
        
        Set<Instance> modHosts = new HashSet<Instance>();
        Set<Instance> newHosts = new HashSet<Instance>();
        Set<Instance> remvHosts = new HashSet<Instance>();
        // 上面是批改、新增、移除的筛选。List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());
        for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();
            String key = entry.getKey();
            if (oldHostMap.containsKey(key) && !StringUtils
                    .equals(host.toString(), oldHostMap.get(key).toString())) {modHosts.add(host);
                continue;
            }
            
            if (!oldHostMap.containsKey(key)) {newHosts.add(host);
            }
        }
        
        for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();
            String key = entry.getKey();
            if (newHostMap.containsKey(key)) {continue;}
            
            if (!newHostMap.containsKey(key)) {remvHosts.add(host);
            }
            
        }
        
        if (newHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service:" + serviceInfo.getKey() + "->"
                    + JacksonUtils.toJson(newHosts));
        }
        
        if (remvHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service:" + serviceInfo.getKey() + "->"
                    + JacksonUtils.toJson(remvHosts));
        }
        
        if (modHosts.size() > 0) {
            changed = true;
            updateBeatInfo(modHosts);
            NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service:" + serviceInfo.getKey() + "->"
                    + JacksonUtils.toJson(modHosts));
        }
        
        serviceInfo.setJsonFromServer(json);
        // 有更新发送给 serviceChanged,并写入文件
        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);
            DiskCache.write(serviceInfo, cacheDir);
        }
        
    } else {
        changed = true;
        NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service:" + serviceInfo.getKey() + "->"
                + JacksonUtils.toJson(serviceInfo.getHosts()));
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        // 有更新发送给 serviceChanged,并写入文件
        eventDispatcher.serviceChanged(serviceInfo);
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }
    
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    
    if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service:" + serviceInfo.getKey() + "->"
                + JacksonUtils.toJson(serviceInfo.getHosts()));
    }
    
    return serviceInfo;
}

scheduleUpdateIfAbsent

把工作退出线程池

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    // 工作曾经有了就不加了
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}
    
    synchronized (futureMap) {
        // // 工作曾经有了就不加了
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}
        // 把工作退出线程池
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        // 退出工作
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

UpdateTask.run

定时工作更新,默认每 10 秒更新,如果失败了,就每次乘以 2,比方第一次 1 秒,第二次 2 秒,第三次 4 秒,最多是 2 的 6 次方,最大期待 60 秒。

public void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        // serviceInfoMap 没有就间接更新
        if (serviceObj == null) {updateService(serviceName, clusters);
            return;
        }
        
        if (serviceObj.getLastRefTime() <= lastRefTime) {updateService(serviceName, clusters);
            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        } else {
            // if serviceName already updated by push, we should not override it
            // since the push data may be different from pull through force push
            refreshOnly(serviceName, clusters);
        }
        
        lastRefTime = serviceObj.getLastRefTime();
        
        if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
            // abort the update task
            NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
            return;
        }
        // 失败了就减少失败次数
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();
            return;
        }
        delayTime = serviceObj.getCacheMillis();
        // 胜利就重置为 1
        resetFailCount();} catch (Throwable e) {incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName:" + serviceName, e);
    } finally {
        // 从新到线程池
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

总结

联合 Nacos – HostReactor 的创立。

正文完
 0