乐趣区

关于golang:高并发系统的限流策略漏桶和令牌桶附源码剖析

原文链接: 高并发零碎的限流策略:漏桶和令牌桶(附源码分析)

前言

哈喽,大家好,我是正在学习 PS 技术的 asong,这是我并发编程系列的第5 篇文章,明天与大家聊一聊高并发零碎中的限流技术,限流又称为流量管制,是指限度达到零碎的并发申请数,当达到限度条件则能够拒绝请求,能够起到爱护上游服务,避免服务过载等作用。罕用的限流策略有漏桶算法、令牌桶算法、滑动窗口;下文次要与大家一起剖析一下漏桶算法和令牌桶算法,滑动窗口就不在这里这介绍了。好啦,废话不多话,开整。

文中测试代码已上传:https://github.com/asong2020/… 欢送 `star

漏桶算法

漏桶算法比拟好了解,假如咱们当初有一个水桶,咱们向这个水桶里添水,尽管咱们咱们无奈预计一次会添多少水,也无奈预计水流入的速度,然而能够固定出水的速度,不管添水的速率有多大,都依照固定的速率流出,如果桶满了,溢出的上方水间接摈弃。咱们把水当作 HTTP 申请,每次都把申请放到一个桶中,而后以固定的速率解决申请,说了这么多,不如看一个图加深了解(图片来自于网络,手残党不会画,多多包涵):

原理其实很简略,就看咱们怎么实现它了,uber团队有一个开源的 uber-go/ratelimit 库,这个库就是漏桶的一种实现,上面咱们一起来看一看他的实现思路。

样例

学习一个新货色的时候,往往是从会用开始的,缓缓能力明确其实现原理,所以咱们先来看看这个库是怎么应用的,这里咱们间接提供一个理论应用例子,配合 Gin 框架,咱们增加一个限流中间件,来达到申请限流的作用,测试代码如下:

// 定义全局限流器对象
var rateLimit ratelimit.Limiter

// 在 gin.HandlerFunc 退出限流逻辑
func leakyBucket() gin.HandlerFunc {prev := time.Now()
    return func(c *gin.Context) {now := rateLimit.Take()
        fmt.Println(now.Sub(prev)) // 为了打印工夫距离
        prev = now // 记录上一次的工夫,没有这个打印的会有问题
    }
}

func main() {rateLimit = ratelimit.New(10)
    r := gin.Default()
    r.GET("/ping", leakyBucket(), func(c *gin.Context) {c.JSON(200, true)
    })
    r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

咱们简略应用压测工具 ab 测试一下:ab -n 10 -c 2 http://127.0.0.1:8080/ping,执行后果局部如下:

察看后果可知,每次解决申请的工夫距离是 10ms,并且前面的申请耗时越来越久,为什么会这样呢?这里先卖个小关子,看完 uber 的实现你就晓得了~

源码实现

咱们首先来看一下其外围构造:

type limiter struct {
    sync.Mutex
    last       time.Time
    sleepFor   time.Duration
    perRequest time.Duration
    maxSlack   time.Duration
    clock      Clock
}
type Limiter interface {
    // Take should block to make sure that the RPS is met.
    Take() time.Time}

限制器接口只提供了一个办法 take()take() 办法会阻塞确保两次申请之间的工夫走完,具体实现咱们在上面进行剖析。实现限制器接口的构造体中各个字段的意义如下:

  • sync.Mutext:互斥锁,管制并发的作用
  • last:记录上一次的时刻
  • sleepFor:间隔解决下一次申请须要期待的工夫
  • perRequest:每次申请的工夫距离
  • maxSlack:最大松弛量,用来解决突发流量
  • clock:一个时钟或模仿时钟,提供了 nowsleep办法,是实例化速率限制器

要是用该限制器,首先须要通过 New 办法进行初始化,一个必传的参数是 rate,代表的是每秒申请量(RPS),还有一个可选参数,参数类型option,也就是咱们能够自定义limit,不过个别应用场景不多,这里就不过多介绍了。我次要看一下他是怎么保障固定速率的,截取New 办法局部代码如下:

l := &limiter{perRequest: time.Second / time.Duration(rate),
        maxSlack:   -10 * time.Second / time.Duration(rate),
    }

依据咱们传入的申请数量,能计算出 1 s 内要通过n 个申请,每个申请之间的间隔时间是多少,这样在 take 办法中就能够依据这个字段来解决申请的固定速率问题,这里还初始化了最大松弛化字段,他的值是正数,默认最大松弛量是 10 个申请的工夫距离。

接下来咱们次要看一下 take 办法:

func (t *limiter) Take() time.Time {t.Lock()
    defer t.Unlock()
    now := t.clock.Now()
    if t.last.IsZero() {
        t.last = now
        return t.last
    }
    t.sleepFor += t.perRequest - now.Sub(t.last)
    if t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}
    if t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)
        t.last = now.Add(t.sleepFor)
        t.sleepFor = 0
    } else {t.last = now}

    return t.last
}

