关于prometheus:prometheusgosdk不活跃指标清理问题

3次阅读

共计 7119 个字符,预计需要花费 18 分钟才能阅读完成。

k8s 教程阐明

  • k8s 底层原理和源码解说之精髓篇
  • k8s 底层原理和源码解说之进阶篇
  • k8s 纯源码解读课程,助力你变成 k8s 专家
  • k8s-operator 和 crd 实战开发 助你成为 k8s 专家
  • tekton 全流水线实战和 pipeline 运行原理源码解读

prometheus 全组件的教程

  • 01_prometheus 全组件配置应用、底层原理解析、高可用实战
  • 02_prometheus-thanos 应用和源码解读
  • 03_kube-prometheus 和 prometheus-operator 实战和原理介绍
  • 04_prometheus 源码解说和二次开发

go 语言课程

  • golang 根底课程
  • golang 运维平台实战,服务树, 日志监控,工作执行,分布式探测

问题形容

  • 比方对于 1 个构建的流水线指标 pipeline_step_duration,会设置 1 个标签是 step
  • 每次流水线蕴含的 step 可能不雷同

    # 比方 流水线 a 第 1 次的 step 蕴含 clone 和 build
    pipeline_step_duration{step="clone"}
    pipeline_step_duration{step="build"}
    # 第 2 次 的 step 蕴含 build 和 push
    pipeline_step_duration{step="build"}
    pipeline_step_duration{step="push"}
  • 那么问题来了:第 2 次的 pipeline_step_duration{step=”build”} 要不要删掉?
  • 其实在这个场景外面是要删掉的,因为曾经不蕴含 clone 了

问题能够总结成:之前采集的标签曾经不存在了,数据要及时清理掉 – 问题是如何清理?

探讨这个问题前做个试验:比照两种常见的自打点形式对于不沉闷指标的删除解决

试验伎俩:prometheus client-go sdk

  • 启动 1 个 rand_metrics
  • 蕴含 rand_key,每次 key 都不一样,测试申请 metrics 接口的后果

    var (
      T1 = prometheus.NewGaugeVec(prometheus.GaugeOpts{
          Name: "rand_metrics",
          Help: "rand_metrics",
      }, []string{"rand_key"})
    )

实现形式 01 业务代码中间接实现打点:不实现 Collector 接口

  • 代码如下,模仿极其状况,每 0.1 秒生成随机 key 和 value 设置 metrics

    package main
    
    import (
      "fmt"
      "github.com/prometheus/client_golang/prometheus"
      "github.com/prometheus/client_golang/prometheus/promhttp"
      "math/rand"
      "net/http"
      "time"
    )
    
    var (
      T1 = prometheus.NewGaugeVec(prometheus.GaugeOpts{
          Name: "rand_metrics",
          Help: "rand_metrics",
      }, []string{"rand_key"})
    )
    
    func init() {prometheus.DefaultRegisterer.MustRegister(T1)
    }
    func RandStr(length int) string {
      str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
      bytes := []byte(str)
      result := []byte{}
      rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100)))
      for i := 0; i < length; i++ {result = append(result, bytes[rand.Intn(len(bytes))])
      }
      return string(result)
    }
    
    func push() {
      for {randKey := RandStr(10)
          rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100)))
          T1.With(prometheus.Labels{"rand_key": randKey}).Set(rand.Float64())
          time.Sleep(100 * time.Millisecond)
    
      }
    }
    
    func main() {go push()
      addr := ":8081"
      http.Handle("/metrics", promhttp.Handler())
      srv := http.Server{Addr: addr}
      err := srv.ListenAndServe()
      fmt.Println(err)
    }
    
  • 启动服务之后申请 :8081/metrics 接口发现 过期的 rand_key 还会保留,不会清理

    # HELP rand_metrics rand_metrics
    # TYPE rand_metrics gauge
    rand_metrics{rand_key="00DsYGkd6x"} 0.02229735291486387
    rand_metrics{rand_key="017UBn8S2T"} 0.7192676436571013
    rand_metrics{rand_key="01Ar4ca3i1"} 0.24131184816722678
    rand_metrics{rand_key="02Ay5kqsDH"} 0.11462075954697458
    rand_metrics{rand_key="02JZNZvMng"} 0.9874169937518104
    rand_metrics{rand_key="02arsU5qNT"} 0.8552103362564516
    rand_metrics{rand_key="02nMy3thfh"} 0.039571420204118024
    rand_metrics{rand_key="032cyHjRhP"} 0.14576779289125183
    rand_metrics{rand_key="03DPDckbfs"} 0.6106184905871918
    rand_metrics{rand_key="03lbtLwFUO"} 0.936911945555629
    rand_metrics{rand_key="03wqYiguP2"} 0.20167059771916385
    rand_metrics{rand_key="04uG2s3X0C"} 0.3324314184499403

