关于prometheus:prometheusadapter源码分析

6次阅读

共计 12589 个字符,预计需要花费 32 分钟才能阅读完成。

一. 启动参数

prometheus-adapter 的 pod 启动参数,重点关注:

  • metrics-relist-interval:

    • series 的查问距离,adapter 在内存中保护了集群以后的 series,并定期查问 prometheus 进行更新;
  • prometheus-url: 查问 prometheus 的 url;
containers:
  - args:
    - --cert-dir=/var/run/serving-cert
    - --config=/etc/adapter/config.yaml
    - --logtostderr=true
    - --metrics-relist-interval=1m
    - --prometheus-url=http://prometheus-k8s.monitoring.svc.cluster.local:9090/
    - --secure-port=6443
    image: bitnami.io/prometheus-adapter:v0.7.0
....

prometheus-adapter 的配置文件 config.yaml,次要蕴含:

  • rules: 用于 custom.metrics.k8s.io,自定义指标;
  • resourceRules: 用于 metrics.k8s.io,kubectl top 的 cpu/mem 指标;
rules:
- seriesQuery: 'http_requests_total'
  resources:
    template: <<.Resource>>
  name:
    matches: 'http_requests_total‘as: "http_requests"
  metricsQuery: sum(rate(http_requests_total{<<.LabelMatchers>>}[5m])) by (<<.GroupBy>>)
"resourceRules":
  "cpu":
    "containerLabel": "container"
    "containerQuery": "sum(irate(container_cpu_usage_seconds_total{<<.LabelMatchers>>,container!=\"POD\",container!=\"\",pod!=\"\"}[5m])) by (<<.GroupBy>>)"
    "nodeQuery": "sum(1 - irate(node_cpu_seconds_total{mode=\"idle\"}[5m]) * on(namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{<<.LabelMatchers>>}) by (<<.GroupBy>>)"
    "resources":
      "overrides":
        "namespace":
          "resource": "namespace"
        "node":
          "resource": "node"
        "pod":
          "resource": "pod"
  "memory":
    "containerLabel": "container"
    "containerQuery": "sum(container_memory_working_set_bytes{<<.LabelMatchers>>,container!=\"POD\",container!=\"\",pod!=\"\"}) by (<<.GroupBy>>)"
    "nodeQuery": "sum(node_memory_MemTotal_bytes{job=\"node-exporter\",<<.LabelMatchers>>} - node_memory_MemAvailable_bytes{job=\"node-exporter\",<<.LabelMatchers>>}) by (<<.GroupBy>>)"
    "resources":
      "overrides":
        "instance":
          "resource": "node"
        "namespace":
          "resource": "namespace"
        "pod":
          "resource": "pod"

二.resourceRules 与 resourceProvider

resourceRule 用于 metrics.k8s.io,能够通过 kubectl top nodes/pods,查看资源的 cpu/mem,这部分性能与 metrics-server 是雷同的。

resourceRule 的解析和查问,是通过 resourceProvider 实现:

代码入口:

// cmd/adapter/adapter.go
func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error {
    ...
    mapper, err := cmd.RESTMapper()
    ...
    // 创立 resourceProvider
    provider, err := resprov.NewProvider(promClient, mapper, cmd.metricsConfig.ResourceRules)
    ...
    informers, err := cmd.Informers()
    ...
    server, err := cmd.Server()
    // 响应 /apis/metrics.k8s.io/v1beta1 的申请
    if err := api.Install(provider, informers.Core().V1(), server.GenericAPIServer); err != nil {return err}
    return nil
}

resourceProvider 实现了 PodMetricsGetter 和 NodeMetricsGetter,用于查问 nodes 和 pods 的指标:

// vendor/sigs.k8s.io/metrics-server/pkg/api/interface.go
type PodMetricsGetter interface {GetContainerMetrics(pods ...apitypes.NamespacedName) ([]TimeInfo, [][]metrics.ContainerMetrics, error)
}

type NodeMetricsGetter interface {GetNodeMetrics(nodes ...string) ([]TimeInfo, []corev1.ResourceList, error)
}

resourceProvider 查问 nodes:

