关于golang:k8sclientgo源码剖析二

8次阅读

共计 8893 个字符,预计需要花费 23 分钟才能阅读完成。

简介:云原生社区活动 —Kubernetes 源码分析第一期第二周

本周是 K8S 源码研习社第一期第二周,学习内容是学习 Informer 机制,本文以这个课题进行开展。

本周研习社社长挺忙的,将本次课程推延到下一周完结,任何事件都是这样,打算总有可能会被其余事件突破,但最终只有可能回归到对应的主线上,就不是什么问题。就像参加开源一样,最开始的凋谢源代码只是开始,须要的是可能坚持下去,而这一点往往是很重要的。

好了,开始注释。

本文主题:


  1. Informer 机制架构设计总览
  2. Reflector 了解
  3. DeltaFIFO 了解
  4. Indexer 了解

如果波及到资源的内容,本文以 Deployment 资源进行相干内容讲述。

Informer 机制架构设计总览

上面是我依据了解画的一个数据流转图, 从全局视角看一下数据的整体走向是怎么样的。

其中虚线的示意的是代码中的办法。

首先讲一个论断:

通过 Informer 机制获取数据的状况下,在初始化的时候会从 Kubernetes API Server 获取对应 Resource 的全副 Object,后续只会通过 Watch 机制接管 API Server 推送过去的数据,不会再被动从 API Server 拉取数据,间接应用本地缓存中的数据以缩小 API Server 的压力。

Watch 机制基于 HTTP 的 Chunk 实现,保护一个长连贯,这是一个优化点,缩小申请的数据量。第二个优化点是 SharedInformer, 它能够让同一种资源应用的是同一个 Informer,例如 v1 版本的 Deployment 和 v1beta1 版本的 Deployment 同时存在的时候,共享一个 Informer。

下面图中能够看到 Informer 分为三个局部,能够了解为三大逻辑。

其中 Reflector 次要是把从 API Server 数据获取到的数据放到 DeltaFIFO 队列中,充当生产者角色。

SharedInformer 次要是从 DeltaFIFIO 队列中获取数据并散发数据,充当消费者角色。

最初 Indexer 是作为本地缓存的存储组件存在。

Reflector 了解

Reflector 中次要看 Run、ListAndWatch、watchHandler 三个中央就足够了。

源码地位是 tools/cache/reflector.go

// Ruvn starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
// 开始时执行 Run,上一层调用的中央是 controller.go 中的 Run 办法
func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.Until(func() {
         // 启动后执行一次 ListAndWatch
        if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

...

// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {

// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {

// 这里是调用了各个资源中的 ListFunc 函数, 例如如果 v1 版本的 Deployment
// 则调用的是 informers/apps/v1/deployment.go 中的 ListFunc
                             return r.listerWatcher.List(opts)
            }))
            if r.WatchListPageSize != 0 {pager.Pa1geSize = r.WatchListPageSize}
            // Pager falls back to full list if paginated list calls fail due to an "Expired" error.
            list, err = pager.List(context.Background(), options)
            close(listCh)
...
// 这一部分次要是从 API SERVER 申请一次数据 获取资源的全副 Object
if err != nil {return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
        }
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(li
st)
        if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        items, err := meta.ExtractList(list)
        if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        initTrace.Step("Objects extracted")
        if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        initTrace.Step("SyncWith done")
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
...

// 解决 Watch 中的数据并且将数据搁置到 DeltaFIFO 当中
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {case apierrs.IsResourceExpired(err):
                    klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
...
}

数据的生产就完结了,就两点:

  1. 初始化时从 API Server 申请数据
  2. 监听后续从 Watch 推送来的数据

DeltaFIFO 了解

先看一下数据结构:

type DeltaFIFO struct {
...
    items map[string]Deltas
    queue []string
...
}

type Delta struct {
    Type   DeltaType
    Object interface{}}

type Deltas []Delta


type DeltaType string

// Change type definition
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    Sync DeltaType = "Sync"
)

其中 queue 存储的是 Object 的 id, 而 items 存储的是以 ObjectID 为 key 的这个 Object 的事件列表,

能够设想到是这样的一个数据结构, 右边是 Key, 左边是一个数组对象, 其中每个元素都是由 type 和 obj 组成.

DeltaFIFO 顾名思义寄存 Delta 数据的先入先出队列,相当于一个数据的中转站,将数据从一个中央转移另一个中央。

次要看的内容是 queueActionLocked、Pop、Resync

queueActionLocked 办法:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
...
    newDeltas := append(f.items[id], Delta{actionType, obj})
      // 去重解决
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        ... 
               //pop 音讯
          
        f.cond.Broadcast()
    ...
    return nil
}

Pop 办法:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()
    defer f.lock.Unlock()
    for {for len(f.queue) == 0 {// 阻塞 直到调用了 f.cond.Broadcast()
            f.cond.Wait()}
// 取出第一个元素
        id := f.queue[0]
        f.queue = f.queue[1:]
        ...
        item, ok := f.items[id]
...
                delete(f.items, id)
        // 这个 process 能够在 controller.go 中的 processLoop() 找到
        // 初始化是在 shared_informer.go 的 Run
        // 最终执行到 shared_informer.go 的 HandleDeltas 办法
        err := process(item)
        // 如果解决出错了从新放回队列中
        if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)
            err = e.Err
        }
         ...
    }
}

Resync 机制:

小总结:每次从本地缓存 Indexer 中获取数据从新放到 DeltaFIFO 中执行工作逻辑。

