云原生社区活动 —Kubernetes 源码分析第一期第三周作业, 也是最初一周作业.
本文次要讲述下 client-go 中 workqueue, 看一下 client-go 的一个整体数据走向. 如下图:
而 workqueue 次要是在 listener 这里援用,listener 应用 chan 获取到数据之后将数据放入到工作队列进行解决。次要是因为 chan 过于简略,曾经无奈满足 K8S 的场景,所以衍生出了 workqueue,
个性
- 有序
- 去重
- 并发
- 提早解决
- 限速
以后有三种 workqueue
- 根本队列
- 提早队列
- 限速队列
其中提早队列是基于根本队列实现的,而限流队列基于提早队列实现
根本队列
看一下根本队列的接口
// client-go 源码门路 util/workqueue/queue.go
type Interface interface {
// 新增元素 能够是任意对象
Add(item interface{})
// 获取以后队列的长度
Len() int
// 阻塞获取头部元素 (先入先出) 返回元素以及队列是否敞开
Get() (item interface{}, shutdown bool)
// 显示标记实现元素的解决
Done(item interface{})
// 敞开队列
ShutDown()
// 队列是否处于敞开状态
ShuttingDown() bool}
看一下根本队列的数据结构, 只看三个重点解决的, 其余的没有展现进去
type Type struct {
// 含有所有元素的元素的队列 保障有序
queue []t
// 所有须要解决的元素 set 是基于 map 以 value 为空 struct 实现的构造,保障去重
dirty set
// 以后正在解决中的元素
processing set
...
}
type empty struct{}
type t interface{}
type set map[t]empty
根本队列的 hello world 也很简略
wq := workqueue.New()
wq.Add("hello")
v, _ := wq.Get()
根本队列 Add
func (q *Type) Add(item interface{}) {q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果以后处于敞开状态, 则不再新增元素
if q.shuttingDown {return}
// 如果元素曾经在期待解决中, 则不再新增
if q.dirty.has(item) {return}
// 增加到 metrics
q.metrics.add(item)
// 退出期待解决中
q.dirty.insert(item)
// 如果目前正在解决该元素 就不将元素增加到队列
if q.processing.has(item) {return}
q.queue = append(q.queue, item)
q.cond.Signal()}
根本队列 Get
func (q *Type) Get() (item interface{}, shutdown bool) {q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果以后没有元素并且不处于敞开状态, 则阻塞
for len(q.queue) == 0 && !q.shuttingDown {q.cond.Wait()
}
...
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
// 把元素增加到正在解决队列中
q.processing.insert(item)
// 把队列从期待解决队列中删除
q.dirty.delete(item)
return item, false
}
根本队列实例化
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
clock: c,
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
// 启动一个协程 定时更新 metrics
go t.updateUnfinishedWorkLoop()
return t
}
func (q *Type) updateUnfinishedWorkLoop() {t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C() {if !func() bool {q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown {q.metrics.updateUnfinishedWork()
return true
}
return false
}() {return}
}
}
提早队列
提早队列的实现思路次要是应用优先队列寄存须要提早增加的元素, 每次判断最小提早的元素书否曾经达到了退出队列的要求 (提早的工夫到了), 如果是则判断下一个元素, 直到没有元素或者元素还须要提早为止。
看一下提早队列的数据结构
type delayingType struct {
Interface
...
// 搁置提早增加的元素
waitingForAddCh chan *waitFor
...
}
次要是应用 chan 来保留提早增加的元素, 而具体实现是通过一个实现了一个 AddAfter 办法,看一下具体的内容
// 提早队列的接口
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
...
// 如果提早实现小于等于 0 间接增加到队列
if duration <= 0 {q.Add(item)
return
}
select {
case <-q.stopCh:
// 增加到 chan, 上面会讲一下这个 chan 的解决
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
提早元素的解决
func (q *delayingType) waitingLoop() {defer utilruntime.HandleCrash()
never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
waitingForQueue := &waitForPriorityQueue{}
// 这里是初始化一个优先队列 具体实现有趣味的同学能够钻研下
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {if q.Interface.ShuttingDown() {return}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() > 0 {entry := waitingForQueue.Peek().(*waitFor)
// 看一下第一个元素是否曾经达到提早的工夫了
if entry.readyAt.After(now) {break}
// 工夫到了, 将元素增加到工作的队列, 并且从提早的元素中移除
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {nextReadyAtTimer.Stop()
}
// 如果还有须要提早的元素, 计算第一个元素的延迟时间 (最小提早的元素)
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// 定时查看下是否有元素达到提早的工夫
case <-nextReadyAt:
// 这里是下面计算出来的工夫, 工夫到了, 解决达到延迟时间的元素
case waitEntry := <-q.waitingForAddCh:
// 查看是否须要提早, 如果须要提早就退出到提早期待
if waitEntry.readyAt.After(q.clock.Now()) {insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
// 如果不须要提早就间接增加到队列
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {case waitEntry := <-q.waitingForAddCh:
下面 waitingLoop 是在实例化提早队列的时候调用的,看一下实例化时候的逻辑
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
// 实例化一个数据结构
ret := &delayingType{Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
// 放到一个协程中解决提早元素
go ret.waitingLoop()
return ret
}
限速队列
以后限速队列反对 4 中限速模式
- 令牌桶算法限速
- 排队指数限速
- 计数器模式
- 混合模式 (多种限速算法同时应用)
限速队列的底层实际上还是通过提早队列来进行限速, 通过计算出元素的限速工夫作为延迟时间
来看一下限速接口
type RateLimiter interface {
//
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
看一下限速队列的数据结构
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// 实际上底层还是调用的提早队列, 通过计算出元素的延迟时间 进行限速
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
// 通过 when 办法计算提早退出队列的工夫
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
令牌桶算法
client-go 中的令牌桶限速是通过 golang.org/x/time/rat 包来实现的
能够通过 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 来应用令牌桶限速算法,其中第一个参数 qps 示意每秒补充多少 token,burst 示意总 token 下限为多少。
排队指数算法
排队指数能够通过 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 来应用。
这个算法有两个参数:
- baseDelay 根底限速工夫
- maxDelay 最大限速工夫
举个例子来了解一下这个算法,例如疾速插入 5 个雷同元素,baseDelay 设置为 1 秒,maxDelay 设置为 10 秒,都在同一个限速期内。第一个元素会在 1 秒后退出到队列,第二个元素会在 2 秒后退出到队列,第三个元素会在 4 秒后退出到队列,第四个元素会在 8 秒后退出到队列,第五个元素会在 10 秒后退出到队列 (指数计算的后果为 16,然而最大值设置了 10 秒)。
来看一下源码的计算
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 第一次为 0
exp := r.failures[item]
// 累加 1
r.failures[item] = r.failures[item] + 1
// 通过以后计数和 baseDelay 计算指数后果 baseDelay*(2 的 exp 次方)
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {return r.maxDelay}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {return r.maxDelay}
return calculated
}
计数器模式
计数器模式能够通过 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) 来应用,有三个参数
- fastDelay 快限速工夫
- slowDelay 慢限速工夫
- maxFastAttempts 快限速元素个数
原理是这样的,假如 fastDelay 设置为 1 秒,slowDelay 设置为 10 秒,maxFastAttempts 设置为 3,同样在一个限速周期内疾速插入 5 个雷同的元素。前三个元素都是以 1 秒的限速工夫退出到队列,增加第四个元素时开始应用 slowDelay 限速工夫,也就是 10 秒后退出到队列,前面的元素都将以 10 秒的限速工夫退出到队列,直到限速周期完结。
来看一下源码
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 增加一次就计数一次
r.failures[item] = r.failures[item] + 1
// 计数小于 maxFastAttempts 都以 fastDelay 为限速工夫,否则以 slowDelay 为限速工夫
if r.failures[item] <= r.maxFastAttempts {return r.fastDelay}
return r.slowDelay
}
混合模式
最初一种是混合模式,能够组合应用不同的限速算法实例化限速队列
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {return &MaxOfRateLimiter{limiters: limiters}
}
总结
在 k8s-client-go 的源码中能够看到,大量的接口组合使用,将各种性能拆分成各个细小的库,是一种十分值得学习的代码格调以及思路。
始发于 四颗咖啡豆, 转载请申明出处.
关注公粽号 ->[四颗咖啡豆] 获取最新内容