实现形式 02 实现 Collector 接口

  • 实现 prometheus sdk 中的 collect 接口:也就是给 1 个构造体 绑定 Collect 和 Describe 办法
  • 在 Collect 中 实现设置标签和赋值办法
  • 在 Describe 中 传入 desc

    package main
    
    import (
      "fmt"
      "github.com/prometheus/client_golang/prometheus"
      "github.com/prometheus/client_golang/prometheus/promhttp"
      "log"
      "math/rand"
      "net/http"
      "time"
    )
    
    var (
      T1 = prometheus.NewDesc(
          "rand_metrics",
          "rand_metrics",
          []string{"rand_key"},
          nil)
    )
    
    type MyCollector struct {Name string}
    
    func (mc *MyCollector) Collect(ch chan<- prometheus.Metric) {log.Printf("MyCollector.collect.called")
      ch <- prometheus.MustNewConstMetric(T1,
          prometheus.GaugeValue, rand.Float64(), RandStr(10))
    }
    func (mc *MyCollector) Describe(ch chan<- *prometheus.Desc) {log.Printf("MyCollector.Describe.called")
      ch <- T1
    }
    
    func RandStr(length int) string {
      str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
      bytes := []byte(str)
      result := []byte{}
      rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100)))
      for i := 0; i < length; i++ {result = append(result, bytes[rand.Intn(len(bytes))])
      }
      return string(result)
    }
    
    func main() {//go push()
      mc := &MyCollector{Name: "abc"}
      prometheus.MustRegister(mc)
      addr := ":8082"
      http.Handle("/metrics", promhttp.Handler())
      srv := http.Server{Addr: addr}
      err := srv.ListenAndServe()
      fmt.Println(err)
    }
    
  • metrics 成果测试:申请:8082/metrics 接口发现 rand_metrics 总是只有 1 个值

    # HELP rand_metrics rand_metrics
    # TYPE rand_metrics gauge
    rand_metrics{rand_key="e1JU185kE4"} 0.12268247569586412
  • 并且查看日志发现,每次咱们申请 /metrics 接口时 MyCollector.collect.called 会调用

    2022/06/21 11:46:40 MyCollector.Describe.called
    2022/06/21 11:46:44 MyCollector.collect.called
    2022/06/21 11:46:47 MyCollector.collect.called
    2022/06/21 11:46:47 MyCollector.collect.called
    2022/06/21 11:46:47 MyCollector.collect.called
    2022/06/21 11:46:47 MyCollector.collect.called
    

景象总结

  • 实现 Collector 接口的形式 能满足过期指标清理的需要,并且打点函数是随同 /metrics 接口申请触发的
  • 不实现 Collector 接口的形式 不能满足过期指标清理的需要,指标会随着业务打点沉积

源码解读相干起因

01 两种形式都是从 web 申请获取的指标,所以得先从 /metrics 接口看

  • 入口就是 http.Handle(“/metrics”, promhttp.Handler())
  • 追踪后发现是 D:\go_path\pkg\mod\github.com\prometheus\client_golang@v1.12.2\prometheus\promhttp\http.go
  • 次要逻辑为:

    • 调用 reg 的 Gather 办法 获取 MetricFamily 数组
    • 而后编码,写到 http 的 resp 中
  • 伪代码如下

    func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {mfs, err := reg.Gather()
      for _, mf := range mfs {if handleError(enc.Encode(mf)) {return}
    }
    }

