go-tokenlimit
one of rate limit – token bucket, written in go
我的项目地址:
https://github.com/sado0823/g…
what?
之前聊 bbr 自适应限流的时候, 说到了 ` 令牌桶 ` 和 ` 漏桶 ` 的限流算法, 明天的配角就是 ` 令牌桶 ` - `token bucket`.
能够概括如下:
- 令牌桶: 限流 (`Token Bucket`)
- 漏桶: 整流 (`Leaky Bucket`)
令牌桶算法的原理是零碎会以一个恒定的速度往桶里放入令牌,而如果申请须要被解决,则须要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。如果令牌曾经满了,则抛弃令牌
why?
置信大家都很相熟, 在 web 服务中, 有时手动的调整流量限速的阈值, 并且容许肯定的流量峰值场景, 这种状况下 ` 令牌桶 ` 就能够派上用场了
限流特点是:
- 当 bucket 满了的时候, 将不在放入新的 token, 不能超过最先限度值
- 依照肯定的速率进行 token 的减少
- 容许一次性耗费全副 token, 即容许肯定的流量峰值存在
how?
根本思维
通过保护一个具备容量下限的 token
桶, 有 token
则申请通过, 无则失败, 一直的耗费和补充这个桶, 从而达到限流的成果
执行过程
- 首先有一个
桶
, 外面装满了token
, 存在一个容量下限
- 用恒定的速率往
桶
中填入token
, 填满则抛弃 token
- 每次申请进来, 取走一个
token
, 如果没有可用的token
, 则拒绝请求
源码剖析
该实现为通过 lua
脚本, 在 redis
中保护 桶
的计数, 如果 redis
宕机了, 则由 go
自带的"golang.org/x/time/rate"
进行兜底
lua.lua
local rate = tonumber(ARGV[1])
local cap = tonumber(ARGV[2]) -- 最大容量
local now = tonumber(ARGV[3]) -- 以后工夫戳
local requested = tonumber(ARGV[4]) -- 须要去除的 token 数量
local fill_time = cap/rate
local ttl = math.floor(fill_time*2) -- 给肯定的过期工夫, 使得 token 复原平滑
-- KEYS[1]: token key 上次残余值
local last_tokens = tonumber(redis.call("get",KEYS[1]))
if last_tokens == nil then
last_tokens = cap
end
-- KEYS[2]: token refreshed timestamp 上一次刷新 token 的工夫戳
local last_refreshed = tonumber(redis.call("get",KEYS[2]))
if last_refreshed == nil then
last_refreshed = 0
end
-- 计算工夫差值
local delta = math.max(0, now-last_refreshed)
-- 工夫差值 x 减少速率 + 上次残余值, 计算出以后可用值
local left = math.min(cap,last_tokens+delta*rate)
local new_tokens = left
-- 可用值 > 申请值, 则通过
local allowed = left >= requested
if allowed then
new_tokens = left - requested
end
redis.call("setex", KEYS[1], ttl, new_tokens)
redis.call("setex", KEYS[2], ttl, now)
return allowed
tokenlimit.go
主函数里应用的是一个interface
, 为当前增加别的存储引擎留下了可能性 (大概率也不会用别的存储了 …)
RedisI interface {Eval(ctx context.Context, script string, keys []string, args ...interface{}) (interface{}, error)
IsErrNil(err error) bool
Ping() bool}
tokenlimit.go
最终实现限流器的构造体
Limiter struct {
rate int // generate token number each second
cap int // at most token to store
local *xrate.Limiter // limiter in process
remote RedisI // for distributed situation, can use redis
tokenKey string
tsKey string // timestamp key, tag get token time
remoteMu sync.Mutex
remoteAlive uint32 // ping remote server is alive or not
monitorStarted bool
}
// New a Limiter
func New(rate, cap int, store RedisI, key string) *Limiter {
return &Limiter{
rate: rate,
cap: cap,
local: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), cap),
remote: store,
tokenKey: fmt.Sprintf(tokenFormat, key),
tsKey: fmt.Sprintf(timestampFormat, key),
remoteAlive: 1,
}
}
tokenlimit.go
新建 redis
存储的助手函数, 提供 dsn
即可
func NewStore(addr string) RedisI {return redisx.New(addr)
}
example
func test() {
const (
total = 100
rate = 5
burst = 10
)
limiter := New(rate, total, NewStore("127.0.0.1:6379"), "test-token-limit")
var allowed int
for i := 0; i < total; i++ {time.Sleep(time.Second / time.Duration(total))
if limiter.Allow() {allowed++}
}
fmt.Printf("Test_Remote: allowed:%d, burst:%d, rate:%d \n", allowed, burst, rate)
}
references
- go-zero
- wiki
- token-bucket