一.启动参数

prometheus-adapter的pod启动参数,重点关注:

  • metrics-relist-interval:

    • series的查问距离,adapter在内存中保护了集群以后的series,并定期查问prometheus进行更新;
  • prometheus-url: 查问prometheus的url;
containers:  - args:    - --cert-dir=/var/run/serving-cert    - --config=/etc/adapter/config.yaml    - --logtostderr=true    - --metrics-relist-interval=1m    - --prometheus-url=http://prometheus-k8s.monitoring.svc.cluster.local:9090/    - --secure-port=6443    image: bitnami.io/prometheus-adapter:v0.7.0....

prometheus-adapter的配置文件config.yaml,次要蕴含:

  • rules: 用于custom.metrics.k8s.io,自定义指标;
  • resourceRules: 用于metrics.k8s.io,kubectl top的cpu/mem指标;
rules:- seriesQuery: 'http_requests_total'  resources:    template: <<.Resource>>  name:    matches: 'http_requests_total‘    as: "http_requests"  metricsQuery: sum(rate(http_requests_total{<<.LabelMatchers>>}[5m])) by (<<.GroupBy>>)"resourceRules":  "cpu":    "containerLabel": "container"    "containerQuery": "sum(irate(container_cpu_usage_seconds_total{<<.LabelMatchers>>,container!=\"POD\",container!=\"\",pod!=\"\"}[5m])) by (<<.GroupBy>>)"    "nodeQuery": "sum(1 - irate(node_cpu_seconds_total{mode=\"idle\"}[5m]) * on(namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{<<.LabelMatchers>>}) by (<<.GroupBy>>)"    "resources":      "overrides":        "namespace":          "resource": "namespace"        "node":          "resource": "node"        "pod":          "resource": "pod"  "memory":    "containerLabel": "container"    "containerQuery": "sum(container_memory_working_set_bytes{<<.LabelMatchers>>,container!=\"POD\",container!=\"\",pod!=\"\"}) by (<<.GroupBy>>)"    "nodeQuery": "sum(node_memory_MemTotal_bytes{job=\"node-exporter\",<<.LabelMatchers>>} - node_memory_MemAvailable_bytes{job=\"node-exporter\",<<.LabelMatchers>>}) by (<<.GroupBy>>)"    "resources":      "overrides":        "instance":          "resource": "node"        "namespace":          "resource": "namespace"        "pod":          "resource": "pod"

二.resourceRules与resourceProvider

resourceRule用于metrics.k8s.io,能够通过kubectl top nodes/pods,查看资源的cpu/mem,这部分性能与metrics-server是雷同的。

resourceRule的解析和查问,是通过resourceProvider实现:

代码入口:

// cmd/adapter/adapter.gofunc (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error {    ...    mapper, err := cmd.RESTMapper()    ...    // 创立resourceProvider    provider, err := resprov.NewProvider(promClient, mapper, cmd.metricsConfig.ResourceRules)    ...    informers, err := cmd.Informers()    ...    server, err := cmd.Server()    // 响应/apis/metrics.k8s.io/v1beta1的申请    if err := api.Install(provider, informers.Core().V1(), server.GenericAPIServer); err != nil {        return err    }    return nil}

resourceProvider实现了PodMetricsGetter和NodeMetricsGetter,用于查问nodes和pods的指标:

// vendor/sigs.k8s.io/metrics-server/pkg/api/interface.gotype PodMetricsGetter interface {    GetContainerMetrics(pods ...apitypes.NamespacedName) ([]TimeInfo, [][]metrics.ContainerMetrics, error)}type NodeMetricsGetter interface {    GetNodeMetrics(nodes ...string) ([]TimeInfo, []corev1.ResourceList, error)}

resourceProvider查问nodes:

// pkg/resourceprovider/provider.gofunc (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]api.TimeInfo, []corev1.ResourceList, error) {    now := pmodel.Now()    // 用promClient去查问nodes的cpu/mem    qRes := p.queryBoth(now, nodeResource, "", nodes...)        resTimes := make([]api.TimeInfo, len(nodes))    resMetrics := make([]corev1.ResourceList, len(nodes))    // organize the results    for i, nodeName := range nodes {        rawCPUs, gotResult := qRes.cpu[nodeName]        rawMems, gotResult := qRes.mem[nodeName]                rawMem := rawMems[0]        rawCPU := rawCPUs[0]        // store the results        resMetrics[i] = corev1.ResourceList{            corev1.ResourceCPU:    *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI),            corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI),        }        // use the earliest timestamp available (in order to be conservative        // when determining if metrics are tainted by startup)        if rawMem.Timestamp.Before(rawCPU.Timestamp) {            resTimes[i] = api.TimeInfo{                Timestamp: rawMem.Timestamp.Time(),                Window:    p.window,            }        } else {            resTimes[i] = api.TimeInfo{                Timestamp: rawCPU.Timestamp.Time(),                Window:    1 * time.Minute,            }        }    }    return resTimes, resMetrics, nil}

查问节点的指标时:

  • queryBoth(): 启动2个goroutine并行查问;
  • 1个goroutine查问所有nodes的cpu指标;
  • 1个goroutine查问所有nodes的mem指标;
// pkg/resourceprovider/provider.gofunc (p *resourceProvider) queryBoth(now pmodel.Time, resource schema.GroupResource, namespace string, names ...string) nsQueryResults {    ...    var wg sync.WaitGroup    wg.Add(2)    go func() {        defer wg.Done()        cpuRes, cpuErr = p.runQuery(now, p.cpu, resource, namespace, names...)    // 查CPU    }()    go func() {        defer wg.Done()        memRes, memErr = p.runQuery(now, p.mem, resource, namespace, names...)    // 查Memory    }()    wg.Wait()    ...    return nsQueryResults{        namespace: namespace,        cpu:       cpuRes,        mem:       memRes,    }}

三.rules与prometheusProvider

rules用于custom.metrics.k8s.io,罕用于自定义指标的HPA伸缩。

rules的解决和查问,通过prometheusProvider实现:

1.prometheusProvider的初始化

prometheusProvider初始化流程:

  • 将rules解析转换为namers,这里会解析配置中的seriesQuery/metricsQuery等属性;
  • 创立prometheusProvider实例;
  • 启动1个goroutine查问series;
// cmd/adapter/adapter.gofunc (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) {    ...    // grab the mapper and dynamic client    mapper, err := cmd.RESTMapper()    dynClient, err := cmd.DynamicClient()    ...    // 将rules解析转为namers    namers, err := naming.NamersFromConfig(cmd.metricsConfig.Rules, mapper)    ...    // 创立prometheusProvider    cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge)    // 启动cachingMetricsLister,查问series并放弃在memory    runner.RunUntil(stopCh)    return cmProvider, nil}

轮询series是通过cachingMetricsLister实现的:

  • 轮询距离updateInterval=启动参数metrics-relist-interval=1m
// pkg/custom-provider/provider.gofunc (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) {    go wait.Until(func() {        if err := l.updateMetrics(); err != nil {            utilruntime.HandleError(err)        }    }, l.updateInterval, stopChan)}

2.cachingMetricsLister定期查问series

每个rule被转换为一个metricNamer构造,保留在l.namers中:

  • 遍历每个metricsNamer,应用seriesQuery表达式,向prometheus发送GET /api/v1/series查问满足条件的series;
  • 将查问后果利用FilterSeries,进行过滤操作;
  • 最初将series后果缓存起来;
