关于java-ee:限流算法与Guava-RateLimiter解析

9次阅读

共计 7006 个字符,预计需要花费 18 分钟才能阅读完成。

在分布式系统中,应答高并发拜访时,缓存、限流、降级是爱护零碎失常运行的罕用办法。当申请量突发暴涨时,如果不加以限度拜访,则可能导致整个零碎解体,服务不可用。同时有一些业务场景,比方短信验证码,或者其它第三方 API 调用,也须要提供必要的拜访限度反对。还有一些资源耗费过大的申请,比方数据导出等(参考 记一次线上 Java 服务 CPU 100% 处理过程),也有限度拜访频率的需要。

常见的限流算法有令牌桶算法,漏桶算法,与计数器算法。本文次要对三个算法的基本原理及 Google Guava 包中令牌桶算法的实现 RateLimiter 进行介绍,下一篇文章介绍最近写的一个以 RateLimiter 为参考的分布式限流实现及计数器限流实现。

令牌桶算法

令牌桶算法的原理就是以一个恒定的速度往桶里放入令牌,每一个申请的解决都须要从桶里先获取一个令牌,当桶里没有令牌时,则申请不会被解决,要么排队期待,要么降级解决,要么间接拒绝服务。当桶里令牌满时,新增加的令牌会被抛弃或回绝。

令牌桶算法的解决示意图如下(图片来自网络)

令牌桶算法次要是能够管制申请的均匀解决速率,它容许预生产,即能够提前生产令牌,以应答突发申请,然而前面的申请须要为预生产买单(期待更长的工夫),以满足申请解决的均匀速率是肯定的。

漏桶算法

漏桶算法的原理是水(申请)先进入漏桶中,漏桶以肯定的速度出水(解决申请),当水流入速度大于流出速度导致水在桶内逐步沉积直到桶满时,水会溢出(申请被回绝)。

漏桶算法的解决示意图如下(图片来自网络)

漏桶算法次要是管制申请的解决速率,平滑网络上的突发流量,申请能够以任意速度进入漏桶中,但申请的解决则以恒定的速度进行。

计数器算法

计数器算法是限流算法中最简略的一种算法,限度在一个工夫窗口内,至少解决多少个申请。比方每分钟最多解决 10 个申请,则从第一个申请进来的工夫为终点,60s 的工夫窗口内只容许最多解决 10 个申请。下一个工夫窗口又以前一时间窗口过后第一个申请进来的工夫为终点。常见的比方一分钟内只能获取一次短信验证码的性能能够通过计数器算法来实现。

Guava RateLimiter 解析

Guava 是 Google 开源的一个工具包,其中的 RateLimiter 是实现了令牌桶算法的一个限流工具类。在 pom.xml 中增加 guava 依赖,即可应用 RateLimiter

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version>
</dependency>

如下测试代码示例了 RateLimiter 的用法,

public static void main(String[] args) {RateLimiter rateLimiter = RateLimiter.create(1); // 创立一个每秒产生一个令牌的令牌桶
    for(int i=1;i<=5;i++) {double waitTime = rateLimiter.acquire(i); // 一次获取 i 个令牌
        System.out.println("acquire:" + i + "waitTime:" + waitTime);
    }
}

运行后,输入如下,

acquire:1 waitTime:0.0
acquire:2 waitTime:0.997729
acquire:3 waitTime:1.998076
acquire:4 waitTime:3.000303
acquire:5 waitTime:4.000223

第一次获取一个令牌时,期待 0s 立刻可获取到(这里之所以不须要期待是因为令牌桶的预生产个性),第二次获取两个令牌,等待时间 1s,这个 1s 就是后面获取一个令牌时因为预生产没有期待延到这次来期待的工夫,这次获取两个又是预生产,所以下一次获取(取 3 个时)就要期待这次预生产须要的 2s 了,依此类推。可见预生产不须要期待的工夫都由下一次来买单,以保障肯定的均匀解决速率(上例为 1s 一次)。

