乐趣区

关于源码分析:限速神器RateLimiter源码解析-京东云技术团队

作者:京东科技 李玉亮

目录指引

限流场景

软件系统中个别有两种场景会用到限流:

场景一、高并发的用户端场景。 尤其是 C 端系统,常常面对海量用户申请,如不做限流,遇到霎时高并发的场景,则可能压垮零碎。

场景二、外部交易解决场景。 如某类交易工作解决时有速率要求,再如上下游调用时上游对上游有速率要求。

•无论哪种场景,都须要对申请解决的速率进行限度,或者单个申请解决的速率绝对固定,或者批量申请的解决速率绝对固定,见下图:

罕用的限流算法有如下几种:

算法一、信号量算法。 保护最大的并发申请数(如连接数),当并发申请数达到阈值时报错或期待,如线程池。

算法二、漏桶算法。 模仿一个按固定速率漏出的桶,当流入的申请量大于桶的容量时溢出。

算法三、令牌桶算法。 以固定速率向桶内发放令牌。申请解决时, 先从桶里获取令牌,只服务有令牌的申请。

本次要介绍的 RateLimiter 应用的是令牌桶算法。RateLimiter 是 google 的 guava 包中的一个笨重限流组件,它次要有两个 java 类文件,RateLimiter.java 和 SmoothRateLimiter.java。两个类文件共有 java 代码 301 行、正文 420 行,正文比 java 代码还要多,写的十分具体,前面的介绍也有相干内容是翻译自其正文,有些形容英文原版更加精确清晰,有趣味的也能够联合原版正文进行更具体的理解。

应用介绍

RateLimiter 应用时只需引入 guava jar 便可,最新的版本是 31.1-jre, 本文介绍的源码也是此版本。

            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>31.1-jre</version>
            </dependency>

源码中提供了两个直观的应用示例。

示例一、有一系列工作列表要提交执行,管制提交速率不超过每秒 2 个。

 final RateLimiter rateLimiter = RateLimiter.create(2.0); // 创立一个每秒 2 个许可的 RateLimiter 对象.
 void submitTasks(List<Runnable> tasks, Executor executor) {for (Runnable task : tasks) {rateLimiter.acquire(); // 此处可能有期待
     executor.execute(task);
   }
 }

示例二、以不超过 5kb/ s 的速率产生数据流。

 final RateLimiter rateLimiter = RateLimiter.create(5000.0); // 创立一个每秒 5k 个许可的 RateLimiter 对象
 void submitPacket(byte[] packet) {rateLimiter.acquire(packet.length);
   networkService.send(packet);
 }

能够看出 RateLimiter 的应用非常简单,只须要结构限速器,调用获取许可办法便可,不须要开释许可.

算法介绍

在介绍之前,先说一下 RateLimiter 中的几个名词:

许可( permit ): 代表一个令牌,获取到许可的申请能力放行。

资源利用有余( underunilization ): 许可的发放个别是匀速的,但申请未必是匀速的,有时会有无申请(资源利用有余)的场景,令牌桶会有储存机制。

储存许可( storedPermit ): 令牌桶反对对闲暇资源进行许可储存,许可申请时优先应用储存许可。

陈腐许可( freshPermit ): 当储存许可为空时,采纳透支形式,下发陈腐许可,同时设置下次许可失效工夫为本次陈腐许可的完结工夫。

•如下为一个许可发放示例,矩形代表整个令牌桶,许可产生速度为 1 个 / 秒,令牌桶里有一个储存桶,容量为 2。

以上示例中,在 T1 储存容量为 0,许可申请时间接返回 1 个陈腐许可,储存容量随着时间推移,增长至最大值 2,在 T2 时收到 3 个许可的申请,此时会先从储存桶中取出 2 个,而后再产生 1 个陈腐许可,0.5s 后在 T3 时刻又来了 1 个许可申请,因为最近的许可 0.5s 后才会下发,因而先 sleep0.5s 再下发。

