关于分布式:分布式Dubbo1负载均衡策略

7次阅读

共计 25035 个字符,预计需要花费 63 分钟才能阅读完成。

Dubbo 源码剖析 – 集群容错之 LoadBalance

1. 简介

LoadBalance 中文意思为负载平衡,它的职责是将网络申请,或者其余模式的负载“均摊”到不同的机器上。防止集群中局部服务器压力过大,而另一些服务器比拟闲暇的状况。

通过负载平衡,能够让每台服务器获取到适宜本人解决能力的负载。在为高负载的服务器分流的同时,还能够防止资源节约,两全其美。负载平衡可分为软件负载平衡和硬件负载平衡。

在咱们日常开发中,个别很难接触到硬件负载平衡。但软件负载平衡还是可能接触到一些的,比方 Nginx。

在 Dubbo 中,也有负载平衡的概念和相应的实现。Dubbo 须要对服务消费者的调用申请进行调配,防止多数服务提供者负载过大。服务提供者负载过大,会导致局部服务调用超时。因而将负载平衡到每个服务提供者上,是十分必要的。

Dubbo 提供了 4 种负载平衡实现,别离是基于 权重随机算法 RandomLoadBalance、基于 起码沉闷调用数算法 LeastActiveLoadBalance、基于 hash 一致性 ConsistentHashLoadBalance,以及基于 加权轮询算法 RoundRobinLoadBalance

这几个负载平衡算法代码不是很长,然而想看懂也不是很容易,须要大家对这几个算法的原理有肯定理解才行。如果不是很理解,也没不必太放心。我会在剖析每个算法的源码之前,对算法原理进行简略的解说,帮忙大家建设初步的印象。

我在写 Dubbo 源码剖析系列文章之初,过后 Dubbo 最新的版本为 2.6.4。近期,Dubbo 2.6.5 公布了,其中就有对负载平衡局部代码批改。因而我在剖析完 2.6.4 版本后的源码后,会另外剖析 2.6.5 更新的局部。本篇文章内容十分之丰盛,须要大家急躁浏览。好了,其余的就不多说了,进入正题吧。

2. 源码剖析

在 Dubbo 中,所有负载平衡实现类均继承自 AbstractLoadBalance,该类实现了 LoadBalance 接口办法,并封装了一些公共的逻辑。所以在剖析负载平衡实现之前,先来看一下 AbstractLoadBalance 的逻辑。首先来看一下负载平衡的入口办法 select,如下:

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {if (invokers == null || invokers.isEmpty())
        return null;
    // 如果 invokers 列表中仅有一个 Invoker,间接返回即可,无需进行负载平衡
    if (invokers.size() == 1)
        return invokers.get(0);
    
    // 调用 doSelect 办法进行负载平衡,该办法为形象办法,由子类实现
    return doSelect(invokers, url, invocation);
}

protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

select 办法的逻辑比较简单,首先会检测 invokers 汇合的合法性,而后再检测 invokers 汇合元素数量。如果只蕴含一个 Invoker,间接返回该 Inovker 即可。如果蕴含多个 Invoker,此时须要通过负载平衡算法抉择一个 Invoker。具体的负载平衡算法由子类实现,接下来章节会对这些子类进行详细分析。

AbstractLoadBalance 除了实现了 LoadBalance 接口办法,还封装了一些公共逻辑 —— 服务提供者权重计算逻辑。具体实现如下:

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    // 从 url 中获取 weight 配置值
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
    if (weight > 0) {
        // 获取服务提供者启动工夫戳
        long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
        if (timestamp > 0L) {
            // 计算服务提供者运行时长
            int uptime = (int) (System.currentTimeMillis() - timestamp);
            // 获取服务预热工夫,默认为 10 分钟
            int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
            // 如果服务运行工夫小于预热工夫,则从新计算服务权重,即降权
            if (uptime > 0 && uptime < warmup) {
                // 从新计算服务权重
                weight = calculateWarmupWeight(uptime, warmup, weight);
            }
        }
    }
    return weight;
}

