共计 3437 个字符,预计需要花费 9 分钟才能阅读完成。
上篇文章提到固定工夫窗口限流无奈解决忽然申请洪峰状况,本文讲述的令牌桶线路算法则能够比拟好的解决此场景。
工作原理
- 单位工夫依照肯定速率匀速的生产 token 放入桶内,直到达到桶容量下限。
- 解决申请,每次尝试获取一个或多个令牌,如果拿到则解决申请,失败则拒绝请求。
优缺点
长处
能够无效解决霎时的突发流量,桶内存量 token 即可作为流量缓冲区平滑解决突发流量。
毛病
实现较为简单。
代码实现
core/limit/tokenlimit.go
分布式环境下思考应用 redis 作为桶和令牌的存储容器,采纳 lua 脚本实现整个算法流程。
redis lua 脚本
-- 每秒生成 token 数量即 token 生成速度 | |
local rate = tonumber(ARGV[1]) | |
-- 桶容量 | |
local capacity = tonumber(ARGV[2]) | |
-- 以后工夫戳 | |
local now = tonumber(ARGV[3]) | |
-- 以后申请 token 数量 | |
local requested = tonumber(ARGV[4]) | |
-- 须要多少秒能力填满桶 | |
local fill_time = capacity/rate | |
-- 向下取整,ttl 为填满工夫的 2 倍 | |
local ttl = math.floor(fill_time*2) | |
-- 以后工夫桶容量 | |
local last_tokens = tonumber(redis.call("get", KEYS[1])) | |
-- 如果以后桶容量为 0, 阐明是第一次进入, 则默认容量为桶的最大容量 | |
if last_tokens == nil then | |
last_tokens = capacity | |
end | |
-- 上一次刷新的工夫 | |
local last_refreshed = tonumber(redis.call("get", KEYS[2])) | |
-- 第一次进入则设置刷新工夫为 0 | |
if last_refreshed == nil then | |
last_refreshed = 0 | |
end | |
-- 间隔上次申请的时间跨度 | |
local delta = math.max(0, now-last_refreshed) | |
-- 间隔上次申请的时间跨度, 总共能生产 token 的数量, 如果超多最大容量则抛弃多余的 token | |
local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) | |
-- 本次申请 token 数量是否足够 | |
local allowed = filled_tokens >= requested | |
-- 桶残余数量 | |
local new_tokens = filled_tokens | |
-- 容许本次 token 申请, 计算残余数量 | |
if allowed then | |
new_tokens = filled_tokens - requested | |
end | |
-- 设置残余 token 数量 | |
redis.call("setex", KEYS[1], ttl, new_tokens) | |
-- 设置刷新工夫 | |
redis.call("setex", KEYS[2], ttl, now) | |
return allowed |
令牌桶限流器定义
type TokenLimiter struct { | |
// 每秒生产速率 | |
rate int | |
// 桶容量 | |
burst int | |
// 存储容器 | |
store *redis.Redis | |
// redis key | |
tokenKey string | |
// 桶刷新工夫 key | |
timestampKey string | |
// lock | |
rescueLock sync.Mutex | |
// redis 衰弱标识 | |
redisAlive uint32 | |
// redis 故障时采纳过程内 令牌桶限流器 | |
rescueLimiter *xrate.Limiter | |
// redis 监控探测工作标识 | |
monitorStarted bool | |
} | |
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {tokenKey := fmt.Sprintf(tokenFormat, key) | |
timestampKey := fmt.Sprintf(timestampFormat, key) | |
return &TokenLimiter{ | |
rate: rate, | |
burst: burst, | |
store: store, | |
tokenKey: tokenKey, | |
timestampKey: timestampKey, | |
redisAlive: 1, | |
rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst), | |
} | |
} |
获取令牌
func (lim *TokenLimiter) reserveN(now time.Time, n int) bool { | |
// 判断 redis 是否衰弱 | |
// redis 故障时采纳过程内限流器 | |
// 兜底保障 | |
if atomic.LoadUint32(&lim.redisAlive) == 0 {return lim.rescueLimiter.AllowN(now, n) | |
} | |
// 执行脚本获取令牌 | |
resp, err := lim.store.Eval( | |
script, | |
[]string{ | |
lim.tokenKey, | |
lim.timestampKey, | |
}, | |
[]string{strconv.Itoa(lim.rate), | |
strconv.Itoa(lim.burst), | |
strconv.FormatInt(now.Unix(), 10), | |
strconv.Itoa(n), | |
}) | |
// redis allowed == false | |
// Lua boolean false -> r Nil bulk reply | |
// 非凡解决 key 不存在的状况 | |
if err == redis.Nil {return false} else if err != nil {logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err) | |
// 执行异样,开启 redis 衰弱探测工作 | |
// 同时采纳过程内限流器作为兜底 | |
lim.startMonitor() | |
return lim.rescueLimiter.AllowN(now, n) | |
} | |
code, ok := resp.(int64) | |
if !ok {logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp) | |
lim.startMonitor() | |
return lim.rescueLimiter.AllowN(now, n) | |
} | |
// redis allowed == true | |
// Lua boolean true -> r integer reply with value of 1 | |
return code == 1 | |
} |
redis 故障时兜底策略
兜底策略的设计思考得十分细节,当 redis
不可用的时候,启动单机版的 ratelimit
做备用限流,确保根本的限流可用,服务不会被冲垮。
// 开启 redis 衰弱探测 | |
func (lim *TokenLimiter) startMonitor() {lim.rescueLock.Lock() | |
defer lim.rescueLock.Unlock() | |
// 避免反复开启 | |
if lim.monitorStarted {return} | |
// 设置工作和衰弱标识 | |
lim.monitorStarted = true | |
atomic.StoreUint32(&lim.redisAlive, 0) | |
// 衰弱探测 | |
go lim.waitForRedis()} | |
// redis 衰弱探测定时工作 | |
func (lim *TokenLimiter) waitForRedis() {ticker := time.NewTicker(pingInterval) | |
// 衰弱探测胜利时回调此函数 | |
defer func() {ticker.Stop() | |
lim.rescueLock.Lock() | |
lim.monitorStarted = false | |
lim.rescueLock.Unlock()}() | |
for range ticker.C { | |
// ping 属于 redis 内置衰弱探测命令 | |
if lim.store.Ping() { | |
// 衰弱探测胜利,设置衰弱标识 | |
atomic.StoreUint32(&lim.redisAlive, 1) | |
return | |
} | |
} | |
} |
我的项目地址
https://github.com/zeromicro/go-zero
欢送应用 go-zero
并 star 反对咱们!
微信交换群
关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。
正文完