关于kubernetes:kubernetes-clientgo的Informer实现分析

28次阅读

共计 9437 个字符,预计需要花费 24 分钟才能阅读完成。

client-go 提供了 Informer 机制,client 能够指定监听的资源对象类型,而后 Informer:

  • 首先,全量 list 所有的该类型的对象;
  • 而后,监听该类型对象的变动 (Add/Update/Delete),并调用用户注册的 Handler;

一. 实现机制

由 sharedIndexInformer 类型实现,整体实现机制如下图所示:

  • Reflector 负责向 apiserver list&watch 资源对象;
  • Reflector list&watch 到的对象及其变动,被放入 DeltaFIFO 队列;
  • Informer 负责 pop DeltaFIFO 队列的变动事件,而后:

    • 更新对象到 Indexer;
    • 调用用户注册的 ResourceEventHandler;

二. 源码框架

整体对外接口:sharedInformerFactory,通过 map 让各种 resourceType 的 informer 能够共享;

客户端调用创立进去的 informer 对象:sharedIndexInformer,其中包含:

  • indexer Indexer: 索引 & 存储;
  • controller Controller:

    • 调用 Reflector&FIFO&indexer 实现事件的散发、LocalStore 的保护;
    • 将 Reflector&FIFO 的逻辑串联起来;
  • processor *sharedProcessor: 利用散发的事件,调用 client 的 ResourceEventHandler;
  • listwatcher ListerWatcher: 被 reflector.Run() 调用,进行资源对象的 List&Watch;
  • objectType runtime.Object: 监听的对象类型;

三. 源码:sharedInformerFactory 如何 share informer

client 创立 Informer 的过程:

  • 先创立 sharedInformerFactory;
  • 再用 factory 创立 informer;
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 10*time.Second, informers.WithNamespace(""))
factory.Core().V1().Pods().Informer()

sharedInformerFactory 的构造:

  • informers:保留了所有已创立的 sharedIndexInformer;
  • startedInformers: 保留了所有已启动的 sharedIndexInformer:
// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
    ...
    informers map[reflect.Type]cache.SharedIndexInformer
    startedInformers map[reflect.Type]bool
    ...
}

底层 client-go 创立 podInformer:

// staging/src/k8s.io/client-go/informers/core/v1/interface.go
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// staging/src/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

InformerFor() 执行创立,并将 podInformer 增加到 factory:

  • 若 objectType 的 informer 曾经存在,则间接返回;
  • 否则,创立 informer,并增加到 factory.informers;
// staging/src/k8s.io/client-go/informer/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    .......
    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    //obj 的 informer 曾经存在
    if exists {return informer}
    .....
    // 创立 informer 并增加到 informers
    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer
    return informer
}

三. 源码:sharedIndexInformer.Run() 流程

Run() 做初始化并调用其它组件 run:

  • 创立 fifo,它对应 delta 存储;
  • 创立 controller;
  • 调用 process.run(): 执行 client 的 ResourceEventHandler;
  • 调用 controller.Run(): 执行 list&watch–> 更新 fifo–>[ 更新 indexer,散发 notification];
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    ...
    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{        // 创立 fifo
        KnownObjects:          s.indexer,
        EmitDeltaTypeReplaced: true,
    })
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,
        Process: s.HandleDeltas,
    }

    func() {
        // 创立 controller
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()
    wg.StartWithChannel(processorStopCh, s.processor.run)  // process.run(),执行 ResourceEventHandler

    s.controller.Run(stopCh)        // controller.Run(): 主逻辑}

四. 源码:controller.Run() 流程

controller 整合了 reflector/fifo/indexer 等各大组件:

  • 应用 reflector.Run() 进行 list&watcher,将变更存入 fifo;
  • 应用 processLoop() 进行 fifo.Pop(),解决变更:

    • 应用 indexer 进行对象的更新;
    • 应用 processor 进行变更告诉的散发;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
    ....
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    c.reflector = r

    var wg wait.Group
    defer wg.Wait()
    // 执行 reflector.Run():进行 List&watch
    wg.StartWithChannel(stopCh, r.Run)
    // 执行 processLoop(): 更新 indexer + 散发事件告诉
    wait.Until(c.processLoop, time.Second, stopCh)
}

1. reflector.Run()

调用 ListAndWatch() 监听 apiserver 而后进行 fifo 的更新;

  • 首先,进行全量的 List(),而后将后果全量更新至 fifo(fifo.Replace());
  • 而后,启动 resync 的定时器,定期进行 resync,行将 LocalStore 的数据再放入 DeltaFIFO 进行解决;

    • resync 的起因:deltaFIFO 中的事件回调,可能存在解决失败的状况,定时的 resync 机制让失败的事件有了从新 onUpdate 解决的机会;
  • 最初,进行 watch 监听,而后通过 watchHandler() 解决 watch 到的事件 ( 即进行 fifo.Add/Update/Delete);
// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)
        }
    }, r.backoffManager, true, stopCh)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    ...
    // 1)list 所有的
    if err := func() error {
        ...
        var list runtime.Object
        listCh := make(chan struct{}, 1)        
        go func() {
            ......
            // 按页 List
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)
            }))
            ...
            list, paginatedResult, err = pager.List(context.Background(), options)
            close(listCh)
        }()
        select {
        // 期待 list 实现
        case <-listCh:
        }
        // 提取 list 中的 items
        listMetaInterface, err := meta.ListAccessor(list)
        resourceVersion = listMetaInterface.GetResourceVersion()
        items, err := meta.ExtractList(list)
        // 进行全量的 sync,最终调用 -->fifo.Replace() 全量更新
        if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        r.setLastSyncResourceVersion(resourceVersion)
        return nil
    }(); err != nil {return err}

    // 2) 定期进行 resync
    resyncerrc := make(chan error, 1)
    go func() {resyncCh, cleanup := r.resyncChan()
        for {
            select {case <-resyncCh:    // 定时器到}
            if r.ShouldResync == nil || r.ShouldResync() {
                // 执行 resync
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()}
    }()

    // 3) 监听 watch
    for {timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            TimeoutSeconds: &timeoutSeconds,
        }
        // 执行 watch
        w, err := r.listerWatcher.Watch(options)
        // 执行 watch 对象的 handler
        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {....}
    }
}

重点看一下 watcherHandler(): 监听到事件后,进行 store 的增删改;

// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0
loop:
    for {
        select {
        // 监听到事件
        case event, ok := <-w.ResultChan():        
            meta, err := meta.Accessor(event.Object)        
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {case watch.Added:                        //fifo.Add()  
                err := r.store.Add(event.Object)                  
            case watch.Modified:                     //fifo.Update() 
                err := r.store.Update(event.Object)                
            case watch.Deleted:                      //fifo.Delete()
                err := r.store.Delete(event.Object)  
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }
    ...
    return nil
}

2. controller.processLoop()

processLoop() 的工作:

  • Pop fifo 中的事件;
  • 事件被 c.config.Process 函数解决;
  • 处理结果呈现 RetryOnError 时,将事件从新入队;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
    for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))  // pop 进去的事件被 Process 函数解决
        if err != nil {
            if err == ErrFIFOClosed {return}
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

事件处理函数:c.config.Process = informer.HandleDeleta()

  • 解决从 fifo pop 进去的变更:

    • 一是更新 indexer;
    • 二是向 processor 散发 notification;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {    
    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        // 事件类型 =Sync/Replaced/Add/Updated
        case Sync, Replaced, Added, Updated:            
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                // 若 LocalStore 中已存在,则更新;if err := s.indexer.Update(d.Object); err != nil {return err}
                isSync := false
                switch {
                case d.Type == Sync:
                    // Sync events are only propagated to listeners that requested resync
                    isSync = true                
                }
                // 向 processor 散发 UPDATE notification
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                // 若 LocalStore 中不存在,则增加;if err := s.indexer.Add(d.Object); err != nil {return err}
                // 向 processor 散发 ADD notification
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        // 事件类型 =Deleted
        case Deleted:
            // 删除 LocalStore 中的对象
            if err := s.indexer.Delete(d.Object); err != nil {return err}
            // 向 processor 散发 delete notification
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

五. 源码:processor.run() 流程

processor 的类型是 sharedProcessor;

processor.run() 次要工作:

  • 接管从 fifo.Pop()–>informer.HandleDelta() 散发进去的 notification;
  • 调用 ResourceEventHandler 进行 notification 的解决;

sharedProcessor 中蕴含 N 个 processorListner,而 processorListener 封装了 ResourceEventHandler;
也就是说,sharedProcessor 中包含 N 个 ResourceEventHandler 的解决;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
    ...
    listeners        []*processorListener
    syncingListeners []*processorListener}
type processorListener struct {nextCh chan interface{}
    addCh  chan interface{}
    handler ResourceEventHandler
    ...
}

process.run() 调用每个 processorListener.run()/pop():

  • 由 processorListener.pop() 进行 notification 的接管;
  • 由 processorListener.run() 调用 ResourceEventHandler 进行 notification 的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {p.wg.Start(listener.run)        // 调用 ResourceEventHandler 进行 notification 的解决
            p.wg.Start(listener.pop)        // 进行 notification 的接管
        }
        p.listenersStarted = true
    }()
    <-stopCh
    ...
}

processorListener.run()

  • 生产 nextCh 中的 notification,调用 ResourceEventHandler 实现事件的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {stopCh := make(chan struct{})
    wait.Until(func() {
        for next := range p.nextCh {            // 生产 notification
            switch notification := next.(type) {
            case updateNotification:
                p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
                p.handler.OnAdd(notification.newObj)
            case deleteNotification:
                p.handler.OnDelete(notification.oldObj)
            default:
                utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
        }
        // the only way to get here is if the p.nextCh is empty and closed
        close(stopCh)
    }, 1*time.Second, stopCh)
}

正文完
 0