kubernetes delta_fifo源码解析
1.介绍
kubernetes delta_fifo是一个先入先出队列,相较于fifo,有两点不同:

• 与key相关联的不间接是obj,而是Deltas,它是一个切片,Delta不仅蕴含了obj,还蕴含了DeltaType
• 当Deltas最初一个元素Delta.DeltaType曾经是Deleted类型时,再增加一个Deleted类型的Delta,Deltas不再新增 delta_fifo的API与fifo类型,不再具体分析
2.应用
参考TestDeltaFIFO_ReplaceMakesDeletions[1] `go // 取testFifoObject中name作为key func testFifoObjectKeyFunc(obj interface{}) (string, error) { return obj.(testFifoObject).name, nil }
type testFifoObject struct { name string val interface{} }

func mkFifoObj(name string, val interface{}) testFifoObject { return testFifoObject{name: name, val: val} }

// literalListerGetter实现了KeyListerGetter接口 type literalListerGetter func() []testFifoObject

var _ KeyListerGetter = literalListerGetter(nil)

func (kl literalListerGetter) ListKeys() []string { result := []string{} for _, fifoObj := range kl() { result = append(result, fifoObj.name) } return result }

func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) { for _, v := range kl() { if v.name == key { return v, true, nil } } return nil, false, nil }

func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, KnownObjects: literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), }) // 删除 f.Delete(mkFifoObj("baz", 10)) // 替换,f.emitDeltaTypeReplaced为false时action为Sync,否则action为Replace f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") // 冀望的列表 expectedList := []Deltas{ {{Deleted, mkFifoObj("baz", 10)}}, {{Sync, mkFifoObj("foo", 5)}}, {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, }

for _, expected := range expectedList {

cur := Pop(f).(Deltas)if e, a := expected, cur; !reflect.DeepEqual(e, a) {    t.Errorf("Expected %#v, got %#v", e, a)}

}

f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {

    return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}}),

})
f.Add(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")

expectedList = []Deltas{

{{Added, mkFifoObj("baz", 10)},{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},{{Sync, mkFifoObj("foo", 5)}},{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},

}

for _, expected := range expectedList {

cur := Pop(f).(Deltas)if e, a := expected, cur; !reflect.DeepEqual(e, a) {    t.Errorf("Expected %#v, got %#v", e, a)}

}

f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")

expectedList = []Deltas{

{{Added, mkFifoObj("baz", 10)},{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},{{Sync, mkFifoObj("foo", 5)}},

}

for _, expected := range expectedList {

cur := Pop(f).(Deltas)if e, a := expected, cur; !reflect.DeepEqual(e, a) {    t.Errorf("Expected %#v, got %#v", e, a)}

}
}

3.源码解析

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {    if opts.KeyFunction == nil {        opts.KeyFunction = MetaNamespaceKeyFunc    }        f := &DeltaFIFO{        items:        map[string]Deltas{},        queue:        []string{},        keyFunc:      opts.KeyFunction,        knownObjects: opts.KnownObjects,                emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,    }    f.cond.L = &f.lock    return f}// 计算obj对应的keyfunc (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {    // 如果obj为Deltas类型    if d, ok := obj.(Deltas); ok {        // 如果没有值,抛err        if len(d) == 0 {            return "", KeyError{obj, ErrZeroLengthDeltasObject}        }        // 取最新的obj        obj = d.Newest().Object    }    // 如果obj为DeletedFinalStateUnknown类型,则间接返回DeletedFinalStateUnknown.Key    if d, ok := obj.(DeletedFinalStateUnknown); ok {        return d.Key, nil    }    // 否则,应用keyFunc    return f.keyFunc(obj)}func (d Deltas) Newest() *Delta {    if n := len(d); n > 0 {        return &d[n-1]    }    return nil}// Delete办法增加Deleted类型的Delta,如果f.knownObjects为nil并且obj不存在时,不做解决;如果f.knownObjects不为nil,且f.knownObjects.GetByKey(id)不存在并且f.items[id]不存在,不做解决func (f *DeltaFIFO) Delete(obj interface{}) error {    // 计算obj对应的key    id, err := f.KeyOf(obj)    if err != nil {        return KeyError{obj, err}    }    f.lock.Lock()    defer f.lock.Unlock()    f.populated = true    if f.knownObjects == nil {        // 如果f.items不存在则不解决        if _, exists := f.items[id]; !exists {            return nil        }    } else {        _, exists, err := f.knownObjects.GetByKey(id)        _, itemsExist := f.items[id]        // 如果f.knownObjects.GetByKey(id)和f.items[id]都不存在,则不解决        if err == nil && !exists && !itemsExist {            return nil        }    }    // Deleted类型入队    return f.queueActionLocked(Deleted, obj)}func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {    // 计算obj对应的key    id, err := f.KeyOf(obj)    if err != nil {        return KeyError{obj, err}    }    oldDeltas := f.items[id]    newDeltas := append(oldDeltas, Delta{actionType, obj})    // delete类型是否反复了    newDeltas = dedupDeltas(newDeltas)        if len(newDeltas) > 0 {        if _, exists := f.items[id]; !exists {            f.queue = append(f.queue, id)        }        f.items[id] = newDeltas        f.cond.Broadcast()    } else {        // 失常状况,不应该走到这个分支        if oldDeltas == nil {            klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)            return nil        }        klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)        f.items[id] = newDeltas        return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)    }    return nil}func dedupDeltas(deltas Deltas) Deltas {    n := len(deltas)    if n < 2 {        return deltas    }    a := &deltas[n-1]    b := &deltas[n-2]    if out := isDup(a, b); out != nil {        deltas[n-2] = *out        return deltas[:n-1]    }    return deltas}func isDup(a, b *Delta) *Delta {    // 是否删除类型反复    if out := isDeletionDup(a, b); out != nil {        return out    }    return nil}func isDeletionDup(a, b *Delta) *Delta {    if b.Type != Deleted || a.Type != Deleted {        return nil    }    // 都为delete类型,并且b.Object是DeletedFinalStateUnknown类型,则保留a,否则保留b    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {        return a    }    return b}// Replace逻辑如下: (1) 增加Sync或Replace Delta类型对象// (2) 删除操作:对于每个曾经存在的keys,但不存在于list中的对象,增加Delete(DeletedFinalStateUnknown{K, O})对象,其中O是K关联的对象;// 如果f.knownObjects为空, 曾经存在的keys是f.items,O是K关联的Deltas.Newest();// 如果f.knownObjects不为空,曾经存在的keys是f.knownObjects,O是f.knownObjects.GetByKey(K)的返回值func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {    f.lock.Lock()    defer f.lock.Unlock()    keys := make(sets.String, len(list))        // 兼容老版本的客户端    action := Sync    if f.emitDeltaTypeReplaced {        action = Replaced    }        for _, item := range list {        key, err := f.KeyOf(item)        if err != nil {            return KeyError{item, err}        }        keys.Insert(key)        // 每个list中的item增加Sync/Replaced类型        if err := f.queueActionLocked(action, item); err != nil {            return fmt.Errorf("couldn't enqueue object: %v", err)        }    }        if f.knownObjects == nil {        // Do deletion detection against our own list.        queuedDeletions := 0        for k, oldItem := range f.items {            if keys.Has(k) {                continue            }            var deletedObj interface{}            // 取最新的一个obj            if n := oldItem.Newest(); n != nil {                deletedObj = n.Object            }            queuedDeletions++            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {                return err            }        }            if !f.populated {            f.populated = true            f.initialPopulationCount = keys.Len() + queuedDeletions        }                return nil    }        knownKeys := f.knownObjects.ListKeys()    queuedDeletions := 0    for _, k := range knownKeys {        if keys.Has(k) {        continue    }    // 取f.knownObjects.GetByKey的返回值    deletedObj, exists, err := f.knownObjects.GetByKey(k)    if err != nil {        deletedObj = nil        klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)    } else if !exists {        deletedObj = nil        klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)    }    queuedDeletions++    if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {        return err        }    }        if !f.populated {        f.populated = true        f.initialPopulationCount = keys.Len() + queuedDeletions    }        return nil}// Add 增加一个Added类型的objfunc (f *DeltaFIFO) Add(obj interface{}) error {    f.lock.Lock()    defer f.lock.Unlock()    f.populated = true    return f.queueActionLocked(Added, obj)}// Pop按added/updated程序返回一个Deltas,如果队列为空则阻塞func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {    f.lock.Lock()    defer f.lock.Unlock()    for {        for len(f.queue) == 0 {            // 当队列为空时,除了入队以外,也能够调用Close()退出循环            if f.closed {                return nil, ErrFIFOClosed            }            f.cond.Wait()        }        // 取队头元素,先入先出        id := f.queue[0]        f.queue = f.queue[1:]        depth := len(f.queue)        if f.initialPopulationCount > 0 {            f.initialPopulationCount--        }        item, ok := f.items[id]        if !ok {            // 不应该不存在f.items中            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)            continue        }        delete(f.items, id)        // 当队列深度大于10的时候,关上trace日志        if depth > 10 {            trace := utiltrace.New("DeltaFIFO Pop Process",                utiltrace.Field{Key: "ID", Value: id},                utiltrace.Field{Key: "Depth", Value: depth},                utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})            defer trace.LogIfLong(100 * time.Millisecond)        }        // 调用PopProcessFunc函数解决item,返回ErrRequeue时,重入队        err := process(item)        if e, ok := err.(ErrRequeue); ok {            f.addIfNotPresent(id, item)            err = e.Err        }        // 间接返回item,不进行深拷贝,将item的所有权转移给调用者        return item, err    }}4.总结kubernetes delta_fifo在实现先入先出队列思路上与kubernetes fifo相似,但其反对与key相关联事件入队,保留多个事件,是informer机制的根底援用链接