static int calculateWarmupWeight(int uptime, int warmup, int weight) {// 计算权重,上面代码逻辑上形似于 (uptime / warmup) * weight。// 随着服务运行工夫 uptime 增大,权重计算值 ww 会缓缓靠近配置值 weight
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

下面是权重的计算过程,该过程次要用于保障当服务运行时长小于服务预热工夫时,对服务进行降权,防止让服务在启动之初就处于高负载状态。服务预热是一个优化伎俩,与此类似的还有 JVM 预热。次要目标是让服务启动后“低功率”运行一段时间,使其效率缓缓晋升至最佳状态。对于预热方面的更多常识,大家感兴趣能够本人搜寻一下。

对于 AbstractLoadBalance 就先剖析到这,接下来剖析各个实现类的代码。首先,咱们从 Dubbo 缺省的实现类 RandomLoadBalance 看起。

2.1 RandomLoadBalance

RandomLoadBalance 是加权随机算法的具体实现,它的算法思维很简略。

假如咱们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为 10。当初把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范畴在 [0, 10) 之间的随机数,而后计算这个随机数会落到哪个区间上。比方数字 3 会落到服务器 A 对应的区间上,此时返回服务器 A 即可。

权重越大的机器,在坐标轴上对应的区间范畴就越大,因而随机数生成器生成的数字就会有更大的概率落到此区间内。只有随机数生成器产生的随机数散布性很好,在通过屡次抉择后,每个服务器被选中的次数比例靠近其权重比例。

比方,通过一万次抉择后,服务器 A 被选中的次数大概为 5000 次,服务器 B 被选中的次数约为 3000 次,服务器 C 被选中的次数约为 2000 次。

以上就是 RandomLoadBalance 背地的算法思维,比较简单,不多说了,上面开始剖析源码。

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size();
        int totalWeight = 0;
        boolean sameWeight = true;
        // 上面这个循环有两个作用,第一是计算总权重 totalWeight,// 第二是检测每个服务提供者的权重是否雷同,若不雷同,则将 sameWeight 置为 false
        for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);
            // 累加权重
            totalWeight += weight;
            // 检测以后服务提供者的权重与上一个服务提供者的权重是否雷同,// 不雷同的话,则将 sameWeight 置为 false。if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {sameWeight = false;}
        }
        
        // 上面的 if 分支次要用于获取随机数,并计算随机数落在哪个区间上
        if (totalWeight > 0 && !sameWeight) {// 随机获取一个 [0, totalWeight) 之间的数字
            int offset = random.nextInt(totalWeight);
            // 循环让 offset 数减去服务提供者权重值,当 offset 小于 0 时,返回相应的 Invoker。// 还是用下面的例子进行阐明,servers = [A, B, C],weights = [5, 3, 2],offset = 7。// 第一次循环,offset - 5 = 2 > 0,阐明 offset 必定不会落在服务器 A 对应的区间上。// 第二次循环,offset - 3 = -1 < 0,表明 offset 落在服务器 B 对应的区间上
            for (int i = 0; i < length; i++) {
                // 让随机值 offset 减去权重值
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    // 返回相应的 Invoker
                    return invokers.get(i);
                }
            }
        }
        
        // 如果所有服务提供者权重值雷同,此时间接随机返回一个即可
        return invokers.get(random.nextInt(length));
    }
}

RandomLoadBalance 的算法思维比较简单,在通过屡次申请后,可能将调用申请依照权重值进行“平均”调配。当然 RandomLoadBalance 也存在肯定的毛病,当调用次数比拟少时,Random 产生的随机数可能会比拟集中,此时少数申请会落到同一台服务器上。这个毛病并不是很重大,少数状况下能够疏忽。RandomLoadBalance 是一个简略,高效的负载平衡实现,因而 Dubbo 抉择它作为缺省实现。

对于 RandomLoadBalance 就先到这了,接下来剖析 LeastActiveLoadBalance。

2.2 LeastActiveLoadBalance

LeastActiveLoadBalance 翻译过去是最小沉闷数负载平衡,所谓的最小沉闷数可了解为起码连接数。即服务提供者目前正在解决的申请数(一个申请对应一条连贯)起码,表明该服务提供者效率高,单位工夫内可解决更多的申请。此时应优先将申请调配给该服务提供者。

在具体实现中,每个服务提供者对应一个沉闷数 active。初始状况下,所有服务提供者沉闷数均为 0。每收到一个申请,沉闷数加 1,实现申请后则将沉闷数减 1。在服务运行一段时间后,性能好的服务提供者解决申请的速度更快,因而沉闷数降落的也越快。此时这样的服务提供者可能优先获取到新的服务申请,这就是最小沉闷数负载平衡算法的根本思维。

除了最小沉闷数,LeastActiveLoadBalance 在实现上还引入了权重值。所以精确的来说 LeastActiveLoadBalance 是基于加权最小沉闷数算法实现的。

举个例子阐明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的沉闷数雷同,此时 Dubbo 会依据它们的权重去调配申请,权重越大,获取到新申请的可能性就越大。如果两个服务提供者权重雷同,此时随机抉择一个即可。