take()办法的执行步骤如下:

  • 为了管制并发,所以进入该办法就须要进行上锁,该锁的粒度比拟大,整个办法都加上了锁
  • 通过 IsZero 办法来判断以后是否是第一次申请,如果是第一次申请,间接取 now 工夫即可返回。
  • 如果不是第一次申请,就须要计算间隔解决下一次申请须要期待的工夫,这里有一个要留神点的是累加须要期待的工夫,目标是能够给前面的对消应用
  • 如果以后累加须要期待的工夫大于最大松弛量了,将期待的工夫设置为最大松弛量的工夫。
  • 如果以后申请多余的工夫无奈齐全对消此次的所需量,调用 sleep 办法进行阻塞,同时清空期待的工夫。如果 sleepFor 小于 0,阐明此次申请工夫距离大于预期距离,也就说无需期待能够间接解决申请。

步骤其实不是很多,次要须要留神一个知识点 —— 最大松弛量。

漏桶算法有个人造缺点就是无奈应答突发流量(匀速,两次申请 req1req2 之间的提早至多应该 >=perRequest),举个例子阐明:假如咱们当初有三个申请 req1req2req3 按程序解决,每个申请解决距离为 100ms,req1申请解决实现之后 150ms,req2申请到来,根据限速策略能够对 req2 立刻解决,当 req2 实现后,50ms 后,req3 到来,这个时候间隔上次申请还有余 100ms,因而还须要期待 50ms 能力继续执行, 然而,对于这种状况,实际上这三个申请一共耗费了 250ms 才实现,并不是预期的 200ms

对于下面这种状况,咱们能够把之前距离比拟长的申请的工夫匀给前面的申请判断限流时应用,缩小申请期待的工夫了,然而当两个申请之间达到的距离比拟大时,就会产生很大的可对消工夫,以至于前面大量申请霎时达到时,也无奈对消这个工夫,那样就曾经失去了限流的意义,所以引入了最大松弛量 (maxSlack) 的概念, 该值为负值,示意容许对消的最长工夫,避免以上状况的呈现。

以上就是漏桶实现的基本思路了,整体还是很简略的,你学会了吗?

令牌桶算法

令牌桶其实和漏桶的原理相似,令牌桶就是设想有一个固定大小的桶,零碎会以恒定速率向桶中放 Token,桶满则临时不放。从网上找了图,表述十分失当:

对于令牌桶限流算法的实现,Github有一个高效的基于令牌桶限流算法实现的限流库:github.com/juju/ratelimitGolangtimer/rate 也是令牌桶的一种实现,本文就不介绍 juju/ratelimit 库了,有趣味的本人学习一下的他的实现思维吧,咱们次要来看一看 time/rate 是如何实现的。

样例

还是老样子,咱们还是联合 gin 写一个限流中间件看看他是怎么应用的,例子如下:

import (
    "net/http"
    "time"

    "github.com/gin-gonic/gin"
    "golang.org/x/time/rate"
)

var rateLimit *rate.Limiter

func tokenBucket() gin.HandlerFunc {return func(c *gin.Context) {if rateLimit.Allow() {c.String(http.StatusOK, "rate limit,Drop")
            c.Abort()
            return
        }
        c.Next()}
}