// pkg/resourceprovider/provider.go
func (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]api.TimeInfo, []corev1.ResourceList, error) {now := pmodel.Now()

    // 用 promClient 去查问 nodes 的 cpu/mem
    qRes := p.queryBoth(now, nodeResource, "", nodes...)
    
    resTimes := make([]api.TimeInfo, len(nodes))
    resMetrics := make([]corev1.ResourceList, len(nodes))

    // organize the results
    for i, nodeName := range nodes {rawCPUs, gotResult := qRes.cpu[nodeName]
        rawMems, gotResult := qRes.mem[nodeName]
        
        rawMem := rawMems[0]
        rawCPU := rawCPUs[0]

        // store the results
        resMetrics[i] = corev1.ResourceList{corev1.ResourceCPU:    *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI),
            corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI),
        }
        // use the earliest timestamp available (in order to be conservative
        // when determining if metrics are tainted by startup)
        if rawMem.Timestamp.Before(rawCPU.Timestamp) {resTimes[i] = api.TimeInfo{Timestamp: rawMem.Timestamp.Time(),
                Window:    p.window,
            }
        } else {resTimes[i] = api.TimeInfo{Timestamp: rawCPU.Timestamp.Time(),
                Window:    1 * time.Minute,
            }
        }
    }
    return resTimes, resMetrics, nil
}

查问节点的指标时:

  • queryBoth(): 启动 2 个 goroutine 并行查问;
  • 1 个 goroutine 查问所有 nodes 的 cpu 指标;
  • 1 个 goroutine 查问所有 nodes 的 mem 指标;
// pkg/resourceprovider/provider.go
func (p *resourceProvider) queryBoth(now pmodel.Time, resource schema.GroupResource, namespace string, names ...string) nsQueryResults {
    ...
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {defer wg.Done()
        cpuRes, cpuErr = p.runQuery(now, p.cpu, resource, namespace, names...)    // 查 CPU
    }()
    go func() {defer wg.Done()
        memRes, memErr = p.runQuery(now, p.mem, resource, namespace, names...)    // 查 Memory
    }()
    wg.Wait()
    ...
    return nsQueryResults{
        namespace: namespace,
        cpu:       cpuRes,
        mem:       memRes,
    }
}

三.rules 与 prometheusProvider

rules 用于 custom.metrics.k8s.io,罕用于自定义指标的 HPA 伸缩。

rules 的解决和查问,通过 prometheusProvider 实现:

1.prometheusProvider 的初始化

prometheusProvider 初始化流程:

  • 将 rules 解析转换为 namers,这里会解析配置中的 seriesQuery/metricsQuery 等属性;
  • 创立 prometheusProvider 实例;
  • 启动 1 个 goroutine 查问 series;
// cmd/adapter/adapter.go
func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) {
    ...
    // grab the mapper and dynamic client
    mapper, err := cmd.RESTMapper()
    dynClient, err := cmd.DynamicClient()
    ...
    // 将 rules 解析转为 namers
    namers, err := naming.NamersFromConfig(cmd.metricsConfig.Rules, mapper)
    ...
    // 创立 prometheusProvider
    cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge)
    // 启动 cachingMetricsLister,查问 series 并放弃在 memory
    runner.RunUntil(stopCh)

    return cmProvider, nil
}

轮询 series 是通过 cachingMetricsLister 实现的:

  • 轮询距离 updateInterval= 启动参数 metrics-relist-interval=1m
