关于golang:Go-分布式令牌桶限流-兜底策略

8次阅读

共计 3437 个字符,预计需要花费 9 分钟才能阅读完成。

上篇文章提到固定工夫窗口限流无奈解决忽然申请洪峰状况,本文讲述的令牌桶线路算法则能够比拟好的解决此场景。

工作原理

  1. 单位工夫依照肯定速率匀速的生产 token 放入桶内,直到达到桶容量下限。
  2. 解决申请,每次尝试获取一个或多个令牌,如果拿到则解决申请,失败则拒绝请求。

优缺点

长处

能够无效解决霎时的突发流量,桶内存量 token 即可作为流量缓冲区平滑解决突发流量。

毛病

实现较为简单。

代码实现

core/limit/tokenlimit.go

分布式环境下思考应用 redis 作为桶和令牌的存储容器,采纳 lua 脚本实现整个算法流程。

redis lua 脚本

-- 每秒生成 token 数量即 token 生成速度
local rate = tonumber(ARGV[1])
-- 桶容量
local capacity = tonumber(ARGV[2])
-- 以后工夫戳
local now = tonumber(ARGV[3])
-- 以后申请 token 数量
local requested = tonumber(ARGV[4])
-- 须要多少秒能力填满桶
local fill_time = capacity/rate
-- 向下取整,ttl 为填满工夫的 2 倍
local ttl = math.floor(fill_time*2)
-- 以后工夫桶容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
-- 如果以后桶容量为 0, 阐明是第一次进入, 则默认容量为桶的最大容量
if last_tokens == nil then
last_tokens = capacity
end
-- 上一次刷新的工夫
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
-- 第一次进入则设置刷新工夫为 0
if last_refreshed == nil then
last_refreshed = 0
end
-- 间隔上次申请的时间跨度
local delta = math.max(0, now-last_refreshed)
-- 间隔上次申请的时间跨度, 总共能生产 token 的数量, 如果超多最大容量则抛弃多余的 token
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 本次申请 token 数量是否足够
local allowed = filled_tokens >= requested
-- 桶残余数量
local new_tokens = filled_tokens
-- 容许本次 token 申请, 计算残余数量
if allowed then
new_tokens = filled_tokens - requested
end
-- 设置残余 token 数量
redis.call("setex", KEYS[1], ttl, new_tokens)
-- 设置刷新工夫
redis.call("setex", KEYS[2], ttl, now)

return allowed

令牌桶限流器定义

type TokenLimiter struct {
    // 每秒生产速率
    rate int
    // 桶容量
    burst int
    // 存储容器
    store *redis.Redis
    // redis key
    tokenKey       string
    // 桶刷新工夫 key
    timestampKey   string
    // lock
    rescueLock     sync.Mutex
    // redis 衰弱标识
    redisAlive     uint32
    // redis 故障时采纳过程内 令牌桶限流器
    rescueLimiter  *xrate.Limiter
    // redis 监控探测工作标识
    monitorStarted bool
}

func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)

    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}

获取令牌

func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
    // 判断 redis 是否衰弱
    // redis 故障时采纳过程内限流器
    // 兜底保障
    if atomic.LoadUint32(&lim.redisAlive) == 0 {return lim.rescueLimiter.AllowN(now, n)
    }
    // 执行脚本获取令牌
    resp, err := lim.store.Eval(
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    // 非凡解决 key 不存在的状况
    if err == redis.Nil {return false} else if err != nil {logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // 执行异样,开启 redis 衰弱探测工作
        // 同时采纳过程内限流器作为兜底
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    code, ok := resp.(int64)
    if !ok {logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}

redis 故障时兜底策略

兜底策略的设计思考得十分细节,当 redis 不可用的时候,启动单机版的 ratelimit 做备用限流,确保根本的限流可用,服务不会被冲垮。

// 开启 redis 衰弱探测
func (lim *TokenLimiter) startMonitor() {lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()
    // 避免反复开启
    if lim.monitorStarted {return}

    // 设置工作和衰弱标识
    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)
    // 衰弱探测
    go lim.waitForRedis()}

// redis 衰弱探测定时工作
func (lim *TokenLimiter) waitForRedis() {ticker := time.NewTicker(pingInterval)
    // 衰弱探测胜利时回调此函数
    defer func() {ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()}()

    for range ticker.C {
        // ping 属于 redis 内置衰弱探测命令
        if lim.store.Ping() {
            // 衰弱探测胜利,设置衰弱标识
            atomic.StoreUint32(&lim.redisAlive, 1)
            return
        }
    }
}

我的项目地址

https://github.com/zeromicro/go-zero

欢送应用 go-zerostar 反对咱们!

微信交换群

关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。

正文完
 0