kubernetes client-go提供了Informer,让客户端能够监听kubernetes资源对象的变更。
Informer中有一个Resync机制,依照Resync工夫定期的对资源进行同步。
一.Informer的Resync
Informer中的Reflector会List/Watch apiserver中的资源变动(event),将其放入DeltaFIFO中,同时会更新Indexer本地缓存;
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync)}
Resync是定期将Indexer缓存中的事件同步到DeltaFIFO中。
Informer的事件监听函数,在处理事件时,可能存在解决失败的状况,定期的Resync让这些事件有个从新OnUpdate解决的机会。
二.Resync的源码
// k8s.io/client-go/tools/cache/delta_fifo.go// 从新同步一次 Indexer 缓存数据到 Delta FIFO 队列中func (f *DeltaFIFO) Resync() error { ... // 遍历 indexer 中的 key,传入 syncKeyLocked 中解决 keys := f.knownObjects.ListKeys() for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err } } return nil}
sync的过程:
func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) ... id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 如果发现 FIFO 队列中曾经有雷同 key 的 event 进来了,阐明该资源对象有了新的 event, // 在 Indexer 中旧的缓存应该生效,因而不做 Resync 解决间接返回 nil if len(f.items[id]) > 0 { return nil } // 否则,从新放入 FIFO 队列中 if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil}
Informer对于sync事件的解决:
- 会生成一个OnUpdate()事件;
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { ... // from oldest to newest for _, d := range obj.(Deltas) { // 判断事件类型,看事件是通过新增、更新、替换、删除还是 Resync 从新同步产生的 switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // 如果是通过 Resync 从新同步失去的事件则做个标记 isSync = true case d.Type == Replaced: ... } // 如果是通过 Resync 从新同步失去的事件,则触发 onUpdate 回调 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil}
参考:
1.Resync机制的引入:https://github.com/cloudnativeto/sig-kubernetes/issues/11