上篇文章提到固定工夫窗口限流无奈解决忽然申请洪峰状况,本文讲述的令牌桶线路算法则能够比拟好的解决此场景。
工作原理
- 单位工夫依照肯定速率匀速的生产 token 放入桶内,直到达到桶容量下限。
- 解决申请,每次尝试获取一个或多个令牌,如果拿到则解决申请,失败则拒绝请求。
优缺点
长处
能够无效解决霎时的突发流量,桶内存量 token 即可作为流量缓冲区平滑解决突发流量。
毛病
实现较为简单。
代码实现
core/limit/tokenlimit.go
分布式环境下思考应用 redis 作为桶和令牌的存储容器,采纳 lua 脚本实现整个算法流程。
redis lua 脚本
-- 每秒生成token数量即token生成速度local rate = tonumber(ARGV[1])-- 桶容量local capacity = tonumber(ARGV[2])-- 以后工夫戳local now = tonumber(ARGV[3])-- 以后申请token数量local requested = tonumber(ARGV[4])-- 须要多少秒能力填满桶local fill_time = capacity/rate-- 向下取整,ttl为填满工夫的2倍local ttl = math.floor(fill_time*2)-- 以后工夫桶容量local last_tokens = tonumber(redis.call("get", KEYS[1]))-- 如果以后桶容量为0,阐明是第一次进入,则默认容量为桶的最大容量if last_tokens == nil thenlast_tokens = capacityend-- 上一次刷新的工夫local last_refreshed = tonumber(redis.call("get", KEYS[2]))-- 第一次进入则设置刷新工夫为0if last_refreshed == nil thenlast_refreshed = 0end-- 间隔上次申请的时间跨度local delta = math.max(0, now-last_refreshed)-- 间隔上次申请的时间跨度,总共能生产token的数量,如果超多最大容量则抛弃多余的tokenlocal filled_tokens = math.min(capacity, last_tokens+(delta*rate))-- 本次申请token数量是否足够local allowed = filled_tokens >= requested-- 桶残余数量local new_tokens = filled_tokens-- 容许本次token申请,计算残余数量if allowed thennew_tokens = filled_tokens - requestedend-- 设置残余token数量redis.call("setex", KEYS[1], ttl, new_tokens)-- 设置刷新工夫redis.call("setex", KEYS[2], ttl, now)return allowed
令牌桶限流器定义
type TokenLimiter struct { // 每秒生产速率 rate int // 桶容量 burst int // 存储容器 store *redis.Redis // redis key tokenKey string // 桶刷新工夫key timestampKey string // lock rescueLock sync.Mutex // redis衰弱标识 redisAlive uint32 // redis故障时采纳过程内 令牌桶限流器 rescueLimiter *xrate.Limiter // redis监控探测工作标识 monitorStarted bool}func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter { tokenKey := fmt.Sprintf(tokenFormat, key) timestampKey := fmt.Sprintf(timestampFormat, key) return &TokenLimiter{ rate: rate, burst: burst, store: store, tokenKey: tokenKey, timestampKey: timestampKey, redisAlive: 1, rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst), }}
获取令牌
func (lim *TokenLimiter) reserveN(now time.Time, n int) bool { // 判断redis是否衰弱 // redis故障时采纳过程内限流器 // 兜底保障 if atomic.LoadUint32(&lim.redisAlive) == 0 { return lim.rescueLimiter.AllowN(now, n) } // 执行脚本获取令牌 resp, err := lim.store.Eval( script, []string{ lim.tokenKey, lim.timestampKey, }, []string{ strconv.Itoa(lim.rate), strconv.Itoa(lim.burst), strconv.FormatInt(now.Unix(), 10), strconv.Itoa(n), }) // redis allowed == false // Lua boolean false -> r Nil bulk reply // 非凡解决key不存在的状况 if err == redis.Nil { return false } else if err != nil { logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err) // 执行异样,开启redis衰弱探测工作 // 同时采纳过程内限流器作为兜底 lim.startMonitor() return lim.rescueLimiter.AllowN(now, n) } code, ok := resp.(int64) if !ok { logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp) lim.startMonitor() return lim.rescueLimiter.AllowN(now, n) } // redis allowed == true // Lua boolean true -> r integer reply with value of 1 return code == 1}
redis 故障时兜底策略
兜底策略的设计思考得十分细节,当 redis
不可用的时候,启动单机版的 ratelimit
做备用限流,确保根本的限流可用,服务不会被冲垮。
// 开启redis衰弱探测func (lim *TokenLimiter) startMonitor() { lim.rescueLock.Lock() defer lim.rescueLock.Unlock() // 避免反复开启 if lim.monitorStarted { return } // 设置工作和衰弱标识 lim.monitorStarted = true atomic.StoreUint32(&lim.redisAlive, 0) // 衰弱探测 go lim.waitForRedis()}// redis衰弱探测定时工作func (lim *TokenLimiter) waitForRedis() { ticker := time.NewTicker(pingInterval) // 衰弱探测胜利时回调此函数 defer func() { ticker.Stop() lim.rescueLock.Lock() lim.monitorStarted = false lim.rescueLock.Unlock() }() for range ticker.C { // ping属于redis内置衰弱探测命令 if lim.store.Ping() { // 衰弱探测胜利,设置衰弱标识 atomic.StoreUint32(&lim.redisAlive, 1) return } }}
我的项目地址
https://github.com/zeromicro/go-zero
欢送应用 go-zero
并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并点击 交换群 获取社区群二维码。