// pkg/custom-provider/provider.gofunc (l *cachingMetricsLister) updateMetrics() error {    startTime := pmodel.Now().Add(-1 * l.maxAge)    seriesCacheByQuery := make(map[prom.Selector][]prom.Series)    ...    selectors := make(map[prom.Selector]struct{})    selectorSeriesChan := make(chan selectorSeries, len(l.namers))    for _, namer := range l.namers {        sel := namer.Selector()        //sel=rule.seriesQuery        if _, ok := selectors[sel]; ok {            errs <- nil            selectorSeriesChan <- selectorSeries{}            continue        }        selectors[sel] = struct{}{}        go func() {            // 调用promethues的GET /api/v1/series查问满足rule.seriesQuery下的series            series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)            if err != nil {                errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)                return            }            errs <- nil            selectorSeriesChan <- selectorSeries{                selector: sel,                series:   series,            }        }()    }    // iterate through, blocking until we've got all results    for range l.namers {        if err := <-errs; err != nil {            return fmt.Errorf("unable to update list of all metrics: %v", err)        }        if ss := <-selectorSeriesChan; ss.series != nil {            seriesCacheByQuery[ss.selector] = ss.series        }    }    close(errs)    newSeries := make([][]prom.Series, len(l.namers))    for i, namer := range l.namers {        series, cached := seriesCacheByQuery[namer.Selector()]        ...        //应用SeriesFilter配置        newSeries[i] = namer.FilterSeries(series)    }    ...    // 将series缓存起来    return l.SetSeries(newSeries, l.namers)}

比方对于如下的配置:

    - seriesQuery: '{__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'      seriesFilters:      - isNot: ^container_.*_seconds_total$      resources:        overrides:          namespace:            resource: namespace          pod:            resource: pod      name:        matches: ^container_(.*)_total$        as: ""      metricsQuery: sum(rate(<<.Series>>{<<.LabelMatchers>>,container!="POD"}[5m]))        by (<<.GroupBy>>)

则向prometheus查问series的url为:

# curl -g 'http://192.168.101:9090/api/v1/series?'  --data-urlencode 'match[]={__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'

四.rules的解析

每个rule被转换为一个metricsNamer,由metricsNamer负责指标的解析和解决。

典型的rule定义demo:

- seriesQuery: 'http_requests_total'  resources:    template: <<.Resource>>  name:    matches: 'http_requests_total‘    as: "http_requests"  metricsQuery: sum(rate(http_requests_total{<<.LabelMatchers>>}[5m])) by (<<.GroupBy>>)

rule转换为metricsNamer的规定:

// pkg/naming/metric_namer.go// NamersFromConfig produces a MetricNamer for each rule in the given config.func NamersFromConfig(cfg []config.DiscoveryRule, mapper apimeta.RESTMapper) ([]MetricNamer, error) {    namers := make([]MetricNamer, len(cfg))    for i, rule := range cfg {        resConv, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper)        ...        metricsQuery, err := NewMetricsQuery(rule.MetricsQuery, resConv)        ...        seriesMatchers := make([]*ReMatcher, len(rule.SeriesFilters))        for i, filterRaw := range rule.SeriesFilters {            matcher, err := NewReMatcher(filterRaw)            ...            seriesMatchers[i] = matcher        }        if rule.Name.Matches != "" {            matcher, err := NewReMatcher(config.RegexFilter{Is: rule.Name.Matches})            ...            seriesMatchers = append(seriesMatchers, matcher)        }        var nameMatches *regexp.Regexp        if rule.Name.Matches != "" {            nameMatches, err = regexp.Compile(rule.Name.Matches)            ...        } else {            // this will always succeed            nameMatches = regexp.MustCompile(".*")        }        nameAs := rule.Name.As        if nameAs == "" {            // check if we have an obvious default            subexpNames := nameMatches.SubexpNames()            if len(subexpNames) == 1 {                // no capture groups, use the whole thing                nameAs = "$0"            } else if len(subexpNames) == 2 {                // one capture group, use that                nameAs = "$1"            } else {                return nil, fmt.Errorf("must specify an 'as' value for name matcher %q associated with series query %q", rule.Name.Matches, rule.SeriesQuery)            }        }        namer := &metricNamer{                // 每个rule被解析为一个metricNamer构造            seriesQuery:       prom.Selector(rule.SeriesQuery),            metricsQuery:      metricsQuery,            nameMatches:       nameMatches,            nameAs:            nameAs,            seriesMatchers:    seriesMatchers,            ResourceConverter: resConv,        }        namers[i] = namer    }    return namers, nil}

