client-go提供了Informer机制,client能够指定监听的资源对象类型,而后Informer:
- 首先,全量list所有的该类型的对象;
- 而后,监听该类型对象的变动(Add/Update/Delete),并调用用户注册的Handler;
一. 实现机制
由sharedIndexInformer类型实现,整体实现机制如下图所示:
- Reflector负责向apiserver list&watch资源对象;
- Reflector list&watch到的对象及其变动,被放入DeltaFIFO队列;
-
Informer负责pop DeltaFIFO队列的变动事件,而后:
- 更新对象到Indexer;
- 调用用户注册的ResourceEventHandler;
二. 源码框架
整体对外接口:sharedInformerFactory,通过map让各种resourceType的informer能够共享;
客户端调用创立进去的informer对象:sharedIndexInformer,其中包含:
- indexer Indexer: 索引&存储;
-
controller Controller:
- 调用Reflector&FIFO&indexer实现事件的散发、LocalStore的保护;
- 将Reflector&FIFO的逻辑串联起来;
- processor *sharedProcessor: 利用散发的事件,调用client的ResourceEventHandler;
- listwatcher ListerWatcher: 被reflector.Run()调用,进行资源对象的List&Watch;
- objectType runtime.Object: 监听的对象类型;
三. 源码:sharedInformerFactory如何share informer
client创立Informer的过程:
- 先创立sharedInformerFactory;
- 再用factory创立informer;
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 10*time.Second, informers.WithNamespace(""))
factory.Core().V1().Pods().Informer()
sharedInformerFactory的构造:
- informers:保留了所有已创立的sharedIndexInformer;
- startedInformers: 保留了所有已启动的sharedIndexInformer:
// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
...
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
...
}
底层client-go创立podInformer:
// staging/src/k8s.io/client-go/informers/core/v1/interface.go
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// staging/src/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
InformerFor()执行创立,并将podInformer增加到factory:
- 若objectType的informer曾经存在,则间接返回;
- 否则,创立informer,并增加到factory.informers;
// staging/src/k8s.io/client-go/informer/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
.......
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
//obj的informer曾经存在
if exists {
return informer
}
.....
//创立informer并增加到informers
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
三. 源码:sharedIndexInformer.Run()流程
Run()做初始化并调用其它组件run:
- 创立fifo,它对应delta存储;
- 创立controller;
- 调用process.run(): 执行client的ResourceEventHandler;
- 调用controller.Run(): 执行list&watch–>更新fifo–>[更新indexer,散发notification];
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
...
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ // 创立fifo
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
// 创立controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
wg.StartWithChannel(processorStopCh, s.processor.run) // process.run(),执行ResourceEventHandler
s.controller.Run(stopCh) // controller.Run():主逻辑
}
四. 源码:controller.Run()流程
controller整合了reflector/fifo/indexer等各大组件:
- 应用reflector.Run()进行list&watcher,将变更存入fifo;
-
应用processLoop()进行fifo.Pop(),解决变更:
- 应用indexer进行对象的更新;
- 应用processor进行变更告诉的散发;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
....
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
c.reflector = r
var wg wait.Group
defer wg.Wait()
// 执行reflector.Run():进行List&watch
wg.StartWithChannel(stopCh, r.Run)
// 执行processLoop(): 更新indexer + 散发事件告诉
wait.Until(c.processLoop, time.Second, stopCh)
}
1. reflector.Run()
调用ListAndWatch()监听apiserver而后进行fifo的更新;
- 首先,进行全量的List(),而后将后果全量更新至fifo(fifo.Replace());
-
而后,启动resync的定时器,定期进行resync,行将LocalStore的数据再放入DeltaFIFO进行解决;
- resync的起因:deltaFIFO中的事件回调,可能存在解决失败的状况,定时的resync机制让失败的事件有了从新onUpdate解决的机会;
- 最初,进行watch监听,而后通过watchHandler()解决watch到的事件(即进行fifo.Add/Update/Delete);
// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
// 1)list所有的
if err := func() error {
...
var list runtime.Object
listCh := make(chan struct{}, 1)
go func() {
......
// 按页List
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
...
list, paginatedResult, err = pager.List(context.Background(), options)
close(listCh)
}()
select {
// 期待list实现
case <-listCh:
}
// 提取list中的items
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
// 进行全量的sync,最终调用-->fifo.Replace()全量更新
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
// 2)定期进行resync
resyncerrc := make(chan error, 1)
go func() {
resyncCh, cleanup := r.resyncChan()
for {
select {
case <-resyncCh: //定时器到
}
if r.ShouldResync == nil || r.ShouldResync() {
// 执行resync
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 3)监听watch
for {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
// 执行watch
w, err := r.listerWatcher.Watch(options)
// 执行watch对象的handler
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
....
}
}
}
重点看一下watcherHandler(): 监听到事件后,进行store的增删改;
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
loop:
for {
select {
// 监听到事件
case event, ok := <-w.ResultChan():
meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added: //fifo.Add()
err := r.store.Add(event.Object)
case watch.Modified: //fifo.Update()
err := r.store.Update(event.Object)
case watch.Deleted: //fifo.Delete()
err := r.store.Delete(event.Object)
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
return nil
}
2. controller.processLoop()
processLoop()的工作:
- Pop fifo中的事件;
- 事件被c.config.Process函数解决;
- 处理结果呈现RetryOnError时,将事件从新入队;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // pop进去的事件被Process函数解决
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
事件处理函数:c.config.Process = informer.HandleDeleta()
-
解决从fifo pop进去的变更:
- 一是更新indexer;
- 二是向processor散发notification;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 事件类型=Sync/Replaced/Add/Updated
case Sync, Replaced, Added, Updated:
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 若LocalStore中已存在,则更新;
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
}
// 向processor散发UPDATE notification
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 若LocalStore中不存在,则增加;
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 向processor散发ADD notification
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
// 事件类型=Deleted
case Deleted:
// 删除LocalStore中的对象
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 向processor散发delete notification
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
五. 源码:processor.run()流程
processor的类型是sharedProcessor;
processor.run()次要工作:
- 接管从fifo.Pop()–>informer.HandleDelta()散发进去的notification;
- 调用ResourceEventHandler进行notification的解决;
sharedProcessor中蕴含N个processorListner,而processorListener封装了ResourceEventHandler;
也就是说,sharedProcessor中包含N个ResourceEventHandler的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
...
listeners []*processorListener
syncingListeners []*processorListener
}
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
...
}
process.run()调用每个processorListener.run()/pop():
- 由processorListener.pop()进行notification的接管;
- 由processorListener.run()调用ResourceEventHandler进行notification的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run) // 调用ResourceEventHandler进行notification的解决
p.wg.Start(listener.pop) // 进行notification的接管
}
p.listenersStarted = true
}()
<-stopCh
...
}
processorListener.run()
- 生产nextCh中的notification,调用ResourceEventHandler实现事件的解决;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh { // 生产notification
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
发表回复