func main() {limit := rate.Every(100 * time.Millisecond)
    rateLimit = rate.NewLimiter(limit, 10)
    r := gin.Default()
    r.GET("/ping", tokenBucket(), func(c *gin.Context) {c.JSON(200, true)
    })
    r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

下面的例子咱们首先调用 NewLimiter 办法结构一个限流器,第一个参数是 r limit,代表每秒能够向Token 桶中产生多少 token,第二个参数是b int,代表Token 桶的容量大小,对于下面的例子,示意每 100ms 往桶中放一个 token,也就是1s 钟产生 10 个,桶的容量就是 10。生产token 的办法这里咱们应用 Allow 办法,Allow 实际上就是 AllowN(time.Now(),1)AllowN 办法示意,截止到某一时刻,目前桶中数目是否至多为 n 个,满足则返回 true,同时从桶中生产 n token。反之返回不生产 Token。对应下面的例子,当桶中的数目有余于 1 个时,就会丢掉该申请。

源码分析

Limit 类型

time/rate自定义了一个 limit 类型,其实他实质就是 float64 的别名,Limit定了事件的最大频率,示意每秒事件的数据量,0 就示意无限度。Inf是有限的速率限度;它容许所有事件(即便突发为 0)。还提供 Every 办法来指定向 Token 桶中搁置 Token 的距离,计算出每秒工夫的数据量。

type Limit float64

// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)

// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
    if interval <= 0 {return Inf}
    return 1 / Limit(interval.Seconds())
}

Limiter构造体

type Limiter struct {
    mu     sync.Mutex
    limit  Limit
    burst  int
    tokens float64
    // last is the last time the limiter's tokens field was updated
    last time.Time
    // lastEvent is the latest time of a rate-limited event (past or future)
    lastEvent time.Time
}

各个字段含意如下:

  • mu:互斥锁、为了管制并发
  • limit:每秒容许解决的事件数量,即每秒处理事件的频率
  • burst:令牌桶的最大数量,如果 burst 为 0,并且 limit == Inf,则容许解决任何事件,否则不容许
  • tokens:令牌桶中可用的令牌数量
  • last:记录上次 limiter 的 tokens 被更新的工夫
  • lastEventlastEvent记录速率受限制 (桶中没有令牌) 的工夫点,该工夫点可能是过来的,也可能是未来的 (Reservation 预约的完结工夫点)

Reservation构造体

type Reservation struct {
    ok        bool
    lim       *Limiter
    tokens    int
    timeToAct time.Time
    // This is the Limit at reservation time, it can change later.
    limit Limit
}

各个字段含意如下:

  • ok:到截至工夫是否能够获取足够的令牌
  • limlimiter对象
  • tokens:须要获取的令牌数量
  • timeToAct:须要期待的工夫点
  • limit:代表预约的工夫,是能够更改的。

reservation就是一个预约令牌的操作,timeToAct是本次预约须要期待到的指定工夫点才有足够预约的令牌。

Limiter生产 token

Limite r 有三个 token 的生产办法,别离是 AllowReserveWait,最终三种生产形式都调用了 reserveNadvance 这两个办法来生成和生产 Token。所以咱们次要看看 reserveNadvance 函数的具体实现。

  • advance办法的实现:
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
    //last 不能在以后工夫 now 之后,否则计算出来的 elapsed 为正数,会导致令牌桶数量缩小
  last := lim.last
    if now.Before(last) {last = now}

    // 依据令牌桶的缺数计算出令牌桶未进行更新的最大工夫
    maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
    elapsed := now.Sub(last) // 令牌桶未进行更新的时间段
    if elapsed > maxElapsed {elapsed = maxElapsed}

    // 依据未更新的工夫 (未向桶中退出令牌的时间段) 计算出产生的令牌数
    delta := lim.limit.tokensFromDuration(elapsed)
    tokens := lim.tokens + delta // 计算出可用的令牌数
    if burst := float64(lim.burst); tokens > burst {tokens = burst}

    return now, last, tokens
}

advance办法的作用是更新令牌桶的状态,计算出令牌桶未更新的工夫 (elapsed),依据elapsed 算出须要向桶中退出的令牌数delta,而后算出桶中可用的令牌数newTokens.

  • reserveN办法的实现:reserveN AllowN, ReserveN WaitN的辅助办法,用于判断在 maxFutureReserve 工夫内是否有足够的令牌。