RateLimiter 有两种实现:

  1. SmoothBursty:令牌的生成速度恒定。应用 RateLimiter.create(double permitsPerSecond) 创立的是 SmoothBursty 实例。
  2. SmoothWarmingUp:令牌的生成速度继续晋升,直到达到一个稳固的值。WarmingUp,顾名思义就是有一个热身的过程。应用 RateLimiter.create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) 时创立就是 SmoothWarmingUp 实例,其中 warmupPeriod 就是热身达到稳固速度的工夫。

类构造如下

要害属性及办法解析(以 SmoothBursty 为例)

1. 要害属性

/** 桶中以后领有的令牌数. */
double storedPermits;

/** 桶中最多能够保留多少秒存入的令牌数 */
double maxBurstSeconds;

/** 桶中能存储的最大令牌数,等于 storedPermits*maxBurstSeconds. */
double maxPermits;

/** 放入令牌的工夫距离 */
double stableIntervalMicros;

/** 下次可获取令牌的工夫点,能够是过来也能够是未来的工夫点 */
private long nextFreeTicketMicros = 0L;

2. 要害办法

调用 RateLimiter.create(double permitsPerSecond) 办法时,创立的是 SmoothBursty 实例,默认设置 maxBurstSeconds 为 1s。SleepingStopwatch 是 guava 中的一个时钟类实现。

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
        rateLimiter.setRate(permitsPerSecond);
        return rateLimiter;
}

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {super(stopwatch);
    this.maxBurstSeconds = maxBurstSeconds;
}

并通过调用 SmoothBursty.doSetRate(double, long) 办法进行初始化,该办法中:

  1. 调用 resync(nowMicros) 对 storedPermits 与 nextFreeTicketMicros 进行了调整——如果以后工夫晚于 nextFreeTicketMicros,则计算这段时间内产生的令牌数,累加到 storedPermits 上,并更新下次可获取令牌工夫 nextFreeTicketMicros 为以后工夫。
  2. 计算 stableIntervalMicros 的值,1/permitsPerSecond。
  3. 调用 doSetRate(double, double) 办法计算 maxPermits 值(maxBurstSeconds*permitsPerSecond),并依据旧的 maxPermits 值对 storedPermits 进行调整。

源码如下所示

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {resync(nowMicros);
        double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
        this.stableIntervalMicros = stableIntervalMicros;
        doSetRate(permitsPerSecond, stableIntervalMicros);
}

/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
        // if nextFreeTicket is in the past, resync to now
        if (nowMicros > nextFreeTicketMicros) {double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
        storedPermits = min(maxPermits, storedPermits + newPermits);
        nextFreeTicketMicros = nowMicros;
        }
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
        double oldMaxPermits = this.maxPermits;
        maxPermits = maxBurstSeconds * permitsPerSecond;
        if (oldMaxPermits == Double.POSITIVE_INFINITY) {
                // if we don't special-case this, we would get storedPermits == NaN, below
                storedPermits = maxPermits;
        } else {
                storedPermits =
                        (oldMaxPermits == 0.0)
                                ? 0.0 // initial state
                                : storedPermits * maxPermits / oldMaxPermits;
        }
}

调用 acquire(int) 办法获取指定数量的令牌时,

  1. 调用 reserve(int) 办法,该办法最终调用 reserveEarliestAvailable(int, long) 来更新下次可取令牌工夫点与以后存储的令牌数,并返回本次可取令牌的工夫点,依据该工夫点计算须要期待的工夫
  2. 阻塞期待 1 中返回的等待时间
  3. 返回期待的工夫(秒)

源码如下所示

