kubernetes client-go提供了Informer,让客户端能够监听kubernetes资源对象的变更。

Informer中有一个Resync机制,依照Resync工夫定期的对资源进行同步。

一.Informer的Resync

Informer中的Reflector会List/Watch apiserver中的资源变动(event),将其放入DeltaFIFO中,同时会更新Indexer本地缓存;

func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {        return NewSharedInformerFactoryWithOptions(client, defaultResync)}

Resync是定期将Indexer缓存中的事件同步到DeltaFIFO中。

Informer的事件监听函数,在处理事件时,可能存在解决失败的状况,定期的Resync让这些事件有个从新OnUpdate解决的机会。

二.Resync的源码

// k8s.io/client-go/tools/cache/delta_fifo.go// 从新同步一次 Indexer 缓存数据到 Delta FIFO 队列中func (f *DeltaFIFO) Resync() error {    ...    // 遍历 indexer 中的 key,传入 syncKeyLocked 中解决    keys := f.knownObjects.ListKeys()    for _, k := range keys {        if err := f.syncKeyLocked(k); err != nil {            return err        }    }    return nil}

sync的过程:

func (f *DeltaFIFO) syncKeyLocked(key string) error {    obj, exists, err := f.knownObjects.GetByKey(key)    ...    id, err := f.KeyOf(obj)    if err != nil {        return KeyError{obj, err}    }    // 如果发现 FIFO 队列中曾经有雷同 key 的 event 进来了,阐明该资源对象有了新的 event,    // 在 Indexer 中旧的缓存应该生效,因而不做 Resync 解决间接返回 nil    if len(f.items[id]) > 0 {        return nil    }    // 否则,从新放入 FIFO 队列中    if err := f.queueActionLocked(Sync, obj); err != nil {        return fmt.Errorf("couldn't queue object: %v", err)    }    return nil}

Informer对于sync事件的解决:

  • 会生成一个OnUpdate()事件;
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {    ...    // from oldest to newest    for _, d := range obj.(Deltas) {        // 判断事件类型,看事件是通过新增、更新、替换、删除还是 Resync 从新同步产生的        switch d.Type {        case Sync, Replaced, Added, Updated:            s.cacheMutationDetector.AddObject(d.Object)            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {                if err := s.indexer.Update(d.Object); err != nil {                    return err                }                isSync := false                switch {                case d.Type == Sync:                    // 如果是通过 Resync 从新同步失去的事件则做个标记                    isSync = true                case d.Type == Replaced:                    ...                }                // 如果是通过 Resync 从新同步失去的事件,则触发 onUpdate 回调                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)            } else {                if err := s.indexer.Add(d.Object); err != nil {                    return err                }                s.processor.distribute(addNotification{newObj: d.Object}, false)            }        case Deleted:            if err := s.indexer.Delete(d.Object); err != nil {                return err            }            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)        }    }    return nil}

参考:

1.Resync机制的引入:https://github.com/cloudnativeto/sig-kubernetes/issues/11