对于 LeastActiveLoadBalance 的背景常识就先介绍到这里,上面开始剖析源码。

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size();
        // 最小的沉闷数
        int leastActive = -1;
        // 具备雷同“最小沉闷数”的服务者提供者(以下用 Invoker 代称)数量
        int leastCount = 0; 
        // leastIndexs 用于记录具备雷同“最小沉闷数”的 Invoker 在 invokers 列表中的下标信息
        int[] leastIndexs = new int[length];
        int totalWeight = 0;
        // 第一个最小沉闷数的 Invoker 权重值,用于与其余具备雷同最小沉闷数的 Invoker 的权重进行比照,// 以检测是否所有具备雷同最小沉闷数的 Invoker 的权重均相等
        int firstWeight = 0;
        boolean sameWeight = true;

        // 遍历 invokers 列表
        for (int i = 0; i < length; i++) {Invoker<T> invoker = invokers.get(i);
            // 获取 Invoker 对应的沉闷数
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // 获取权重 - ⭐️
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
            // 发现更小的沉闷数,从新开始
            if (leastActive == -1 || active < leastActive) {
                // 应用以后沉闷数 active 更新最小沉闷数 leastActive
                leastActive = active;
                // 更新 leastCount 为 1
                leastCount = 1;
                // 记录以后下标值到 leastIndexs 中
                leastIndexs[0] = i;
                totalWeight = weight;
                firstWeight = weight;
                sameWeight = true;

            // 以后 Invoker 的沉闷数 active 与最小沉闷数 leastActive 雷同 
            } else if (active == leastActive) {
                // 在 leastIndexs 中记录下以后 Invoker 在 invokers 汇合中的下标
                leastIndexs[leastCount++] = i;
                // 累加权重
                totalWeight += weight;
                // 检测以后 Invoker 的权重与 firstWeight 是否相等,// 不相等则将 sameWeight 置为 false
                if (sameWeight && i > 0
                    && weight != firstWeight) {sameWeight = false;}
            }
        }
        
        // 当只有一个 Invoker 具备最小沉闷数,此时间接返回该 Invoker 即可
        if (leastCount == 1) {return invokers.get(leastIndexs[0]);
        }

        // 有多个 Invoker 具备雷同的最小沉闷数,但他们的权重不同
        if (!sameWeight && totalWeight > 0) {// 随机获取一个 [0, totalWeight) 之间的数字
            int offsetWeight = random.nextInt(totalWeight);
            // 循环让随机数减去具备最小沉闷数的 Invoker 的权重值,// 当 offset 小于等于 0 时,返回相应的 Invoker
            for (int i = 0; i < leastCount; i++) {int leastIndex = leastIndexs[i];
                // 获取权重值,并让随机数减去权重值 - ⭐️
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 如果权重雷同或权重为 0 时,随机返回一个 Invoker
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }
}

如上,为了帮忙大家了解代码,我在下面的代码中写了大量的正文。上面简略总结一下以上代码所做的事件,如下:

  1. 遍历 invokers 列表,寻找沉闷数最小的 Invoker
  2. 如果有多个 Invoker 具备雷同的最小沉闷数,此时记录下这些 Invoker 在 invokers 汇合中的下标,以及累加它们的权重,比拟它们之间的权重值是否相等
  3. 如果只有一个 Invoker 具备最小的沉闷数,此时间接返回该 Invoker 即可
  4. 如果有多个 Invoker 具备最小沉闷数,且它们的权重不相等,此时解决形式和 RandomLoadBalance 统一
  5. 如果有多个 Invoker 具备最小沉闷数,但它们的权重相等,此时随机返回一个即可

以上就是 LeastActiveLoadBalance 大抵的实现逻辑,大家在浏览的源码的过程中要留神辨别沉闷数与权重这两个概念,不要一概而论。

以上剖析是基于 Dubbo 2.6.4 版本进行了,因为近期 Dubbo 2.6.5 公布了,对负载平衡局部的代码进行了一些更新。这其中就蕴含了本节剖析的 LeastActiveLoadBalance,所以上面简略阐明一下 Dubbo 2.6.5 对 LeastActiveLoadBalance 进行了怎么的批改。回到下面的源码中,我在下面的代码中标注了两个黄色的五角星⭐️。两处标记对应的代码别离如下:

int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);

问题出在服务预热阶段,第一行代码间接从 url 中去权重值,未被降权过。第二行代码获取到的是通过降权后的权重。第一行代码获取到的权重值最终会被累加到权重总和 totalWeight 中,这个时候会导致一个问题。offsetWeight 是一个在 0, totalWeight) 范畴内的随机数,而它所减去的是通过降权的权重。很有可能在通过 leastCount 次运算后,offsetWeight 依然是大于 0 的,导致无奈选中 Invoker。这个问题对应的 issue 为 [#904,在 pull request #2172 中被修复。具体的修复逻辑是将标注一处的代码批改为:

// afterWarmup 等价于下面的 weight 变量,这样命名是为了强调该变量通过 warmup 降权解决了
int afterWarmup = getWeight(invoker, invocation);

另外,2.6.4 版本中的 LeastActiveLoadBalance 还要一个缺点,即当一组 Invoker 具备雷同的最小沉闷数,且其中一个 Invoker 的权重值为 1,此时这个 Invoker 无奈被选中。缺点代码如下:

int offsetWeight = random.nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {int leastIndex = leastIndexs[i];
    offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
    if (offsetWeight <= 0)    // ❌
        return invokers.get(leastIndex);
}

问题就出在了 offsetWeight <= 0 上,举例说明,假如有一组 Invoker 的权重为 5、2、1,offsetWeight 最大值为 7。假如 offsetWeight = 7,你会发现,当 for 循环进行第二次遍历后 offsetWeight = 7 – 5 – 2 = 0,提前返回了。此时,权重为 1 的 Invoker 就没有机会被选中。这个批改起来也不难,能够将 offsetWeight < 0,不过 Dubbo 的是将offsetWeight + 1,也就是:

int offsetWeight = random.nextInt(totalWeight) + 1;

两种改变都行,不过我认为感觉第一种形式更好一点,可与 RandomLoadBalance 逻辑保持一致。这里 + 1 有点突兀,大家读到这里要顺便思考一下为什么要 +1。

以上就是 Dubob 2.6.5 对 LeastActiveLoadBalance 的更新,不是很难了解,就不多说了。接下来剖析基于一致性 hash 思维的 ConsistentHashLoadBalance。

2.3 ConsistentHashLoadBalance

一致性 hash 算法由麻省理工学院的 Karger 及其合作者于 1997 年提供出的,算法提出之初是用于大规模缓存零碎的负载平衡。

它的工作过程是这样的,首先依据 ip 获取其余的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 – 1] 的圆环上。当有查问或写入申请时,则为缓存项的 key 生成一个 hash 值。而后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查问或写入缓存项。如果以后节点挂了,则在下一次查问或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。

