关于golang:高并发下的流量控制

6次阅读

共计 6980 个字符,预计需要花费 18 分钟才能阅读完成。

背景

目前在做一个音讯中台,提供给业务方各种音讯通道能力。咱们在零碎设计过程中,除了有对业务方在应用时做 Quota 限度;也有对申请做流量管制(几 w + QPS),避免并发流量上来时打垮服务。上面是集体在调研流量管制计划的一些梳理和总结。

高并发解决方案概述

并发通常是指并发拜访,也就是在某个工夫点,有多少个拜访申请同时到来。机器的性能是无限的,如果这个量级达到肯定水平,就会造成零碎压力,影响零碎性能。

应答高并发流量的几种解决方案:

  • 流量优化:防盗链解决
  • 前端优化:缩小 HTTP 申请,合并 CSS 或 js, 增加异步申请,启用流量器缓存和文件压缩,CDN 减速,建设独立图片服务器
  • 服务端优化:页面动态化,并发解决,队列解决
  • 数据库优化:数据库缓存,分库分表,分区操作,读写拆散,
  • Web 服务器优化:负载平衡,nginx 反向代理
  • 服务降级: 如果不是外围链路,就把这个服务去掉
  • 流量管制:限流

流量管制

高并发最无效和罕用的解决方案是流量管制,也就是限流。为应答服务的高可用,通过对大流量的申请进行限流,拦挡掉大部分申请,只容许一部分申请真正进入后端服务器,这样就能够避免大量申请造成零碎压力过大导致系统解体的状况,从而爱护服务失常可用。

罕用的限流算法

  • 计算器
  • 漏桶
  • 令牌桶
  • 滑动窗口

计数器

计数器是一种比较简单的限流算法,用处比拟宽泛,在接口层面,很多中央应用这种形式限流。在一段时间内,进行计数,与阀值进行比拟,到了工夫临界点,再将计数器清 0。

package counter

import (
    "fmt"
    "time"
)

func CounterDemo() {
    // init rate limiter
    limitCount := int64(100)
    interval := 60 * time.Second
    rateLimiter := NewRateLimiter(limitCount, interval)

    for i := 0; i < 800; i++ {if rateLimiter.Grant() {fmt.Println("Continue to process")
            continue
        }
        fmt.Println("Exceed rate limit")
    }
}

type RateLimiter struct {
    limitCount   int64
    interval     time.Duration
    requestCount int64
    startAt      time.Time
}

func NewRateLimiter(limitCount int64, interval time.Duration) *RateLimiter {
    return &RateLimiter{startAt:    time.Now(),
        interval:   interval,
        limitCount: limitCount,
    }
}

func (rl *RateLimiter) Grant() bool {now := time.Now()
    if now.Before(rl.startAt.Add(rl.interval)) {
        if rl.requestCount < rl.limitCount {
            rl.requestCount++
            return true
        }
        return false
    }

    rl.startAt = time.Now()
    rl.requestCount = 0
    return false
}

这种实现形式存在一个工夫临界点问题:如果在单位工夫 1min 内的前 1s,曾经通过了 100 个申请,那前面的 59s,只能把申请回绝,这种景象称为 突刺景象

漏桶

因为计数器存在突刺景象,能够应用漏桶算法来解决。漏桶提供了一种简略、直观的办法,通过队列来限度速率,能够把队列看作是一个寄存申请的桶。当一个申请被注册时,会被加到队列的末端。每隔一段时间,队列中的第一个事件就会被解决。这也被称为先进先出(FIFO)队列。如果队列已满,那么额定的申请就会被抛弃(或泄露)。

package leakyBucket

import (
    "fmt"
    "time"
)

func LeakyBucketDemo() {
    // init rate limiter
    rate := int64(5)
    size := int64(10)
    rateLimiter := NewRateLimiter(rate, size)
    for i := 0; i < 800; i++ {if rateLimiter.Grant() {fmt.Println("Continue to process")
            continue
        }
        fmt.Println("Exceed rate limit")
    }
}

type RateLimiter struct {
    startAt time.Time
    // bucket size
    size int64
    // now the water in bucket
    water int64
    // rater discharge rate
    rate int64
}

func NewRateLimiter(rate, size int64) *RateLimiter {
    return &RateLimiter{startAt: time.Now(),
        rate:    rate, // rate of processing requests, request/s
        size:    size,
    }
}