五.prometheusProvider解决查问申请

对于custom.metrics.k8s.io/v1beta1,由prometheusProvider解决其查问申请。

1.API注册

API注册通过PrometheusAdapter实例中的AdapterBase实现:

// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/builder.gofunc (b *AdapterBase) Server() (*apiserver.CustomMetricsAdapterServer, error) {    if b.server == nil {        config, err := b.Config()        ...        server, err := config.Complete(b.informers).New(b.Name, b.cmProvider, b.emProvider)        ..        b.server = server    }    return b.server, nil}

将API注册到了custom.metrics.k8s.io:

// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/apiserver/apiserver.gofunc (c completedConfig) New(name string, customMetricsProvider provider.CustomMetricsProvider, externalMetricsProvider provider.ExternalMetricsProvider) (*CustomMetricsAdapterServer, error) {    genericServer, err := c.CompletedConfig.New(name, genericapiserver.NewEmptyDelegate()) // completion is done in Complete, no need for a second time    ...    s := &CustomMetricsAdapterServer{        GenericAPIServer:        genericServer,        customMetricsProvider:   customMetricsProvider,        externalMetricsProvider: externalMetricsProvider,    }    if customMetricsProvider != nil {        // 注册API        if err := s.InstallCustomMetricsAPI(); err != nil {            return nil, err        }    }    ...    return s, nil}// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/apiserver/cmapis.gofunc (s *CustomMetricsAdapterServer) InstallCustomMetricsAPI() error {    // GroupName=custom.metrics.k8s.io    groupInfo := genericapiserver.NewDefaultAPIGroupInfo(custom_metrics.GroupName, Scheme, runtime.NewParameterCodec(Scheme), Codecs)    // "custom.metrics.k8s.io"    container := s.GenericAPIServer.Handler.GoRestfulContainer    ...    return nil}

2.申请解决

prometheusProvider实现了CustomMetricsProvider接口,通过:

  • GetMetricByName()
  • GetMetricBySelector()
    查问metrics及其指标值。

以GetMetricByName为例:

// pkg/custom-provider/provider.gofunc (p *prometheusProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {    // construct a query    queryResults, err := p.buildQuery(info, name.Namespace, metricSelector, name.Name)    ...}

p.buildQuery负责:

  • 结构查问表达式: 即PromQL表达式;
  • 应用promClient,调用prometheus的GET /api/v1/query执行查问;
// pkg/custom-provider/provider.gofunc (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, metricSelector labels.Selector, names ...string) (pmodel.Vector, error) {    // 结构PromQL    query, found := p.QueryForMetric(info, namespace, metricSelector, names...)    ...    // 执行promql的查问    queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), query)    ...    return *queryResults.Vector, nil}

重点看一下如何结构Promql查问表达式:

  • 首先,使request中的metricInfo,查问缓存中的seriesInfo;

    • 这里的缓存,是由cachingMetricLister每隔1m(metrics-relist-interval)缓存的series;
  • 而后,应用seriesName、namespaces等参数,应用metricNamer,对rule.metricsQuery表达式进行解析,最终失去PromQL表达式;
// pkg/custom-provider/series_registry.gofunc (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInfo, namespace string, metricSelector labels.Selector, resourceNames ...string) (prom.Selector, bool) {    ...    metricInfo, _, err := metricInfo.Normalized(r.mapper)    ...    // 查问缓存中的series信息    info, infoFound := r.info[metricInfo]    ...    // 应用seriesName/namespace等参数,对rule.metricsQuery表达式进行解析,失去promql表达式    query, err := info.namer.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, metricSelector, resourceNames...)    ...    return query, true}
// pkg/naming/metric_namer.gofunc (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, metricSelector labels.Selector, names ...string) (prom.Selector, error) {    return n.metricsQuery.Build(series, resource, namespace, nil, metricSelector, names...)    // metricNamer对metricsQuery表达式进行解析}