大抵成果如下,每个缓存节点在圆环上占据一个地位。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比方上面绿色点对应的缓存项存储到 cache-2 节点中。因为 cache-3 挂了,本来应该存到该节点中的缓存想最终会存储到 cache-4 节点中。

对于一致性 hash 算法,我这里只做扫盲。具体的细节不探讨,大家请自行补充相干的背景常识。上面来看看一致性 hash 在 Dubbo 中的利用。咱们把上图的缓存节点替换成 Dubbo 的服务提供者,于是失去了下图:

这里雷同色彩的节点均属于同一个服务提供者,比方 Invoker1-1,Invoker1-2,……, Invoker1-160。这样做的目标是通过引入虚构节点,让 Invoker 在圆环上分散开来,防止数据歪斜问题。所谓数据歪斜是指,因为节点不够扩散,导致大量申请落到了同一个节点上,而其余节点只会接管到了大量的申请。比方:

如上,因为 Invoker-1 和 Invoker-2 在圆环上散布不均,导致系统中 75% 的申请都会落到 Invoker-1 上,只有 25% 的申请会落到 Invoker-2 上。解决这个问题方法是引入虚构节点,通过虚构节点平衡各个节点的申请量。

到这里背景常识就遍及完了,接下来开始剖析源码。咱们先从 ConsistentHashLoadBalance 的 doSelect 办法开始看起,如下:

public class ConsistentHashLoadBalance extends AbstractLoadBalance {

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = 
        new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

        // 获取 invokers 原始的 hashcode
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 如果 invokers 是一个新的 List 对象,意味着服务提供者数量产生了变动,可能新增也可能缩小了。// 此时 selector.identityHashCode != identityHashCode 条件成立
        if (selector == null || selector.identityHashCode != identityHashCode) {
            // 创立新的 ConsistentHashSelector
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }

        // 调用 ConsistentHashSelector 的 select 办法抉择 Invoker
        return selector.select(invocation);
    }
    
    private static final class ConsistentHashSelector<T> {...}
}

如上,doSelect 办法次要做了一些前置工作,比方检测 invokers 列表是不是变动过,以及创立 ConsistentHashSelector。这些工作做完后,接下来开始调用 select 办法执行负载平衡逻辑。在剖析 select 办法之前,咱们先来看一下一致性 hash 选择器 ConsistentHashSelector 的初始化过程,如下:

private static final class ConsistentHashSelector<T> {

    // 应用 TreeMap 存储 Invoker 虚构节点
    private final TreeMap<Long, Invoker<T>> virtualInvokers;

    private final int replicaNumber;

    private final int identityHashCode;

    private final int[] argumentIndex;

    ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
        this.identityHashCode = identityHashCode;
        URL url = invokers.get(0).getUrl();
        // 获取虚构节点数,默认为 160
        this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
        // 获取参加 hash 计算的参数下标值,默认对第一个参数进行 hash 运算
        String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
        argumentIndex = new int[index.length];
        for (int i = 0; i < index.length; i++) {argumentIndex[i] = Integer.parseInt(index[i]);
        }
        for (Invoker<T> invoker : invokers) {String address = invoker.getUrl().getAddress();
            for (int i = 0; i < replicaNumber / 4; i++) {
                // 对 address + i 进行 md5 运算,失去一个长度为 16 的字节数组
                byte[] digest = md5(address + i);
                // 对 digest 局部字节进行 4 次 hash 运算,失去四个不同的 long 型正整数
                for (int h = 0; h < 4; h++) {
                    // h = 0 时,取 digest 中下标为 0 ~ 3 的 4 个字节进行位运算
                    // h = 1 时,取 digest 中下标为 4 ~ 7 的 4 个字节进行位运算
                    // h = 2, h = 3 时过程同上
                    long m = hash(digest, h);
                    // 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中,// virtualInvokers 中的元素要有序,因而选用 TreeMap 作为存储构造
                    virtualInvokers.put(m, invoker);
                }
            }
        }
    }
}

ConsistentHashSelector 的构造方法执行了一系列的初始化逻辑,比方从配置中获取虚构节点数以及参加 hash 计算的参数下标,默认状况下只应用第一个参数进行 hash。须要特地阐明的是,ConsistentHashLoadBalance 的负载平衡逻辑只受参数值影响,具备雷同参数值的申请将会被调配给同一个服务提供者。ConsistentHashLoadBalance 不 care 权重,因而应用时须要留神一下。

在获取虚构节点数和参数下标配置后,接下来要做的事件是计算虚构节点 hash 值,并将虚构节点存储到 TreeMap 中。到此,ConsistentHashSelector 初始化工作就实现了。接下来,咱们再来看看 select 办法的逻辑。

public Invoker<T> select(Invocation invocation) {
    // 将参数转为 key
    String key = toKey(invocation.getArguments());
    // 对参数 key 进行 md5 运算
    byte[] digest = md5(key);
    // 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 办法,// 寻找适合的 Invoker
    return selectForKey(hash(digest, 0));
}