/** 获取指定数量(permits)的令牌,阻塞直到获取到令牌,返回期待的工夫 */
@CanIgnoreReturnValue
public double acquire(int permits) {long microsToWait = reserve(permits);
        stopwatch.sleepMicrosUninterruptibly(microsToWait);
        return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {checkPermits(permits);
        synchronized (mutex()) {return reserveAndGetWaitLength(permits, stopwatch.readMicros());
        }
}

/** 返回须要期待的工夫 */
final long reserveAndGetWaitLength(int permits, long nowMicros) {long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
        return max(momentAvailable - nowMicros, 0);
}

/** 针对此次须要获取的令牌数更新下次可取令牌工夫点与存储的令牌数,返回本次可取令牌的工夫点 */
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {resync(nowMicros); // 更新以后数据
        long returnValue = nextFreeTicketMicros;
        double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可生产的令牌数
        double freshPermits = requiredPermits - storedPermitsToSpend; // 须要新增的令牌数
        long waitMicros =
                storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
                        + (long) (freshPermits * stableIntervalMicros); // 须要期待的工夫

        this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 更新下次可取令牌的工夫点
        this.storedPermits -= storedPermitsToSpend; // 更新以后存储的令牌数
        return returnValue;
}

acquire(int) 办法是获取不到令牌时始终阻塞,直到获取到令牌,tryAcquire(int,long,TimeUnit) 办法则是在指定超时工夫内尝试获取令牌,如果获取到或超时工夫到则返回是否获取胜利

  1. 先判断是否能在指定超时工夫内获取到令牌,通过 nextFreeTicketMicros <= timeoutMicros + nowMicros 是否为 true 来判断,即可取令牌工夫早于以后工夫加超时工夫则可取(预生产的个性),否则不可获取。
  2. 如果不可获取,立刻返回 false。
  3. 如果可获取,则调用 reserveAndGetWaitLength(permits, nowMicros) 来更新下次可取令牌工夫点与以后存储的令牌数,返回等待时间(逻辑与后面雷同),并阻塞期待相应的工夫,返回 true。

源码如下所示

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {long timeoutMicros = max(unit.toMicros(timeout), 0);
        checkPermits(permits);
        long microsToWait;
        synchronized (mutex()) {long nowMicros = stopwatch.readMicros();
                if (!canAcquire(nowMicros, timeoutMicros)) { // 判断是否能在超时工夫内获取指定数量的令牌
                        return false;
                } else {microsToWait = reserveAndGetWaitLength(permits, nowMicros);
                }
        }
        stopwatch.sleepMicrosUninterruptibly(microsToWait);
        return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; // 只有可取工夫小于以后工夫 + 超时工夫,则可获取(可预生产的个性!)}

@Override
final long queryEarliestAvailable(long nowMicros) {return nextFreeTicketMicros;}

以上就是 SmoothBursty 实现的根本解决流程。留神两点:

  1. RateLimiter 通过限度前面申请的等待时间,来反对肯定水平的突发申请——预生产的个性。
  2. RateLimiter 令牌桶的实现并不是起一个线程一直往桶里放令牌,而是以一种提早计算的形式(参考 resync 函数),在每次获取令牌之前计算该段时间内能够产生多少令牌,将产生的令牌退出令牌桶中并更新数据来实现,比起一个线程来一直往桶里放令牌高效得多。(想想如果须要针对每个用户限度某个接口的拜访,则针对每个用户都得创立一个 RateLimiter,并起一个线程来管制令牌寄存的话,如果在线用户数有几十上百万,起线程来管制是一件如许恐怖的事件)

总结

本文介绍了限流的三种根本算法,其中令牌桶算法与漏桶算法次要用来限度申请解决的速度,可将其归为限速,计数器算法则是用来限度一个工夫窗口内申请解决的数量,可将其归为限量(对速度不限度)。Guava 的 RateLimiter 是令牌桶算法的一种实现,但 RateLimiter 只实用于单机利用,在分布式环境下就不实用了。尽管已有一些开源我的项目可用于分布式环境下的限流治理,如阿里的 Sentinel,但对于小型我的项目来说,引入 Sentinel 可能显得有点过重,但限流的需要在小型我的项目中也是存在的,下一篇文章就介绍下基于 RateLimiter 的分布式下的限流实现。


[转载请注明出处]
作者:雨歌
欢送关注作者公众号:半路雨歌,查看更多技术干货文章

正文完
 0