// pkg/custom-provider/provider.go
func (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {go wait.Until(func() {if err := l.updateMetrics(); err != nil {utilruntime.HandleError(err)
        }
    }, l.updateInterval, stopChan)
}

2.cachingMetricsLister 定期查问 series

每个 rule 被转换为一个 metricNamer 构造,保留在 l.namers 中:

  • 遍历每个 metricsNamer,应用 seriesQuery 表达式,向 prometheus 发送 GET /api/v1/series 查问满足条件的 series;
  • 将查问后果利用 FilterSeries,进行过滤操作;
  • 最初将 series 后果缓存起来;
// pkg/custom-provider/provider.go
func (l *cachingMetricsLister) updateMetrics() error {startTime := pmodel.Now().Add(-1 * l.maxAge)
    seriesCacheByQuery := make(map[prom.Selector][]prom.Series)
    ...
    selectors := make(map[prom.Selector]struct{})
    selectorSeriesChan := make(chan selectorSeries, len(l.namers))
    for _, namer := range l.namers {sel := namer.Selector()        //sel=rule.seriesQuery
        if _, ok := selectors[sel]; ok {
            errs <- nil
            selectorSeriesChan <- selectorSeries{}
            continue
        }
        selectors[sel] = struct{}{}
        go func() {
            // 调用 promethues 的 GET /api/v1/series 查问满足 rule.seriesQuery 下的 series
            series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
            if err != nil {errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
                return
            }
            errs <- nil
            selectorSeriesChan <- selectorSeries{
                selector: sel,
                series:   series,
            }
        }()}
    // iterate through, blocking until we've got all results
    for range l.namers {
        if err := <-errs; err != nil {return fmt.Errorf("unable to update list of all metrics: %v", err)
        }
        if ss := <-selectorSeriesChan; ss.series != nil {seriesCacheByQuery[ss.selector] = ss.series
        }
    }
    close(errs)

    newSeries := make([][]prom.Series, len(l.namers))
    for i, namer := range l.namers {series, cached := seriesCacheByQuery[namer.Selector()]
        ...
        // 应用 SeriesFilter 配置
        newSeries[i] = namer.FilterSeries(series)
    }
    ...
    // 将 series 缓存起来
    return l.SetSeries(newSeries, l.namers)
}

比方对于如下的配置:

    - seriesQuery: '{__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'
      seriesFilters:
      - isNot: ^container_.*_seconds_total$
      resources:
        overrides:
          namespace:
            resource: namespace
          pod:
            resource: pod
      name:
        matches: ^container_(.*)_total$
        as: ""metricsQuery: sum(rate(<<.Series>>{<<.LabelMatchers>>,container!="POD"}[5m]))
        by (<<.GroupBy>>)

则向 prometheus 查问 series 的 url 为:

# curl -g 'http://192.168.101:9090/api/v1/series?'  --data-urlencode 'match[]={__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'

四.rules 的解析

每个 rule 被转换为一个 metricsNamer,由 metricsNamer 负责指标的解析和解决。

典型的 rule 定义 demo:

- seriesQuery: 'http_requests_total'
  resources:
    template: <<.Resource>>
  name:
    matches: 'http_requests_total‘as: "http_requests"
  metricsQuery: sum(rate(http_requests_total{<<.LabelMatchers>>}[5m])) by (<<.GroupBy>>)

rule 转换为 metricsNamer 的规定:

// pkg/naming/metric_namer.go
// NamersFromConfig produces a MetricNamer for each rule in the given config.
func NamersFromConfig(cfg []config.DiscoveryRule, mapper apimeta.RESTMapper) ([]MetricNamer, error) {namers := make([]MetricNamer, len(cfg))

    for i, rule := range cfg {resConv, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper)
        ...
        metricsQuery, err := NewMetricsQuery(rule.MetricsQuery, resConv)
        ...
        seriesMatchers := make([]*ReMatcher, len(rule.SeriesFilters))
        for i, filterRaw := range rule.SeriesFilters {matcher, err := NewReMatcher(filterRaw)
            ...
            seriesMatchers[i] = matcher
        }
        if rule.Name.Matches != "" {matcher, err := NewReMatcher(config.RegexFilter{Is: rule.Name.Matches})
            ...
            seriesMatchers = append(seriesMatchers, matcher)
        }

        var nameMatches *regexp.Regexp
        if rule.Name.Matches != "" {nameMatches, err = regexp.Compile(rule.Name.Matches)
            ...
        } else {
            // this will always succeed
            nameMatches = regexp.MustCompile(".*")
        }
        nameAs := rule.Name.As
        if nameAs == "" {
            // check if we have an obvious default
            subexpNames := nameMatches.SubexpNames()
            if len(subexpNames) == 1 {
                // no capture groups, use the whole thing
                nameAs = "$0"
            } else if len(subexpNames) == 2 {
                // one capture group, use that
                nameAs = "$1"
            } else {return nil, fmt.Errorf("must specify an'as'value for name matcher %q associated with series query %q", rule.Name.Matches, rule.SeriesQuery)
            }
        }
        namer := &metricNamer{                // 每个 rule 被解析为一个 metricNamer 构造
            seriesQuery:       prom.Selector(rule.SeriesQuery),
            metricsQuery:      metricsQuery,
            nameMatches:       nameMatches,
            nameAs:            nameAs,
            seriesMatchers:    seriesMatchers,
            ResourceConverter: resConv,
        }
        namers[i] = namer
    }
    return namers, nil
}

