聊聊NacosNamingService的selectOneHealthyInstance

26次阅读

共计 7807 个字符,预计需要花费 20 分钟才能阅读完成。

本文主要研究一下 NacosNamingService 的 selectOneHealthyInstance

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

public class NacosNamingService implements NamingService {
    private static final String DEFAULT_PORT = "8080";
    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);

    /**
     * Each Naming instance should have different namespace.
     */
    private String namespace;

    private String endpoint;

    private String serverList;

    private String cacheDir;

    private String logName;

    private HostReactor hostReactor;

    private BeatReactor beatReactor;

    private EventDispatcher eventDispatcher;

    private NamingProxy serverProxy;

    //......

    @Override
    public Instance selectOneHealthyInstance(String serviceName) throws NacosException {return selectOneHealthyInstance(serviceName, new ArrayList<String>());
    }

    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException {return selectOneHealthyInstance(serviceName, groupName, true);
    }

    @Override
    public Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException {return selectOneHealthyInstance(serviceName, new ArrayList<String>(), subscribe);
    }

    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException {return selectOneHealthyInstance(serviceName, groupName, new ArrayList<String>(), subscribe);
    }

    @Override
    public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException {return selectOneHealthyInstance(serviceName, clusters, true);
    }

    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException {return selectOneHealthyInstance(serviceName, groupName, clusters, true);
    }

    @Override
    public Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe)
        throws NacosException {return selectOneHealthyInstance(serviceName, Constants.DEFAULT_GROUP, clusters, subscribe);
    }

    @Override
    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {if (subscribe) {
            return Balancer.RandomByWeight.selectHost(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
        } else {
            return Balancer.RandomByWeight.selectHost(hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
        }
    }

    //......
}
  • selectOneHealthyInstance 跟 selectInstances 类似,只不过它返回的是单个 instance;selectOneHealthyInstance 也是先从 hostReactor 获取 serviceInfo
  • 如果 subscribe 为 true,则执行 hostReactor.getServiceInfo 获取 serviceInfo,否则执行 hostReactor.getServiceInfoDirectlyFromServer 获取 serviceInfo
  • 获取到 serviceInfo 之后,selectOneHealthyInstance 通过 Balancer.RandomByWeight.selectHost 方法来选取单个 healthy 的 instance

Balancer

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/Balancer.java

public class Balancer {

    /**
     * report status to server
     */
    public final static List<String> UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER = new CopyOnWriteArrayList<String>();

    public static class RandomByWeight {public static List<Instance> selectAll(ServiceInfo serviceInfo) {List<Instance> hosts = serviceInfo.getHosts();

            if (CollectionUtils.isEmpty(hosts)) {throw new IllegalStateException("no host to srv for serviceInfo:" + serviceInfo.getName());
            }

            return hosts;
        }

        public static Instance selectHost(ServiceInfo dom) {List<Instance> hosts = selectAll(dom);

            if (CollectionUtils.isEmpty(hosts)) {throw new IllegalStateException("no host to srv for service:" + dom.getName());
            }

            return getHostByRandomWeight(hosts);
        }
    }

    /**
     * Return one host from the host list by random-weight.
     *
     * @param hosts The list of the host.
     * @return The random-weight result of the host
     */
    protected static Instance getHostByRandomWeight(List<Instance> hosts) {NAMING_LOGGER.debug("entry randomWithWeight");
        if (hosts == null || hosts.size() == 0) {NAMING_LOGGER.debug("hosts == null || hosts.size() == 0");
            return null;
        }

        Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");

        NAMING_LOGGER.debug("new Chooser");

        List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();
        for (Instance host : hosts) {if (host.isHealthy()) {hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));
            }
        }
        NAMING_LOGGER.debug("for (Host host : hosts)");
        vipChooser.refresh(hostsWithWeight);
        NAMING_LOGGER.debug("vipChooser.refresh");
        return vipChooser.randomWithWeight();}
}
  • Balancer 的 RandomByWeight 提供了 selectAll 及 selectHost 方法;selectAll 针对 serviceInfo.getHosts() 进行了空判断,空的话会抛出 IllegalStateException
  • selectHost 方法内部调用了 selectAll 方法,其最后通过 getHostByRandomWeight 来选取单个 healthy 的 instance
  • getHostByRandomWeight 方法首先创建一个 Chooser,然后选取 healthy 的 instance 构造 hostsWithWeight,再通过 vipChooser.refresh(hostsWithWeight) 进行 refresh,最后通过 vipChooser.randomWithWeight() 选取单个 healthy 的 instance

