乐趣区

关于java:解析-Dubbo-的-LoadBalance-策略源码

零 后期筹备

0 FBI WARNING

文章异样啰嗦且绕弯。

1 版本

Dubbo 版本 : dubbo 3.0

2 LoadBalance 简介

Dubbo LoadBalance 是 Dubbo Consumer 中用于负载平衡的组件,位于 Cluster 层中。

一 Interface

LoadBalance 的组件遵循 Dubbo 的个别设计法则,接口在 dubbo-cluster 模块中:

package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.Adaptive;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance;
import java.util.List;

@SPI(RandomLoadBalance.NAME)  // RandomLoadBalance.NAME = random
public interface LoadBalance {

    /**
     * 能够在 url 里传入 loadbalance 参数来切换负载平衡策略,默认依据 spi 机制,会应用 random
     */
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

二 模版类

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import java.util.List;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_SERVICE_REFERENCE_PATH;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;

/**
 * 负载平衡组件模板
 */
public abstract class AbstractLoadBalance implements LoadBalance {static int calculateWarmupWeight(int uptime, int warmup, int weight) {// ww = ( 以后工夫 - 启动工夫) / 预热工夫 * 权重
        // 取 ww 和 权重 中的最小值
        // 如果 以后工夫 还在 预热工夫 内,那么此处 ww 必然小于 权重
        // 如果 以后工夫 和 启动工夫 相差十分近,或者 预热工夫 很长,那么此处 ww 有可能会小于 1,此处会返回 1
        // 如果 以后工夫 小于 启动工夫,那么是服务的工夫问题,ww 就会小于 0,此处会返回 1

        // 从 getWeight(...) 办法可知,此处 ww 必然小于 weight
        int ww = (int) (uptime / ((float) warmup / weight));
        return ww < 1 ? 1 : (Math.min(ww, weight));
    }

    /**
     * 接口形象办法 select 的实现,也是模版的外围办法
     * 
     * @param invokers   所有的服务提供者信息的封装
     * @param url        以后调用者的 url
     * @param invocation  要发送的信息
     */
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // 如果没有服务提供者,此处返回 null
        if (CollectionUtils.isEmpty(invokers)) {return null;}

        // 服务的提供者只有一个,间接返回就能够了,没有负载平衡的必要
        if (invokers.size() == 1) {return invokers.get(0);
        }

        // 有多个,那么此处须要不同的策略自行实现具体逻辑
        return doSelect(invokers, url, invocation);
    }

    /**
     * 模板办法的具体实现,从列表中抉择一个 invoker
     *
     * @param invokers 所有的服务提供者信息的封装
     * @param url     以后调用者的 url
     * @param invocation 要发送的信息
     */
    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);


    /**
     * 获取 invoker 权重的办法,在 random 和 robin      * 中很重要的办法
     */
    int getWeight(Invoker<?> invoker, Invocation invocation) {
        int weight;

        // 获取 url
        URL url = invoker.getUrl();

        // REGISTRY_SERVICE_REFERENCE_PATH = org.apache.dubbo.registry.RegistryService
        // REGISTRY_KEY = registry
        // WEIGHT_KEY = weight
        // DEFAULT_WEIGHT = 100
        // TIMESTAMP_KEY = timestamp
        // WARMUP_KEY = warmup
        // DEFAULT_WARMUP = 600000

        if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
            // 入参 registry.weight 和 100
            weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
        } else {

            // provider 的权重
            weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
            if (weight > 0) { // 权重大于 0
                // provider 的启动的工夫戳
                long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
                if (timestamp > 0L) {long uptime = System.currentTimeMillis() - timestamp;
                    if (uptime < 0) {
                        // 启动的工夫戳小于以后工夫戳,这种状况可能是存在服务器工夫问题
                        // 此处为何返回 1?return 1;
                    }

                    // warmup 是预热工夫,如果以后工夫内,这个 provider 还处于预热当中
                    // 那么就会调用到 calculateWarmupWeight(...) 办法
                    int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
                    if (uptime > 0 && uptime < warmup) {weight = calculateWarmupWeight((int)uptime, warmup, weight);
                    }
                }
            }
        }

        // 权重不能低于 0
        return Math.max(weight, 0);
    }
}

三 负载平衡策略实现

