共计 10662 个字符,预计需要花费 27 分钟才能阅读完成。
限流作为当初微服务中常见的稳定性措施,在面试中必定也是常常会被问到的,我在面试的时候也常常喜爱问一下你对限流算法晓得哪一些?有看过源码吗?实现原理是什么?
第一局部先讲讲限流算法,最初再讲讲源码的实现原理。
限流算法
对于限流的算法大体上能够分为四类:固定窗口计数器、滑动窗口计数器、漏桶(也有称漏斗,英文 Leaky bucket)、令牌桶(英文 Token bucket)。
固定窗口
固定窗口,相比其余的限流算法,这应该是最简略的一种。
它简略地对一个固定的工夫窗口内的申请数量进行计数,如果超过申请数量的阈值,将被间接抛弃。
这个简略的限流算法优缺点都很显著。长处的话就是简略,毛病举个例子来说。
比方咱们下图中的黄色区域就是固定工夫窗口,默认工夫范畴是 60s,限流数量是 100。
如图中括号内所示,后面一段时间都没有流量,刚好前面 30 秒内来了 100 个申请,此时因为没有超过限流阈值,所以申请全副通过,而后下一个窗口的 20 秒内同样通过了 100 个申请。
所以变相的相当于在这个括号的 40 秒的工夫内就通过了 200 个申请,超过了咱们限流的阈值。
滑动窗口
为了优化这个问题,于是有了滑动窗口算法,顾名思义,滑动窗口就是工夫窗口在随着时间推移不停地挪动。
滑动窗口把一个固定工夫窗口再持续拆分成 N 个小窗口,而后对每个小窗口别离进行计数,所有小窗口申请之和不能超过咱们设定的限流阈值。
以下图举例子来说,假如咱们的窗口拆分成了 3 个小窗口,小窗口都是 20s,同样基于下面的例子,当在第三个 20s 的时候来了 100 个申请,能够通过。
而后工夫窗口滑动,下一个 20s 申请又来了 100 个申请,此时咱们滑动窗口的 60s 范畴内申请数量必定就超过 100 了啊,所以申请被回绝。
漏桶 Leaky bucket
漏桶算法,人如其名,他就是一个漏的桶,不论申请的数量有多少,最终都会以固定的进口流量大小匀速流出,如果申请的流量超过漏桶大小,那么超出的流量将会被抛弃。
也就是说流量流入的速度是不定的,然而流出的速度是恒定的。
这个和 MQ 削峰填谷的思维比拟相似,在面对忽然激增的流量的时候,通过漏桶算法能够做到匀速排队,固定速度限流。
漏桶算法的劣势是匀速,匀速是长处也是毛病,很多人说漏桶不能解决突增流量,这个说法并不精确。
漏桶原本就应该是为了解决间歇性的突增流量,流量一下起来了,而后零碎解决不过去,能够在闲暇的时候去解决,避免了突增流量导致系统解体,爱护了零碎的稳定性。
然而,换一个思路来想,其实这些突增的流量对于零碎来说齐全没有压力,你还在缓缓地匀速排队,其实是对系统性能的节约。
所以,对于这种有场景来说,令牌桶算法比漏桶就更有劣势。
令牌桶 token bucket
令牌桶算法是指零碎以肯定地速度往令牌桶里丢令牌,当一个申请过去的时候,会去令牌桶里申请一个令牌,如果可能获取到令牌,那么申请就能够失常进行,反之被抛弃。
当初的令牌桶算法,像 Guava 和 Sentinel 的实现都有冷启动 / 预热的形式,为了防止在流量激增的同时把零碎打挂,令牌桶算法会在最开始一段时间内 冷启动,随着流量的减少,零碎会依据流量大小动静地调整生成令牌的速度,最终直到申请达到零碎的阈值。
源码举例
咱们以 sentinel 举例,sentinel 中统计用到了滑动窗口算法,而后也有用到漏桶、令牌桶算法。
滑动窗口
sentinel
中就应用到了滑动窗口算法来进行统计,不过他的实现和我下面画的图有点不一样,实际上 sentinel 中的滑动窗口用一个圆形来形容更正当一点。
后期就是创立节点,而后 slot 串起来就是一个责任链模式,StatisticSlot 通过滑动窗口来统计数据,FlowSlot 是真正限流的逻辑,还有一些降级、零碎爱护的措施,最终造成了整个 sentinel 的限流形式。
滑动窗口的实现次要能够看 LeapArray
的代码,默认的话定义了工夫窗口的相干参数。
对于 sentinel 来说其实窗口分为 秒和 分钟 两个级别,秒的话窗口数量是 2,分钟则是 60 个窗口,每个窗口的工夫长度是 1s,总的工夫周期就是 60s,分成 60 个窗口,这里咱们就以分钟级别的统计来说。
public abstract class LeapArray<T> {
// 窗口工夫长度,毫秒数,默认 1000ms
protected int windowLengthInMs;
// 窗口数量,默认 60
protected int sampleCount;
// 毫秒工夫周期,默认 60*1000
protected int intervalInMs;
// 秒级工夫周期,默认 60
private double intervalInSecond;
// 工夫窗口数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
而后咱们要看的就是它是怎么计算出以后窗口的,其实源码里写的听分明的,然而如果你依照之前设想把他当做一条直线延长去想的话预计不太好了解。
首先计算数组索引下标和工夫窗口工夫这个都比较简单,难点应该大部分在于第三点窗口大于 old 这个是什么鬼,具体说下这几种状况。
- 数组中的工夫窗口是是空的,这个阐明工夫走到了咱们初始化的工夫之后了,此时 new 一个新的窗口通过 CAS 的形式去更新,而后返回这个新的窗口就好了。
- 第二种状况是刚好工夫窗口的工夫相等,那么间接返回,没啥好说的
- 第三种状况就是比拟难以了解的,能够参看两条工夫线的图,就比拟好了解了,第一次工夫窗口走完了达到 1200,而后圆形工夫窗口开始循环,新的工夫起始地位还是 1200,而后工夫窗口的工夫来到 1676,B2 的地位如果还是老的窗口那么就是 600,所以咱们要重置之前的工夫窗口的工夫为以后的工夫。
- 最初一种个别状况不太可能产生,除非时钟回拨这样子
从这个咱们能够发现就是针对每个 WindowWrap
工夫窗口都进行了统计,最初实际上在前面的几个中央都会用到工夫窗口统计的 QPS 后果,这里就不再赘述了,晓得即可。
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int) (timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;}
public WindowWrap<T> currentWindow(long timeMillis) {
// 以后工夫如果小于 0,返回空
if (timeMillis < 0) {return null;}
// 计算工夫窗口的索引
int idx = calculateTimeIdx(timeMillis);
// 计算以后工夫窗口的开始工夫
long windowStart = calculateWindowStart(timeMillis);
while (true) {
// 在窗口数组中取得窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 比方以后工夫是 888,依据计算失去的数组窗口地位是个空,所以间接创立一个新窗口就好了
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 这个更好了,刚好等于,间接返回就行
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* |_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* 这个要当成圆形了解就好了,之前如果是 1200 一个残缺的圆形,而后持续从 1200 开始,如果当初工夫是 1676,落在在 B2 的地位,* 窗口开始工夫是 1600,获取到的 old 工夫其实会是 600,所以必定是过期了,间接重置窗口就能够了
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {updateLock.unlock();
}
} else {Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 这个不太可能呈现,嗯。。时钟回拨
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
漏桶
sentinel 次要依据 FlowSlot
中的流控进行流量管制,其中 RateLimiterController
就是漏桶算法的实现,这个实现相比其余几个还是简略多了,略微看一下应该就明确了。
- 首先计算出以后申请平摊到 1s 内的工夫破费,而后去计算这一次申请预计工夫
- 如果小于以后工夫的话,那么以以后工夫为主,返回即可
- 反之如果超过以后工夫的话,这时候就要进行排队期待了,期待的时候要判断是否超过以后最大的等待时间,超过就间接抛弃
- 没有超过就更新上一次的通过工夫,而后再比拟一次是否超时,还超时就重置工夫,反之在等待时间范畴之内的话就期待,如果都不是那就能够通过了
public class RateLimiterController implements TrafficShapingController {
// 最大期待超时工夫,默认 500ms
private final int maxQueueingTimeMs;
// 限流数量
private final double count;
// 上一次的通过工夫
private final AtomicLong latestPassedTime = new AtomicLong(-1);
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {return true;}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {return false;}
long currentTime = TimeUtil.currentTimeMillis();
// 工夫平摊到 1s 内的破费
long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms
// 计算这一次申请预计的工夫
long expectedTime = costTime + latestPassedTime.get();
// 破费工夫小于以后工夫,pass,最初通过工夫 = 以后工夫
if (expectedTime <= currentTime) {latestPassedTime.set(currentTime);
return true;
} else {
// 预计通过的工夫超过以后工夫,要进行排队期待,从新获取一下,避免出现问题,差额就是须要期待的工夫
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 等待时间超过最大等待时间,抛弃
if (waitTime > maxQueueingTimeMs) {return false;} else {
// 反之,能够更新最初一次通过工夫了
long oldTime = latestPassedTime.addAndGet(costTime);
try {waitTime = oldTime - TimeUtil.currentTimeMillis();
// 更新后再判断,还是超过最大超时工夫,那么就抛弃,工夫重置
if (waitTime > maxQueueingTimeMs) {latestPassedTime.addAndGet(-costTime);
return false;
}
// 在工夫范畴之内的话,就期待
if (waitTime > 0) {Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {}}
}
return false;
}
}
令牌桶
最初是令牌桶,这个不在于实现的复制,而是你看源码会发现都算的些啥玩意儿。。。sentinel 的令牌桶实现基于 Guava,代码在 WarmUpController
中。
这个算法那些各种计算逻辑其实咱们能够不论(因为我也没看懂。。),然而流程上咱们是清晰的就能够了。
几个外围的参数看正文,构造方法里那些计算逻辑临时不论他是怎么算的(我也没整明确,然而不影响咱们了解),要害看 canPass
是怎么做的。
- 拿到以后窗口和上一个窗口的 QPS
- 填充令牌,也就是往桶里丢令牌,而后咱们先看填充令牌的逻辑
public class WarmUpController implements TrafficShapingController {
// 限流 QPS
protected double count;
// 冷启动系数,默认 =3
private int coldFactor;
// 戒备的令牌数
protected int warningToken = 0;
// 最大令牌数
private int maxToken;
// 斜率,产生令牌的速度
protected double slope;
// 存储的令牌数量
protected AtomicLong storedTokens = new AtomicLong(0);
// 最初一次填充令牌工夫
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {if (coldFactor <= 1) {throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
//stableInterval 稳固产生令牌的工夫周期,1/QPS
//warmUpPeriodInSec 预热 / 冷启动工夫 , 默认 10s
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 斜率的计算参考 Guava,当做一个固定改的公式
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 以后工夫窗口通过的 QPS
long passQps = (long) node.passQps();
// 上一个工夫窗口 QPS
long previousQps = (long) node.previousPassQps();
// 填充令牌
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的 qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
// 以后的令牌超过警戒线,取得超过警戒线的令牌数
long aboveToken = restToken - warningToken;
// 耗费的速度要比 warning 快,然而要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {return true;}
} else {if (passQps + acquireCount <= count) {return true;}
}
return false;
}
}
填充令牌的逻辑如下:
- 拿到以后的工夫,而后去掉毫秒数,失去的就是秒级工夫
- 判断工夫小于这里就是为了管制每秒丢一次令牌
- 而后就是
coolDownTokens
去计算咱们的冷启动 / 预热是怎么计算填充令牌的 - 前面计算以后剩下的令牌数这个就不说了,减去上一次耗费的就是桶里剩下的令牌
protected void syncToken(long passQps) {long currentTime = TimeUtil.currentTimeMillis();
// 去掉以后工夫的毫秒
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
// 管制每秒填充一次令牌
if (currentTime <= oldLastFillTime) {return;}
// 以后的令牌数量
long oldValue = storedTokens.get();
// 获取新的令牌数量,蕴含增加令牌的逻辑,这就是预热的逻辑
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 存储的令牌数量当然要减去上一次耗费的令牌
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
- 最开始的事实因为
lastFilledTime
和oldValue
都是 0,所以依据以后工夫戳会失去一个十分大的数字,最初和maxToken
取小的话就失去了最大的令牌数,所以第一次初始化的时候就会生成maxToken
的令牌 - 之后咱们假如零碎的 QPS 一开始很低,而后忽然飙高。所以开始的时候回始终走到高于警戒线的逻辑里去,而后
passQps
又很低,所以会始终处于把令牌桶填满的状态(currentTime - lastFilledTime.get()
会始终都是 1000,也就是 1 秒),所以每次都会填充最大 QPScount
数量的令牌 - 而后突增流量来了,QPS 霎时很高,缓缓地令牌数量就会耗费到警戒线之下,走到咱们
if
的逻辑里去,而后去依照count
数量减少令牌
private long coolDownTokens(long currentTime, long passQps) {long oldValue = storedTokens.get();
long newValue = oldValue;
// 水位低于警戒线,就生成令牌
if (oldValue < warningToken) {
// 如果桶中令牌低于警戒线,依据上一次的时间差,失去新的令牌数,因为去掉了毫秒,1 秒生成的令牌就是阈值 count
// 第一次都是 0 的话,会生成 count 数量的令牌
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
// 反之,如果是高于警戒线,要判断 QPS。因为 QPS 越高,生成令牌就要越慢,QPS 低的话生成令牌要越快
if (passQps < (int)count / coldFactor) {newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
// 不要超过最大令牌数
return Math.min(newValue, maxToken);
}
下面的逻辑理顺之后,咱们就能够持续看限流的局部逻辑:
- 令牌计算的逻辑实现,而后判断是不是超过警戒线,依照下面的说法,低 QPS 的状态必定是始终超过的,所以会依据斜率来计算出一个
warningQps
,因为咱们处于冷启动的状态,所以这个阶段就是要依据斜率来计算出一个 QPS 数量,让流量缓缓地达到零碎能接受的峰值。举个例子,如果count
是 100,那么在 QPS 很低的状况下,令牌桶始终处于满状态,然而零碎会管制 QPS,理论通过的 QPS 就是warningQps
,依据算法可能只有 10 或者 20(怎么算的不影响了解)。QPS 主键进步的时候,aboveToken
再逐步变小,整个warningQps
就在逐步变大,直到走到警戒线之下,到了else
逻辑里。 - 流量突增的状况,就是
else
逻辑里低于警戒线的状况,咱们令牌桶在不停地依据count
去减少令牌,这时候耗费令牌的速度超过咱们生成令牌的速度,可能就会导致始终处于警戒线之下,这时候判断当然就须要依据最高 QPS 去判断限流了。
long restToken = storedTokens.get();
if (restToken >= warningToken) {
// 以后的令牌超过警戒线,取得超过警戒线的令牌数
long aboveToken = restToken - warningToken;
// 耗费的速度要比 warning 快,然而要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {return true;}
} else {if (passQps + acquireCount <= count) {return true;}
}
所以,依照低 QPS 到突增高 QPS 的流程,来设想一下这个过程:
- 刚开始,零碎的 QPS 非常低,初始化咱们就间接把令牌桶塞满了
- 而后这个低 QPS 的状态继续了一段时间,因为咱们始终会填充最大 QPS 数量的令牌(因为取最小值,所以其实桶里令牌根本不会有变动),所以令牌桶始终处于满的状态,整个零碎的限流也处于一个比拟低的程度
这以上的局部始终处于警戒线之上,实际上就是叫做冷启动 / 预热的过程。
- 接着零碎的 QPS 忽然激增,令牌耗费速度太快,就算咱们每次减少最大 QPS 数量的令牌任然无奈维持耗费,所以桶里的令牌在一直低缩小,这个时候,冷启动阶段的限度 QPS 也在一直地进步,最初直到桶里的令牌低于警戒线
- 低于警戒线之后,零碎就会依照最高 QPS 去限流,这个过程就是零碎在逐步达到最高限流的过程
那这样一来,理论就达到了咱们解决突增流量的目标,整个零碎在漫漫地适应忽然飙高的 QPS,而后最终达到零碎的 QPS 阈值。
- 最初,如果 QPS 回复失常,那么又会逐步回到警戒线之上,就回到了最开始的过程。
总结
因为算法如果独自说的话都比较简单,一说大家都能够听明确,不须要几个字就能说明确,所以还是得弄点源码看看他人是怎么玩的,所以只管我很厌恶放源码,然而还是不得不干。
光靠他人说一点其实有点看不明确,依照程序读一遍的话心里就无数了。
那源码的话最难以了解的就是令牌桶的实现了,说实话那几个计算的逻辑我看了好几遍不晓得他算的什么鬼,然而思维咱们了解就行了,其余的逻辑相对来说就比拟容易了解。