共计 7807 个字符,预计需要花费 20 分钟才能阅读完成。
序
本文主要研究一下 NacosNamingService 的 selectOneHealthyInstance
NacosNamingService
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
public class NacosNamingService implements NamingService {
private static final String DEFAULT_PORT = "8080";
private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
/**
* Each Naming instance should have different namespace.
*/
private String namespace;
private String endpoint;
private String serverList;
private String cacheDir;
private String logName;
private HostReactor hostReactor;
private BeatReactor beatReactor;
private EventDispatcher eventDispatcher;
private NamingProxy serverProxy;
//......
@Override
public Instance selectOneHealthyInstance(String serviceName) throws NacosException {return selectOneHealthyInstance(serviceName, new ArrayList<String>());
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException {return selectOneHealthyInstance(serviceName, groupName, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException {return selectOneHealthyInstance(serviceName, new ArrayList<String>(), subscribe);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException {return selectOneHealthyInstance(serviceName, groupName, new ArrayList<String>(), subscribe);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException {return selectOneHealthyInstance(serviceName, clusters, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException {return selectOneHealthyInstance(serviceName, groupName, clusters, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe)
throws NacosException {return selectOneHealthyInstance(serviceName, Constants.DEFAULT_GROUP, clusters, subscribe);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {if (subscribe) {
return Balancer.RandomByWeight.selectHost(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
} else {
return Balancer.RandomByWeight.selectHost(hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
}
}
//......
}
- selectOneHealthyInstance 跟 selectInstances 类似,只不过它返回的是单个 instance;selectOneHealthyInstance 也是先从 hostReactor 获取 serviceInfo
- 如果 subscribe 为 true,则执行 hostReactor.getServiceInfo 获取 serviceInfo,否则执行 hostReactor.getServiceInfoDirectlyFromServer 获取 serviceInfo
- 获取到 serviceInfo 之后,selectOneHealthyInstance 通过 Balancer.RandomByWeight.selectHost 方法来选取单个 healthy 的 instance
Balancer
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/Balancer.java
public class Balancer {
/**
* report status to server
*/
public final static List<String> UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER = new CopyOnWriteArrayList<String>();
public static class RandomByWeight {public static List<Instance> selectAll(ServiceInfo serviceInfo) {List<Instance> hosts = serviceInfo.getHosts();
if (CollectionUtils.isEmpty(hosts)) {throw new IllegalStateException("no host to srv for serviceInfo:" + serviceInfo.getName());
}
return hosts;
}
public static Instance selectHost(ServiceInfo dom) {List<Instance> hosts = selectAll(dom);
if (CollectionUtils.isEmpty(hosts)) {throw new IllegalStateException("no host to srv for service:" + dom.getName());
}
return getHostByRandomWeight(hosts);
}
}
/**
* Return one host from the host list by random-weight.
*
* @param hosts The list of the host.
* @return The random-weight result of the host
*/
protected static Instance getHostByRandomWeight(List<Instance> hosts) {NAMING_LOGGER.debug("entry randomWithWeight");
if (hosts == null || hosts.size() == 0) {NAMING_LOGGER.debug("hosts == null || hosts.size() == 0");
return null;
}
Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");
NAMING_LOGGER.debug("new Chooser");
List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();
for (Instance host : hosts) {if (host.isHealthy()) {hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));
}
}
NAMING_LOGGER.debug("for (Host host : hosts)");
vipChooser.refresh(hostsWithWeight);
NAMING_LOGGER.debug("vipChooser.refresh");
return vipChooser.randomWithWeight();}
}
- Balancer 的 RandomByWeight 提供了 selectAll 及 selectHost 方法;selectAll 针对 serviceInfo.getHosts() 进行了空判断,空的话会抛出 IllegalStateException
- selectHost 方法内部调用了 selectAll 方法,其最后通过 getHostByRandomWeight 来选取单个 healthy 的 instance
- getHostByRandomWeight 方法首先创建一个 Chooser,然后选取 healthy 的 instance 构造 hostsWithWeight,再通过 vipChooser.refresh(hostsWithWeight) 进行 refresh,最后通过 vipChooser.randomWithWeight() 选取单个 healthy 的 instance
Chooser
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java
public class Chooser<K, T> {
private K uniqueKey;
private volatile Ref<T> ref;
public T random() {
List<T> items = ref.items;
if (items.size() == 0) {return null;}
if (items.size() == 1) {return items.get(0);
}
return items.get(ThreadLocalRandom.current().nextInt(items.size()));
}
public T randomWithWeight() {
Ref<T> ref = this.ref;
double random = ThreadLocalRandom.current().nextDouble(0, 1);
int index = Arrays.binarySearch(ref.weights, random);
if (index < 0) {index = -index - 1;} else {return ref.items.get(index);
}
if (index >= 0 && index < ref.weights.length) {if (random < ref.weights[index]) {return ref.items.get(index);
}
}
/* This should never happen, but it ensures we will return a correct
* object in case there is some floating point inequality problem
* wrt the cumulative probabilities. */
return ref.items.get(ref.items.size() - 1);
}
public Chooser(K uniqueKey) {this(uniqueKey, new ArrayList<Pair<T>>());
}
public Chooser(K uniqueKey, List<Pair<T>> pairs) {Ref<T> ref = new Ref<T>(pairs);
ref.refresh();
this.uniqueKey = uniqueKey;
this.ref = ref;
}
public K getUniqueKey() {return uniqueKey;}
public Ref<T> getRef() {return ref;}
public void refresh(List<Pair<T>> itemsWithWeight) {Ref<T> newRef = new Ref<T>(itemsWithWeight);
newRef.refresh();
newRef.poller = this.ref.poller.refresh(newRef.items);
this.ref = newRef;
}
//......
}
- Chooser 的 refresh 方法会根据 itemsWithWeight 创建 Ref,然后执行 Ref 的 refresh 方法;randomWithWeight 方法通过 Arrays.binarySearch(ref.weights, random) 创建初始 index,然后根据 index 从 ref.items 获取元素
Ref
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java
public class Ref<T> {private List<Pair<T>> itemsWithWeight = new ArrayList<Pair<T>>();
private List<T> items = new ArrayList<T>();
private Poller<T> poller = new GenericPoller<T>(items);
private double[] weights;
@SuppressWarnings("unchecked")
public Ref(List<Pair<T>> itemsWithWeight) {this.itemsWithWeight = itemsWithWeight;}
public void refresh() {Double originWeightSum = (double) 0;
for (Pair<T> item : itemsWithWeight) {double weight = item.weight();
//ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
if (weight <= 0) {continue;}
items.add(item.item());
if (Double.isInfinite(weight)) {weight = 10000.0D;}
if (Double.isNaN(weight)) {weight = 1.0D;}
originWeightSum += weight;
}
double[] exactWeights = new double[items.size()];
int index = 0;
for (Pair<T> item : itemsWithWeight) {double singleWeight = item.weight();
//ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
if (singleWeight <= 0) {continue;}
exactWeights[index++] = singleWeight / originWeightSum;
}
weights = new double[items.size()];
double randomRange = 0D;
for (int i = 0; i < index; i++) {weights[i] = randomRange + exactWeights[i];
randomRange += exactWeights[i];
}
double doublePrecisionDelta = 0.0001;
if (index == 0 || (Math.abs(weights[index - 1] - 1) < doublePrecisionDelta)) {return;}
throw new IllegalStateException("Cumulative Weight caculate wrong , the sum of probabilities does not equals 1.");
}
//......
}
- Ref 的 refresh 方法主要是初始化 items 及 weights
小结
- selectOneHealthyInstance 跟 selectInstances 类似,只不过它返回的是单个 instance;selectOneHealthyInstance 也是先从 hostReactor 获取 serviceInfo
- 如果 subscribe 为 true,则执行 hostReactor.getServiceInfo 获取 serviceInfo,否则执行 hostReactor.getServiceInfoDirectlyFromServer 获取 serviceInfo
- 获取到 serviceInfo 之后,selectOneHealthyInstance 通过 Balancer.RandomByWeight.selectHost 方法来选取单个 healthy 的 instance
doc
- NacosNamingService
正文完