// @param n 要生产的 token 数量
// @param maxFutureReserve 违心期待的最长工夫
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()
    // 如果没有限度
    if lim.limit == Inf {lim.mu.Unlock()
        return Reservation{
            ok:        true, // 桶中有足够的令牌
            lim:       lim,
            tokens:    n,
            timeToAct: now,
        }
    }
    // 更新令牌桶的状态,tokens 为目前可用的令牌数量
    now, last, tokens := lim.advance(now)
  // 计算取完之后桶还能剩能下多少 token
    tokens -= float64(n)
    var waitDuration time.Duration
  // 如果 token < 0, 阐明目前的 token 不够,须要期待一段时间
    if tokens < 0 {waitDuration = lim.limit.durationFromTokens(-tokens)
    }
    ok := n <= lim.burst && waitDuration <= maxFutureReserve
    r := Reservation{
        ok:    ok,
        lim:   lim,
        limit: lim.limit,
    }
  // timeToAct 示意当桶中满足 token 数目等于 n 的工夫
    if ok {
        r.tokens = n
        r.timeToAct = now.Add(waitDuration)
    }
  // 更新桶外面的 token 数目
    // 更新 last 工夫
    // lastEvent
    if ok {
        lim.last = now
        lim.tokens = tokens
        lim.lastEvent = r.timeToAct
    } else {lim.last = last}
    lim.mu.Unlock()
    return r
}

下面的代码我曾经进行了正文,这里在总结一下流程:

  • 首选判断是否领有速率限度,没有速率限度也就是桶中统一领有足够的令牌。
  • 计算从上次取 Token 的工夫到以后时刻,期间一共新产生了多少 Token:咱们只在取 Token 之前生成新的 Token,也就意味着每次取 Token 的距离,实际上也是生成 Token 的距离。咱们能够利用 tokensFromDuration, 轻易的算出这段时间一共产生 Token 的数目。所以以后 Token 数目 = 新产生的 Token 数目 + 之前残余的 Token 数目 – 要生产的 Token 数目。
  • 如果生产后残余 Token 数目大于零,阐明此时 Token 桶内仍不为空,此时 Token 短缺,无需调用侧期待。
    如果 Token 数目小于零,则需期待一段时间。那么这个时候,咱们能够利用 durationFromTokens 将以后负值的 Token 数转化为须要期待的工夫。
  • 将须要期待的工夫等相干后果返回给调用方

其实整个过程就是利用了 Token 数能够和工夫互相转化 的原理。而如果 Token 数为负,则须要期待相应工夫即可。

下面提到了 durationFromTokenstokensFromDuration 这两个办法,是要害,他们的实现如下:

func (limit Limit) durationFromTokens(tokens float64) time.Duration {seconds := tokens / float64(limit)
    return time.Nanosecond * time.Duration(1e9*seconds)
}
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
    // Split the integer and fractional parts ourself to minimize rounding errors.
    // See golang.org/issues/34861.
    sec := float64(d/time.Second) * float64(limit)
    nsec := float64(d%time.Second) * float64(limit)
    return sec + nsec/1e9
}
  • durationFromTokens:性能是计算出生成N 个新的 Token 一共须要多久。
  • tokensFromDuration:给定一段时长,这段时间一共能够生成多少个 Token。

仔细的网友会发现 tokensFromDuration 办法既然是计算一段时间一共能够生成多少个 Token,为什么不间接进行相乘呢?其实Golang 最后的版本就是采纳 d.Seconds() * float64(limit) 间接相乘实现的,尽管看上去一点问题没有,然而这里是两个小数相乘,会带来精度损失,所以采纳当初这种办法实现,别离求出秒的整数局部和小数局部,进行相乘后再相加,这样能够失去最准确的精度。

limiter偿还Token

既然咱们能够生产 Token,那么对应也能够勾销此次生产,将token 偿还,当调用 Cancel() 函数时,生产的 Token 数将会尽可能归还给 Token 桶。偿还也并不是那么简略,接下咱们咱们看看偿还 token 是如何实现的。

