乐趣区

关于golang:k8sclientgo源码剖析三

云原生社区活动 —Kubernetes 源码分析第一期第三周作业, 也是最初一周作业.

本文次要讲述下 client-go 中 workqueue, 看一下 client-go 的一个整体数据走向. 如下图:

而 workqueue 次要是在 listener 这里援用,listener 应用 chan 获取到数据之后将数据放入到工作队列进行解决。次要是因为 chan 过于简略,曾经无奈满足 K8S 的场景,所以衍生出了 workqueue,

个性


  1. 有序
  2. 去重
  3. 并发
  4. 提早解决
  5. 限速

以后有三种 workqueue


  1. 根本队列
  2. 提早队列
  3. 限速队列

其中提早队列是基于根本队列实现的,而限流队列基于提早队列实现

根本队列


看一下根本队列的接口

// client-go 源码门路 util/workqueue/queue.go
type Interface interface {
    // 新增元素 能够是任意对象
    Add(item interface{})
    // 获取以后队列的长度
    Len() int
    // 阻塞获取头部元素 (先入先出)  返回元素以及队列是否敞开
    Get() (item interface{}, shutdown bool)
    // 显示标记实现元素的解决
    Done(item interface{})
    // 敞开队列
    ShutDown()
    // 队列是否处于敞开状态
    ShuttingDown() bool}

看一下根本队列的数据结构, 只看三个重点解决的, 其余的没有展现进去

type Type struct {
    // 含有所有元素的元素的队列 保障有序
    queue []t
    // 所有须要解决的元素 set 是基于 map 以 value 为空 struct 实现的构造,保障去重
    dirty set
    // 以后正在解决中的元素
    processing set
    ...
}

type empty struct{}
type t interface{}
type set map[t]empty

根本队列的 hello world 也很简略

 wq := workqueue.New()
    wq.Add("hello")
    v, _ := wq.Get()

根本队列 Add


func (q *Type) Add(item interface{}) {q.cond.L.Lock()
    defer q.cond.L.Unlock()
    // 如果以后处于敞开状态, 则不再新增元素
    if q.shuttingDown {return}
    // 如果元素曾经在期待解决中, 则不再新增
    if q.dirty.has(item) {return}
    // 增加到 metrics
    q.metrics.add(item)
    // 退出期待解决中
    q.dirty.insert(item)
    // 如果目前正在解决该元素 就不将元素增加到队列
    if q.processing.has(item) {return}
    q.queue = append(q.queue, item)
    q.cond.Signal()}

根本队列 Get


func (q *Type) Get() (item interface{}, shutdown bool) {q.cond.L.Lock()
    defer q.cond.L.Unlock()
    // 如果以后没有元素并且不处于敞开状态, 则阻塞
    for len(q.queue) == 0 && !q.shuttingDown {q.cond.Wait()
    }
    ...
    item, q.queue = q.queue[0], q.queue[1:]
    q.metrics.get(item)
    // 把元素增加到正在解决队列中
    q.processing.insert(item)
    // 把队列从期待解决队列中删除
    q.dirty.delete(item)
    return item, false
}

根本队列实例化


func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
    t := &Type{
        clock:                      c,
        dirty:                      set{},
        processing:                 set{},
        cond:                       sync.NewCond(&sync.Mutex{}),
        metrics:                    metrics,
        unfinishedWorkUpdatePeriod: updatePeriod,
    }
        // 启动一个协程 定时更新 metrics
    go t.updateUnfinishedWorkLoop()
    return t
}

func (q *Type) updateUnfinishedWorkLoop() {t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
    defer t.Stop()
    for range t.C() {if !func() bool {q.cond.L.Lock()
            defer q.cond.L.Unlock()
            if !q.shuttingDown {q.metrics.updateUnfinishedWork()
                return true
            }
            return false

        }() {return}
    }
}

提早队列


提早队列的实现思路次要是应用优先队列寄存须要提早增加的元素, 每次判断最小提早的元素书否曾经达到了退出队列的要求 (提早的工夫到了), 如果是则判断下一个元素, 直到没有元素或者元素还须要提早为止。

看一下提早队列的数据结构

type delayingType struct {
    Interface
        ...
    // 搁置提早增加的元素
    waitingForAddCh chan *waitFor
       ...
}

次要是应用 chan 来保留提早增加的元素, 而具体实现是通过一个实现了一个 AddAfter 办法,看一下具体的内容

// 提早队列的接口
type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    ...
    // 如果提早实现小于等于 0 间接增加到队列
    if duration <= 0 {q.Add(item)
        return
    }
    select {
    case <-q.stopCh:
    // 增加到 chan, 上面会讲一下这个 chan 的解决
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
    }
}

提早元素的解决