五.prometheusProvider 解决查问申请

对于 custom.metrics.k8s.io/v1beta1,由 prometheusProvider 解决其查问申请。

1.API 注册

API 注册通过 PrometheusAdapter 实例中的 AdapterBase 实现:

// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/builder.go
func (b *AdapterBase) Server() (*apiserver.CustomMetricsAdapterServer, error) {
    if b.server == nil {config, err := b.Config()
        ...
        server, err := config.Complete(b.informers).New(b.Name, b.cmProvider, b.emProvider)
        ..
        b.server = server
    }
    return b.server, nil
}

将 API 注册到了 custom.metrics.k8s.io:

// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/apiserver/apiserver.go
func (c completedConfig) New(name string, customMetricsProvider provider.CustomMetricsProvider, externalMetricsProvider provider.ExternalMetricsProvider) (*CustomMetricsAdapterServer, error) {genericServer, err := c.CompletedConfig.New(name, genericapiserver.NewEmptyDelegate()) // completion is done in Complete, no need for a second time
    ...
    s := &CustomMetricsAdapterServer{
        GenericAPIServer:        genericServer,
        customMetricsProvider:   customMetricsProvider,
        externalMetricsProvider: externalMetricsProvider,
    }
    if customMetricsProvider != nil {
        // 注册 API
        if err := s.InstallCustomMetricsAPI(); err != nil {return nil, err}
    }
    ...
    return s, nil
}

// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/apiserver/cmapis.go
func (s *CustomMetricsAdapterServer) InstallCustomMetricsAPI() error {
    // GroupName=custom.metrics.k8s.io
    groupInfo := genericapiserver.NewDefaultAPIGroupInfo(custom_metrics.GroupName, Scheme, runtime.NewParameterCodec(Scheme), Codecs)    // "custom.metrics.k8s.io"
    container := s.GenericAPIServer.Handler.GoRestfulContainer
    ...
    return nil
}

2. 申请解决

prometheusProvider 实现了 CustomMetricsProvider 接口,通过:

  • GetMetricByName()
  • GetMetricBySelector()
    查问 metrics 及其指标值。

以 GetMetricByName 为例:

// pkg/custom-provider/provider.go
func (p *prometheusProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
    // construct a query
    queryResults, err := p.buildQuery(info, name.Namespace, metricSelector, name.Name)
    ...
}

p.buildQuery 负责:

  • 结构查问表达式: 即 PromQL 表达式;
  • 应用 promClient,调用 prometheus 的 GET /api/v1/query 执行查问;
// pkg/custom-provider/provider.go
func (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, metricSelector labels.Selector, names ...string) (pmodel.Vector, error) {
    // 结构 PromQL
    query, found := p.QueryForMetric(info, namespace, metricSelector, names...)
    ...
    // 执行 promql 的查问
    queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), query)
    ...
    return *queryResults.Vector, nil
}

重点看一下如何结构 Promql 查问表达式:

  • 首先,使 request 中的 metricInfo,查问缓存中的 seriesInfo;

    • 这里的缓存,是由 cachingMetricLister 每隔 1m(metrics-relist-interval) 缓存的 series;
  • 而后,应用 seriesName、namespaces 等参数,应用 metricNamer,对 rule.metricsQuery 表达式进行解析,最终失去 PromQL 表达式;
// pkg/custom-provider/series_registry.go
func (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInfo, namespace string, metricSelector labels.Selector, resourceNames ...string) (prom.Selector, bool) {
    ...
    metricInfo, _, err := metricInfo.Normalized(r.mapper)
    ...
    // 查问缓存中的 series 信息
    info, infoFound := r.info[metricInfo]
    ...
    // 应用 seriesName/namespace 等参数,对 rule.metricsQuery 表达式进行解析,失去 promql 表达式
    query, err := info.namer.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, metricSelector, resourceNames...)
    ...
    return query, true
}
// pkg/naming/metric_namer.go
func (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, metricSelector labels.Selector, names ...string) (prom.Selector, error) {return n.metricsQuery.Build(series, resource, namespace, nil, metricSelector, names...)    // metricNamer 对 metricsQuery 表达式进行解析
}

正文完
 0