func (r *Reservation) CancelAt(now time.Time) {
    if !r.ok {return}

    r.lim.mu.Lock()
    defer r.lim.mu.Unlock()
  /*
  1. 如果无需限流
    2. tokens 为 0 (须要获取的令牌数量为 0)
    3. 曾经过了截至工夫
    以上三种状况无需解决勾销操作
    */
    if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {return}

    // 计算出须要还原的令牌数量
    // 这里的 r.lim.lastEvent 可能是本次 Reservation 的完结工夫,也可能是起初的 Reservation 的完结工夫,所以要把本次完结工夫点 (r.timeToAct) 之后产生的令牌数减去
    restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  // 当小于 0,示意曾经都预支完了,不能偿还了
    if restoreTokens <= 0 {return}
    // 从新计算令牌桶的状态
    now, _, tokens := r.lim.advance(now)
    // 还原以后令牌桶的令牌数量,以后的令牌数 tokens 加上须要还原的令牌数 restoreTokens
    tokens += restoreTokens
  // 如果 tokens 大于桶的最大容量,则将 tokens 置为桶的最大容量
    if burst := float64(r.lim.burst); tokens > burst {tokens = burst}
    // update state
    r.lim.last = now // 记录桶的更新工夫
    r.lim.tokens = tokens // 更新令牌数量
 // 如果都相等,阐明跟没生产一样。间接还原成上次的状态吧
    if r.timeToAct == r.lim.lastEvent {prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
        if !prevEvent.Before(now) {r.lim.lastEvent = prevEvent}
    }

    return
}

正文曾经增加,就不在具体解释了,重点是这一行代码:restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))r.tokens指的是本次生产的 token 数,r.timeToAcr指的是 Token 桶能够满足本次生产数目的时刻,也就是生产的时刻 + 期待的时长,r.lim.lastEvent指的是最近一次生产的 timeToAct 的值,通过 r.limit.tokensFromDuration 办法得出的后果指的是从该次生产到以后工夫,一共又生产了多少 Token 数目,所以最终得出这一段的代码含意是:

要偿还的Token = 该次生产的Token – 新生产的token

好啦,源码就临时剖析到这了,因为规范库的实现的代码量有点大,还有一部分在这里没有说,留给大家本人去分析吧~。

总结

本文重点介绍了漏桶算法和令牌桶算法,漏桶算法和令牌桶算法的次要区别在于,” 漏桶算法 ” 可能强行限度数据的传输速率(或申请频率),而 ” 令牌桶算法 ” 在可能限度数据的均匀传输速率外,还容许某种程度的突发传输。在某些状况下,漏桶算法不可能无效地应用网络资源,因为漏桶的漏出速率是固定的,所以即便网络中没有产生拥塞,漏桶算法也不能使某一个独自的数据流达到端口速率。因而,漏桶算法对于存在突发个性的流量来说不足效率。而令牌桶算法则可能满足这些具备突发个性的流量。通常,漏桶算法与令牌桶算法联合起来为网络流量提供更高效的管制。

文中测试代码已上传:https://github.com/asong2020/… 欢送star

好啦,这篇文章就到这里啦,素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!

创立了一个 Golang 学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:加我 vx 拉你入群,或者公众号获取入群二维码

结尾给大家发一个小福利吧,最近我在看 [微服务架构设计模式] 这一本书,讲的很好,本人也收集了一本 PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang 梦工厂],后盾回复:[微服务],即可获取。

我翻译了一份 GIN 中文文档,会定期进行保护,有须要的小伙伴后盾回复 [gin] 即可下载。

翻译了一份 Machinery 中文文档,会定期进行保护,有须要的小伙伴们后盾回复 [machinery] 即可获取。

我是 asong,一名普普通通的程序猿,让咱们一起缓缓变强吧。欢送各位的关注,咱们下期见~~~

举荐往期文章:

  • Go 看源码必会常识之 unsafe 包
  • 源码分析 panic 与 recover,看不懂你打我好了!
  • 详解并发编程根底之原子操作(atomic 包)
  • 详解 defer 实现机制
  • 真的了解 interface 了嘛
  • Leaf—Segment 分布式 ID 生成零碎(Golang 实现版本)
  • 十张动图带你搞懂排序算法(附 go 实现代码)
  • go 参数传递类型
  • 手把手教姐姐写音讯队列
  • 常见面试题之缓存雪崩、缓存穿透、缓存击穿
  • 详解 Context 包,看这一篇就够了!!!
  • go-ElasticSearch 入门看这一篇就够了(一)
  • 面试官:go 中 for-range 应用过吗?这几个问题你能解释一下起因吗
退出移动版