private Invoker<T> selectForKey(long hash) {
    // 到 TreeMap 中查找第一个节点值大于或等于以后 hash 的 Invoker
    Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
    // 如果 hash 大于 Invoker 在圆环上最大的地位,此时 entry = null,// 须要将 TreeMap 的头结点赋值给 entry
    if (entry == null) {entry = virtualInvokers.firstEntry();
    }

    // 返回 Invoker
    return entry.getValue();}

如上,抉择的过程比较简单了。首先是对参数进行 md5 以及 hash 运算,失去一个 hash 值。而后再拿这个值到 TreeMap 中查找指标 Invoker 即可。

到此对于 ConsistentHashLoadBalance 就剖析完了。在浏览 ConsistentHashLoadBalance 之前,大家肯定要先补充背景常识。否者即便这里只有一百多行代码,也很难看懂。好了,本节先剖析到这。

2.4 RoundRobinLoadBalance

本节,咱们来看一下 Dubbo 中的加权轮询负载平衡的实现 RoundRobinLoadBalance。

在详细分析源码前,咱们还是先来理解一下什么是加权轮询。

这里从最简略的轮询开始讲起,所谓轮询就是将申请轮流调配给一组服务器。举个例子,咱们有三台服务器 A、B、C。咱们将第一个申请调配给服务器 A,第二个申请调配给服务器 B,第三个申请调配给服务器 C,第四个申请再次调配给服务器 A。这个过程就叫做轮询。

轮询是一种无状态负载平衡算法,实现简略,实用于每台服务器性能相近的场景下。

显然,现实情况下,咱们并不能保障每台服务器性能均相近。如果咱们将等量的申请调配给性能较差的服务器,这显然是不合理的。

因而,这个时候咱们须要加权轮询算法,对轮询过程进行干涉,使得性能好的服务器能够失去更多的申请,性能差的失去的少一些。每台服务器可能失去的申请数比例,靠近或等于他们的权重比。

比方服务器 A、B、C 权重比为 5:2:1。那么在 8 次申请中,服务器 A 将获取到其中的 5 次申请,服务器 B 获取到其中的 2 次申请,服务器 C 则获取到其中的 1 次申请。

以上就是加权轮询的算法思维,搞懂了这个思维,接下来咱们就能够剖析源码了。咱们先来看一下 2.6.4 版本的 RoundRobinLoadBalance。

public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = 
        new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // key = 全限定类名 + "." + 办法名,比方 com.xxx.DemoService.sayHello
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size();
        // 最大权重
        int maxWeight = 0;
        // 最小权重
        int minWeight = Integer.MAX_VALUE;
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        // 权重总和
        int weightSum = 0;

        // 上面这个循环次要用于查找最大和最小权重,计算权重总和等
        for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);
            // 获取最大和最小权重
            maxWeight = Math.max(maxWeight, weight);
            minWeight = Math.min(minWeight, weight);
            if (weight > 0) {
                // 将 weight 封装到 IntegerWrapper 中
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                // 累加权重
                weightSum += weight;
            }
        }

        // 查找 key 对应的对应 AtomicPositiveInteger 实例,为空则创立。// 这里能够把 AtomicPositiveInteger 看成一个黑盒,大家只有晓得
        // AtomicPositiveInteger 用于记录服务的调用编号即可。至于细节,// 大家如果感兴趣,能够自行剖析
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }

        // 获取以后的调用编号
        int currentSequence = sequence.getAndIncrement();
        // 如果 最小权重 < 最大权重,表明服务提供者之间的权重是不相等的
        if (maxWeight > 0 && minWeight < maxWeight) {
            // 应用调用编号对权重总和进行取余操作
            int mod = currentSequence % weightSum;
            // 进行 maxWeight 次遍历
            for (int i = 0; i < maxWeight; i++) {
                // 遍历 invokerToWeightMap
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    // 获取 Invoker
                    final Invoker<T> k = each.getKey();
                    // 获取权重包装类 IntegerWrapper
                    final IntegerWrapper v = each.getValue();
                    
                    // 如果 mod = 0,且权重大于 0,此时返回相应的 Invoker
                    if (mod == 0 && v.getValue() > 0) {return k;}
                    
                    // mod != 0,且权重大于 0,此时对权重和 mod 别离进行自减操作
                    if (v.getValue() > 0) {v.decrement();
                        mod--;
                    }
                }
            }
        }
        
        // 服务提供者之间的权重相等,此时通过轮询抉择 Invoker
        return invokers.get(currentSequence % length);
    }

    // IntegerWrapper 是一个 int 包装类,次要蕴含了一个自减办法。// 与 Integer 不同,Integer 是不可变类,而 IntegerWrapper 是可变类
    private static final class IntegerWrapper {
        private int value;

        public void decrement() {this.value--;}
        
        // 省略局部代码
    }
}

如上,RoundRobinLoadBalance 的每行代码都不是很难了解,然而将它们组合到一起之后,如同就看不懂了。这里对下面代码的次要逻辑进行总结,如下:

  1. 找到最大权重值,并计算出权重和
  2. 应用调用编号对权重总和进行取余操作,失去 mod
  3. 检测 mod 的值是否等于 0,且 Invoker 权重是否大于 0,如果两个条件均满足,则返回该 Invoker
  4. 如果下面条件不满足,且 Invoker 权重大于 0,此时对 mod 和权重进行递加
  5. 再次循环,反复步骤 3、4

