关于golang:k8sclientgo源码剖析三

云原生社区活动—Kubernetes源码分析第一期第三周作业, 也是最初一周作业.

本文次要讲述下client-go中workqueue, 看一下client-go的一个整体数据走向.如下图:

而workqueue次要是在listener这里援用,listener应用chan获取到数据之后将数据放入到工作队列进行解决。次要是因为chan过于简略,曾经无奈满足K8S的场景,所以衍生出了workqueue,

个性


  1. 有序
  2. 去重
  3. 并发
  4. 提早解决
  5. 限速

以后有三种workqueue


  1. 根本队列
  2. 提早队列
  3. 限速队列

其中提早队列是基于根本队列实现的,而限流队列基于提早队列实现

根本队列


看一下根本队列的接口

// client-go源码门路util/workqueue/queue.go
type Interface interface {
    //新增元素 能够是任意对象
    Add(item interface{})
    //获取以后队列的长度
    Len() int
    // 阻塞获取头部元素(先入先出)  返回元素以及队列是否敞开
    Get() (item interface{}, shutdown bool)
    // 显示标记实现元素的解决
    Done(item interface{})
    //敞开队列
    ShutDown()
    //队列是否处于敞开状态
    ShuttingDown() bool
}

看一下根本队列的数据结构,只看三个重点解决的,其余的没有展现进去

type Type struct {
    //含有所有元素的元素的队列 保障有序
    queue []t
    //所有须要解决的元素 set是基于map以value为空struct实现的构造,保障去重
    dirty set
    //以后正在解决中的元素
    processing set
    ...
}

type empty struct{}
type t interface{}
type set map[t]empty

根本队列的hello world也很简略

 wq := workqueue.New()
    wq.Add("hello")
    v, _ := wq.Get()

根本队列Add


func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    //如果以后处于敞开状态,则不再新增元素
    if q.shuttingDown {
        return
    }
    //如果元素曾经在期待解决中,则不再新增
    if q.dirty.has(item) {
        return
    }
    //增加到metrics
    q.metrics.add(item)
    //退出期待解决中
    q.dirty.insert(item)
    //如果目前正在解决该元素 就不将元素增加到队列
    if q.processing.has(item) {
        return
    }
    q.queue = append(q.queue, item)
    q.cond.Signal()
}

根本队列Get


func (q *Type) Get() (item interface{}, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    //如果以后没有元素并且不处于敞开状态,则阻塞
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    ...
    item, q.queue = q.queue[0], q.queue[1:]
    q.metrics.get(item)
    //把元素增加到正在解决队列中
    q.processing.insert(item)
    //把队列从期待解决队列中删除
    q.dirty.delete(item)
    return item, false
}

根本队列实例化


func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
    t := &Type{
        clock:                      c,
        dirty:                      set{},
        processing:                 set{},
        cond:                       sync.NewCond(&sync.Mutex{}),
        metrics:                    metrics,
        unfinishedWorkUpdatePeriod: updatePeriod,
    }
        //启动一个协程 定时更新metrics
    go t.updateUnfinishedWorkLoop()
    return t
}

func (q *Type) updateUnfinishedWorkLoop() {
    t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
    defer t.Stop()
    for range t.C() {
        if !func() bool {
            q.cond.L.Lock()
            defer q.cond.L.Unlock()
            if !q.shuttingDown {
                q.metrics.updateUnfinishedWork()
                return true
            }
            return false

        }() {
            return
        }
    }
}

提早队列


提早队列的实现思路次要是应用优先队列寄存须要提早增加的元素,每次判断最小提早的元素书否曾经达到了退出队列的要求(提早的工夫到了),如果是则判断下一个元素,直到没有元素或者元素还须要提早为止。

看一下提早队列的数据结构

type delayingType struct {
    Interface
        ...
    //搁置提早增加的元素
    waitingForAddCh chan *waitFor
       ...
}

次要是应用chan来保留提早增加的元素,而具体实现是通过一个实现了一个AddAfter办法,看一下具体的内容

//提早队列的接口
type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    ...
    //如果提早实现小于等于0 间接增加到队列
    if duration <= 0 {
        q.Add(item)
        return
    }
    select {
    case <-q.stopCh:
    //增加到chan,上面会讲一下这个chan的解决
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
    }
}

提早元素的解决

