乐趣区

关于go:bbr自适应限流算法

1) 个别限流

个别咱们会抉择 ` 漏斗桶 / 令牌桶 ` 算法来进行限流, 的确可能爱护零碎不被拖垮。其 ` 核心思想 ` 有两点:
1) 设置指标, 固定一个漏斗或者固定发送令牌的速度
2) 超过指标限度流量进入

依据这两个特点, 咱们很容易推出会遇到什么 ` 问题 `:
1) 指标不好定, 设置流量的阈值是什么?
2) 当忽然呈现流量顶峰的时候, 是须要人工染指去调整的

总结就是传统限流比拟被动, 不可能自适应流量的变动

2) 自适应限流

对于自适应限流来说, 个别都是联合零碎的 `Load`、`CPU` 使用率以及利用的入口 `QPS`、` 均匀响应工夫 ` 和 ` 并发量 ` 等几个维度的监控指标,通过自适应的流控策略, 让零碎的入口流量和零碎的负载达到一个均衡,让零碎尽可能跑在最大吞吐量的同时保证系统整体的稳定性

3) 实现

咱们参考kratosgo-zero , 来看一下自适应限流具体是如何实现的

1) 根本公式

# 1) 计算单点 cpu, 得出一个 [0~1000]的数字示意 0~100% 的 cpu
cpu = (周期内用户应用 / 周期内零碎总共应用) * 1e3

# 2) 滑动窗口 cpu 计算 (指数加权均匀算法)
// 个别 decay=0.95, 示意消退率
// t 示意工夫周期, t-1 示意上一个工夫周期
windowCpu =  cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)

# 3) 计算是否应该抛弃
1) cpu 大于预约值, 比方 900
2) 周期内申请数超过容许的最大申请数, 计算形式如下
// winBucketPerSec: 每秒内的采样数量,
// 计算形式:
// int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)),
// conf.Window 默认值 10s, conf.WinBucket 默认值 100.
// 简化下公式: 1/(10/100) = 10, 所以每秒内的采样数就是 10
// maxQPS = maxPass * winBucketPerSec
// minRT = min average response time in milliseconds
// maxQPS * minRT / milliseconds_per_second
maxFlight = b.maxPass()*b.minRT()*b.winBucketPerSec)/1e3

2) 计算 cpu

此处只计算 linux 下的 cpu, 依据 cgroup计算

文件门路: internal/cpu/cgroup.go

# 1) cgroup 文件地址, 读取相干信息
/proc/{pid}/cgroup

// 失去相似如下信息 (我这里读的是某个 docker 过程的数据)
11:cpuset:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
10:memory:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
9:devices:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
8:blkio:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
7:hugetlb:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
6:perf_event:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
5:freezer:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
4:net_cls,net_prio:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
3:pids:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
2:cpu,cpuacct:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30
1:name=systemd:/docker/290247cde1fff59d5322068be83a7c7629f4454ac0960a89e6856ea041970b30

# 2) /sys/fs/cgroup 再把对应 cpu 拼上 cgroup 根门路读取对应信息

# 3) 最初计算出 cpu 的使用率

3) 计算滑动窗口 cpu

算法: 指数加权均匀算法 (moving average )

工夫周期: time.Millisecond * 500, 有 1s 的冷却工夫

公式 cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)

窗口: 10s的窗口内划分100 个 bucket, 消退率 decay=0.95

文件门路: internal/middleware/bbr.go:CpuProc()

// CpuProc update cpu in every 250 Millisecond
func CpuProc() {ticker := time.NewTicker(time.Millisecond * 250)
    defer func() {ticker.Stop()
        if err := recover(); err != nil {fmt.Println("cpuProc fail, e:", err)
            go CpuProc()}
    }()

    for range ticker.C {stat := &internal.Stat{}
        internal.LoadStat(stat)
        preCpu := atomic.LoadInt64(&cpu)

        // cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)
        curCpu := int64(float64(preCpu)*decay + float64(stat.Usage)*(1.0-decay))

        atomic.StoreInt64(&cpu, curCpu)
        fmt.Printf("old-self-cpu: %v, now-self-cpu:%v \n", preCpu, curCpu)
    }

}

4) 计算窗口内容许的最大申请数

公式: maxFlight = b.maxPass()*b.minRT()*b.winBucketPerSec)/1e3

文件:internal/middleware/bbr.go::maxFlight()

实际上就是每个 bucket 内最大的申请通过数和最小的响应工夫相乘, 即为maxFlight