以上过程对应的原理不太好解释,所以上面间接举例说明把。假如咱们有三台服务器 servers = [A, B, C],对应的权重为 weights = [2, 5, 1]。接下来对下面的逻辑进行简略的模仿。

mod = 0:满足条件,此时间接返回服务器 A

mod = 1:须要进行一次递加操作能力满足条件,此时返回服务器 B

mod = 2:须要进行两次递加操作能力满足条件,此时返回服务器 C

mod = 3:须要进行三次递加操作能力满足条件,通过递加后,服务器权重为 [1, 4, 0],此时返回服务器 A

mod = 4:须要进行四次递加操作能力满足条件,通过递加后,服务器权重为 [0, 4, 0],此时返回服务器 B

mod = 5:须要进行五次递加操作能力满足条件,通过递加后,服务器权重为 [0, 3, 0],此时返回服务器 B

mod = 6:须要进行六次递加操作能力满足条件,通过递加后,服务器权重为 [0, 2, 0],此时返回服务器 B

mod = 7:须要进行七次递加操作能力满足条件,通过递加后,服务器权重为 [0, 1, 0],此时返回服务器 B

通过 8 次调用后,咱们失去的负载平衡后果为 [A, B, C, A, B, B, B, B],次数比 A:B:C = 2:5:1,等于权重比。当 sequence = 8 时,mod = 0,此时重头再来。从下面的模仿过程能够看出,当 mod >= 3 后,服务器 C 就不会被选中了,因为它的权重被减为 0 了。当 mod >= 4 后,服务器 A 的权重被减为 0,尔后 A 就不会再被选中。

以上是 2.6.4 版本的 RoundRobinLoadBalance 剖析过程,大家如果看不懂,本人能够定义一些权重组合进行模仿。也能够写点测试用例,进行调试剖析,总之不要死看。

2.6.4 版本的 RoundRobinLoadBalance 存在着比较严重的性能问题,该问题最后是在 issue #2578 中被反馈进去。问题出在了 Invoker 的返回机会上,RoundRobinLoadBalance 须要在mod == 0 && v.getValue() > 0 条件成立的状况下才会被返回相应的 Invoker。如果 mod 很大,比方 10000,50000,甚至更大时,doSelect 办法须要进行很屡次计算能力将 mod 减为 0。由此可知,doSelect 的效率与 mod 无关,工夫复杂度为 O(mod)。mod 又受最大权重 maxWeight 的影响,因而当某个服务提供者配置了十分大的权重,此时 RoundRobinLoadBalance 会产生比较严重的性能问题。这个问题被反馈后,社区很快做了回应。并对 RoundRobinLoadBalance 的代码进行了重构,将工夫复杂度优化至了常量级别。这个优化能够说很好了,上面咱们来学习一下优化后的代码。

public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private final ConcurrentMap<String, AtomicPositiveInteger> indexSeqs = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size();
        int maxWeight = 0;
        int minWeight = Integer.MAX_VALUE;
        final List<Invoker<T>> invokerToWeightList = new ArrayList<>();
        
        // 查找最大和最小权重
        for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight);
            minWeight = Math.min(minWeight, weight);
            if (weight > 0) {invokerToWeightList.add(invokers.get(i));
            }
        }
        
        // 获取以后服务对应的调用序列对象 AtomicPositiveInteger
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            // 创立 AtomicPositiveInteger,默认值为 0
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        
        // 获取下标序列对象 AtomicPositiveInteger
        AtomicPositiveInteger indexSeq = indexSeqs.get(key);
        if (indexSeq == null) {
            // 创立 AtomicPositiveInteger,默认值为 -1
            indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
            indexSeq = indexSeqs.get(key);
        }

        if (maxWeight > 0 && minWeight < maxWeight) {length = invokerToWeightList.size();
            while (true) {int index = indexSeq.incrementAndGet() % length;
                int currentWeight = sequence.get() % maxWeight;

                // 每循环一轮(index = 0),从新计算 currentWeight
                if (index == 0) {currentWeight = sequence.incrementAndGet() % maxWeight;
                }
                
                // 检测 Invoker 的权重是否大于 currentWeight,大于则返回
                if (getWeight(invokerToWeightList.get(index), invocation) > currentWeight) {return invokerToWeightList.get(index);
                }
            }
        }
        
        // 所有 Invoker 权重相等,此时进行一般的轮询即可
        return invokers.get(sequence.incrementAndGet() % length);
    }
}

下面代码的逻辑是这样的,每进行一轮循环,从新计算 currentWeight。如果以后 Invoker 权重大于 currentWeight,则返回该 Invoker。还是举例说明吧,假如服务器 [A, B, C] 对应权重 [5, 2, 1]。

第一轮循环,currentWeight = 1,可返回 A 和 B

第二轮循环,currentWeight = 2,返回 A

第三轮循环,currentWeight = 3,返回 A

第四轮循环,currentWeight = 4,返回 A