func (rl *RateLimiter) Grant() bool {
    // calculating water output
    now := time.Now()
    out := int64(now.Sub(rl.startAt).Milliseconds()) * rl.rate

    // remain water after the leak
    rl.water = max(0, rl.water-out)

    rl.startAt = now
    if rl.water+1 < rl.size {
        rl.water++
        return true
    }
    return false
}

func max(a, b int64) int64 {
    if a > b {return a}
    return b
}

漏桶算法的长处是,它能将突发的申请平滑化,并以近似均匀的速度解决。然而,霎时高并发的流量可能会使申请占满队列,使最新的申请无奈失去解决,也不能保障申请在固定工夫内失去解决。

令牌桶

令牌桶算法是对漏桶算法的一种改良,桶算法可能限度申请调用的速率,而令牌桶算法可能在限度调用的均匀速率的同时还容许肯定水平的突发调用。

该算法的基本原理也很容易了解。就是有一个桶,外面有一个最大数量的 Token(容量)。每当一个消费者想要调用一个服务或生产一个资源时,他就会取出一个或多个 Token。只有当消费者可能取出所需数量的 Token 时,他能力生产一项服务。如果桶中没有所需数量的令牌,他须要期待,直到桶中有足够的令牌。

package tokenBucket

import (
    "fmt"
    "time"
)

func tokenBucketDemo() {tokenRate := int64(5)
    size := int64(10)
    rateLimiter := NewRateLimiter(tokenRate, size)
    for i := 0; i < 800; i++ {if rateLimiter.Grant() {fmt.Println("Continue to process")
            continue
        }
        fmt.Println("Exceed rate limit")
    }
}

type RateLimiter struct {
    startAt   time.Time
    size      int64
    tokens    int64
    tokenRate int64
}

func NewRateLimiter(tokenRate, size int64) *RateLimiter {
    return &RateLimiter{startAt:   time.Now(),
        tokenRate: tokenRate,
        size:      size,
    }
}

func (rl *RateLimiter) Grant() bool {now := time.Now()
    in := now.Sub(rl.startAt).Milliseconds() * rl.tokenRate

    rl.tokens = min(rl.size, rl.tokens+in)
    rl.startAt = now

    if rl.tokens > 0 {
        rl.tokens--
        return true
    }
    return false
}

func min(a, b int64) int64 {
    if a > b {return b}
    return a
}

滑动窗口

漏桶和令牌桶算法存在两个毛病:

  1. 须要设置两个参数 (均匀速率和阈值),不肯定容易调试好
  2. 波及多个不同操作(比方漏桶算法,每次校验都须要再更新开始工夫),无奈原子化实现这些操作

这里举荐一种更优良的限流算法:滑动窗口。它能够灵便地扩大速率限度,并且性能良好。能较好解决下面这两个缺点,同时防止了漏桶的刹时大流量问题,以及计数器实现的突刺景象。

滑动窗口是把固定工夫片,进行划分,并且随着工夫进行挪动,这样就奇妙的避开了计数器的突刺景象。也就是说这些固定数量的能够挪动的格子,将会进行计数判断阀值,因而格子的数量影响着滑动窗口算法的精度。

package slidingWindow

import (
    "fmt"
    "sync"
    "time"
)

func SlidingWindowDemo() {
    // allow 10 requests per second
    rateLimiter := NewRateLimiter(time.Second, 10, func() Window {return NewLocalWindow()
    })

    if rateLimiter.Grant() {fmt.Println("Continue to process")
    } else {fmt.Println("Exceed rate limit")
    }

}

// Window represents a fixed-window
type Window interface {
    // Start returns the start boundary
    Start() time.Time

    // Count returns the accumulated count
    Count() int64

    // AddCount increments the accumulated count by n
    AddCount(n int64)

    // Reset sets the state of the window with the given settings
    Reset(s time.Time, c int64)
}

type NewWindow func() Window

type LocalWindow struct {
    start int64
    count int64
}

func NewLocalWindow() *LocalWindow {return &LocalWindow{}
}

func (w *LocalWindow) Start() time.Time {return time.Unix(0, w.start)
}

func (w *LocalWindow) Count() int64 {return w.count}