Chooser

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java

public class Chooser<K, T> {

    private K uniqueKey;

    private volatile Ref<T> ref;

    public T random() {
        List<T> items = ref.items;
        if (items.size() == 0) {return null;}
        if (items.size() == 1) {return items.get(0);
        }
        return items.get(ThreadLocalRandom.current().nextInt(items.size()));
    }

    public T randomWithWeight() {
        Ref<T> ref = this.ref;
        double random = ThreadLocalRandom.current().nextDouble(0, 1);
        int index = Arrays.binarySearch(ref.weights, random);
        if (index < 0) {index = -index - 1;} else {return ref.items.get(index);
        }

        if (index >= 0 && index < ref.weights.length) {if (random < ref.weights[index]) {return ref.items.get(index);
            }
        }

        /* This should never happen, but it ensures we will return a correct
         * object in case there is some floating point inequality problem
         * wrt the cumulative probabilities. */
        return ref.items.get(ref.items.size() - 1);
    }

    public Chooser(K uniqueKey) {this(uniqueKey, new ArrayList<Pair<T>>());
    }

    public Chooser(K uniqueKey, List<Pair<T>> pairs) {Ref<T> ref = new Ref<T>(pairs);
        ref.refresh();
        this.uniqueKey = uniqueKey;
        this.ref = ref;
    }

    public K getUniqueKey() {return uniqueKey;}

    public Ref<T> getRef() {return ref;}

    public void refresh(List<Pair<T>> itemsWithWeight) {Ref<T> newRef = new Ref<T>(itemsWithWeight);
        newRef.refresh();
        newRef.poller = this.ref.poller.refresh(newRef.items);
        this.ref = newRef;
    }

    //......
}
  • Chooser 的 refresh 方法会根据 itemsWithWeight 创建 Ref,然后执行 Ref 的 refresh 方法;randomWithWeight 方法通过 Arrays.binarySearch(ref.weights, random) 创建初始 index,然后根据 index 从 ref.items 获取元素

Ref

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java

    public class Ref<T> {private List<Pair<T>> itemsWithWeight = new ArrayList<Pair<T>>();
        private List<T> items = new ArrayList<T>();
        private Poller<T> poller = new GenericPoller<T>(items);
        private double[] weights;

        @SuppressWarnings("unchecked")
        public Ref(List<Pair<T>> itemsWithWeight) {this.itemsWithWeight = itemsWithWeight;}

        public void refresh() {Double originWeightSum = (double) 0;

            for (Pair<T> item : itemsWithWeight) {double weight = item.weight();
                //ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
                if (weight <= 0) {continue;}

                items.add(item.item());
                if (Double.isInfinite(weight)) {weight = 10000.0D;}
                if (Double.isNaN(weight)) {weight = 1.0D;}
                originWeightSum += weight;
            }

            double[] exactWeights = new double[items.size()];
            int index = 0;
            for (Pair<T> item : itemsWithWeight) {double singleWeight = item.weight();
                //ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
                if (singleWeight <= 0) {continue;}
                exactWeights[index++] = singleWeight / originWeightSum;
            }

            weights = new double[items.size()];
            double randomRange = 0D;
            for (int i = 0; i < index; i++) {weights[i] = randomRange + exactWeights[i];
                randomRange += exactWeights[i];
            }

            double doublePrecisionDelta = 0.0001;

            if (index == 0 || (Math.abs(weights[index - 1] - 1) < doublePrecisionDelta)) {return;}
            throw new IllegalStateException("Cumulative Weight caculate wrong , the sum of probabilities does not equals 1.");
        }

        //......
    }
  • Ref 的 refresh 方法主要是初始化 items 及 weights

小结

  • selectOneHealthyInstance 跟 selectInstances 类似,只不过它返回的是单个 instance;selectOneHealthyInstance 也是先从 hostReactor 获取 serviceInfo
  • 如果 subscribe 为 true,则执行 hostReactor.getServiceInfo 获取 serviceInfo,否则执行 hostReactor.getServiceInfoDirectlyFromServer 获取 serviceInfo
  • 获取到 serviceInfo 之后,selectOneHealthyInstance 通过 Balancer.RandomByWeight.selectHost 方法来选取单个 healthy 的 instance

doc

  • NacosNamingService

正文完
 0