目前在做一个音讯中台,提供给业务方各种音讯通道能力。咱们在零碎设计过程中,除了有对业务方在应用时做 Quota 限度;也有对申请做流量管制(几 w + QPS),避免并发流量上来时打垮服务。上面是集体在调研流量管制计划的一些梳理和总结。




  • 流量优化:防盗链解决
  • 前端优化:缩小 HTTP 申请,合并 CSS 或 js, 增加异步申请,启用流量器缓存和文件压缩,CDN 减速,建设独立图片服务器
  • 服务端优化:页面动态化,并发解决,队列解决
  • 数据库优化:数据库缓存,分库分表,分区操作,读写拆散,
  • Web 服务器优化:负载平衡,nginx 反向代理
  • 服务降级: 如果不是外围链路,就把这个服务去掉
  • 流量管制:限流




  • 计算器
  • 漏桶
  • 令牌桶
  • 滑动窗口


计数器是一种比较简单的限流算法,用处比拟宽泛,在接口层面,很多中央应用这种形式限流。在一段时间内,进行计数,与阀值进行比拟,到了工夫临界点,再将计数器清 0。

package counter

import (

func CounterDemo() {
    // init rate limiter
    limitCount := int64(100)
    interval := 60 * time.Second
    rateLimiter := NewRateLimiter(limitCount, interval)

    for i := 0; i < 800; i++ {if rateLimiter.Grant() {fmt.Println("Continue to process")
        fmt.Println("Exceed rate limit")

type RateLimiter struct {
    limitCount   int64
    interval     time.Duration
    requestCount int64
    startAt      time.Time

func NewRateLimiter(limitCount int64, interval time.Duration) *RateLimiter {
    return &RateLimiter{startAt:    time.Now(),
        interval:   interval,
        limitCount: limitCount,

func (rl *RateLimiter) Grant() bool {now := time.Now()
    if now.Before(rl.startAt.Add(rl.interval)) {
        if rl.requestCount < rl.limitCount {
            return true
        return false

    rl.startAt = time.Now()
    rl.requestCount = 0
    return false

这种实现形式存在一个工夫临界点问题:如果在单位工夫 1min 内的前 1s,曾经通过了 100 个申请,那前面的 59s,只能把申请回绝,这种景象称为 突刺景象



package leakyBucket

import (

func LeakyBucketDemo() {
    // init rate limiter
    rate := int64(5)
    size := int64(10)
    rateLimiter := NewRateLimiter(rate, size)
    for i := 0; i < 800; i++ {if rateLimiter.Grant() {fmt.Println("Continue to process")
        fmt.Println("Exceed rate limit")

type RateLimiter struct {
    startAt time.Time
    // bucket size
    size int64
    // now the water in bucket
    water int64
    // rater discharge rate
    rate int64

func NewRateLimiter(rate, size int64) *RateLimiter {
    return &RateLimiter{startAt: time.Now(),
        rate:    rate, // rate of processing requests, request/s
        size:    size,

func (rl *RateLimiter) Grant() bool {
    // calculating water output
    now := time.Now()
    out := int64(now.Sub(rl.startAt).Milliseconds()) * rl.rate

    // remain water after the leak
    rl.water = max(0, rl.water-out)

    rl.startAt = now
    if rl.water+1 < rl.size {
        return true
    return false

func max(a, b int64) int64 {
    if a > b {return a}
    return b




该算法的基本原理也很容易了解。就是有一个桶,外面有一个最大数量的 Token(容量)。每当一个消费者想要调用一个服务或生产一个资源时,他就会取出一个或多个 Token。只有当消费者可能取出所需数量的 Token 时,他能力生产一项服务。如果桶中没有所需数量的令牌,他须要期待,直到桶中有足够的令牌。

package tokenBucket

import (

func tokenBucketDemo() {tokenRate := int64(5)
    size := int64(10)
    rateLimiter := NewRateLimiter(tokenRate, size)
    for i := 0; i < 800; i++ {if rateLimiter.Grant() {fmt.Println("Continue to process")
        fmt.Println("Exceed rate limit")

type RateLimiter struct {
    startAt   time.Time
    size      int64
    tokens    int64
    tokenRate int64

func NewRateLimiter(tokenRate, size int64) *RateLimiter {
    return &RateLimiter{startAt:   time.Now(),
        tokenRate: tokenRate,
        size:      size,

func (rl *RateLimiter) Grant() bool {now := time.Now()
    in := now.Sub(rl.startAt).Milliseconds() * rl.tokenRate

    rl.tokens = min(rl.size, rl.tokens+in)
    rl.startAt = now

    if rl.tokens > 0 {
        return true
    return false

func min(a, b int64) int64 {
    if a > b {return b}
    return a



  1. 须要设置两个参数 (均匀速率和阈值),不肯定容易调试好
  2. 波及多个不同操作(比方漏桶算法,每次校验都须要再更新开始工夫),无奈原子化实现这些操作



package slidingWindow

import (

func SlidingWindowDemo() {
    // allow 10 requests per second
    rateLimiter := NewRateLimiter(time.Second, 10, func() Window {return NewLocalWindow()

    if rateLimiter.Grant() {fmt.Println("Continue to process")
    } else {fmt.Println("Exceed rate limit")


// Window represents a fixed-window
type Window interface {
    // Start returns the start boundary
    Start() time.Time

    // Count returns the accumulated count
    Count() int64

    // AddCount increments the accumulated count by n
    AddCount(n int64)

    // Reset sets the state of the window with the given settings
    Reset(s time.Time, c int64)

type NewWindow func() Window

type LocalWindow struct {
    start int64
    count int64

func NewLocalWindow() *LocalWindow {return &LocalWindow{}

func (w *LocalWindow) Start() time.Time {return time.Unix(0, w.start)

func (w *LocalWindow) Count() int64 {return w.count}

func (w *LocalWindow) AddCount(n int64) {w.count += n}

func (w *LocalWindow) Reset(s time.Time, c int64) {w.start = s.UnixNano()
    w.count = c

type RateLimiter struct {
    size  time.Duration
    limit int64

    mu sync.Mutex

    curr Window
    prev Window

func NewRateLimiter(size time.Duration, limit int64, newWindow NewWindow) *RateLimiter {currWin := newWindow()

    // The previous window is static (i.e. no add changes will happen within it),
    // so we always create it as an instance of LocalWindow
    prevWin := NewLocalWindow()

    return &RateLimiter{
        size:  size,
        limit: limit,
        curr:  currWin,
        prev:  prevWin,


// Size returns the time duration of one window size
func (rl *RateLimiter) Size() time.Duration {return rl.size}

// Limit returns the maximum events permitted to happen during one window size
func (rl *RateLimiter) Limit() int64 {rl.mu.Lock()
    defer rl.mu.Unlock()
    return rl.limit

func (rl *RateLimiter) SetLimit(limit int64) {rl.mu.Lock()
    defer rl.mu.Unlock()
    rl.limit = limit

// shorthand for GrantN(time.Now(), 1)
func (rl *RateLimiter) Grant() bool {return rl.GrantN(time.Now(), 1)

// reports whether n events may happen at time now
func (rl *RateLimiter) GrantN(now time.Time, n int64) bool {rl.mu.Lock()
    defer rl.mu.Unlock()


    elapsed := now.Sub(rl.curr.Start())
    weight := float64(rl.size-elapsed) / float64(rl.size)
    count := int64(weight*float64(rl.prev.Count())) + rl.curr.Count()

    if count+n > rl.limit {return false}

    return true

// advance updates the current/previous windows resulting from the passage of time
func (rl *RateLimiter) advance(now time.Time) {
    // Calculate the start boundary of the expected current-window.
    newCurrStart := now.Truncate(rl.size)

    diffSize := newCurrStart.Sub(rl.curr.Start()) / rl.size
    if diffSize >= 1 {
        // The current-window is at least one-window-size behind the expected one.
        newPrevCount := int64(0)
        if diffSize == 1 {
            // The new previous-window will overlap with the old current-window,
            // so it inherits the count.
            newPrevCount = rl.curr.Count()}

        rl.prev.Reset(newCurrStart.Add(-rl.size), newPrevCount)
        // The new current-window always has zero count.
        rl.curr.Reset(newCurrStart, 0)


下面的 4 种限流形式,更多是针对单实例下的并发场景,上面介绍几种服务集群的限流计划:

Nginx 限流

Nginx 官网提供的限速模块应用的是 漏桶算法,保障申请的实时处理速度不会超过预设的阈值,次要有两个设置:

  • limit_req_zone: 限度 IP 在单位工夫内的申请数
  • limit_req_conn: 限度同一时间连接数

Redis 限流

通过 Redis 提供的 incr 命令,在规定的工夫窗口,容许通过的最大申请数


Kong 官网提供了一种分布式滑动窗口算法的设计, 目前反对在 Kong 上做集群限流配置。它通过集中存储每个滑动窗口和 consumer 的计数,从而反对集群场景。这里举荐一个 Go 版本的实现:slidingwindow


另外业界在分布式场景下,也有 通过 Nginx+Lua 和 Redis+Lua 等形式来实现限流




