client-go 提供了 Informer 机制,client 能够指定监听的资源对象类型,而后 Informer:
- 首先,全量 list 所有的该类型的对象;
- 而后,监听该类型对象的变动 (Add/Update/Delete),并调用用户注册的 Handler;
一. 实现机制
由 sharedIndexInformer 类型实现,整体实现机制如下图所示:
- Reflector 负责向 apiserver list&watch 资源对象;
- Reflector list&watch 到的对象及其变动,被放入 DeltaFIFO 队列;
-
Informer 负责 pop DeltaFIFO 队列的变动事件,而后:
- 更新对象到 Indexer;
- 调用用户注册的 ResourceEventHandler;
二. 源码框架
整体对外接口:sharedInformerFactory,通过 map 让各种 resourceType 的 informer 能够共享;
客户端调用创立进去的 informer 对象:sharedIndexInformer,其中包含:
- indexer Indexer: 索引 & 存储;
-
controller Controller:
- 调用 Reflector&FIFO&indexer 实现事件的散发、LocalStore 的保护;
- 将 Reflector&FIFO 的逻辑串联起来;
- processor *sharedProcessor: 利用散发的事件,调用 client 的 ResourceEventHandler;
- listwatcher ListerWatcher: 被 reflector.Run() 调用,进行资源对象的 List&Watch;
- objectType runtime.Object: 监听的对象类型;
三. 源码:sharedInformerFactory 如何 share informer
client 创立 Informer 的过程:
- 先创立 sharedInformerFactory;
- 再用 factory 创立 informer;
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 10*time.Second, informers.WithNamespace(""))
factory.Core().V1().Pods().Informer()
sharedInformerFactory 的构造:
- informers:保留了所有已创立的 sharedIndexInformer;
- startedInformers: 保留了所有已启动的 sharedIndexInformer:
// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
...
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
...
}
底层 client-go 创立 podInformer:
// staging/src/k8s.io/client-go/informers/core/v1/interface.go
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// staging/src/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
InformerFor() 执行创立,并将 podInformer 增加到 factory:
- 若 objectType 的 informer 曾经存在,则间接返回;
- 否则,创立 informer,并增加到 factory.informers;
// staging/src/k8s.io/client-go/informer/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
.......
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
//obj 的 informer 曾经存在
if exists {return informer}
.....
// 创立 informer 并增加到 informers
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
三. 源码:sharedIndexInformer.Run() 流程
Run() 做初始化并调用其它组件 run:
- 创立 fifo,它对应 delta 存储;
- 创立 controller;
- 调用 process.run(): 执行 client 的 ResourceEventHandler;
- 调用 controller.Run(): 执行 list&watch–> 更新 fifo–>[ 更新 indexer,散发 notification];
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
...
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ // 创立 fifo
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
// 创立 controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
wg.StartWithChannel(processorStopCh, s.processor.run) // process.run(),执行 ResourceEventHandler
s.controller.Run(stopCh) // controller.Run(): 主逻辑}
四. 源码:controller.Run() 流程
controller 整合了 reflector/fifo/indexer 等各大组件:
- 应用 reflector.Run() 进行 list&watcher,将变更存入 fifo;
-
应用 processLoop() 进行 fifo.Pop(),解决变更:
- 应用 indexer 进行对象的更新;
- 应用 processor 进行变更告诉的散发;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
....
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
c.reflector = r
var wg wait.Group
defer wg.Wait()
// 执行 reflector.Run():进行 List&watch
wg.StartWithChannel(stopCh, r.Run)
// 执行 processLoop(): 更新 indexer + 散发事件告诉
wait.Until(c.processLoop, time.Second, stopCh)
}
1. reflector.Run()
调用 ListAndWatch() 监听 apiserver 而后进行 fifo 的更新;
- 首先,进行全量的 List(),而后将后果全量更新至 fifo(fifo.Replace());
-
而后,启动 resync 的定时器,定期进行 resync,行将 LocalStore 的数据再放入 DeltaFIFO 进行解决;
- resync 的起因:deltaFIFO 中的事件回调,可能存在解决失败的状况,定时的 resync 机制让失败的事件有了从新 onUpdate 解决的机会;
- 最初,进行 watch 监听,而后通过 watchHandler() 解决 watch 到的事件 ( 即进行 fifo.Add/Update/Delete);
// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
// 1)list 所有的
if err := func() error {
...
var list runtime.Object
listCh := make(chan struct{}, 1)
go func() {
......
// 按页 List
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)
}))
...
list, paginatedResult, err = pager.List(context.Background(), options)
close(listCh)
}()
select {
// 期待 list 实现
case <-listCh:
}
// 提取 list 中的 items
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
// 进行全量的 sync,最终调用 -->fifo.Replace() 全量更新
if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {return err}
// 2) 定期进行 resync
resyncerrc := make(chan error, 1)
go func() {resyncCh, cleanup := r.resyncChan()
for {
select {case <-resyncCh: // 定时器到}
if r.ShouldResync == nil || r.ShouldResync() {
// 执行 resync
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()}
}()
// 3) 监听 watch
for {timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
// 执行 watch
w, err := r.listerWatcher.Watch(options)
// 执行 watch 对象的 handler
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {....}
}
}
重点看一下 watcherHandler(): 监听到事件后,进行 store 的增删改;
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
loop:
for {
select {
// 监听到事件
case event, ok := <-w.ResultChan():
meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
switch event.Type {case watch.Added: //fifo.Add()
err := r.store.Add(event.Object)
case watch.Modified: //fifo.Update()
err := r.store.Update(event.Object)
case watch.Deleted: //fifo.Delete()
err := r.store.Delete(event.Object)
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
return nil
}
2. controller.processLoop()
processLoop() 的工作:
- Pop fifo 中的事件;
- 事件被 c.config.Process 函数解决;
- 处理结果呈现 RetryOnError 时,将事件从新入队;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // pop 进去的事件被 Process 函数解决
if err != nil {
if err == ErrFIFOClosed {return}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
事件处理函数:c.config.Process = informer.HandleDeleta()
-
解决从 fifo pop 进去的变更:
- 一是更新 indexer;
- 二是向 processor 散发 notification;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 事件类型 =Sync/Replaced/Add/Updated
case Sync, Replaced, Added, Updated:
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 若 LocalStore 中已存在,则更新;if err := s.indexer.Update(d.Object); err != nil {return err}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
}
// 向 processor 散发 UPDATE notification
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 若 LocalStore 中不存在,则增加;if err := s.indexer.Add(d.Object); err != nil {return err}
// 向 processor 散发 ADD notification
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
// 事件类型 =Deleted
case Deleted:
// 删除 LocalStore 中的对象
if err := s.indexer.Delete(d.Object); err != nil {return err}
// 向 processor 散发 delete notification
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
五. 源码:processor.run() 流程
processor 的类型是 sharedProcessor;
processor.run() 次要工作:
- 接管从 fifo.Pop()–>informer.HandleDelta() 散发进去的 notification;
- 调用 ResourceEventHandler 进行 notification 的解决;
sharedProcessor 中蕴含 N 个 processorListner,而 processorListener 封装了 ResourceEventHandler;
也就是说,sharedProcessor 中包含 N 个 ResourceEventHandler 的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
...
listeners []*processorListener
syncingListeners []*processorListener}
type processorListener struct {nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
...
}
process.run() 调用每个 processorListener.run()/pop():
- 由 processorListener.pop() 进行 notification 的接管;
- 由 processorListener.run() 调用 ResourceEventHandler 进行 notification 的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {p.wg.Start(listener.run) // 调用 ResourceEventHandler 进行 notification 的解决
p.wg.Start(listener.pop) // 进行 notification 的接管
}
p.listenersStarted = true
}()
<-stopCh
...
}
processorListener.run()
- 生产 nextCh 中的 notification,调用 ResourceEventHandler 实现事件的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh { // 生产 notification
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}