启动的 Resync 中央是 reflector.go 的 resyncChan() 办法,在 reflector.go 的 ListAndWatch 办法中的调用开始定时执行。

go func() {
               // 启动定时工作
        resyncCh, cleanup := r.resyncChan()
        defer func() {cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
                        // 定时执行   调用会执行到 delta_fifo.go 的 Resync() 办法
            if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()}
    }()

func (f *DeltaFIFO) Resync() error {
    ...
// 从缓存中获取到所有的 key
    keys := f.knownObjects.ListKeys()
    for _, k := range keys {if err := f.syncKeyLocked(k); err != nil {return err}
    }
    return nil

}


func (f *DeltaFIFO) syncKeyLocked(key string) error {
           // 获缓存拿到对应的 Object
        obj, exists, err := f.knownObjects.GetByKey(key)
    ...
         // 放入到队列中执行工作逻辑
    if err := f.queueActionLocked(Sync, obj); err != nil {return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}

SharedInformer 生产音讯了解

次要看 HandleDeltas 办法就好,生产音讯而后散发数据并且存储数据到缓存的中央

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        
        switch d.Type {
        case Sync, Added, Updated:
            ...
            // 查一下是否在 Indexer 缓存中 如果在缓存中就更新缓存中的对象
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}
                // 把数据散发到 Listener
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                // 没有在 Indexer 缓存中 把对象插入到缓存中
                if err := s.indexer.Add(d.Object); err != nil {return err}
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        ...
        }
    }
    return nil
}

Indexer 了解

这块不会讲述太多内容,因为我认为 Informer 机制最次要的还是后面数据的流转,当然这并不代表数据存储不重要,而是先理分明整体的思路,后续再具体更新存储的局部。

Indexer 应用的是 threadsafe_store.go 中的 threadSafeMap 存储数据,是一个线程平安并且带有索引性能的 map, 数据只会寄存在内存中,每次波及操作都会进行加锁。

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}
    indexers Indexers
    indices Indices
}

Indexer 还有一个索引相干的内容就临时不开展讲述。

Example 代码

-------------

package main

import (
    "flag"
    "fmt"
    "path/filepath"
    "time"

    v1 "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var err error
    var config *rest.Config

    var kubeconfig *string

    if home := homedir.HomeDir(); home != "" {kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径")
    } else {kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig 绝对路径")
    }
    // 初始化 rest.Config 对象
    if config, err = rest.InClusterConfig(); err != nil {if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {panic(err.Error())
        }
    }
    // 创立 Clientset 对象
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {panic(err.Error())
    }
    // 初始化一个 SharedInformerFactory 设置 resync 为 60 秒一次,会触发 UpdateFunc
    informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*60)
    // 对 Deployment 监听
    // 这里如果获取 v1betav1 的 deployment 的资源
    // informerFactory.Apps().V1beta1().Deployments()
    deployInformer := informerFactory.Apps().V1().Deployments()
    // 创立 Informer(相当于注册到工厂中去,这样上面启动的时候就会去 List & Watch 对应的资源)informer := deployInformer.Informer()
    // 创立 deployment 的 Lister
    deployLister := deployInformer.Lister()
    // 注册事件处理程序 处理事件数据
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: onUpdate,
        DeleteFunc: onDelete,
    })

    stopper := make(chan struct{})
    defer close(stopper)

    informerFactory.Start(stopper)
    informerFactory.WaitForCacheSync(stopper)

    // 从本地缓存中获取 default 命名空间中的所有 deployment 列表
    deployments, err := deployLister.Deployments("default").List(labels.Everything())
    if err != nil {panic(err)
    }
    for idx, deploy := range deployments {fmt.Printf("%d -> %sn", idx+1, deploy.Name)
    }

    <-stopper
}

func onAdd(obj interface{}) {deploy := obj.(*v1.Deployment)
    fmt.Println("add a deployment:", deploy.Name)
}

func onUpdate(old, new interface{}) {oldDeploy := old.(*v1.Deployment)
    newDeploy := new.(*v1.Deployment)
    fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)
}

func onDelete(obj interface{}) {deploy := obj.(*v1.Deployment)
    fmt.Println("delete a deployment:", deploy.Name)
} 

以上示例代码中程序启动后会拉取一次 Deployment 数据,并且拉取数据实现后从本地缓存中 List 一次 default 命名空间的 Deployment 资源并打印,而后每 60 秒 Resync 一次 Deployment 资源。

QA


为什么须要 Resync?

在本周有同学提出一个,我看到这个问题后也感觉挺奇怪的,因为 Resync 是从本地缓存的数据缓存到本地缓存 (从开始到完结来说是这样), 为什么须要把数据拿进去又走一遍流程呢?过后钻牛角尖也是想不明确,起初换个角度想就晓得了。

数据从 API Server 过去并且通过解决后放到缓存中,但数据并不一定就能够失常解决,也就是说可能报错了,而这个 Resync 相当于一个重试的机制。

能够尝试实际一下: 部署有状态服务,存储应用 LocalPV(也能够换成本人相熟的), 这时候 pod 会因为存储目录不存在而启动失败. 而后在 pod 启动失败后再创立好对应的目录,过一会 pod 就启动胜利了。这是我了解的一种状况。

总结:


Informer 机制在 K8S 中是各个组件通信的基石,了解透彻是十分无益的,我也还在进一步了解的过程中,欢送一起交换。

前置浏览:


  • k8s-client-go 源码分析 (一)

始发于 四颗咖啡豆, 转载请申明出处.
关注公粽号 ->[四颗咖啡豆] 获取最新内容

正文完
 0