云原生社区活动---Kubernetes源码分析第一期第三周作业, 也是最初一周作业.
本文次要讲述下client-go中workqueue, 看一下client-go的一个整体数据走向.如下图:
而workqueue次要是在listener这里援用,listener应用chan获取到数据之后将数据放入到工作队列进行解决。次要是因为chan过于简略,曾经无奈满足K8S的场景,所以衍生出了workqueue,
个性
- 有序
- 去重
- 并发
- 提早解决
- 限速
以后有三种workqueue
- 根本队列
- 提早队列
- 限速队列
其中提早队列是基于根本队列实现的,而限流队列基于提早队列实现
根本队列
看一下根本队列的接口
// client-go源码门路util/workqueue/queue.gotype Interface interface { //新增元素 能够是任意对象 Add(item interface{}) //获取以后队列的长度 Len() int // 阻塞获取头部元素(先入先出) 返回元素以及队列是否敞开 Get() (item interface{}, shutdown bool) // 显示标记实现元素的解决 Done(item interface{}) //敞开队列 ShutDown() //队列是否处于敞开状态 ShuttingDown() bool}
看一下根本队列的数据结构,只看三个重点解决的,其余的没有展现进去
type Type struct { //含有所有元素的元素的队列 保障有序 queue []t //所有须要解决的元素 set是基于map以value为空struct实现的构造,保障去重 dirty set //以后正在解决中的元素 processing set ...}type empty struct{}type t interface{}type set map[t]empty
根本队列的hello world也很简略
wq := workqueue.New() wq.Add("hello") v, _ := wq.Get()
根本队列Add
func (q *Type) Add(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() //如果以后处于敞开状态,则不再新增元素 if q.shuttingDown { return } //如果元素曾经在期待解决中,则不再新增 if q.dirty.has(item) { return } //增加到metrics q.metrics.add(item) //退出期待解决中 q.dirty.insert(item) //如果目前正在解决该元素 就不将元素增加到队列 if q.processing.has(item) { return } q.queue = append(q.queue, item) q.cond.Signal()}
根本队列Get
func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock() defer q.cond.L.Unlock() //如果以后没有元素并且不处于敞开状态,则阻塞 for len(q.queue) == 0 && !q.shuttingDown { q.cond.Wait() } ... item, q.queue = q.queue[0], q.queue[1:] q.metrics.get(item) //把元素增加到正在解决队列中 q.processing.insert(item) //把队列从期待解决队列中删除 q.dirty.delete(item) return item, false}
根本队列实例化
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type { t := &Type{ clock: c, dirty: set{}, processing: set{}, cond: sync.NewCond(&sync.Mutex{}), metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod, } //启动一个协程 定时更新metrics go t.updateUnfinishedWorkLoop() return t}func (q *Type) updateUnfinishedWorkLoop() { t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) defer t.Stop() for range t.C() { if !func() bool { q.cond.L.Lock() defer q.cond.L.Unlock() if !q.shuttingDown { q.metrics.updateUnfinishedWork() return true } return false }() { return } }}
提早队列
提早队列的实现思路次要是应用优先队列寄存须要提早增加的元素,每次判断最小提早的元素书否曾经达到了退出队列的要求(提早的工夫到了),如果是则判断下一个元素,直到没有元素或者元素还须要提早为止。
看一下提早队列的数据结构
type delayingType struct { Interface ... //搁置提早增加的元素 waitingForAddCh chan *waitFor ...}
次要是应用chan来保留提早增加的元素,而具体实现是通过一个实现了一个AddAfter办法,看一下具体的内容
//提早队列的接口type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration)}func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { ... //如果提早实现小于等于0 间接增加到队列 if duration <= 0 { q.Add(item) return } select { case <-q.stopCh: //增加到chan,上面会讲一下这个chan的解决 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: }}
提早元素的解决
func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() never := make(<-chan time.Time) var nextReadyAtTimer clock.Timer waitingForQueue := &waitForPriorityQueue{} //这里是初始化一个优先队列 具体实现有趣味的同学能够钻研下 heap.Init(waitingForQueue) waitingEntryByData := map[t]*waitFor{} for { if q.Interface.ShuttingDown() { return } now := q.clock.Now() // Add ready entries for waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) //看一下第一个元素是否曾经达到提早的工夫了 if entry.readyAt.After(now) { break } //工夫到了,将元素增加到工作的队列,并且从提早的元素中移除 entry = heap.Pop(waitingForQueue).(*waitFor) q.Add(entry.data) delete(waitingEntryByData, entry.data) } // Set up a wait for the first item's readyAt (if one exists) nextReadyAt := never if waitingForQueue.Len() > 0 { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } //如果还有须要提早的元素,计算第一个元素的延迟时间(最小提早的元素) entry := waitingForQueue.Peek().(*waitFor) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAt = nextReadyAtTimer.C() } select { case <-q.stopCh: return case <-q.heartbeat.C(): //定时查看下是否有元素达到提早的工夫 case <-nextReadyAt: //这里是下面计算出来的工夫,工夫到了,解决达到延迟时间的元素 case waitEntry := <-q.waitingForAddCh: //查看是否须要提早,如果须要提早就退出到提早期待 if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { //如果不须要提早就间接增加到队列 q.Add(waitEntry.data) } drained := false for !drained { select { case waitEntry := <-q.waitingForAddCh:
下面waitingLoop 是在实例化提早队列的时候调用的,看一下实例化时候的逻辑
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface { //实例化一个数据结构 ret := &delayingType{ Interface: NewNamed(name), clock: clock, heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(name), } //放到一个协程中解决提早元素 go ret.waitingLoop() return ret}
限速队列
以后限速队列反对4中限速模式
- 令牌桶算法限速
- 排队指数限速
- 计数器模式
- 混合模式(多种限速算法同时应用)
限速队列的底层实际上还是通过提早队列来进行限速,通过计算出元素的限速工夫作为延迟时间
来看一下限速接口
type RateLimiter interface { // When(item interface{}) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing // or for success, we'll stop tracking it Forget(item interface{}) // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int}
看一下限速队列的数据结构
// RateLimitingInterface is an interface that rate limits items being added to the queue.type RateLimitingInterface interface { DelayingInterface //实际上底层还是调用的提早队列,通过计算出元素的延迟时间 进行限速 AddRateLimited(item interface{}) // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int}func (q *rateLimitingType) AddRateLimited(item interface{}) { //通过when办法计算提早退出队列的工夫 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))}
令牌桶算法
client-go中的令牌桶限速是通过 golang.org/x/time/rat包来实现的
能够通过 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 来应用令牌桶限速算法,其中第一个参数qps示意每秒补充多少token,burst示意总token下限为多少。
排队指数算法
排队指数能够通过 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 来应用。
这个算法有两个参数:
- baseDelay 根底限速工夫
- maxDelay 最大限速工夫
举个例子来了解一下这个算法,例如疾速插入5个雷同元素,baseDelay设置为1秒,maxDelay设置为10秒,都在同一个限速期内。第一个元素会在1秒后退出到队列,第二个元素会在2秒后退出到队列,第三个元素会在4秒后退出到队列,第四个元素会在8秒后退出到队列,第五个元素会在10秒后退出到队列(指数计算的后果为16,然而最大值设置了10秒)。
来看一下源码的计算
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() //第一次为0 exp := r.failures[item] //累加1 r.failures[item] = r.failures[item] + 1 //通过以后计数和baseDelay计算指数后果 baseDelay*(2的exp次方) backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) if backoff > math.MaxInt64 { return r.maxDelay } calculated := time.Duration(backoff) if calculated > r.maxDelay { return r.maxDelay } return calculated}
计数器模式
计数器模式能够通过 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int)来应用,有三个参数
- fastDelay 快限速工夫
- slowDelay 慢限速工夫
- maxFastAttempts 快限速元素个数
原理是这样的,假如fastDelay设置为1秒,slowDelay设置为10秒,maxFastAttempts设置为3,同样在一个限速周期内疾速插入5个雷同的元素。前三个元素都是以1秒的限速工夫退出到队列,增加第四个元素时开始应用slowDelay限速工夫,也就是10秒后退出到队列,前面的元素都将以10秒的限速工夫退出到队列,直到限速周期完结。
来看一下源码
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() //增加一次就计数一次 r.failures[item] = r.failures[item] + 1 //计数小于maxFastAttempts都以fastDelay为限速工夫,否则以slowDelay为限速工夫 if r.failures[item] <= r.maxFastAttempts { return r.fastDelay } return r.slowDelay}
混合模式
最初一种是混合模式,能够组合应用不同的限速算法实例化限速队列
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { return &MaxOfRateLimiter{limiters: limiters}}
总结
在k8s-client-go的源码中能够看到,大量的接口组合使用,将各种性能拆分成各个细小的库,是一种十分值得学习的代码格调以及思路。
始发于 四颗咖啡豆,转载请申明出处.
关注公粽号->[四颗咖啡豆] 获取最新内容