乐趣区

聊聊nacos-NamingProxy的getServiceList

本文主要研究一下 nacos NamingProxy 的 getServiceList

NamingProxy.initRefreshSrvIfNeed

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java

public class NamingProxy {

    private static final int DEFAULT_SERVER_PORT = 8848;

    private int serverPort = DEFAULT_SERVER_PORT;

    private String namespaceId;

    private String endpoint;

    private String nacosDomain;

    private List<String> serverList;

    private List<String> serversFromEndpoint = new ArrayList<String>();

    private long lastSrvRefTime = 0L;

    private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);

    private Properties properties;

    public NamingProxy(String namespaceId, String endpoint, String serverList) {

        this.namespaceId = namespaceId;
        this.endpoint = endpoint;
        if (StringUtils.isNotEmpty(serverList)) {this.serverList = Arrays.asList(serverList.split(","));
            if (this.serverList.size() == 1) {this.nacosDomain = serverList;}
        }

        initRefreshSrvIfNeed();}

    private void initRefreshSrvIfNeed() {if (StringUtils.isEmpty(endpoint)) {return;}

        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.naming.serverlist.updater");
                t.setDaemon(true);
                return t;
            }
        });

        executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {refreshSrvIfNeed();
            }
        }, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);

        refreshSrvIfNeed();}

    //......

    private void refreshSrvIfNeed() {
        try {if (!CollectionUtils.isEmpty(serverList)) {NAMING_LOGGER.debug("server list provided by user:" + serverList);
                return;
            }

            if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {return;}

            List<String> list = getServerListFromEndpoint();

            if (CollectionUtils.isEmpty(list)) {throw new Exception("Can not acquire Nacos list");
            }

            if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {NAMING_LOGGER.info("[SERVER-LIST] server list is updated:" + list);
            }

            serversFromEndpoint = list;
            lastSrvRefTime = System.currentTimeMillis();} catch (Throwable e) {NAMING_LOGGER.warn("failed to update server list", e);
        }
    }

    public List<String> getServerListFromEndpoint() {

        try {
            String urlString = "http://" + endpoint + "/nacos/serverlist";
            List<String> headers = builderHeaders();

            HttpClient.HttpResult result = HttpClient.httpGet(urlString, headers, null, UtilAndComs.ENCODING);
            if (HttpURLConnection.HTTP_OK != result.code) {
                throw new IOException("Error while requesting:" + urlString + "'. Server returned:"
                    + result.code);
            }

            String content = result.content;
            List<String> list = new ArrayList<String>();
            for (String line : IoUtils.readLines(new StringReader(content))) {if (!line.trim().isEmpty()) {list.add(line.trim());
                }
            }

            return list;

        } catch (Exception e) {e.printStackTrace();
        }

        return null;
    }

    //......
}
  • NamingProxy 的构造器执行了 initRefreshSrvIfNeed 方法,该方法在 endpoint 不为空的时候,会注册一个定时任务,每隔 vipSrvRefInterMillis 时间执行一次 refreshSrvIfNeed 方法,同时立马调用了 refreshSrvIfNeed 方法
  • refreshSrvIfNeed 方法在 serverList 为空,且距离 lastSrvRefTime 大于等于 vipSrvRefInterMillis 时会通过 getServerListFromEndpoint() 方法获取 serverList 更新 serversFromEndpoint 及 lastSrvRefTime
  • getServerListFromEndpoint 方法会向 endpoint 请求 /serverlist 接口,获取 server 端返回的 serverList

NamingProxy.getServiceList

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java

public class NamingProxy {

    private static final int DEFAULT_SERVER_PORT = 8848;

    private int serverPort = DEFAULT_SERVER_PORT;

    private String namespaceId;

    private String endpoint;

    private String nacosDomain;

    private List<String> serverList;

    private List<String> serversFromEndpoint = new ArrayList<String>();

    private long lastSrvRefTime = 0L;