如果 cpu 大于预设值或者申请数大于maxFlight, 则断定为须要丢掉申请

1) maxPass

// maxPass 单个采样窗口在一个采样周期中的最大的申请数,
// 默认的采样窗口是 10s, 采样 bucket 数量 100
func (b *BBR) maxPass() int64 {maxPassCache := b.maxPassCache.Load()
    if maxPassCache != nil {ps := maxPassCache.(*CounterCache)
        if b.timespan(ps.time) < 1 {return ps.val}
    }

    rawMaxPass := int64(b.passStat.Reduce(func(iterator metric.Iterator) float64 {
        var result = 1.0
        for i := 1; iterator.Next() && i < b.conf.WinBucket; i++ {bucket := iterator.Bucket()
            count := 0.0
            for _, point := range bucket.Points {count += point}
            result = math.Max(result, count)
        }
        return result
    }))

    if rawMaxPass == 0 {rawMaxPass = 1}

    b.maxPassCache.Store(&CounterCache{
        val:  rawMaxPass,
        time: time.Now(),})

    return rawMaxPass
}

2) minRT

// minRT 单个采样窗口中最小的响应工夫
func (b *BBR) minRT() int64 {minRtCache := b.minRtCache.Load()
    if minRtCache != nil {rc := minRtCache.(*CounterCache)
        if b.timespan(rc.time) < 1 {return rc.val}
    }

    rawMinRt := int64(math.Ceil(b.rtStat.Reduce(func(iterator metric.Iterator) float64 {
        var res = math.MaxFloat64

        for i := 1; iterator.Next() && i < b.conf.WinBucket; i++ {bucket := iterator.Bucket()
            if len(bucket.Points) == 0 {continue}

            total := 0.0
            for _, point := range bucket.Points {total += point}
            avg := total / float64(bucket.Count)
            res = math.Min(res, avg)
        }

        return res

    })))

    if rawMinRt <= 0 {rawMinRt = 1}

    b.minRtCache.Store(&CounterCache{
        val:  rawMinRt,
        time: time.Now(),})

    return rawMinRt
}

3) maxFlight

// current window max flight
func (b *BBR) maxFlight() int64 {
    // winBucketPerSec: 每秒内的采样数量,
    // 计算形式:
    // int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)),
    // conf.Window 默认值 10s, conf.WinBucket 默认值 100.
    // 简化下公式: 1/(10/100) = 10, 所以每秒内的采样数就是 10
    // maxQPS = maxPass * winBucketPerSec
    // minRT = min average response time in milliseconds
    // maxQPS * minRT / milliseconds_per_second
    return int64(
        math.Floor(
            float64(b.maxPass()*b.minRT()*b.winBucketPerSec)/1e3 + 0.5,
        ),
    )

}

4) shouldDrop

// Cooling time: 1s
func (b *BBR) shouldDrop() bool {
    // not overload
    if b.cpu() < b.conf.CPUThreshold {preDropTime, _ := b.preDrop.Load().(time.Duration)
        // didn't drop before
        if preDropTime == 0 {return false}

        // in cooling time duration, 1s
        // should not drop
        if time.Since(initTime)-preDropTime <= time.Second {inFlight := atomic.LoadInt64(&b.inFlight)
            return inFlight > 1 && inFlight > b.maxFlight()}

        // store this drop time as pre drop time
        b.preDrop.Store(time.Duration(0))
        return false
    }

    // overload case
    inFlight := atomic.LoadInt64(&b.inFlight)
    shouldDrop := inFlight > 1 && inFlight > b.maxFlight()

    if shouldDrop {preDropTime, _ := b.preDrop.Load().(time.Duration)
        if preDropTime != 0 {return shouldDrop}
        b.preDrop.Store(time.Since(initTime))
    }

    return shouldDrop
}

5) rollingCounter

窗口统计, 外围数据结构为:

// Bucket contains multiple float64 points.
// 环形链表
type Bucket struct {Points []float64 // all of the points
    Count  int64     // this bucket point length
    next   *Bucket
}

// Window contains multiple buckets.
Window struct {buckets []Bucket
    size    int
}

4) 源码地址

https://github.com/sado0823/go-bbr-ratelimit

5) 参考资料

  • alibaba-sentinel
  • kratos-bbr
  • go-zero-shedding
  • EMA algorithm
  • cgroup-/proc/stat
  • cgroup-cpu
退出移动版