reg.Gather:遍历 reg 中已注册的 collector 调用他们的 collect 办法

  • 先调用他们的 collect 办法获取 metrics 后果

      collectWorker := func() {
          for {
              select {
              case collector := <-checkedCollectors:
                  collector.Collect(checkedMetricChan)
              case collector := <-uncheckedCollectors:
                  collector.Collect(uncheckedMetricChan)
              default:
                  return
              }
              wg.Done()}
      }
    
  • 而后生产 chan 中的数据,解决 metrics

      cmc := checkedMetricChan
      umc := uncheckedMetricChan
    
      for {
          select {
          case metric, ok := <-cmc:
              if !ok {
                  cmc = nil
                  break
              }
              errs.Append(processMetric(
                  metric, metricFamiliesByName,
                  metricHashes,
                  registeredDescIDs,
              ))
          case metric, ok := <-umc:
              if !ok {
                  umc = nil
                  break
              }
              errs.Append(processMetric(
                  metric, metricFamiliesByName,
                  metricHashes,
                  nil,
              ))

processMetric 解决办法统一,所以形式 12 的不同就在 collect 办法

02 不实现 Collector 接口的形式的 collect 办法追踪

  • 因为咱们往 reg 中注册的是 prometheus.NewGaugeVec 生成的 *GaugeVec 指针
  • 所以执行的是 *GaugeVec 的 collect 办法
  • 而 GaugeVec 又继承了 MetricVec

    type GaugeVec struct {*MetricVec}
  • 而 MetricVec 中有个 metricMap 对象,所以最终是 metricMap 的 collect 办法

    type MetricVec struct {
      *metricMap
    
      curry []curriedLabelValue
    
      // hashAdd and hashAddByte can be replaced for testing collision handling.
      hashAdd     func(h uint64, s string) uint64
      hashAddByte func(h uint64, b byte) uint64
    }

    察看 metricMap 构造体和办法

  • metricMap 有个 metrics 的 map
  • 而它的 Collect 办法就是遍历这个 map 内层的所有 metricWithLabelValues 接口,塞入 ch 中解决

    // metricVecs.
    type metricMap struct {
      mtx       sync.RWMutex // Protects metrics.
      metrics   map[uint64][]metricWithLabelValues
      desc      *Desc
      newMetric func(labelValues ...string) Metric
    }
    
    // Describe implements Collector. It will send exactly one Desc to the provided
    // channel.
    func (m *metricMap) Describe(ch chan<- *Desc) {ch <- m.desc}
    
    // Collect implements Collector.
    func (m *metricMap) Collect(ch chan<- Metric) {m.mtx.RLock()
      defer m.mtx.RUnlock()
    
      for _, metrics := range m.metrics {
          for _, metric := range metrics {ch <- metric.metric}
      }
    }
    
  • 看到这里就很清晰了,只有 metrics map 中的元素不被显示的删除,那么数据就会始终存在
  • 有一些 exporter 是采纳这种显式删除的流派的,比方 event_expoter

03 实现 Collector 接口的形式的 collect 办法追踪

  • 因为咱们的 collector 实现了 collect 办法
  • 所以间接申请 Gather 会调用咱们的 collect 办法 获取后果

    func (mc *MyCollector) Collect(ch chan<- prometheus.Metric) {log.Printf("MyCollector.collect.called")
      ch <- prometheus.MustNewConstMetric(T1,
          prometheus.GaugeValue, rand.Float64(), RandStr(10))
    }
  • 所以它不会往 metricsMap 中写入,所以只有 1 个值

总结

  • 两种打点形式的 collect 办法是不一样的
  • 其实支流的 exporter 的成果也是不沉闷的指标会删掉:

    • 比方 process-exporter 监控过程,过程不存在指标曲线就会隐没:从 grafana 图上看就是断点:不然采集一次会始终存在
    • 比方 node-exporter 监控挂载点等,当挂载点隐没相干曲线也会隐没
  • 因为支流的 exporter 采纳都是 实现 collect 办法的形式:
  • 还有 k8s 中 kube-state-metrics 采纳的是 metrics-store 作为 informer 的 store 去 watch etcd 的 delete 事件:pod 删除的时候相干的曲线也会隐没
  • 或者能够显示的调用 delete 办法,将过期的 series 从 map 中删掉,不过须要 hold 中上一次的和这一次的 diff
  • 总之两个流派:map 显式删除 VS 实现 collector 接口
正文完
 0