    private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);

    private Properties properties;

    //......

    public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException {return getServiceList(pageNo, pageSize, groupName, null);
    }

    public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {Map<String, String> params = new HashMap<String, String>(4);
        params.put("pageNo", String.valueOf(pageNo));
        params.put("pageSize", String.valueOf(pageSize));
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.GROUP_NAME, groupName);

        if (selector != null) {switch (SelectorType.valueOf(selector.getType())) {
                case none:
                    break;
                case label:
                    ExpressionSelector expressionSelector = (ExpressionSelector) selector;
                    params.put("selector", JSON.toJSONString(expressionSelector));
                    break;
                default:
                    break;
            }
        }

        String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/service/list", params);

        JSONObject json = JSON.parseObject(result);
        ListView<String> listView = new ListView<String>();
        listView.setCount(json.getInteger("count"));
        listView.setData(JSON.parseObject(json.getString("doms"), new TypeReference<List<String>>() {}));

        return listView;
    }

    public String reqAPI(String api, Map<String, String> params) throws NacosException {

        List<String> snapshot = serversFromEndpoint;
        if (!CollectionUtils.isEmpty(serverList)) {snapshot = serverList;}

        return reqAPI(api, params, snapshot);
    }

    public String reqAPI(String api, Map<String, String> params, List<String> servers) {return reqAPI(api, params, servers, HttpMethod.GET);
    }

    public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {throw new IllegalArgumentException("no server available");
        }

        Exception exception = new Exception();

        if (servers != null && !servers.isEmpty()) {Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());

            for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);
                try {return callServer(api, params, server, method);
                } catch (NacosException e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                }

                index = (index + 1) % servers.size();}

            throw new IllegalStateException("failed to req API:" + api + "after all servers(" + servers + ") tried:"
                + exception.getMessage());
        }

        for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
            try {return callServer(api, params, nacosDomain);
            } catch (Exception e) {
                exception = e;
                NAMING_LOGGER.error("[NA] req api:" + api + "failed, server(" + nacosDomain, e);
            }
        }

        throw new IllegalStateException("failed to req API:/api/" + api + "after all servers(" + servers + ") tried:"
            + exception.getMessage());

    }
                
    //......
}
  • getServiceList 方法有个 AbstractSelector 参数,它会往请求的参数里头添加 selector 参数,目前 label 类型会添加 ExpressionSelector,之后调用 reqAPI 方法请求 /service/list 接口
  • reqAPI 方法首先将 serversFromEndpoint 赋值给 snapshot,但是 serverList 不为空的情况下会重置 snapshot 为 serverList,然后进行 reqAPI 请求
  • reqAPI 方法会根据 servers.size() 随机一个 index,然后以 servers.size() 为最大循环次数开始 for 循环,循环里头根据 index 获取 server 然后通过 callServer 请求,请求成功则跳出循环返回,请求失败则递增 index 并对 servers.size() 取余继续下次循环,如果都请求失败则最后抛出 IllegalStateException

小结

  • NamingProxy 的构造器执行了 initRefreshSrvIfNeed 方法,该方法在 endpoint 不为空的时候,会注册一个定时任务,每隔 vipSrvRefInterMillis 时间执行一次 refreshSrvIfNeed 方法
  • refreshSrvIfNeed 方法在 serverList 为空,且距离 lastSrvRefTime 大于等于 vipSrvRefInterMillis 时会通过 getServerListFromEndpoint() 方法获取 serverList 更新 serversFromEndpoint 及 lastSrvRefTime
  • getServiceList 方法优先以 serverList 作为 server 端地址列表,如果它为空再以 serversFromEndpoint 为准,然后通过 reqAPI 方法请求的时候,随机选择一个 server 进行请求,最多请求 server.size() 次,请求成功则跳出循环返回,请求失败则递增 index 并对 servers.size() 取余继续下次循环,如果都请求失败则最后抛出 IllegalStateException

doc

  • NamingProxy
退出移动版