原文链接: 高并发零碎的限流策略:漏桶和令牌桶(附源码分析)

前言

哈喽,大家好,我是正在学习PS技术的asong,这是我并发编程系列的第5篇文章,明天与大家聊一聊高并发零碎中的限流技术,限流又称为流量管制,是指限度达到零碎的并发申请数,当达到限度条件则能够拒绝请求,能够起到爱护上游服务,避免服务过载等作用。罕用的限流策略有漏桶算法、令牌桶算法、滑动窗口;下文次要与大家一起剖析一下漏桶算法和令牌桶算法,滑动窗口就不在这里这介绍了。好啦,废话不多话,开整。

文中测试代码已上传:https://github.com/asong2020/... 欢送`star

漏桶算法

漏桶算法比拟好了解,假如咱们当初有一个水桶,咱们向这个水桶里添水,尽管咱们咱们无奈预计一次会添多少水,也无奈预计水流入的速度,然而能够固定出水的速度,不管添水的速率有多大,都依照固定的速率流出,如果桶满了,溢出的上方水间接摈弃。咱们把水当作HTTP申请,每次都把申请放到一个桶中,而后以固定的速率解决申请,说了这么多,不如看一个图加深了解(图片来自于网络,手残党不会画,多多包涵):

原理其实很简略,就看咱们怎么实现它了,uber团队有一个开源的uber-go/ratelimit库,这个库就是漏桶的一种实现,上面咱们一起来看一看他的实现思路。

样例

学习一个新货色的时候,往往是从会用开始的,缓缓能力明确其实现原理,所以咱们先来看看这个库是怎么应用的,这里咱们间接提供一个理论应用例子,配合Gin框架,咱们增加一个限流中间件,来达到申请限流的作用,测试代码如下:

// 定义全局限流器对象var rateLimit ratelimit.Limiter// 在 gin.HandlerFunc 退出限流逻辑func leakyBucket() gin.HandlerFunc {    prev := time.Now()    return func(c *gin.Context) {        now := rateLimit.Take()        fmt.Println(now.Sub(prev)) // 为了打印工夫距离        prev = now // 记录上一次的工夫,没有这个打印的会有问题    }}func main() {    rateLimit = ratelimit.New(10)    r := gin.Default()    r.GET("/ping", leakyBucket(), func(c *gin.Context) {        c.JSON(200, true)    })    r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")}

咱们简略应用压测工具ab测试一下:ab -n 10 -c 2 http://127.0.0.1:8080/ping,执行后果局部如下:

察看后果可知,每次解决申请的工夫距离是10ms,并且前面的申请耗时越来越久,为什么会这样呢? 这里先卖个小关子,看完uber的实现你就晓得了~

源码实现

咱们首先来看一下其外围构造:

type limiter struct {    sync.Mutex    last       time.Time    sleepFor   time.Duration    perRequest time.Duration    maxSlack   time.Duration    clock      Clock}type Limiter interface {    // Take should block to make sure that the RPS is met.    Take() time.Time}

限制器接口只提供了一个办法take()take()办法会阻塞确保两次申请之间的工夫走完,具体实现咱们在上面进行剖析。实现限制器接口的构造体中各个字段的意义如下:

  • sync.Mutext:互斥锁,管制并发的作用
  • last:记录上一次的时刻
  • sleepFor:间隔解决下一次申请须要期待的工夫
  • perRequest:每次申请的工夫距离
  • maxSlack:最大松弛量,用来解决突发流量
  • clock:一个时钟或模仿时钟,提供了nowsleep办法,是实例化速率限制器

要是用该限制器,首先须要通过New办法进行初始化,一个必传的参数是rate,代表的是每秒申请量(RPS),还有一个可选参数,参数类型option,也就是咱们能够自定义limit,不过个别应用场景不多,这里就不过多介绍了。我次要看一下他是怎么保障固定速率的,截取New办法局部代码如下:

l := &limiter{        perRequest: time.Second / time.Duration(rate),        maxSlack:   -10 * time.Second / time.Duration(rate),    }

依据咱们传入的申请数量,能计算出1s内要通过n个申请,每个申请之间的间隔时间是多少,这样在take办法中就能够依据这个字段来解决申请的固定速率问题,这里还初始化了最大松弛化字段,他的值是正数,默认最大松弛量是10个申请的工夫距离。

接下来咱们次要看一下take办法:

func (t *limiter) Take() time.Time {    t.Lock()    defer t.Unlock()    now := t.clock.Now()    if t.last.IsZero() {        t.last = now        return t.last    }    t.sleepFor += t.perRequest - now.Sub(t.last)    if t.sleepFor < t.maxSlack {        t.sleepFor = t.maxSlack    }    if t.sleepFor > 0 {        t.clock.Sleep(t.sleepFor)        t.last = now.Add(t.sleepFor)        t.sleepFor = 0    } else {        t.last = now    }    return t.last}

take()办法的执行步骤如下:

  • 为了管制并发,所以进入该办法就须要进行上锁,该锁的粒度比拟大,整个办法都加上了锁
  • 通过IsZero办法来判断以后是否是第一次申请,如果是第一次申请,间接取now工夫即可返回。
  • 如果不是第一次申请,就须要计算间隔解决下一次申请须要期待的工夫,这里有一个要留神点的是累加须要期待的工夫,目标是能够给前面的对消应用
  • 如果以后累加须要期待的工夫大于最大松弛量了,将期待的工夫设置为最大松弛量的工夫。
  • 如果以后申请多余的工夫无奈齐全对消此次的所需量,调用sleep办法进行阻塞,同时清空期待的工夫。如果sleepFor小于0,阐明此次申请工夫距离大于预期距离,也就说无需期待能够间接解决申请。

步骤其实不是很多,次要须要留神一个知识点 —— 最大松弛量。

漏桶算法有个人造缺点就是无奈应答突发流量(匀速,两次申请 req1req2 之间的提早至多应该 >=perRequest),举个例子阐明:假如咱们当初有三个申请req1req2req3按程序解决,每个申请解决距离为100ms,req1申请解决实现之后150ms,req2申请到来,根据限速策略能够对 req2 立刻解决,当 req2 实现后,50ms 后, req3 到来,这个时候间隔上次申请还有余 100ms,因而还须要期待 50ms 能力继续执行, 然而,对于这种状况,实际上这三个申请一共耗费了 250ms 才实现,并不是预期的 200ms

对于下面这种状况,咱们能够把之前距离比拟长的申请的工夫匀给前面的申请判断限流时应用,缩小申请期待的工夫了,然而当两个申请之间达到的距离比拟大时,就会产生很大的可对消工夫,以至于前面大量申请霎时达到时,也无奈对消这个工夫,那样就曾经失去了限流的意义,所以引入了最大松弛量 (maxSlack) 的概念, 该值为负值,示意容许对消的最长工夫,避免以上状况的呈现。

以上就是漏桶实现的基本思路了,整体还是很简略的,你学会了吗?

令牌桶算法

令牌桶其实和漏桶的原理相似,令牌桶就是设想有一个固定大小的桶,零碎会以恒定速率向桶中放 Token,桶满则临时不放。从网上找了图,表述十分失当:

对于令牌桶限流算法的实现,Github有一个高效的基于令牌桶限流算法实现的限流库:github.com/juju/ratelimitGolangtimer/rate也是令牌桶的一种实现,本文就不介绍juju/ratelimit库了,有趣味的本人学习一下的他的实现思维吧,咱们次要来看一看time/rate是如何实现的。

样例

还是老样子,咱们还是联合gin写一个限流中间件看看他是怎么应用的,例子如下:

import (    "net/http"    "time"    "github.com/gin-gonic/gin"    "golang.org/x/time/rate")var rateLimit *rate.Limiterfunc tokenBucket() gin.HandlerFunc {    return func(c *gin.Context) {        if rateLimit.Allow() {            c.String(http.StatusOK, "rate limit,Drop")            c.Abort()            return        }        c.Next()    }}func main() {    limit := rate.Every(100 * time.Millisecond)    rateLimit = rate.NewLimiter(limit, 10)    r := gin.Default()    r.GET("/ping", tokenBucket(), func(c *gin.Context) {        c.JSON(200, true)    })    r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")}

下面的例子咱们首先调用NewLimiter办法结构一个限流器,第一个参数是r limit,代表每秒能够向Token桶中产生多少token,第二个参数是b int,代表Token桶的容量大小,对于下面的例子,示意每100ms往桶中放一个token,也就是1s钟产生10个,桶的容量就是10。生产token的办法这里咱们应用Allow办法,Allow 实际上就是 AllowN(time.Now(),1)AllowN 办法示意,截止到某一时刻,目前桶中数目是否至多为 n 个,满足则返回 true,同时从桶中生产 n token。反之返回不生产 Token。对应下面的例子,当桶中的数目有余于1个时,就会丢掉该申请。

源码分析

Limit类型

time/rate自定义了一个limit类型,其实他实质就是float64的别名,Limit定了事件的最大频率,示意每秒事件的数据量,0就示意无限度。Inf是有限的速率限度;它容许所有事件(即便突发为0)。还提供 Every 办法来指定向 Token 桶中搁置 Token 的距离,计算出每秒工夫的数据量。

type Limit float64// Inf is the infinite rate limit; it allows all events (even if burst is zero).const Inf = Limit(math.MaxFloat64)// Every converts a minimum time interval between events to a Limit.func Every(interval time.Duration) Limit {    if interval <= 0 {        return Inf    }    return 1 / Limit(interval.Seconds())}

Limiter构造体

type Limiter struct {    mu     sync.Mutex    limit  Limit    burst  int    tokens float64    // last is the last time the limiter's tokens field was updated    last time.Time    // lastEvent is the latest time of a rate-limited event (past or future)    lastEvent time.Time}

各个字段含意如下:

  • mu:互斥锁、为了管制并发
  • limit:每秒容许解决的事件数量,即每秒处理事件的频率
  • burst:令牌桶的最大数量,如果burst为0,并且limit == Inf,则容许解决任何事件,否则不容许
  • tokens:令牌桶中可用的令牌数量
  • last:记录上次limiter的tokens被更新的工夫
  • lastEventlastEvent记录速率受限制(桶中没有令牌)的工夫点,该工夫点可能是过来的,也可能是未来的(Reservation预约的完结工夫点)

Reservation构造体

type Reservation struct {    ok        bool    lim       *Limiter    tokens    int    timeToAct time.Time    // This is the Limit at reservation time, it can change later.    limit Limit}

各个字段含意如下:

  • ok:到截至工夫是否能够获取足够的令牌
  • limlimiter对象
  • tokens:须要获取的令牌数量
  • timeToAct:须要期待的工夫点
  • limit:代表预约的工夫,是能够更改的。

reservation就是一个预约令牌的操作,timeToAct是本次预约须要期待到的指定工夫点才有足够预约的令牌。

Limiter生产token

Limiter有三个token的生产办法,别离是AllowReserveWait,最终三种生产形式都调用了 reserveNadvance这两个办法来生成和生产 Token。所以咱们次要看看reserveNadvance函数的具体实现。

  • advance办法的实现:
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {    //last不能在以后工夫now之后,否则计算出来的elapsed为正数,会导致令牌桶数量缩小  last := lim.last    if now.Before(last) {        last = now    }    //依据令牌桶的缺数计算出令牌桶未进行更新的最大工夫    maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)    elapsed := now.Sub(last) //令牌桶未进行更新的时间段    if elapsed > maxElapsed {        elapsed = maxElapsed    }    //依据未更新的工夫(未向桶中退出令牌的时间段)计算出产生的令牌数    delta := lim.limit.tokensFromDuration(elapsed)    tokens := lim.tokens + delta //计算出可用的令牌数    if burst := float64(lim.burst); tokens > burst {        tokens = burst    }    return now, last, tokens}

advance办法的作用是更新令牌桶的状态,计算出令牌桶未更新的工夫(elapsed),依据elapsed算出须要向桶中退出的令牌数delta,而后算出桶中可用的令牌数newTokens.

  • reserveN办法的实现:reserveN AllowN, ReserveN WaitN的辅助办法,用于判断在maxFutureReserve工夫内是否有足够的令牌。
// @param n 要生产的token数量// @param maxFutureReserve 违心期待的最长工夫func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {    lim.mu.Lock()    // 如果没有限度    if lim.limit == Inf {        lim.mu.Unlock()        return Reservation{            ok:        true, //桶中有足够的令牌            lim:       lim,            tokens:    n,            timeToAct: now,        }    }    //更新令牌桶的状态,tokens为目前可用的令牌数量    now, last, tokens := lim.advance(now)  // 计算取完之后桶还能剩能下多少token    tokens -= float64(n)    var waitDuration time.Duration  // 如果token < 0, 阐明目前的token不够,须要期待一段时间    if tokens < 0 {        waitDuration = lim.limit.durationFromTokens(-tokens)    }    ok := n <= lim.burst && waitDuration <= maxFutureReserve    r := Reservation{        ok:    ok,        lim:   lim,        limit: lim.limit,    }  // timeToAct示意当桶中满足token数目等于n的工夫    if ok {        r.tokens = n        r.timeToAct = now.Add(waitDuration)    }  // 更新桶外面的token数目    // 更新last工夫    // lastEvent    if ok {        lim.last = now        lim.tokens = tokens        lim.lastEvent = r.timeToAct    } else {        lim.last = last    }    lim.mu.Unlock()    return r}

下面的代码我曾经进行了正文,这里在总结一下流程:

  • 首选判断是否领有速率限度,没有速率限度也就是桶中统一领有足够的令牌。
  • 计算从上次取 Token 的工夫到以后时刻,期间一共新产生了多少 Token:咱们只在取 Token 之前生成新的 Token,也就意味着每次取 Token 的距离,实际上也是生成 Token 的距离。咱们能够利用 tokensFromDuration, 轻易的算出这段时间一共产生 Token 的数目。所以以后 Token 数目 = 新产生的 Token 数目 + 之前残余的 Token 数目 - 要生产的 Token 数目。
  • 如果生产后残余 Token 数目大于零,阐明此时 Token 桶内仍不为空,此时 Token 短缺,无需调用侧期待。
    如果 Token 数目小于零,则需期待一段时间。那么这个时候,咱们能够利用 durationFromTokens 将以后负值的 Token 数转化为须要期待的工夫。
  • 将须要期待的工夫等相干后果返回给调用方

其实整个过程就是利用了 Token 数能够和工夫互相转化 的原理。而如果 Token 数为负,则须要期待相应工夫即可。

下面提到了durationFromTokenstokensFromDuration这两个办法,是要害,他们的实现如下:

func (limit Limit) durationFromTokens(tokens float64) time.Duration {    seconds := tokens / float64(limit)    return time.Nanosecond * time.Duration(1e9*seconds)}func (limit Limit) tokensFromDuration(d time.Duration) float64 {    // Split the integer and fractional parts ourself to minimize rounding errors.    // See golang.org/issues/34861.    sec := float64(d/time.Second) * float64(limit)    nsec := float64(d%time.Second) * float64(limit)    return sec + nsec/1e9}
  • durationFromTokens:性能是计算出生成N 个新的 Token 一共须要多久。
  • tokensFromDuration:给定一段时长,这段时间一共能够生成多少个 Token。

仔细的网友会发现tokensFromDuration办法既然是计算一段时间一共能够生成多少个 Token,为什么不间接进行相乘呢?其实Golang最后的版本就是采纳d.Seconds() * float64(limit)间接相乘实现的,尽管看上去一点问题没有,然而这里是两个小数相乘,会带来精度损失,所以采纳当初这种办法实现,别离求出秒的整数局部和小数局部,进行相乘后再相加,这样能够失去最准确的精度。

limiter偿还Token

既然咱们能够生产Token,那么对应也能够勾销此次生产,将token偿还,当调用 Cancel() 函数时,生产的 Token 数将会尽可能归还给 Token 桶。偿还也并不是那么简略,接下咱们咱们看看偿还token是如何实现的。

func (r *Reservation) CancelAt(now time.Time) {    if !r.ok {        return    }    r.lim.mu.Lock()    defer r.lim.mu.Unlock()  /*  1.如果无需限流    2. tokens为0 (须要获取的令牌数量为0)    3. 曾经过了截至工夫    以上三种状况无需解决勾销操作    */    if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {        return    }    //计算出须要还原的令牌数量    //这里的r.lim.lastEvent可能是本次Reservation的完结工夫,也可能是起初的Reservation的完结工夫,所以要把本次完结工夫点(r.timeToAct)之后产生的令牌数减去    restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))  // 当小于0,示意曾经都预支完了,不能偿还了    if restoreTokens <= 0 {        return    }    //从新计算令牌桶的状态    now, _, tokens := r.lim.advance(now)    //还原以后令牌桶的令牌数量,以后的令牌数tokens加上须要还原的令牌数restoreTokens    tokens += restoreTokens  //如果tokens大于桶的最大容量,则将tokens置为桶的最大容量    if burst := float64(r.lim.burst); tokens > burst {        tokens = burst    }    // update state    r.lim.last = now //记录桶的更新工夫    r.lim.tokens = tokens //更新令牌数量 // 如果都相等,阐明跟没生产一样。间接还原成上次的状态吧    if r.timeToAct == r.lim.lastEvent {        prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))        if !prevEvent.Before(now) {            r.lim.lastEvent = prevEvent        }    }    return}

正文曾经增加,就不在具体解释了,重点是这一行代码:restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))r.tokens指的是本次生产的token数,r.timeToAcr指的是Token桶能够满足本次生产数目的时刻,也就是生产的时刻+期待的时长,r.lim.lastEvent指的是最近一次生产的timeToAct的值,通过r.limit.tokensFromDuration办法得出的后果指的是从该次生产到以后工夫,一共又生产了多少Token数目,所以最终得出这一段的代码含意是:

要偿还的Token = 该次生产的Token - 新生产的token

好啦,源码就临时剖析到这了,因为规范库的实现的代码量有点大,还有一部分在这里没有说,留给大家本人去分析吧~。

总结

本文重点介绍了漏桶算法和令牌桶算法,漏桶算法和令牌桶算法的次要区别在于,"漏桶算法"可能强行限度数据的传输速率(或申请频率),而"令牌桶算法"在可能限度数据的均匀传输速率外,还容许某种程度的突发传输。在某些状况下,漏桶算法不可能无效地应用网络资源,因为漏桶的漏出速率是固定的,所以即便网络中没有产生拥塞,漏桶算法也不能使某一个独自的数据流达到端口速率。因而,漏桶算法对于存在突发个性的流量来说不足效率。而令牌桶算法则可能满足这些具备突发个性的流量。通常,漏桶算法与令牌桶算法联合起来为网络流量提供更高效的管制。

文中测试代码已上传:https://github.com/asong2020/... 欢送star

好啦,这篇文章就到这里啦,素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!

创立了一个Golang学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:加我vx拉你入群,或者公众号获取入群二维码

结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,本人也收集了一本PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang梦工厂],后盾回复:[微服务],即可获取。

我翻译了一份GIN中文文档,会定期进行保护,有须要的小伙伴后盾回复[gin]即可下载。

翻译了一份Machinery中文文档,会定期进行保护,有须要的小伙伴们后盾回复[machinery]即可获取。

我是asong,一名普普通通的程序猿,让咱们一起缓缓变强吧。欢送各位的关注,咱们下期见~~~

举荐往期文章:

  • Go看源码必会常识之unsafe包
  • 源码分析panic与recover,看不懂你打我好了!
  • 详解并发编程根底之原子操作(atomic包)
  • 详解defer实现机制
  • 真的了解interface了嘛
  • Leaf—Segment分布式ID生成零碎(Golang实现版本)
  • 十张动图带你搞懂排序算法(附go实现代码)
  • go参数传递类型
  • 手把手教姐姐写音讯队列
  • 常见面试题之缓存雪崩、缓存穿透、缓存击穿
  • 详解Context包,看这一篇就够了!!!
  • go-ElasticSearch入门看这一篇就够了(一)
  • 面试官:go中for-range应用过吗?这几个问题你能解释一下起因吗