第五轮循环,currentWeight = 0,返回 A, B, C

如上,这里的一轮循环是指 index 再次变为 0 所经验过的循环,这里能够把 index = 0 看做是一轮循环的开始。每一轮循环的次数与 Invoker 的数量无关,Invoker 数量通常不会太多,所以咱们能够认为下面代码的工夫复杂度为常数级。

重构后的 RoundRobinLoadBalance 看起来曾经很不错了,然而在代码更新不久后,很有又被重构了。这次重构起因是新的 RoundRobinLoadBalance 在某些状况下选出的服务器序列不够平均。比方,服务器 [A, B, C] 对应权重 [5, 1, 1]。当初进行 7 次负载平衡,抉择进去的序列为 [A, A, A, A, A, B, C]。前 5 个申请全副都落在了服务器 A 上,散布不够平均。这将会使服务器 A 短时间内接管大量的申请,压力陡增。而 B 和 C 无申请,处于闲暇状态。咱们冀望的后果是这样的 [A, A, B, A, C, A, A],不同服务器能够交叉获取申请。为了减少负载平衡后果的平滑性,社区再次对 RoundRobinLoadBalance 的实现进行了重构。这次重构参考自 Nginx 的平滑加权轮询负载平衡,实现原理是这样的。每个服务器对应两个权重,别离为 weight 和 currentWeight。其中 weight 是固定的,currentWeight 是会动静调整,初始值为 0。当有新的申请进来时,遍历服务器列表,让它的 currentWeight 加上本身权重。遍历实现后,找到最大的 currentWeight,并将其减去权重总和,而后返回相应的服务器即可。

下面形容不是很好了解,上面还是举例说明吧。依然应用服务器 [A, B, C] 对应权重 [5, 1, 1] 的例子进行阐明,当初有 7 个申请顺次进入负载平衡逻辑,抉择过程如下:

申请编号 currentWeight 数组 抉择后果 减去权重总和后的 currentWeight 数组
1 [5, 1, 1] A [-2, 1, 1]
2 [3, 2, 2] A [-4, 2, 2]
3 [1, 3, 3] B [1, -4, 3]
4 [6, -3, 4] A [-1, -3, 4]
5 [4, -2, 5] C [4, -2, -2]
6 [9, -1, -1] A [2, -1, -1]
7 [7, 0, 0] A [0, 0, 0]

如上,通过平滑性解决后,失去的服务器序列为 [A, A, B, A, C, A, A],相比之前的序列 [A, A, A, A, A, B, C],散布性要好一些。初始状况下 currentWeight = [0, 0, 0],第 7 个申请解决完后,currentWeight 再次变为 [0, 0, 0],是不是很神奇。这个后果也不难理解,在 7 次计算过程中,每个服务器的 currentWeight 都减少了本身权重 weight 7,失去 currentWeight = [35, 7, 7],A 被选中 5 次,要被减去 5 7。B 和 C 被选中 1 次,要被减去 1 * 7。于是 currentWeight = [35, 7, 7] – [35, 7, 7] = [0, 0, 0]。

