关于kubernetes:kubernetes-deltafifo源码解析

39次阅读

共计 8319 个字符,预计需要花费 21 分钟才能阅读完成。

kubernetes delta_fifo 源码解析
1. 介绍
kubernetes delta_fifo 是一个先入先出队列,相较于 fifo,有两点不同:

• 与 key 相关联的不间接是 obj,而是 Deltas,它是一个切片,Delta 不仅蕴含了 obj,还蕴含了 DeltaType
• 当 Deltas 最初一个元素 Delta.DeltaType 曾经是 Deleted 类型时,再增加一个 Deleted 类型的 Delta,Deltas 不再新增 delta_fifo 的 API 与 fifo 类型,不再具体分析
2. 应用
参考 TestDeltaFIFO_ReplaceMakesDeletions[1] `go // 取 testFifoObject 中 name 作为 key func testFifoObjectKeyFunc(obj interface{}) (string, error) {return obj.(testFifoObject).name, nil }
type testFifoObject struct {name string val interface{} }

func mkFifoObj(name string, val interface{}) testFifoObject {return testFifoObject{name: name, val: val} }

// literalListerGetter 实现了 KeyListerGetter 接口 type literalListerGetter func() []testFifoObject

var _ KeyListerGetter = literalListerGetter(nil)

func (kl literalListerGetter) ListKeys() []string {result := []string{} for _, fifoObj := range kl() {result = append(result, fifoObj.name) } return result }

func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) {for _, v := range kl() {if v.name == key { return v, true, nil} } return nil, false, nil }

func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, KnownObjects: literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj(“foo”, 5), mkFifoObj(“bar”, 6), mkFifoObj(“baz”, 7)} }), }) // 删除 f.Delete(mkFifoObj(“baz”, 10)) // 替换,f.emitDeltaTypeReplaced 为 false 时 action 为 Sync,否则 action 为 Replace f.Replace([]interface{}{mkFifoObj(“foo”, 5)}, “0”) // 冀望的列表 expectedList := []Deltas{ {{Deleted, mkFifoObj(“baz”, 10)}}, {{Sync, mkFifoObj(“foo”, 5)}}, {{Deleted, DeletedFinalStateUnknown{Key: “bar”, Obj: mkFifoObj(“bar”, 6)}}}, }

for _, expected := range expectedList {

cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {t.Errorf("Expected %#v, got %#v", e, a)
}

}

f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {

    return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),

})
f.Add(mkFifoObj(“baz”, 10))
f.Replace([]interface{}{mkFifoObj(“foo”, 5)}, “0”)

expectedList = []Deltas{

{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
{{Sync, mkFifoObj("foo", 5)}},
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},

}

for _, expected := range expectedList {

cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {t.Errorf("Expected %#v, got %#v", e, a)
}

}

f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj(“baz”, 10))
f.Replace([]interface{}{mkFifoObj(“foo”, 5)}, “0”)

expectedList = []Deltas{

{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
{{Sync, mkFifoObj("foo", 5)}},

}

for _, expected := range expectedList {

cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {t.Errorf("Expected %#v, got %#v", e, a)
}

}
}

3. 源码解析

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
    if opts.KeyFunction == nil {opts.KeyFunction = MetaNamespaceKeyFunc}
    
    f := &DeltaFIFO{items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      opts.KeyFunction,
        knownObjects: opts.KnownObjects,
        
        emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
    }
    f.cond.L = &f.lock
    return f
}
// 计算 obj 对应的 key
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    // 如果 obj 为 Deltas 类型
    if d, ok := obj.(Deltas); ok {
        // 如果没有值,抛 err
        if len(d) == 0 {return "", KeyError{obj, ErrZeroLengthDeltasObject}
        }
        // 取最新的 obj
        obj = d.Newest().Object}
    // 如果 obj 为 DeletedFinalStateUnknown 类型,则间接返回 DeletedFinalStateUnknown.Key
    if d, ok := obj.(DeletedFinalStateUnknown); ok {return d.Key, nil}
    // 否则,应用 keyFunc
    return f.keyFunc(obj)
}

func (d Deltas) Newest() *Delta {if n := len(d); n > 0 {return &d[n-1]
    }
    return nil
}

// Delete 办法增加 Deleted 类型的 Delta,如果 f.knownObjects 为 nil 并且 obj 不存在时,不做解决;如果 f.knownObjects 不为 nil,且 f.knownObjects.GetByKey(id)不存在并且 f.items[id]不存在,不做解决
func (f *DeltaFIFO) Delete(obj interface{}) error {
    // 计算 obj 对应的 key
    id, err := f.KeyOf(obj)
    if err != nil {return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    if f.knownObjects == nil {
        // 如果 f.items 不存在则不解决
        if _, exists := f.items[id]; !exists {return nil}
    } else {_, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        // 如果 f.knownObjects.GetByKey(id)和 f.items[id]都不存在,则不解决
        if err == nil && !exists && !itemsExist {return nil}
    }
    // Deleted 类型入队
    return f.queueActionLocked(Deleted, obj)
}

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // 计算 obj 对应的 key
    id, err := f.KeyOf(obj)
    if err != nil {return KeyError{obj, err}
    }
    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{actionType, obj})
    // delete 类型是否反复了
    newDeltas = dedupDeltas(newDeltas)
    
    if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()} else {
        // 失常状况,不应该走到这个分支
        if oldDeltas == nil {klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
            return nil
        }
        klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
        f.items[id] = newDeltas
        return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
    }
    return nil
}

func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)
    if n < 2 {return deltas}
    a := &deltas[n-1]
    b := &deltas[n-2]
    if out := isDup(a, b); out != nil {deltas[n-2] = *out
        return deltas[:n-1]
    }
    return deltas
}

func isDup(a, b *Delta) *Delta {
    // 是否删除类型反复
    if out := isDeletionDup(a, b); out != nil {return out}
    return nil
}

func isDeletionDup(a, b *Delta) *Delta {
    if b.Type != Deleted || a.Type != Deleted {return nil}
    // 都为 delete 类型,并且 b.Object 是 DeletedFinalStateUnknown 类型,则保留 a,否则保留 b
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a}
    return b
}
// Replace 逻辑如下: (1) 增加 Sync 或 Replace Delta 类型对象
// (2) 删除操作:对于每个曾经存在的 keys,但不存在于 list 中的对象,增加 Delete(DeletedFinalStateUnknown{K, O})对象,其中 O 是 K 关联的对象;// 如果 f.knownObjects 为空,曾经存在的 keys 是 f.items,O 是 K 关联的 Deltas.Newest();// 如果 f.knownObjects 不为空,曾经存在的 keys 是 f.knownObjects,O 是 f.knownObjects.GetByKey(K)的返回值
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))
    
    // 兼容老版本的客户端
    action := Sync
    if f.emitDeltaTypeReplaced {action = Replaced}
    
    for _, item := range list {key, err := f.KeyOf(item)
        if err != nil {return KeyError{item, err}
        }
        keys.Insert(key)
        // 每个 list 中的 item 增加 Sync/Replaced 类型
        if err := f.queueActionLocked(action, item); err != nil {return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }
    
    if f.knownObjects == nil {
        // Do deletion detection against our own list.
        queuedDeletions := 0
        for k, oldItem := range f.items {if keys.Has(k) {continue}

            var deletedObj interface{}
            // 取最新的一个 obj
            if n := oldItem.Newest(); n != nil {deletedObj = n.Object}
            queuedDeletions++
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {return err}
        }
    
        if !f.populated {
            f.populated = true
            f.initialPopulationCount = keys.Len() + queuedDeletions}
        
        return nil
    }
    
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {if keys.Has(k) {continue}
    // 取 f.knownObjects.GetByKey 的返回值
    deletedObj, exists, err := f.knownObjects.GetByKey(k)
    if err != nil {
        deletedObj = nil
        klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
    } else if !exists {
        deletedObj = nil
        klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
    }
    queuedDeletions++
    if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {return err}
    }
    
    if !f.populated {
        f.populated = true
        f.initialPopulationCount = keys.Len() + queuedDeletions}
    
    return nil
}
// Add 增加一个 Added 类型的 obj
func (f *DeltaFIFO) Add(obj interface{}) error {f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}
// Pop 按 added/updated 程序返回一个 Deltas,如果队列为空则阻塞
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()
    defer f.lock.Unlock()
    for {for len(f.queue) == 0 {// 当队列为空时,除了入队以外,也能够调用 Close()退出循环
            if f.closed {return nil, ErrFIFOClosed}

            f.cond.Wait()}
        // 取队头元素,先入先出
        id := f.queue[0]
        f.queue = f.queue[1:]
        depth := len(f.queue)
        if f.initialPopulationCount > 0 {f.initialPopulationCount--}
        item, ok := f.items[id]
        if !ok {
            // 不应该不存在 f.items 中
            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            continue
        }
        delete(f.items, id)
        // 当队列深度大于 10 的时候,关上 trace 日志
        if depth > 10 {
            trace := utiltrace.New("DeltaFIFO Pop Process",
                utiltrace.Field{Key: "ID", Value: id},
                utiltrace.Field{Key: "Depth", Value: depth},
                utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
            defer trace.LogIfLong(100 * time.Millisecond)
        }
        // 调用 PopProcessFunc 函数解决 item,返回 ErrRequeue 时,重入队
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)
            err = e.Err
        }
        // 间接返回 item,不进行深拷贝,将 item 的所有权转移给调用者
        return item, err
    }
}
4. 总结
kubernetes delta_fifo 在实现先入先出队列思路上与 kubernetes fifo 相似,但其反对与 key 相关联事件入队,保留多个事件,是 informer 机制的根底

援用链接

正文完
 0