RateLimiter 的外围性能是限速,咱们首先想到的限速计划是记住最初一次下发令牌许可 (permit) 工夫,下次许可申请时,如果与最初一次下发许可工夫的距离小于 1 /QPS,则进行 sleep 至 1 /QPS,否则间接发放,但该办法不能感知到资源利用有余的场景。一方面,隔了很长一段再来申请许可,则可能零碎此时绝对闲暇,可下发更多的许能够充分利用资源;另一方面,隔了很长一段时间再来申请许可,也可能意味着解决申请的资源变冷(如缓存生效),解决效率会降落。因而在 RateLimiter 中,减少了资源利用有余(underutilization)的治理,在代码中体现为储存许可(storedPermits),储存许可值最开始为 0,随着工夫的减少,始终增长为最大储存许可数。许可获取时,首先从储存许可中获取,而后再依据下次陈腐许可获取工夫来进行陈腐许可获取。这里要说的是 RateLimiter 是记住了下次令牌发放的工夫,相似于透支的性能,以后许可获取时立即返回,同时记录下次获取许可的工夫。

代码构造和主体流程

代码构造

整体类图如下:

RateLimiter 类

RateLimiter 类是顶级类,也是惟一裸露给使用者的类,它提供了工厂办法来创立 RateLimiter 办法。create(double permitsPerSecond) 办法创立的是突发限速器,create(double permitsPerSecond, Duration warmupPeriod)办法创立的是预热限速器。同时它提供了 acquire 办法用于获取令牌,提供了 tryAcquire 办法用于尝试获取令牌。该类的外部实现上,一方面有一个 SleepingStopWatch 用于 sleep 操作,另一方面有一个 mutexDoNotUseDirectly 变量和 mutex()办法进行互斥加锁。

SmoothRateLimiter 类

该类继承了 RateLimiter 类,是一个抽象类,含意为平滑限速器,限度速率是平滑的,maxPermits 和 storedPermits 保护了最大存储许可数量和以后存储许可数量;stableIntervalMicros 指规定的稳固许可发放距离,nextFreeTicketMicros 指下一个闲暇许可工夫。

SmoothBursty 类

平滑突发限速器,该类继承了 SmoothRateLimiter,它存储许可的发放频率同设置的 stableIntervalMicros,有一个成员变量 maxBurstSeconds,代表最多存储多长时间的令牌许可。

SmoothWarmingUp 类

平滑预热限速器,继承了 SmoothRateLimiter,与 SmoothBursty 平级,它的预热算法须要肯定的了解老本。

主体流程

获取许可的主体流程如下:

主体流程次要是对储存许可数量和陈腐许可数量进行计算和更新,失去以后许可申请的等待时间。SmoothBursty 算法和 SmoothWarmingUp 算法共用这一套主体流程,差别次要是储存许可的管理策略,两种算法的不同策略在两个子类中各自实现,SmoothBursty 算法绝对简略一些,上面先介绍该算法,而后再介绍 SmoothWarmingUp 算法。

SmoothBursty 算法

限速器创立

采纳的是工厂模式创立,源码如下:

  public static RateLimiter create(double permitsPerSecond) {
    // permitsPerSecond 指每秒容许的许可数. 该办法调用了上面的办法
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }
  // 创立 SmoothBursty(固定储存 1s 的储存许可), 而后设置速率
  static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

1、SmoothBursty 的构造方法绝对简略:

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }

2、rateLimiter.setRate 的定义在父类 RateLimiter 中

  public final void setRate(double permitsPerSecond) {
    checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

该办法应用 synchronized(mutex())办法对互斥锁进行同步,以保障多线程调用的平安,而后调用子类的 doSetRate 办法。第二个参数 nowMicros 传的值是调用了 stopwatch 的办法,将限速器创立的工夫定义为 0,而后计算了以后工夫和创立工夫的时间差,因而采纳的是绝对工夫。

2.1 mutex 办法的实现如下:

  // Can't be initialized in the constructor because mocks don't call the constructor.
  // 从上行正文可看出,这是因为 mock 才用了懒加载, 实际上即时加载代码更简洁
  @CheckForNull private volatile Object mutexDoNotUseDirectly;
  // 双重查看锁的懒加载模式
  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

该办法应用了双重查看锁来对锁对象 mutexDoNotUseDirectly 进行懒加载,另外该办法通过 mutex 长期变量来解决了双重查看锁生效的问题。

2.2 doSetRate 办法的主体实现在 SmoothRateLimiter 类中:

  final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 同步储存许可和工夫
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

该办法在限速器创立时会调用,创立后调用限速器的 setRate 重置速率时也会调用。

2.2.1 resync 办法用于基于以后工夫刷新计算最新的 storedPermis 和 nextFreeTicketMicros.

  /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

该办法从事实场景上讲,代表的是随着工夫的流逝,储存许可一直减少,但从技术实现的角度,并不是真正的继续刷新,而是仅在须要时调用刷新。该办法如果以后工夫小于等于下次许可工夫,则储存许可数量和下次许可工夫不须要刷新;否则通过 (以后工夫 - 下次许可工夫)/ 储存许可的发放距离 计算出的值域最大储存数量取小,则为已储存的许可数量,须要留神的是储存许可数量是 double 类型的。

限速器应用

限速器罕用的办法次要有 accquire 和 tryAccquire。

先说一下 accquire 办法,共有两个共有办法,一个是无参的,每次获取 1 个许可,再一个是整数参数的,每次调用获取多个许可。

  // 获取 1 个许可
  public double acquire() {return acquire(1);
  }
 
  // 获取多个许可
  public double acquire(int permits) {
    // 留出 permits 个许可,失去须要 sleep 的微秒数.
    long microsToWait = reserve(permits);
    // 该办法如果小于等于零则间接返回,否则 sleep
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    // 返回休眠的秒数.
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

从以上源码可看出,获取许可的逻辑很简略:留出 permits 个许可,依据返回值决定是否 sleep 期待。留出许可的办法实现如下:

 // 预留出 permits 个许可
  final long reserve(int permits) {checkPermits(permits);
    synchronized (mutex()) {return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }
   
  // 预留出 permits 个需要,失去须要期待的工夫
  final long reserveAndGetWaitLength(int permits, long nowMicros) {long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }
  abstract long reserveEarliestAvailable(int permits, long nowMicros);

reserveEarliestAvailable 为形象办法,实现在 SmoothRateLimiter 类中,该办法是外围主链路办法,该办法先从储存许可中获取,如果数量足够则间接返回,否则先将全副储存许可取出,再计算还须要的等待时间,逻辑如下:

  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 刷新储存许可和下个令牌工夫
    resync(nowMicros);
    // 返回值为以后的下次闲暇工夫
    long returnValue = nextFreeTicketMicros;
    // 要耗费的储存数量为须要的储存数量
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // 陈腐许可数 = 须要的许可数 - 应用的储存许可
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 等待时间 = 储存许可等待时间(实现方决定)+ 陈腐许可等待时间(数量 * 固定速率)
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);
    // 透支后的下次许可可用工夫 = 以后工夫(nextFreeTicketMicros)+ 等待时间(waitMicros)
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    // 储存许可数量缩小
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

该办法有两点阐明:1、returnValue 为之前计算的下次闲暇工夫(后面有说 RateLimiter 采纳预支的模式,本次间接返回,同时计算下次的最早闲暇工夫)2、储存许可的等待时间不同的实现方逻辑不同,SmoothBursty 算法认为储存许可间接可用,所以返回 0, 前面的 SmoothWarmingUp 算法认为储存许可须要耗费比失常速率更多的预热工夫,有肯定算法逻辑.

至此整个 accquire 办法的调用链路剖析完结,上面再看 tryAccquire 办法就比较简单了,tryAccquire 比 accquire 差别的逻辑在于 tryAccquire 办法会判断下次许可工夫 - 以后工夫是否大于超时工夫,如果是则间接返回 false,否则进行 sleep 并返回 true. 办法源码如下:

  public boolean tryAcquire(Duration timeout) {return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
  }

  public boolean tryAcquire(long timeout, TimeUnit unit) {return tryAcquire(1, timeout, unit);
  }

  public boolean tryAcquire(int permits) {return tryAcquire(permits, 0, MICROSECONDS);
  }

  public boolean tryAcquire() {return tryAcquire(1, 0, MICROSECONDS);
  }

  public boolean tryAcquire(int permits, Duration timeout) {return tryAcquire(permits, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
  }

  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {long nowMicros = stopwatch.readMicros();
      // 判断超时微秒数是否可等到下个许可工夫
      if (!canAcquire(nowMicros, timeoutMicros)) {return false;} else {microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    // 休眠期待
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }
  
  // 下次许可工夫 - 超时工夫 <= 以后工夫
  private boolean canAcquire(long nowMicros, long timeoutMicros) {return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }

SmoothWarmingUp 算法

SmoothWarmingUp 算法的主体解决流程同 SmoothBurstry 算法,次要在储存许可工夫计算上的两个办法进行了新实现,该算法不像 SmoothBurstry 算法那么直观好了解,须要先理解算法逻辑,再看源码。

算法阐明

该算法在源码正文中曾经形容的比拟清晰了,次要思维是限流器的初始储存许可数量便是最大储存许可值,储存许可执行时按肯定算法由慢到快的产生,直至设定的固定速率,以此来达到预热过程。该算法波及到一些数学知识,如果不是很感兴趣,则理解其次要思维便可。上面具体说一下该算法。

说到该算法前,咱们再回头看一下 SmoothRateLimiter 的储存许可,储存许可有以后数量和最大数量,另外还有两个算法逻辑,一个是储存许可生产的速率管制,再一个是储存许可生产速率的管制,在 Bursty 算法中,生产的速率同设定的固定速率,而生产的速率为无穷大(立即生产,不占用工夫);在 WarmingUp 算法中,需对照下图进行剖析:

该图可这样了解,每个储存许可的生产耗时为右侧梯形面积,梯形面积 =(上边长 + 下边长)/2 * 高. 能够看到每个储存许可的面积越来越小,直到固定速率的长方形面积。

在限速器初始化时,输出的变量有固定速率和预热工夫,另外冷却因子是固定值 3;在作者算法中,首先计算的是阈值许可数 = 0.5 * 预热周期 / 固定速率. 而后计算的是最大许可数,咱们晓得了梯形的面积、上边(大速率)、下边(小速率),便能推到出高,最大许可 = 阀值许可数 + 高。

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

在具体应用中,一个是生产的速率,固定为预热工夫 / 最大许可数,源码如下:

  double coolDownIntervalMicros() {return warmupPeriodMicros / maxPermits;}

再一个是生产的速率,按如上曲线从右至左的面积 = 梯形面积 + 长方形面积,梯形面积 =(上边 + 下边) /2 * 高,源码如下:

    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // TODO(cpovirk): Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

源码剖析

理解了以上算法后,再看上面的源码就绝对简略了。

  static final class SmoothWarmingUp extends SmoothRateLimiter {
    // 预热工夫
    private final long warmupPeriodMicros;
    // 斜率
    private double slope;
    // 阈值许可
    private double thresholdPermits;
    // 冷却因子
    private double coldFactor;

    SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {super(stopwatch);
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
      this.coldFactor = coldFactor;
    }
    
    // 参数初始化
    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    // 有 storedPermits 个储存许可,要应用 permitsToTake 个时的等待时间计算
    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // TODO(cpovirk): Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }
    // 许可耗时 = 固定速率 + 许可值 * 斜率
    private double permitsToTime(double permits) {return stableIntervalMicros + permits * slope;}
    // 冷却距离固定为预热工夫 / 最大许可数.
    @Override
    double coolDownIntervalMicros() {return warmupPeriodMicros / maxPermits;}
  }

思考总结

sleep 阐明和绝对工夫

RateLimiter 外部应用类 StopWatch 进行了一个绝对工夫的度量,RateLimiter 创立时,工夫为 0,而后向后累计,sleep 时不受 interrupt 异样影响。

double 浮点数

RateLimiter 裸露的 API 的许可数量入参为整数类型,但外部计算时理论是浮点 double 类型,反对小数许可数量,一方面浮点存在失落精度,另一方面也不便于了解;是否能够应用整数值得思考。

只反对单机

RateLimiter 的这几种算法只反对单机限流,如要反对集群限流,一种形式是先依据负载平衡的权重计算出单机的限速值,再进行单节点限速;另一种形式是参考该组件应用 redis 等中心化数量治理的中间件,但性能和稳定性会升高一些。

扩展性

RateLimiter 提供了无限的扩大能力,自带的 SmoothBursty 和 SmoothWarmingUp 类不是公开类,不能间接创立或调整参数,如敞开储存性能或调整预热系数等。这种场景须要继承 SmoothRateLimiter 进行重写,储存许可的生产和生产算法是容易变动和重写的点,将整个源码拷贝进去进行二次批改也是一种计划。

退出移动版