实例列表获取次要是HostReactor#getServiceInfo办法。Nacos - 启动中namingService.subscribe注册监听的时候,也会调用这个办法。

getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {    // 如果产生故障转移,就从文件缓存里取       NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());    String key = ServiceInfo.getKey(serviceName, clusters);    if (failoverReactor.isFailoverSwitch()) {        return failoverReactor.getService(key);    }    // 从serviceInfoMap里取    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);    // serviceInfoMap没有    if (null == serviceObj) {        serviceObj = new ServiceInfo(serviceName, clusters);                serviceInfoMap.put(serviceObj.getKey(), serviceObj);                updatingMap.put(serviceName, new Object());        // 内存没有,从服务器取        updateServiceNow(serviceName, clusters);        updatingMap.remove(serviceName);            } else if (updatingMap.containsKey(serviceName)) {        // 如果正在更新,则wait,防止多线程同时调用服务器        if (UPDATE_HOLD_INTERVAL > 0) {            // hold a moment waiting for update finish            synchronized (serviceObj) {                try {                    serviceObj.wait(UPDATE_HOLD_INTERVAL);                } catch (InterruptedException e) {                    NAMING_LOGGER                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);                }            }        }    }    // 开启定时工作更新服务列表    scheduleUpdateIfAbsent(serviceName, clusters);    // 从内存里取    return serviceInfoMap.get(serviceObj.getKey());}

updateServiceNow

从服务器获取,NamingProxy会调用NamingProxy#reqApi,他会随机取一个server,调用NamingProxy#callServer。NamingProxy的代码就略了。

private void updateServiceNow(String serviceName, String clusters) {    try {        updateService(serviceName, clusters);    } catch (NacosException e) {        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);    }}public void updateService(String serviceName, String clusters) throws NacosException {    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);    try {        // 从服务器取        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);                if (StringUtils.isNotEmpty(result)) {            // 解析返回的字符串            processServiceJson(result);        }    } finally {        if (oldService != null) {            synchronized (oldService) {                oldService.notifyAll();            }        }    }}

processServiceJson

次要是判断是否有更新,有更新发送给serviceChanged,并写入文件

public ServiceInfo processServiceJson(String json) {    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());    if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {                return oldService;    }        boolean changed = false;        if (oldService != null) {                if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {            NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "                    + serviceInfo.getLastRefTime());        }                serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);                Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());        for (Instance host : oldService.getHosts()) {            oldHostMap.put(host.toInetAddr(), host);        }                Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());        for (Instance host : serviceInfo.getHosts()) {            newHostMap.put(host.toInetAddr(), host);        }                Set<Instance> modHosts = new HashSet<Instance>();        Set<Instance> newHosts = new HashSet<Instance>();        Set<Instance> remvHosts = new HashSet<Instance>();        // 上面是批改、新增、移除的筛选。        List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(                newHostMap.entrySet());        for (Map.Entry<String, Instance> entry : newServiceHosts) {            Instance host = entry.getValue();            String key = entry.getKey();            if (oldHostMap.containsKey(key) && !StringUtils                    .equals(host.toString(), oldHostMap.get(key).toString())) {                modHosts.add(host);                continue;            }                        if (!oldHostMap.containsKey(key)) {                newHosts.add(host);            }        }                for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {            Instance host = entry.getValue();            String key = entry.getKey();            if (newHostMap.containsKey(key)) {                continue;            }                        if (!newHostMap.containsKey(key)) {                remvHosts.add(host);            }                    }                if (newHosts.size() > 0) {            changed = true;            NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "                    + JacksonUtils.toJson(newHosts));        }                if (remvHosts.size() > 0) {            changed = true;            NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "                    + JacksonUtils.toJson(remvHosts));        }                if (modHosts.size() > 0) {            changed = true;            updateBeatInfo(modHosts);            NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "                    + JacksonUtils.toJson(modHosts));        }                serviceInfo.setJsonFromServer(json);        // 有更新发送给serviceChanged,并写入文件        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {            eventDispatcher.serviceChanged(serviceInfo);            DiskCache.write(serviceInfo, cacheDir);        }            } else {        changed = true;        NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "                + JacksonUtils.toJson(serviceInfo.getHosts()));        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);        // 有更新发送给serviceChanged,并写入文件        eventDispatcher.serviceChanged(serviceInfo);        serviceInfo.setJsonFromServer(json);        DiskCache.write(serviceInfo, cacheDir);    }        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());        if (changed) {        NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "                + JacksonUtils.toJson(serviceInfo.getHosts()));    }        return serviceInfo;}

scheduleUpdateIfAbsent

把工作退出线程池

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {    // 工作曾经有了就不加了    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {        return;    }        synchronized (futureMap) {        // // 工作曾经有了就不加了        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {            return;        }        // 把工作退出线程池        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));        // 退出工作        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);    }}

UpdateTask.run

定时工作更新,默认每10秒更新,如果失败了,就每次乘以2,比方第一次1秒,第二次2秒,第三次4秒,最多是2的6次方,最大期待60秒。

public void run() {    long delayTime = DEFAULT_DELAY;        try {        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));        // serviceInfoMap没有就间接更新        if (serviceObj == null) {            updateService(serviceName, clusters);            return;        }                if (serviceObj.getLastRefTime() <= lastRefTime) {            updateService(serviceName, clusters);            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));        } else {            // if serviceName already updated by push, we should not override it            // since the push data may be different from pull through force push            refreshOnly(serviceName, clusters);        }                lastRefTime = serviceObj.getLastRefTime();                if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap                .containsKey(ServiceInfo.getKey(serviceName, clusters))) {            // abort the update task            NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);            return;        }        // 失败了就减少失败次数        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {            incFailCount();            return;        }        delayTime = serviceObj.getCacheMillis();        // 胜利就重置为1        resetFailCount();    } catch (Throwable e) {        incFailCount();        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);    } finally {        // 从新到线程池        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);    }}

总结

联合Nacos - HostReactor的创立。