func (q *delayingType) waitingLoop() {

    defer utilruntime.HandleCrash()

    never := make(<-chan time.Time)

    var nextReadyAtTimer clock.Timer

    waitingForQueue := &waitForPriorityQueue{}
    //这里是初始化一个优先队列 具体实现有趣味的同学能够钻研下
    heap.Init(waitingForQueue)

    waitingEntryByData := map[t]*waitFor{}

    for {
        if q.Interface.ShuttingDown() {
            return
        }

        now := q.clock.Now()

        // Add ready entries
        for waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            //看一下第一个元素是否曾经达到提早的工夫了
            if entry.readyAt.After(now) {
                break
            }

            //工夫到了,将元素增加到工作的队列,并且从提早的元素中移除
            entry = heap.Pop(waitingForQueue).(*waitFor)
            q.Add(entry.data)
            delete(waitingEntryByData, entry.data)
        }

        // Set up a wait for the first item's readyAt (if one exists)
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            if nextReadyAtTimer != nil {
                nextReadyAtTimer.Stop()
            }
            //如果还有须要提早的元素,计算第一个元素的延迟时间(最小提早的元素)
            entry := waitingForQueue.Peek().(*waitFor)
            nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
            nextReadyAt = nextReadyAtTimer.C()
        }

        select {
        case <-q.stopCh:
            return
        case <-q.heartbeat.C():
            //定时查看下是否有元素达到提早的工夫
        case <-nextReadyAt:
            //这里是下面计算出来的工夫,工夫到了,解决达到延迟时间的元素
        case waitEntry := <-q.waitingForAddCh:
            //查看是否须要提早,如果须要提早就退出到提早期待
            if waitEntry.readyAt.After(q.clock.Now()) {
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                //如果不须要提早就间接增加到队列
                q.Add(waitEntry.data)
            }

            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh: 

下面waitingLoop 是在实例化提早队列的时候调用的,看一下实例化时候的逻辑

func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
    //实例化一个数据结构
    ret := &delayingType{
        Interface:       NewNamed(name),
        clock:           clock,
        heartbeat:       clock.NewTicker(maxWait),
        stopCh:          make(chan struct{}),
        waitingForAddCh: make(chan *waitFor, 1000),
        metrics:         newRetryMetrics(name),
    }

    //放到一个协程中解决提早元素
    go ret.waitingLoop()

    return ret
}

限速队列


以后限速队列反对4中限速模式

  1. 令牌桶算法限速
  2. 排队指数限速
  3. 计数器模式
  4. 混合模式(多种限速算法同时应用)

限速队列的底层实际上还是通过提早队列来进行限速,通过计算出元素的限速工夫作为延迟时间

来看一下限速接口

type RateLimiter interface {
    //
    When(item interface{}) time.Duration
    // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
    // or for success, we'll stop tracking it
    Forget(item interface{})
    // NumRequeues returns back how many failures the item has had
    NumRequeues(item interface{}) int
}

看一下限速队列的数据结构

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
    DelayingInterface

    //实际上底层还是调用的提早队列,通过计算出元素的延迟时间 进行限速
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int
}

func (q *rateLimitingType) AddRateLimited(item interface{}) {
         //通过when办法计算提早退出队列的工夫
    q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

令牌桶算法


client-go中的令牌桶限速是通过 golang.org/x/time/rat包来实现的

能够通过 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 来应用令牌桶限速算法,其中第一个参数qps示意每秒补充多少token,burst示意总token下限为多少。

排队指数算法


排队指数能够通过 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 来应用。

这个算法有两个参数:

  1. baseDelay 根底限速工夫
  2. maxDelay 最大限速工夫

举个例子来了解一下这个算法,例如疾速插入5个雷同元素,baseDelay设置为1秒,maxDelay设置为10秒,都在同一个限速期内。第一个元素会在1秒后退出到队列,第二个元素会在2秒后退出到队列,第三个元素会在4秒后退出到队列,第四个元素会在8秒后退出到队列,第五个元素会在10秒后退出到队列(指数计算的后果为16,然而最大值设置了10秒)。

来看一下源码的计算

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    //第一次为0
    exp := r.failures[item]
    //累加1
    r.failures[item] = r.failures[item] + 1

    //通过以后计数和baseDelay计算指数后果  baseDelay*(2的exp次方)
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }

    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
        return r.maxDelay
    }

    return calculated
}

计数器模式


计数器模式能够通过 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int)来应用,有三个参数

  1. fastDelay 快限速工夫
  2. slowDelay 慢限速工夫
  3. maxFastAttempts 快限速元素个数

原理是这样的,假如fastDelay设置为1秒,slowDelay设置为10秒,maxFastAttempts设置为3,同样在一个限速周期内疾速插入5个雷同的元素。前三个元素都是以1秒的限速工夫退出到队列,增加第四个元素时开始应用slowDelay限速工夫,也就是10秒后退出到队列,前面的元素都将以10秒的限速工夫退出到队列,直到限速周期完结。

来看一下源码

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
    //增加一次就计数一次
    r.failures[item] = r.failures[item] + 1
    //计数小于maxFastAttempts都以fastDelay为限速工夫,否则以slowDelay为限速工夫
    if r.failures[item] <= r.maxFastAttempts {
        return r.fastDelay
    }
    return r.slowDelay
}

混合模式


最初一种是混合模式,能够组合应用不同的限速算法实例化限速队列

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
    return &MaxOfRateLimiter{limiters: limiters}
}

总结


在k8s-client-go的源码中能够看到,大量的接口组合使用,将各种性能拆分成各个细小的库,是一种十分值得学习的代码格调以及思路。

始发于 四颗咖啡豆,转载请申明出处.
关注公粽号->[四颗咖啡豆] 获取最新内容

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理