共计 6662 个字符,预计需要花费 17 分钟才能阅读完成。
序
本文主要研究一下 ribbon 的 ServerListSubsetFilter
ServerListSubsetFilter
ribbon-loadbalancer-2.3.0-sources.jar!/com/netflix/loadbalancer/ServerListSubsetFilter.java
public class ServerListSubsetFilter<T extends Server> extends ZoneAffinityServerListFilter<T> implements IClientConfigAware, Comparator<T>{private Random random = new Random(); | |
private volatile Set<T> currentSubset = Sets.newHashSet(); | |
private DynamicIntProperty sizeProp = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.size", 20); | |
private DynamicFloatProperty eliminationPercent = | |
new DynamicFloatProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f); | |
private DynamicIntProperty eliminationFailureCountThreshold = | |
new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationFailureThresold", 0); | |
private DynamicIntProperty eliminationConnectionCountThreshold = | |
new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationConnectionThresold", 0); | |
@Override | |
public void initWithNiwsConfig(IClientConfig clientConfig) {super.initWithNiwsConfig(clientConfig); | |
sizeProp = new DynamicIntProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() + ".ServerListSubsetFilter.size", 20); | |
eliminationPercent = | |
new DynamicFloatProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f); | |
eliminationFailureCountThreshold = new DynamicIntProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() | |
+ ".ServerListSubsetFilter.eliminationFailureThresold", 0); | |
eliminationConnectionCountThreshold = new DynamicIntProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() | |
+ ".ServerListSubsetFilter.eliminationConnectionThresold", 0); | |
} | |
/** | |
* Given all the servers, keep only a stable subset of servers to use. This method | |
* keeps the current list of subset in use and keep returning the same list, with exceptions | |
* to relatively unhealthy servers, which are defined as the following: | |
* <p> | |
* <ul> | |
* <li>Servers with their concurrent connection count exceeding the client configuration for | |
* {@code <clientName>.<nameSpace>.ServerListSubsetFilter.eliminationConnectionThresold} (default is 0) | |
* <li>Servers with their failure count exceeding the client configuration for | |
* {@code <clientName>.<nameSpace>.ServerListSubsetFilter.eliminationFailureThresold} (default is 0) | |
* <li>If the servers evicted above is less than the forced eviction percentage as defined by client configuration | |
* {@code <clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent} (default is 10%, or 0.1), the | |
* remaining servers will be sorted by their health status and servers will worst health status will be | |
* forced evicted. | |
* </ul> | |
* <p> | |
* After the elimination, new servers will be randomly chosen from all servers pool to keep the | |
* number of the subset unchanged. | |
* | |
*/ | |
@Override | |
public List<T> getFilteredListOfServers(List<T> servers) {List<T> zoneAffinityFiltered = super.getFilteredListOfServers(servers); | |
Set<T> candidates = Sets.newHashSet(zoneAffinityFiltered); | |
Set<T> newSubSet = Sets.newHashSet(currentSubset); | |
LoadBalancerStats lbStats = getLoadBalancerStats(); | |
for (T server: currentSubset) { | |
// this server is either down or out of service | |
if (!candidates.contains(server)) {newSubSet.remove(server); | |
} else {ServerStats stats = lbStats.getSingleServerStat(server); | |
// remove the servers that do not meet health criteria | |
if (stats.getActiveRequestsCount() > eliminationConnectionCountThreshold.get() | |
|| stats.getFailureCount() > eliminationFailureCountThreshold.get()) {newSubSet.remove(server); | |
// also remove from the general pool to avoid selecting them again | |
candidates.remove(server); | |
} | |
} | |
} | |
int targetedListSize = sizeProp.get(); | |
int numEliminated = currentSubset.size() - newSubSet.size(); | |
int minElimination = (int) (targetedListSize * eliminationPercent.get()); | |
int numToForceEliminate = 0; | |
if (targetedListSize < newSubSet.size()) { | |
// size is shrinking | |
numToForceEliminate = newSubSet.size() - targetedListSize;} else if (minElimination > numEliminated) {numToForceEliminate = minElimination - numEliminated;} | |
if (numToForceEliminate > newSubSet.size()) {numToForceEliminate = newSubSet.size(); | |
} | |
if (numToForceEliminate > 0) {List<T> sortedSubSet = Lists.newArrayList(newSubSet); | |
Collections.sort(sortedSubSet, this); | |
List<T> forceEliminated = sortedSubSet.subList(0, numToForceEliminate); | |
newSubSet.removeAll(forceEliminated); | |
candidates.removeAll(forceEliminated); | |
} | |
// after forced elimination or elimination of unhealthy instances, | |
// the size of the set may be less than the targeted size, | |
// then we just randomly add servers from the big pool | |
if (newSubSet.size() < targetedListSize) {int numToChoose = targetedListSize - newSubSet.size(); | |
candidates.removeAll(newSubSet); | |
if (numToChoose > candidates.size()) { | |
// Not enough healthy instances to choose, fallback to use the | |
// total server pool | |
candidates = Sets.newHashSet(zoneAffinityFiltered); | |
candidates.removeAll(newSubSet); | |
} | |
List<T> chosen = randomChoose(Lists.newArrayList(candidates), numToChoose); | |
for (T server: chosen) {newSubSet.add(server); | |
} | |
} | |
currentSubset = newSubSet; | |
return Lists.newArrayList(newSubSet); | |
} | |
/** | |
* Randomly shuffle the beginning portion of server list (according to the number passed into the method) | |
* and return them. | |
* | |
* @param servers | |
* @param toChoose | |
* @return | |
*/ | |
private List<T> randomChoose(List<T> servers, int toChoose) {int size = servers.size(); | |
if (toChoose >= size || toChoose < 0) {return servers;} | |
for (int i = 0; i < toChoose; i++) {int index = random.nextInt(size); | |
T tmp = servers.get(index); | |
servers.set(index, servers.get(i)); | |
servers.set(i, tmp); | |
} | |
return servers.subList(0, toChoose); | |
} | |
/** | |
* Function to sort the list by server health condition, with | |
* unhealthy servers before healthy servers. The servers are first sorted by | |
* failures count, and then concurrent connection count. | |
*/ | |
@Override | |
public int compare(T server1, T server2) {LoadBalancerStats lbStats = getLoadBalancerStats(); | |
ServerStats stats1 = lbStats.getSingleServerStat(server1); | |
ServerStats stats2 = lbStats.getSingleServerStat(server2); | |
int failuresDiff = (int) (stats2.getFailureCount() - stats1.getFailureCount()); | |
if (failuresDiff != 0) {return failuresDiff;} else {return (stats2.getActiveRequestsCount() - stats1.getActiveRequestsCount()); | |
} | |
} | |
} |
- ServerListSubsetFilter 继承了 ZoneAffinityServerListFilter,实现了 IClientConfigAware、Comparator 接口
- initWithNiwsConfig 方法从 IClientConfig 读取了 ServerListSubsetFilter.size、ServerListSubsetFilter.forceEliminatePercent、ServerListSubsetFilter.eliminationFailureThresold、ServerListSubsetFilter.eliminationConnectionThresold 配置
- getFilteredListOfServers 方法会先调用父类 ZoneAffinityServerListFilter 的 getFilteredListOfServers 先过滤出 zoneAffinityFiltered 作为 candidates,然后遍历 currentSubset 根据 ServerStats 剔除 activeRequestsCount 及 failureCount 超出阈值的 server;然后根据 numToForceEliminate,以及 failureCount 排序,使用 subList 方法得出 forceEliminated,然后移除掉,最后 randomChoose 出来新的 newSubSet,然后重置 currentSubset
小结
在 server list 非常多的场景下,没有必要在连接池的保持这么多的连接,ServerListSubsetFilter 可以在这种场景下对 server list 进行精简,通过剔除相对不健康 (failureCount、activeRequestCount
) 的 server 来达到此目标
doc
- ServerListSubsetFilter
正文完