以上就是平滑加权轮询的计算过程,当初大家应该对平滑加权轮询算法了有了一些理解。接下来,咱们来看看 Dubbo-2.6.5 是如何实现下面的计算过程的。

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin";
    
    private static int RECYCLE_PERIOD = 60000;
    
    protected static class WeightedRoundRobin {
        // 服务提供者权重
        private int weight;
        // 以后权重
        private AtomicLong current = new AtomicLong(0);
        // 最初一次更新工夫
        private long lastUpdate;
        
        public void setWeight(int weight) {
            this.weight = weight;
            // 初始状况下,current = 0
            current.set(0);
        }
        public long increaseCurrent() {
            // current = current + weight;return current.addAndGet(weight);
        }
        public void sel(int total) {
            // current = current - total;
            current.addAndGet(-1 * total);
        }
    }

    // 嵌套 Map 构造,存储的数据结构示例如下:// {
    //     "UserService.query": {
    //         "url1": WeightedRoundRobin@123, 
    //         "url2": WeightedRoundRobin@456, 
    //     },
    //     "UserService.update": {
    //         "url1": WeightedRoundRobin@123, 
    //         "url2": WeightedRoundRobin@456,
    //     }
    // }
    // 最外层为服务类名 + 办法名,第二层为 url 到 WeightedRoundRobin 的映射关系。// 这里咱们能够将 url 看成是服务提供者的 id
    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
    
    // 原子更新锁
    private AtomicBoolean updateLock = new AtomicBoolean();
    
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        // 获取 url 到 WeightedRoundRobin 映射表,如果为空,则创立一个新的
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        
        // 获取以后工夫
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;

        // 上面这个循环次要做了这样几件事件://   1. 遍历 Invoker 列表,检测以后 Invoker 是否有
        //      对应的 WeightedRoundRobin,没有则创立
        //   2. 检测 Invoker 权重是否产生了变动,若变动了,//      则更新 WeightedRoundRobin 的 weight 字段
        //   3. 让 current 字段加上本身权重,等价于 current += weight
        //   4. 设置 lastUpdate 字段,即 lastUpdate = now
        //   5. 寻找具备最大 current 的 Invoker 以及 WeightedRoundRobin,//      暂存起来,留作后用
        //   6. 计算权重总和
        for (Invoker<T> invoker : invokers) {String identifyString = invoker.getUrl().toIdentityString();
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
            int weight = getWeight(invoker, invocation);
            if (weight < 0) {weight = 0;}
            
            // 检测以后 Invoker 是否有对应的 WeightedRoundRobin,没有则创立
            if (weightedRoundRobin == null) {weightedRoundRobin = new WeightedRoundRobin();
                // 设置 Invoker 权重
                weightedRoundRobin.setWeight(weight);
                // 存储 url 惟一标识 identifyString 到 weightedRoundRobin 的映射关系
                map.putIfAbsent(identifyString, weightedRoundRobin);
                weightedRoundRobin = map.get(identifyString);
            }
            // Invoker 权重不等于 WeightedRoundRobin 中保留的权重,阐明权重变动了,此时进行更新
            if (weight != weightedRoundRobin.getWeight()) {weightedRoundRobin.setWeight(weight);
            }
            
            // 让 current 加上本身权重,等价于 current += weight
            long cur = weightedRoundRobin.increaseCurrent();
            // 设置 lastUpdate,示意近期更新过
            weightedRoundRobin.setLastUpdate(now);
            // 找出最大的 current 
            if (cur > maxCurrent) {
                maxCurrent = cur;
                // 将具备最大 current 权重的 Invoker 赋值给 selectedInvoker
                selectedInvoker = invoker;
                // 将 Invoker 对应的 weightedRoundRobin 赋值给 selectedWRR,留作后用
                selectedWRR = weightedRoundRobin;
            }
            
            // 计算权重总和
            totalWeight += weight;
        }

        // 对 <identifyString, WeightedRoundRobin> 进行查看,过滤掉长时间未被更新的节点。// 该节点可能挂了,invokers 中不蕴含该节点,所以该节点的 lastUpdate 长时间无奈被更新。// 若未更新时长超过阈值后,就会被移除掉,默认阈值为 60 秒。if (!updateLock.get() && invokers.size() != map.size()) {if (updateLock.compareAndSet(false, true)) {
                try {ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                    // 拷贝
                    newMap.putAll(map);
                    
                    // 遍历批改,也就是移除过期记录
                    Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                    while (it.hasNext()) {Entry<String, WeightedRoundRobin> item = it.next();
                        if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {it.remove();
                        }
                    }
                    
                    // 更新援用
                    methodWeightMap.put(key, newMap);
                } finally {updateLock.set(false);
                }
            }
        }

        if (selectedInvoker != null) {
            // 让 current 减去权重总和,等价于 current -= totalWeight
            selectedWRR.sel(totalWeight);
            // 返回具备最大 current 的 Invoker
            return selectedInvoker;
        }
        
        // should not happen here
        return invokers.get(0);
    }
}

以上就是 Dubbo-2.6.5 版本的 RoundRobinLoadBalance,大家如果可能了解平滑加权轮询算法的计算过程,再配合我写的正文,了解下面的代码应该不难。

以上就是对于 RoundRobinLoadBalance 全副的剖析,内容有点多,大家缓缓消化吧。好了,本节先到这。

3. 总结

本篇文章对 Dubbo 中的几种负载平衡实现进行了具体的剖析,总的来说,这篇文章写的还是有点累的。次要是每介绍一种负载平衡实现,就要介绍一下相干背景。另一方面,这里很多货色对于我来说,也齐全是新的。在此之前,我对负载平衡算法并没太多理解。这篇文章基本上是边学边写的,总共耗时 5 天。原本想简略写写算了,但最初还是决定写具体点。好在,当初写完了,我也能够放松一下了。

本篇文章是我的 Dubbo 源码剖析系列文章对于集群容错局部的最初一篇文章,写完感觉学到了很多货色。通过坚定不移的浏览代码,写技术文章,使得我对 Dubbo 有了更深刻的理解。当然,这还远远不够。后续还有很多货色要理解,比方 Nacos、Sentinel 等。长路漫漫,步履不停。

好了,本篇文章到这里就完结了。感激大家的浏览。

参考

  • 负载平衡之加权轮询算法 – CSDN
  • dubbo 源码 - 预热 warmup 过程 – 简书
  • 一致性哈希算法原理 – 博客园

附录:Dubbo 源码剖析系列文章

工夫 文章
2018-10-01 Dubbo 源码剖析 – SPI 机制
2018-10-13 Dubbo 源码剖析 – 自适应拓展原理
2018-10-31 Dubbo 源码剖析 – 服务导出
2018-11-12 Dubbo 源码剖析 – 服务援用
2018-11-17 Dubbo 源码剖析 – 集群容错之 Directory
2018-11-20 Dubbo 源码剖析 – 集群容错之 Router
2018-11-22 Dubbo 源码剖析 – 集群容错之 Cluster
2018-11-29 Dubbo 源码剖析 – 集群容错之 LoadBalance
2019-01-09 Dubbo 源码剖析 – 服务调用过程
  • 本文链接: https://www.tianxiaobo.com/20…
  • 版权申明: 本博客所有文章除特地申明外,均采纳 常识共享署名 - 非商业性应用 - 禁止演绎 4.0 国内许可协定 许可协定。转载请注明出处!
正文完
 0