关于kubernetes:kubernetes-Informer的Resync机制

54次阅读

共计 2115 个字符,预计需要花费 6 分钟才能阅读完成。

kubernetes client-go 提供了 Informer,让客户端能够监听 kubernetes 资源对象的变更。

Informer 中有一个 Resync 机制,依照 Resync 工夫定期的对资源进行同步。

一.Informer 的 Resync

Informer 中的 Reflector 会 List/Watch apiserver 中的资源变动 (event),将其放入 DeltaFIFO 中,同时会更新 Indexer 本地缓存;

func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

Resync 是定期将 Indexer 缓存中的事件同步到 DeltaFIFO 中。

Informer 的事件监听函数,在处理事件时,可能存在解决失败的状况,定期的 Resync 让这些事件有个从新 OnUpdate 解决的机会。

二.Resync 的源码

// k8s.io/client-go/tools/cache/delta_fifo.go
// 从新同步一次 Indexer 缓存数据到 Delta FIFO 队列中
func (f *DeltaFIFO) Resync() error {
    ...
    // 遍历 indexer 中的 key,传入 syncKeyLocked 中解决
    keys := f.knownObjects.ListKeys()
    for _, k := range keys {if err := f.syncKeyLocked(k); err != nil {return err}
    }
    return nil
}

sync 的过程:

func (f *DeltaFIFO) syncKeyLocked(key string) error {obj, exists, err := f.knownObjects.GetByKey(key)
    ...
    id, err := f.KeyOf(obj)
    if err != nil {return KeyError{obj, err}
    }
    // 如果发现 FIFO 队列中曾经有雷同 key 的 event 进来了,阐明该资源对象有了新的 event,// 在 Indexer 中旧的缓存应该生效,因而不做 Resync 解决间接返回 nil
    if len(f.items[id]) > 0 {return nil}
    // 否则,从新放入 FIFO 队列中
    if err := f.queueActionLocked(Sync, obj); err != nil {return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}

Informer 对于 sync 事件的解决:

  • 会生成一个 OnUpdate() 事件;
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    ...
    // from oldest to newest
    for _, d := range obj.(Deltas) {
        // 判断事件类型,看事件是通过新增、更新、替换、删除还是 Resync 从新同步产生的
        switch d.Type {
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}
                isSync := false
                switch {
                case d.Type == Sync:
                    // 如果是通过 Resync 从新同步失去的事件则做个标记
                    isSync = true
                case d.Type == Replaced:
                    ...
                }
                // 如果是通过 Resync 从新同步失去的事件,则触发 onUpdate 回调
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {if err := s.indexer.Add(d.Object); err != nil {return err}
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {return err}
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

参考:

1.Resync 机制的引入:https://github.com/cloudnativeto/sig-kubernetes/issues/11

正文完
 0