Operator 的整体架构:
次要包含 3 大组件:
- Informer: 监听资源对象的变动,将变动转成事件放入 WorkQueue;
- WorkQueue: 保留变动的事件;
- Control Loop: 生产 WorkQueue 中的事件,对事件做响应;
其中,Informer 较为简单:
- Reflector: 调用 apiservier 接口,应用 List&Watch 对指定类型的资源对象进行监控;
- DeltaFIFO: 增量队列,保留 Reflector 监控到的 change 对象;
- LocalStorage: informer 的本地 cache,用以查问特定类型的资源对象,以加重 apiserver 的查问压力;
1. Informer 源码
对于要监控的资源类型,每种类型创立一个 Informer,比方 prometheus CRD:
// 代码入口
// cmd/operator/main.go
func Main() int {
r := prometheus.NewRegistry()
po, err := prometheuscontroller.New(cfg, log.With(logger, "component", "prometheusoperator"), r)
if err != nil {fmt.Fprint(os.Stderr, "instantiating prometheus controller failed:", err)
return 1
这里能够看到,用 prometheuscontroller.New() 创立对应的 operator,这也是常说的 operator=CRD+Controller:
// pkg/prometheus/operator.go
// New creates a new controller.
func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
c.promInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = c.config.PromSelector
return mclient.MonitoringV1().Prometheuses(namespace).List(context.TODO(), options)
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = c.config.PromSelector
return mclient.MonitoringV1().Prometheuses(namespace).Watch(context.TODO(), options)
&monitoringv1.Prometheus{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
创立了 prometheus 的 Informor: c.promInf,能够看到,它监听对象变更应用的是 List&Watch;
创立 ok 后,将该 Informer Run 起来:
// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
go c.promInf.Run(stopc)
同时为该 Informer 增加 handler,包含 Add/Delete/Update:
// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
if err := c.waitForCacheSync(stopc); err != nil {return err}
// addHandlers adds the eventhandlers to the informers.
func (c *Operator) addHandlers() {
AddFunc: c.handlePrometheusAdd,
DeleteFunc: c.handlePrometheusDelete,
UpdateFunc: c.handlePrometheusUpdate,
2. WorkQueue 源码
Informer 发现监听的对象变更,调用 handler,handler 会将变更对象放入 WorkQueue:
上面是 Add Prometheus 的事件:
// pkg/prometheus/operator.go
func (c *Operator) handlePrometheusAdd(obj interface{}) {key, ok := c.keyFunc(obj)
if !ok {return}
level.Debug(c.logger).Log("msg", "Prometheus added", "key", key)
c.metrics.TriggerByCounter(monitoringv1.PrometheusesKind, "add").Inc()
checkPrometheusSpecDeprecation(key, obj.(*monitoringv1.Prometheus), c.logger)
3. Control Loop 源码
operator 会启动 1 个 worker,来生产 workQueue 中的事件:
// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
go c.worker()}
// pkg/prometheus/operator.go
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. It enforces that the syncHandler is never invoked
// concurrently with the same key.
func (c *Operator) worker() {for c.processNextWorkItem() {}}
// pkg/prometheus/operator.go
func (c *Operator) processNextWorkItem() bool {key, quit := c.queue.Get() // 取队列元素
if quit {return false}
defer c.queue.Done(key)
err := c.sync(key.(string)) // 进行事件操作
if err == nil {c.queue.Forget(key) // 处理完毕,Forget
return true
utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("Sync %q failed", key)))
return true
事件处理在 c.sync() 中,以 prometehus statefulset 为例:
- 首先查看资源对象是否存在,若不存在则间接创立,返回;
- 否则,更新 statefulset 对象;
- 若都不满足,则删除 statefulset 对象;
// pkg/prometheus/operator.go
func (c *Operator) sync(key string) error {
ssetClient := c.kclient.AppsV1().StatefulSets(p.Namespace)
// Ensure we have a StatefulSet running Prometheus deployed.
obj, exists, err = c.ssetInf.GetIndexer().GetByKey(prometheusKeyToStatefulSetKey(key))
sset, err := makeStatefulSet(*p, &c.config, ruleConfigMapNames, newSSetInputHash)
// 不存在,就创立
if !exists {level.Debug(c.logger).Log("msg", "no current Prometheus statefulset found")
level.Debug(c.logger).Log("msg", "creating Prometheus statefulset")
if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil {return errors.Wrap(err, "creating statefulset failed")
return nil
// 否则,就更新
_, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})
// 都不满足,则删除
if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid {level.Info(c.logger).Log("msg", "resolving illegal update of Prometheus StatefulSet", "details", sErr.ErrStatus.Details)
if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action")
return nil
简略看下创立 statefulset 做的事件,无外乎依据 CRD 的配置,结构 spec 对象,而后应用 ssetClient.Create 创立 statefulset:
// pkg/prometheus/statefulset.go
func makeStatefulSet(
p monitoringv1.Prometheus,
config *Config,
ruleConfigMapNames []string,
inputHash string,
) (*appsv1.StatefulSet, error) {
spec, err := makeStatefulSetSpec(p, config, ruleConfigMapNames)
func makeStatefulSetSpec(p monitoringv1.Prometheus, c *Config, ruleConfigMapNames []string) (*appsv1.StatefulSetSpec, error) {
promArgs := []string{
switch version.Major {
case 1:
case 2:
retentionTimeFlag := "-storage.tsdb.retention="
if version.Minor >= 7 {
retentionTimeFlag = "-storage.tsdb.retention.time="
if p.Spec.RetentionSize != "" {
promArgs = append(promArgs,
fmt.Sprintf("-storage.tsdb.retention.size=%s", p.Spec.RetentionSize),
promArgs = append(promArgs,
fmt.Sprintf("-config.file=%s", path.Join(confOutDir, configEnvsubstFilename)),
fmt.Sprintf("-storage.tsdb.path=%s", storageDir),
