共计 2115 个字符,预计需要花费 6 分钟才能阅读完成。
kubernetes client-go 提供了 Informer,让客户端能够监听 kubernetes 资源对象的变更。
Informer 中有一个 Resync 机制,依照 Resync 工夫定期的对资源进行同步。
一.Informer 的 Resync
Informer 中的 Reflector 会 List/Watch apiserver 中的资源变动 (event),将其放入 DeltaFIFO 中,同时会更新 Indexer 本地缓存;
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
Resync 是定期将 Indexer 缓存中的事件同步到 DeltaFIFO 中。
Informer 的事件监听函数,在处理事件时,可能存在解决失败的状况,定期的 Resync 让这些事件有个从新 OnUpdate 解决的机会。
二.Resync 的源码
// k8s.io/client-go/tools/cache/delta_fifo.go
// 从新同步一次 Indexer 缓存数据到 Delta FIFO 队列中
func (f *DeltaFIFO) Resync() error {
...
// 遍历 indexer 中的 key,传入 syncKeyLocked 中解决
keys := f.knownObjects.ListKeys()
for _, k := range keys {if err := f.syncKeyLocked(k); err != nil {return err}
}
return nil
}
sync 的过程:
func (f *DeltaFIFO) syncKeyLocked(key string) error {obj, exists, err := f.knownObjects.GetByKey(key)
...
id, err := f.KeyOf(obj)
if err != nil {return KeyError{obj, err}
}
// 如果发现 FIFO 队列中曾经有雷同 key 的 event 进来了,阐明该资源对象有了新的 event,// 在 Indexer 中旧的缓存应该生效,因而不做 Resync 解决间接返回 nil
if len(f.items[id]) > 0 {return nil}
// 否则,从新放入 FIFO 队列中
if err := f.queueActionLocked(Sync, obj); err != nil {return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
Informer 对于 sync 事件的解决:
- 会生成一个 OnUpdate() 事件;
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
...
// from oldest to newest
for _, d := range obj.(Deltas) {
// 判断事件类型,看事件是通过新增、更新、替换、删除还是 Resync 从新同步产生的
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}
isSync := false
switch {
case d.Type == Sync:
// 如果是通过 Resync 从新同步失去的事件则做个标记
isSync = true
case d.Type == Replaced:
...
}
// 如果是通过 Resync 从新同步失去的事件,则触发 onUpdate 回调
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {if err := s.indexer.Add(d.Object); err != nil {return err}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {return err}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
参考:
1.Resync 机制的引入:https://github.com/cloudnativeto/sig-kubernetes/issues/11
正文完
发表至: kubernetes
2023-04-09