func (w *LocalWindow) AddCount(n int64) {w.count += n}

func (w *LocalWindow) Reset(s time.Time, c int64) {w.start = s.UnixNano()
    w.count = c
}

type RateLimiter struct {
    size  time.Duration
    limit int64

    mu sync.Mutex

    curr Window
    prev Window
}

func NewRateLimiter(size time.Duration, limit int64, newWindow NewWindow) *RateLimiter {currWin := newWindow()

    // The previous window is static (i.e. no add changes will happen within it),
    // so we always create it as an instance of LocalWindow
    prevWin := NewLocalWindow()

    return &RateLimiter{
        size:  size,
        limit: limit,
        curr:  currWin,
        prev:  prevWin,
    }

}

// Size returns the time duration of one window size
func (rl *RateLimiter) Size() time.Duration {return rl.size}

// Limit returns the maximum events permitted to happen during one window size
func (rl *RateLimiter) Limit() int64 {rl.mu.Lock()
    defer rl.mu.Unlock()
    return rl.limit
}

func (rl *RateLimiter) SetLimit(limit int64) {rl.mu.Lock()
    defer rl.mu.Unlock()
    rl.limit = limit
}

// shorthand for GrantN(time.Now(), 1)
func (rl *RateLimiter) Grant() bool {return rl.GrantN(time.Now(), 1)
}

// reports whether n events may happen at time now
func (rl *RateLimiter) GrantN(now time.Time, n int64) bool {rl.mu.Lock()
    defer rl.mu.Unlock()

    rl.advance(now)

    elapsed := now.Sub(rl.curr.Start())
    weight := float64(rl.size-elapsed) / float64(rl.size)
    count := int64(weight*float64(rl.prev.Count())) + rl.curr.Count()

    if count+n > rl.limit {return false}

    rl.curr.AddCount(n)
    return true
}

// advance updates the current/previous windows resulting from the passage of time
func (rl *RateLimiter) advance(now time.Time) {
    // Calculate the start boundary of the expected current-window.
    newCurrStart := now.Truncate(rl.size)

    diffSize := newCurrStart.Sub(rl.curr.Start()) / rl.size
    if diffSize >= 1 {
        // The current-window is at least one-window-size behind the expected one.
        newPrevCount := int64(0)
        if diffSize == 1 {
            // The new previous-window will overlap with the old current-window,
            // so it inherits the count.
            newPrevCount = rl.curr.Count()}

        rl.prev.Reset(newCurrStart.Add(-rl.size), newPrevCount)
        // The new current-window always has zero count.
        rl.curr.Reset(newCurrStart, 0)
    }
}

集群限流

下面的 4 种限流形式,更多是针对单实例下的并发场景,上面介绍几种服务集群的限流计划:

Nginx 限流

Nginx 官网提供的限速模块应用的是 漏桶算法,保障申请的实时处理速度不会超过预设的阈值,次要有两个设置:

  • limit_req_zone: 限度 IP 在单位工夫内的申请数
  • limit_req_conn: 限度同一时间连接数

Redis 限流

通过 Redis 提供的 incr 命令,在规定的工夫窗口,容许通过的最大申请数

分布式滑动窗口限流

Kong 官网提供了一种分布式滑动窗口算法的设计, 目前反对在 Kong 上做集群限流配置。它通过集中存储每个滑动窗口和 consumer 的计数,从而反对集群场景。这里举荐一个 Go 版本的实现:slidingwindow

其余

另外业界在分布式场景下,也有 通过 Nginx+Lua 和 Redis+Lua 等形式来实现限流

总结

本文次要是本人在学习和调研高并发场景下的限流计划的总结。目前业界风行的限流算法包含计数器、漏桶、令牌桶和滑动窗口,每种算法都有本人的劣势,理论利用中能够依据本人业务场景做抉择,而分布式场景下的限流计划,也根本通过以上限流算法来实现。在高并发下流量管制的一个准则是:先让申请先到队列,并做流量管制,不让流量间接打到零碎上。

参考

  1. 对高并发流量管制的一点思考
  2. How to Design a Scalable Rate Limiting Algorithm
  3. slidingwindow
  4. Token Bucket Rate Limiting
  5. Rate limiting Spring MVC endpoints with bucket4j
  6. How we built rate limiting capable of scaling to millions of domains
正文完
 0