func (q *delayingType) waitingLoop() {defer utilruntime.HandleCrash()

    never := make(<-chan time.Time)

    var nextReadyAtTimer clock.Timer

    waitingForQueue := &waitForPriorityQueue{}
    // 这里是初始化一个优先队列 具体实现有趣味的同学能够钻研下
    heap.Init(waitingForQueue)

    waitingEntryByData := map[t]*waitFor{}

    for {if q.Interface.ShuttingDown() {return}

        now := q.clock.Now()

        // Add ready entries
        for waitingForQueue.Len() > 0 {entry := waitingForQueue.Peek().(*waitFor)
            // 看一下第一个元素是否曾经达到提早的工夫了
            if entry.readyAt.After(now) {break}

            // 工夫到了, 将元素增加到工作的队列, 并且从提早的元素中移除
            entry = heap.Pop(waitingForQueue).(*waitFor)
            q.Add(entry.data)
            delete(waitingEntryByData, entry.data)
        }

        // Set up a wait for the first item's readyAt (if one exists)
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            if nextReadyAtTimer != nil {nextReadyAtTimer.Stop()
            }
            // 如果还有须要提早的元素, 计算第一个元素的延迟时间 (最小提早的元素)
            entry := waitingForQueue.Peek().(*waitFor)
            nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
            nextReadyAt = nextReadyAtTimer.C()}

        select {
        case <-q.stopCh:
            return
        case <-q.heartbeat.C():
            // 定时查看下是否有元素达到提早的工夫
        case <-nextReadyAt:
            // 这里是下面计算出来的工夫, 工夫到了, 解决达到延迟时间的元素
        case waitEntry := <-q.waitingForAddCh:
            // 查看是否须要提早, 如果须要提早就退出到提早期待
            if waitEntry.readyAt.After(q.clock.Now()) {insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                // 如果不须要提早就间接增加到队列
                q.Add(waitEntry.data)
            }

            drained := false
            for !drained {
                select {case waitEntry := <-q.waitingForAddCh: 

下面 waitingLoop 是在实例化提早队列的时候调用的,看一下实例化时候的逻辑

func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
    // 实例化一个数据结构
    ret := &delayingType{Interface:       NewNamed(name),
        clock:           clock,
        heartbeat:       clock.NewTicker(maxWait),
        stopCh:          make(chan struct{}),
        waitingForAddCh: make(chan *waitFor, 1000),
        metrics:         newRetryMetrics(name),
    }

    // 放到一个协程中解决提早元素
    go ret.waitingLoop()

    return ret
}

限速队列


以后限速队列反对 4 中限速模式

  1. 令牌桶算法限速
  2. 排队指数限速
  3. 计数器模式
  4. 混合模式 (多种限速算法同时应用)

限速队列的底层实际上还是通过提早队列来进行限速, 通过计算出元素的限速工夫作为延迟时间

来看一下限速接口

type RateLimiter interface {
    //
    When(item interface{}) time.Duration
    // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
    // or for success, we'll stop tracking it
    Forget(item interface{})
    // NumRequeues returns back how many failures the item has had
    NumRequeues(item interface{}) int
}

看一下限速队列的数据结构

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
    DelayingInterface

    // 实际上底层还是调用的提早队列, 通过计算出元素的延迟时间 进行限速
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int
}

func (q *rateLimitingType) AddRateLimited(item interface{}) {
         // 通过 when 办法计算提早退出队列的工夫
    q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

令牌桶算法


client-go 中的令牌桶限速是通过 golang.org/x/time/rat 包来实现的

能够通过 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 来应用令牌桶限速算法,其中第一个参数 qps 示意每秒补充多少 token,burst 示意总 token 下限为多少。

排队指数算法


排队指数能够通过 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 来应用。

这个算法有两个参数:

  1. baseDelay 根底限速工夫
  2. maxDelay 最大限速工夫

举个例子来了解一下这个算法,例如疾速插入 5 个雷同元素,baseDelay 设置为 1 秒,maxDelay 设置为 10 秒,都在同一个限速期内。第一个元素会在 1 秒后退出到队列,第二个元素会在 2 秒后退出到队列,第三个元素会在 4 秒后退出到队列,第四个元素会在 8 秒后退出到队列,第五个元素会在 10 秒后退出到队列 (指数计算的后果为 16,然而最大值设置了 10 秒)。

来看一下源码的计算

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    // 第一次为 0
    exp := r.failures[item]
    // 累加 1
    r.failures[item] = r.failures[item] + 1

    // 通过以后计数和 baseDelay 计算指数后果  baseDelay*(2 的 exp 次方)
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    if backoff > math.MaxInt64 {return r.maxDelay}

    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {return r.maxDelay}

    return calculated
}

计数器模式


计数器模式能够通过 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) 来应用,有三个参数

  1. fastDelay 快限速工夫
  2. slowDelay 慢限速工夫
  3. maxFastAttempts 快限速元素个数

原理是这样的,假如 fastDelay 设置为 1 秒,slowDelay 设置为 10 秒,maxFastAttempts 设置为 3,同样在一个限速周期内疾速插入 5 个雷同的元素。前三个元素都是以 1 秒的限速工夫退出到队列,增加第四个元素时开始应用 slowDelay 限速工夫,也就是 10 秒后退出到队列,前面的元素都将以 10 秒的限速工夫退出到队列,直到限速周期完结。

来看一下源码

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
    // 增加一次就计数一次
    r.failures[item] = r.failures[item] + 1
    // 计数小于 maxFastAttempts 都以 fastDelay 为限速工夫,否则以 slowDelay 为限速工夫
    if r.failures[item] <= r.maxFastAttempts {return r.fastDelay}
    return r.slowDelay
}

混合模式


最初一种是混合模式,能够组合应用不同的限速算法实例化限速队列

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {return &MaxOfRateLimiter{limiters: limiters}
}

总结


在 k8s-client-go 的源码中能够看到,大量的接口组合使用,将各种性能拆分成各个细小的库,是一种十分值得学习的代码格调以及思路。

始发于 四颗咖啡豆, 转载请申明出处.
关注公粽号 ->[四颗咖啡豆] 获取最新内容

退出移动版