在 Dubbo 3.0 中,负载平衡策略存在以下几种:

  • RandomLoadBalance(随机)
  • RoundRobinLoadBalance(轮询)
  • ShortestResponseLoadBalance(最短反馈)
  • LeastActiveLoadBalance(起码沉闷)
  • ConsistentHashLoadBalance(一致性 hash)

笔者这里临时只列举前三种,前面两种有缘补充 (实际上是因为还没看完)。

1 RandomLoadBalance

默认策略,实际上是思考了权重之后的随机抉择,如果每个服务提供者的权重都统一,那么就应用 java 的随机函数去抉择一个。

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 思考权重值之后的随机负载平衡
 */
public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // 获取服务提供者的数量
        int length = invokers.size();

        // 默认所有的服务提供者是雷同的权重
        boolean sameWeight = true;

        // 权重数组
        int[] weights = new int[length];

        // 获取第一个服务提供者的权重
        int firstWeight = getWeight(invokers.get(0), invocation);
        // 存入数组
        weights[0] = firstWeight;

        // 权重的和
        int totalWeight = firstWeight;

        // 轮询所有的提供者的权重并记录下来
        for (int i = 1; i < length; i++) {
            // 此处和上方代码相似
            int weight = getWeight(invokers.get(i), invocation);
            weights[i] = weight;
            totalWeight += weight;

            // 如果遇到不一样的就把标识改成 false
            if (sameWeight && weight != firstWeight) {sameWeight = false;}
        }

        // 不同权重模式下的随机计算
        // 大略思路是 row 一个随机值,并依照程序进行相减,察看落在哪个区间内
        if (totalWeight > 0 && !sameWeight) {int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < length; i++) {offset -= weights[i];
                if (offset < 0) {return invokers.get(i);
                }
            }
        }

        // 雷同权重下的随机计算
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }

}

2 RoundRobinLoadBalance

轮询负载平衡策略,实质上也是思考了权重之后的轮循。如果 A 服务提供者的权重是 B 服务提供者的两倍,那么实践上 A 被轮循到的次数就会是 B 的两倍。

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Round robin load balance.
 *
 * 轮询负载平衡策略
 */
public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin";

    private static final int RECYCLE_PERIOD = 60000;

    /**
     * 权重的封装
     */
    protected static class WeightedRoundRobin {
        private int weight;
        private AtomicLong current = new AtomicLong(0);
        private long lastUpdate;

        public int getWeight() {return weight;}

        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }

        public long increaseCurrent() {return current.addAndGet(weight);
        }

        public void sel(int total) {current.addAndGet(-1 * total);
        }

        public long getLastUpdate() {return lastUpdate;}

        public void setLastUpdate(long lastUpdate) {this.lastUpdate = lastUpdate;}
    }

    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();

    /**
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     *
     * @param invokers
     * @param invocation
     * @return
     */
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {return map.keySet();
        }
        return null;
    }

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // key = serviceKey + methodName
        // 这个 key 代表一个 provider 接口
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

        // 获取权重记录,如果没有的话会创立一个空 map
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();


        Invoker<T> selectedInvoker = null; // 被选中的 provider
        WeightedRoundRobin selectedWRR = null; // 被选中的 provider 的权重 entity
        for (Invoker<T> invoker : invokers) {

            // 此处如果存在权重记录就间接返回,不存在就初始化一个
            // identifyString 是缓存的 key
            String identifyString = invoker.getUrl().toIdentityString();

            /*
                 获取权重的封装对象,如果没有的话会创立一个
                 WeightedRoundRobin 保护两个重要的参数,一个数 current,代表该 provider 以后的调用权重
                 一个是 weight,代表该 provider 恒定的配置权重
             */
            int weight = getWeight(invoker, invocation);
            WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {WeightedRoundRobin wrr = new WeightedRoundRobin();
                wrr.setWeight(weight);
                return wrr;
            });

            // 改权重数据
            if (weight != weightedRoundRobin.getWeight()) {weightedRoundRobin.setWeight(weight);
            }

            // cur = weightedRoundRobin.current + weightedRoundRobin.weight
            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);

            // 此处的 cur > maxCurrent,实质上选出了所有 provider 中 current 最大的一个
            // 此处联合上述逻辑,相当于给每个 provider 的 current 减少了一次 weight
            // 并选出了 current 最大的那一个,作为调用方
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }


        // 对 map 进行自检
        // 如果超过 60 秒都没有被调用,此处即认为服务曾经异样,就会移除
        if (invokers.size() != map.size()) {map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
        }
        if (selectedInvoker != null) {
            // weightedRoundRobin.current = weightedRoundRobin.current - totalWeight
            // 相当于
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }


        /**
         * 上述逻辑简图
         * 假如三个服务 s1,s2,s3 权重均为 10
         *
         * 第一轮叠加权重后的 current:* 10 10 10
         * 第一轮抉择推送 s1,推送实现后的 current:* -20 10 10
         *
         * 第二轮叠加权重后的 current:* -10 20 20
         * 第二轮抉择推送 s2,推送实现后的 current:* -10 -10 20
         *
         * 第三轮叠加权重后的 current:* 0 0 30
         * 第三轮抉择推送 s3,推送实现后的 current:* 0 0 0
         *
         * 第四轮叠加权重后的 current:* 10 10 10
         * 第四轮抉择推送 s1,推送实现后的 current:* -20 10 10
         *
         *
         * 以此类推。*/

        // 上述代码出问题的状况下默认选第一个
        return invokers.get(0);
    }
}

