一.启动参数
prometheus-adapter的pod启动参数,重点关注:
metrics-relist-interval:
- series的查问距离,adapter在内存中保护了集群以后的series,并定期查问prometheus进行更新;
- prometheus-url: 查问prometheus的url;
containers: - args: - --cert-dir=/var/run/serving-cert - --config=/etc/adapter/config.yaml - --logtostderr=true - --metrics-relist-interval=1m - --prometheus-url=http://prometheus-k8s.monitoring.svc.cluster.local:9090/ - --secure-port=6443 image: bitnami.io/prometheus-adapter:v0.7.0....
prometheus-adapter的配置文件config.yaml,次要蕴含:
- rules: 用于custom.metrics.k8s.io,自定义指标;
- resourceRules: 用于metrics.k8s.io,kubectl top的cpu/mem指标;
rules:- seriesQuery: 'http_requests_total' resources: template: <<.Resource>> name: matches: 'http_requests_total‘ as: "http_requests" metricsQuery: sum(rate(http_requests_total{<<.LabelMatchers>>}[5m])) by (<<.GroupBy>>)"resourceRules": "cpu": "containerLabel": "container" "containerQuery": "sum(irate(container_cpu_usage_seconds_total{<<.LabelMatchers>>,container!=\"POD\",container!=\"\",pod!=\"\"}[5m])) by (<<.GroupBy>>)" "nodeQuery": "sum(1 - irate(node_cpu_seconds_total{mode=\"idle\"}[5m]) * on(namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{<<.LabelMatchers>>}) by (<<.GroupBy>>)" "resources": "overrides": "namespace": "resource": "namespace" "node": "resource": "node" "pod": "resource": "pod" "memory": "containerLabel": "container" "containerQuery": "sum(container_memory_working_set_bytes{<<.LabelMatchers>>,container!=\"POD\",container!=\"\",pod!=\"\"}) by (<<.GroupBy>>)" "nodeQuery": "sum(node_memory_MemTotal_bytes{job=\"node-exporter\",<<.LabelMatchers>>} - node_memory_MemAvailable_bytes{job=\"node-exporter\",<<.LabelMatchers>>}) by (<<.GroupBy>>)" "resources": "overrides": "instance": "resource": "node" "namespace": "resource": "namespace" "pod": "resource": "pod"
二.resourceRules与resourceProvider
resourceRule用于metrics.k8s.io,能够通过kubectl top nodes/pods,查看资源的cpu/mem,这部分性能与metrics-server是雷同的。
resourceRule的解析和查问,是通过resourceProvider实现:
代码入口:
// cmd/adapter/adapter.gofunc (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error { ... mapper, err := cmd.RESTMapper() ... // 创立resourceProvider provider, err := resprov.NewProvider(promClient, mapper, cmd.metricsConfig.ResourceRules) ... informers, err := cmd.Informers() ... server, err := cmd.Server() // 响应/apis/metrics.k8s.io/v1beta1的申请 if err := api.Install(provider, informers.Core().V1(), server.GenericAPIServer); err != nil { return err } return nil}
resourceProvider实现了PodMetricsGetter和NodeMetricsGetter,用于查问nodes和pods的指标:
// vendor/sigs.k8s.io/metrics-server/pkg/api/interface.gotype PodMetricsGetter interface { GetContainerMetrics(pods ...apitypes.NamespacedName) ([]TimeInfo, [][]metrics.ContainerMetrics, error)}type NodeMetricsGetter interface { GetNodeMetrics(nodes ...string) ([]TimeInfo, []corev1.ResourceList, error)}
resourceProvider查问nodes:
// pkg/resourceprovider/provider.gofunc (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]api.TimeInfo, []corev1.ResourceList, error) { now := pmodel.Now() // 用promClient去查问nodes的cpu/mem qRes := p.queryBoth(now, nodeResource, "", nodes...) resTimes := make([]api.TimeInfo, len(nodes)) resMetrics := make([]corev1.ResourceList, len(nodes)) // organize the results for i, nodeName := range nodes { rawCPUs, gotResult := qRes.cpu[nodeName] rawMems, gotResult := qRes.mem[nodeName] rawMem := rawMems[0] rawCPU := rawCPUs[0] // store the results resMetrics[i] = corev1.ResourceList{ corev1.ResourceCPU: *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI), corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI), } // use the earliest timestamp available (in order to be conservative // when determining if metrics are tainted by startup) if rawMem.Timestamp.Before(rawCPU.Timestamp) { resTimes[i] = api.TimeInfo{ Timestamp: rawMem.Timestamp.Time(), Window: p.window, } } else { resTimes[i] = api.TimeInfo{ Timestamp: rawCPU.Timestamp.Time(), Window: 1 * time.Minute, } } } return resTimes, resMetrics, nil}
查问节点的指标时:
- queryBoth(): 启动2个goroutine并行查问;
- 1个goroutine查问所有nodes的cpu指标;
- 1个goroutine查问所有nodes的mem指标;
// pkg/resourceprovider/provider.gofunc (p *resourceProvider) queryBoth(now pmodel.Time, resource schema.GroupResource, namespace string, names ...string) nsQueryResults { ... var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() cpuRes, cpuErr = p.runQuery(now, p.cpu, resource, namespace, names...) // 查CPU }() go func() { defer wg.Done() memRes, memErr = p.runQuery(now, p.mem, resource, namespace, names...) // 查Memory }() wg.Wait() ... return nsQueryResults{ namespace: namespace, cpu: cpuRes, mem: memRes, }}
三.rules与prometheusProvider
rules用于custom.metrics.k8s.io,罕用于自定义指标的HPA伸缩。
rules的解决和查问,通过prometheusProvider实现:
1.prometheusProvider的初始化
prometheusProvider初始化流程:
- 将rules解析转换为namers,这里会解析配置中的seriesQuery/metricsQuery等属性;
- 创立prometheusProvider实例;
- 启动1个goroutine查问series;
// cmd/adapter/adapter.gofunc (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) { ... // grab the mapper and dynamic client mapper, err := cmd.RESTMapper() dynClient, err := cmd.DynamicClient() ... // 将rules解析转为namers namers, err := naming.NamersFromConfig(cmd.metricsConfig.Rules, mapper) ... // 创立prometheusProvider cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge) // 启动cachingMetricsLister,查问series并放弃在memory runner.RunUntil(stopCh) return cmProvider, nil}
轮询series是通过cachingMetricsLister实现的:
- 轮询距离updateInterval=启动参数metrics-relist-interval=1m
// pkg/custom-provider/provider.gofunc (l *cachingMetricsLister) RunUntil(stopChan <-chan struct{}) { go wait.Until(func() { if err := l.updateMetrics(); err != nil { utilruntime.HandleError(err) } }, l.updateInterval, stopChan)}
2.cachingMetricsLister定期查问series
每个rule被转换为一个metricNamer构造,保留在l.namers中:
- 遍历每个metricsNamer,应用seriesQuery表达式,向prometheus发送GET /api/v1/series查问满足条件的series;
- 将查问后果利用FilterSeries,进行过滤操作;
- 最初将series后果缓存起来;
// pkg/custom-provider/provider.gofunc (l *cachingMetricsLister) updateMetrics() error { startTime := pmodel.Now().Add(-1 * l.maxAge) seriesCacheByQuery := make(map[prom.Selector][]prom.Series) ... selectors := make(map[prom.Selector]struct{}) selectorSeriesChan := make(chan selectorSeries, len(l.namers)) for _, namer := range l.namers { sel := namer.Selector() //sel=rule.seriesQuery if _, ok := selectors[sel]; ok { errs <- nil selectorSeriesChan <- selectorSeries{} continue } selectors[sel] = struct{}{} go func() { // 调用promethues的GET /api/v1/series查问满足rule.seriesQuery下的series series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel) if err != nil { errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err) return } errs <- nil selectorSeriesChan <- selectorSeries{ selector: sel, series: series, } }() } // iterate through, blocking until we've got all results for range l.namers { if err := <-errs; err != nil { return fmt.Errorf("unable to update list of all metrics: %v", err) } if ss := <-selectorSeriesChan; ss.series != nil { seriesCacheByQuery[ss.selector] = ss.series } } close(errs) newSeries := make([][]prom.Series, len(l.namers)) for i, namer := range l.namers { series, cached := seriesCacheByQuery[namer.Selector()] ... //应用SeriesFilter配置 newSeries[i] = namer.FilterSeries(series) } ... // 将series缓存起来 return l.SetSeries(newSeries, l.namers)}
比方对于如下的配置:
- seriesQuery: '{__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}' seriesFilters: - isNot: ^container_.*_seconds_total$ resources: overrides: namespace: resource: namespace pod: resource: pod name: matches: ^container_(.*)_total$ as: "" metricsQuery: sum(rate(<<.Series>>{<<.LabelMatchers>>,container!="POD"}[5m])) by (<<.GroupBy>>)
则向prometheus查问series的url为:
# curl -g 'http://192.168.101:9090/api/v1/series?' --data-urlencode 'match[]={__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'
四.rules的解析
每个rule被转换为一个metricsNamer,由metricsNamer负责指标的解析和解决。
典型的rule定义demo:
- seriesQuery: 'http_requests_total' resources: template: <<.Resource>> name: matches: 'http_requests_total‘ as: "http_requests" metricsQuery: sum(rate(http_requests_total{<<.LabelMatchers>>}[5m])) by (<<.GroupBy>>)
rule转换为metricsNamer的规定:
// pkg/naming/metric_namer.go// NamersFromConfig produces a MetricNamer for each rule in the given config.func NamersFromConfig(cfg []config.DiscoveryRule, mapper apimeta.RESTMapper) ([]MetricNamer, error) { namers := make([]MetricNamer, len(cfg)) for i, rule := range cfg { resConv, err := NewResourceConverter(rule.Resources.Template, rule.Resources.Overrides, mapper) ... metricsQuery, err := NewMetricsQuery(rule.MetricsQuery, resConv) ... seriesMatchers := make([]*ReMatcher, len(rule.SeriesFilters)) for i, filterRaw := range rule.SeriesFilters { matcher, err := NewReMatcher(filterRaw) ... seriesMatchers[i] = matcher } if rule.Name.Matches != "" { matcher, err := NewReMatcher(config.RegexFilter{Is: rule.Name.Matches}) ... seriesMatchers = append(seriesMatchers, matcher) } var nameMatches *regexp.Regexp if rule.Name.Matches != "" { nameMatches, err = regexp.Compile(rule.Name.Matches) ... } else { // this will always succeed nameMatches = regexp.MustCompile(".*") } nameAs := rule.Name.As if nameAs == "" { // check if we have an obvious default subexpNames := nameMatches.SubexpNames() if len(subexpNames) == 1 { // no capture groups, use the whole thing nameAs = "$0" } else if len(subexpNames) == 2 { // one capture group, use that nameAs = "$1" } else { return nil, fmt.Errorf("must specify an 'as' value for name matcher %q associated with series query %q", rule.Name.Matches, rule.SeriesQuery) } } namer := &metricNamer{ // 每个rule被解析为一个metricNamer构造 seriesQuery: prom.Selector(rule.SeriesQuery), metricsQuery: metricsQuery, nameMatches: nameMatches, nameAs: nameAs, seriesMatchers: seriesMatchers, ResourceConverter: resConv, } namers[i] = namer } return namers, nil}
五.prometheusProvider解决查问申请
对于custom.metrics.k8s.io/v1beta1,由prometheusProvider解决其查问申请。
1.API注册
API注册通过PrometheusAdapter实例中的AdapterBase实现:
// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/builder.gofunc (b *AdapterBase) Server() (*apiserver.CustomMetricsAdapterServer, error) { if b.server == nil { config, err := b.Config() ... server, err := config.Complete(b.informers).New(b.Name, b.cmProvider, b.emProvider) .. b.server = server } return b.server, nil}
将API注册到了custom.metrics.k8s.io:
// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/apiserver/apiserver.gofunc (c completedConfig) New(name string, customMetricsProvider provider.CustomMetricsProvider, externalMetricsProvider provider.ExternalMetricsProvider) (*CustomMetricsAdapterServer, error) { genericServer, err := c.CompletedConfig.New(name, genericapiserver.NewEmptyDelegate()) // completion is done in Complete, no need for a second time ... s := &CustomMetricsAdapterServer{ GenericAPIServer: genericServer, customMetricsProvider: customMetricsProvider, externalMetricsProvider: externalMetricsProvider, } if customMetricsProvider != nil { // 注册API if err := s.InstallCustomMetricsAPI(); err != nil { return nil, err } } ... return s, nil}// vendor/github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/apiserver/cmapis.gofunc (s *CustomMetricsAdapterServer) InstallCustomMetricsAPI() error { // GroupName=custom.metrics.k8s.io groupInfo := genericapiserver.NewDefaultAPIGroupInfo(custom_metrics.GroupName, Scheme, runtime.NewParameterCodec(Scheme), Codecs) // "custom.metrics.k8s.io" container := s.GenericAPIServer.Handler.GoRestfulContainer ... return nil}
2.申请解决
prometheusProvider实现了CustomMetricsProvider接口,通过:
- GetMetricByName()
- GetMetricBySelector()
查问metrics及其指标值。
以GetMetricByName为例:
// pkg/custom-provider/provider.gofunc (p *prometheusProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) { // construct a query queryResults, err := p.buildQuery(info, name.Namespace, metricSelector, name.Name) ...}
p.buildQuery负责:
- 结构查问表达式: 即PromQL表达式;
- 应用promClient,调用prometheus的GET /api/v1/query执行查问;
// pkg/custom-provider/provider.gofunc (p *prometheusProvider) buildQuery(info provider.CustomMetricInfo, namespace string, metricSelector labels.Selector, names ...string) (pmodel.Vector, error) { // 结构PromQL query, found := p.QueryForMetric(info, namespace, metricSelector, names...) ... // 执行promql的查问 queryResults, err := p.promClient.Query(context.TODO(), pmodel.Now(), query) ... return *queryResults.Vector, nil}
重点看一下如何结构Promql查问表达式:
首先,使request中的metricInfo,查问缓存中的seriesInfo;
- 这里的缓存,是由cachingMetricLister每隔1m(metrics-relist-interval)缓存的series;
- 而后,应用seriesName、namespaces等参数,应用metricNamer,对rule.metricsQuery表达式进行解析,最终失去PromQL表达式;
// pkg/custom-provider/series_registry.gofunc (r *basicSeriesRegistry) QueryForMetric(metricInfo provider.CustomMetricInfo, namespace string, metricSelector labels.Selector, resourceNames ...string) (prom.Selector, bool) { ... metricInfo, _, err := metricInfo.Normalized(r.mapper) ... // 查问缓存中的series信息 info, infoFound := r.info[metricInfo] ... // 应用seriesName/namespace等参数,对rule.metricsQuery表达式进行解析,失去promql表达式 query, err := info.namer.QueryForSeries(info.seriesName, metricInfo.GroupResource, namespace, metricSelector, resourceNames...) ... return query, true}
// pkg/naming/metric_namer.gofunc (n *metricNamer) QueryForSeries(series string, resource schema.GroupResource, namespace string, metricSelector labels.Selector, names ...string) (prom.Selector, error) { return n.metricsQuery.Build(series, resource, namespace, nil, metricSelector, names...) // metricNamer对metricsQuery表达式进行解析}