Operator的整体架构:
次要包含3大组件:
- Informer: 监听资源对象的变动,将变动转成事件放入WorkQueue;
- WorkQueue: 保留变动的事件;
- Control Loop: 生产WorkQueue中的事件,对事件做响应;
其中,Informer较为简单:
- Reflector: 调用apiservier接口,应用List&Watch对指定类型的资源对象进行监控;
- DeltaFIFO: 增量队列,保留Reflector监控到的change对象;
- LocalStorage: informer的本地cache,用以查问特定类型的资源对象,以加重apiserver的查问压力;
1. Informer源码
对于要监控的资源类型,每种类型创立一个Informer,比方prometheus CRD:
// 代码入口// cmd/operator/main.gofunc Main() int { ....... r := prometheus.NewRegistry() po, err := prometheuscontroller.New(cfg, log.With(logger, "component", "prometheusoperator"), r) if err != nil { fmt.Fprint(os.Stderr, "instantiating prometheus controller failed: ", err) return 1 } .......}
这里能够看到,用prometheuscontroller.New()创立对应的operator,这也是常说的operator=CRD+Controller:
// pkg/prometheus/operator.go// New creates a new controller.func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) { .... c.promInf = cache.NewSharedIndexInformer( c.metrics.NewInstrumentedListerWatcher( listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher { return &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.LabelSelector = c.config.PromSelector return mclient.MonitoringV1().Prometheuses(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.LabelSelector = c.config.PromSelector return mclient.MonitoringV1().Prometheuses(namespace).Watch(context.TODO(), options) }, } }), ), &monitoringv1.Prometheus{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) .....}
创立了prometheus的Informor: c.promInf,能够看到,它监听对象变更应用的是List&Watch;
创立ok后,将该Informer Run起来:
// pkg/prometheus/operator.go// Run the controller.func (c *Operator) Run(stopc <-chan struct{}) error { ...... go c.promInf.Run(stopc) ......}
同时为该Informer增加handler,包含Add/Delete/Update:
// pkg/prometheus/operator.go// Run the controller.func (c *Operator) Run(stopc <-chan struct{}) error { ...... if err := c.waitForCacheSync(stopc); err != nil { return err } c.addHandlers() ......}// addHandlers adds the eventhandlers to the informers.func (c *Operator) addHandlers() { c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePrometheusAdd, DeleteFunc: c.handlePrometheusDelete, UpdateFunc: c.handlePrometheusUpdate, }) .......}
2. WorkQueue源码
Informer发现监听的对象变更,调用handler,handler会将变更对象放入WorkQueue:
上面是Add Prometheus的事件:
// pkg/prometheus/operator.gofunc (c *Operator) handlePrometheusAdd(obj interface{}) { key, ok := c.keyFunc(obj) if !ok { return } level.Debug(c.logger).Log("msg", "Prometheus added", "key", key) c.metrics.TriggerByCounter(monitoringv1.PrometheusesKind, "add").Inc() checkPrometheusSpecDeprecation(key, obj.(*monitoringv1.Prometheus), c.logger) c.enqueue(key)}
3. Control Loop源码
operator会启动1个worker,来生产workQueue中的事件:
// pkg/prometheus/operator.go// Run the controller.func (c *Operator) Run(stopc <-chan struct{}) error { ....... go c.worker()}
// pkg/prometheus/operator.go// worker runs a worker thread that just dequeues items, processes them, and// marks them done. It enforces that the syncHandler is never invoked// concurrently with the same key.func (c *Operator) worker() { for c.processNextWorkItem() { }}
看下具体的生产动作:
// pkg/prometheus/operator.gofunc (c *Operator) processNextWorkItem() bool { key, quit := c.queue.Get() //取队列元素 if quit { return false } defer c.queue.Done(key) err := c.sync(key.(string)) //进行事件操作 if err == nil { c.queue.Forget(key) //处理完毕,Forget return true } c.metrics.ReconcileErrorsCounter().Inc() utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("Sync %q failed", key))) c.queue.AddRateLimited(key) return true}
事件处理在c.sync()中,以prometehus statefulset为例:
- 首先查看资源对象是否存在,若不存在则间接创立,返回;
- 否则,更新statefulset对象;
- 若都不满足,则删除statefulset对象;
// pkg/prometheus/operator.gofunc (c *Operator) sync(key string) error { ...... ssetClient := c.kclient.AppsV1().StatefulSets(p.Namespace) // Ensure we have a StatefulSet running Prometheus deployed. obj, exists, err = c.ssetInf.GetIndexer().GetByKey(prometheusKeyToStatefulSetKey(key)) sset, err := makeStatefulSet(*p, &c.config, ruleConfigMapNames, newSSetInputHash) //不存在,就创立 if !exists { level.Debug(c.logger).Log("msg", "no current Prometheus statefulset found") level.Debug(c.logger).Log("msg", "creating Prometheus statefulset") if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil { return errors.Wrap(err, "creating statefulset failed") } return nil } ...... //否则,就更新 _, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{}) // 都不满足,则删除 if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid { level.Info(c.logger).Log("msg", "resolving illegal update of Prometheus StatefulSet", "details", sErr.ErrStatus.Details) if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil { return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action") } return nil } ......}
简略看下创立statefulset做的事件,无外乎依据CRD的配置,结构spec对象,而后应用ssetClient.Create创立statefulset:
// pkg/prometheus/statefulset.gofunc makeStatefulSet( p monitoringv1.Prometheus, config *Config, ruleConfigMapNames []string, inputHash string,) (*appsv1.StatefulSet, error) { ...... spec, err := makeStatefulSetSpec(p, config, ruleConfigMapNames) ......}
func makeStatefulSetSpec(p monitoringv1.Prometheus, c *Config, ruleConfigMapNames []string) (*appsv1.StatefulSetSpec, error) { ...... promArgs := []string{ "-web.console.templates=/etc/prometheus/consoles", "-web.console.libraries=/etc/prometheus/console_libraries", } switch version.Major { case 1: ...... case 2: retentionTimeFlag := "-storage.tsdb.retention=" if version.Minor >= 7 { retentionTimeFlag = "-storage.tsdb.retention.time=" if p.Spec.RetentionSize != "" { promArgs = append(promArgs, fmt.Sprintf("-storage.tsdb.retention.size=%s", p.Spec.RetentionSize), ) } } promArgs = append(promArgs, fmt.Sprintf("-config.file=%s", path.Join(confOutDir, configEnvsubstFilename)), fmt.Sprintf("-storage.tsdb.path=%s", storageDir), retentionTimeFlag+p.Spec.Retention, "-web.enable-lifecycle", "-storage.tsdb.no-lockfile", ) ...... }