3 ShortestResponseLoadBalance

依据响应工夫和以后服务的申请量去取得一个最优解。如果存在多个最优解,则思考权重,如果仅有一个则权重有效。

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcStatus;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 依据最优解抉择服务提供者
 */
public class ShortestResponseLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "shortestresponse";

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // 可调用的服务提供者的数量
        int length = invokers.size();
        // 初始化一个最短 response 工夫
        long shortestResponse = Long.MAX_VALUE;
        // 初始化一个最短 response 总数
        int shortestCount = 0;
        // The index of invokers having the same estimated shortest response time
        int[] shortestIndexes = new int[length];
        // 每个服务提供者的权重
        int[] weights = new int[length];
        // 权重和
        int totalWeight = 0;
        // 调用均匀返回工夫最短的服务提供者的权重
        int firstWeight = 0;
        // 权重是否雷同
        boolean sameWeight = true;

        // 轮询所有的服务提供者
        for (int i = 0; i < length; i++) {Invoker<T> invoker = invokers.get(i);
            // 获取服务提供者的状态
            RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            // 均匀服务调用胜利返回工夫
            long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
            // 正在沉闷的申请数
            int active = rpcStatus.getActive();
            // 此处用均匀工夫乘以沉闷数,取得打分
            // 如果服务提供方很强壮,均匀工夫很短,然而申请调配的很多,这里分数也会比拟高
            // 分数越低,优先级越高
            long estimateResponse = succeededAverageElapsed * active;

            // 获取权重
            int afterWarmup = getWeight(invoker, invocation);
            weights[i] = afterWarmup;

            /**
             * 计算最短数组,shortestResponse 记录以后最短的
             */
            if (estimateResponse < shortestResponse) {
                // 如果以后服务提供者的得分低于最低的得分,则更新最低得分,// 并将最优提供者数组的首地位为以后的提供者
                shortestResponse = estimateResponse;
                shortestCount = 1;
                shortestIndexes[0] = i;
                totalWeight = afterWarmup;
                firstWeight = afterWarmup;
                sameWeight = true;
            } else if (estimateResponse == shortestResponse) {
                // 如果相等,则可能存在多个最优解
                shortestIndexes[shortestCount++] = i;
                totalWeight += afterWarmup;
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {sameWeight = false;}
            }
        }

        // 最优解只有一个的状况,间接选最优解进行调用
        if (shortestCount == 1) {return invokers.get(shortestIndexes[0]);
        }
        // 最优解不止一个,且最优解之间的权重不同,那么此处依据权重去随机抉择一个
        if (!sameWeight && totalWeight > 0) {int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < shortestCount; i++) {int shortestIndex = shortestIndexes[i];
                offsetWeight -= weights[shortestIndex];
                if (offsetWeight < 0) {return invokers.get(shortestIndex);
                }
            }
        }

        // 最优解不止一个,且权重雷同,则随机抉择
        return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
    }
}

本文仅为集体的学习笔记,可能存在谬误或者表述不清的中央,有缘补充

退出移动版