k8s教程阐明

  • k8s底层原理和源码解说之精髓篇
  • k8s底层原理和源码解说之进阶篇
  • k8s纯源码解读课程,助力你变成k8s专家
  • k8s-operator和crd实战开发 助你成为k8s专家
  • tekton全流水线实战和pipeline运行原理源码解读

prometheus全组件的教程

  • 01_prometheus全组件配置应用、底层原理解析、高可用实战
  • 02_prometheus-thanos应用和源码解读
  • 03_kube-prometheus和prometheus-operator实战和原理介绍
  • 04_prometheus源码解说和二次开发

go语言课程

  • golang根底课程
  • golang运维平台实战,服务树,日志监控,工作执行,分布式探测

问题形容

  • 比方对于1个构建的流水线指标 pipeline_step_duration ,会设置1个标签是step
  • 每次流水线蕴含的step可能不雷同

    # 比方 流水线a 第1次的step 蕴含clone 和buildpipeline_step_duration{step="clone"}pipeline_step_duration{step="build"}# 第2次 的step 蕴含 build 和pushpipeline_step_duration{step="build"}pipeline_step_duration{step="push"}
  • 那么问题来了:第2次的pipeline_step_duration{step="build"} 要不要删掉?
  • 其实在这个场景外面是要删掉的,因为曾经不蕴含clone了

问题能够总结成:之前采集的标签曾经不存在了,数据要及时清理掉 --问题是如何清理?

探讨这个问题前做个试验:比照两种常见的自打点形式对于不沉闷指标的删除解决

试验伎俩:prometheus client-go sdk

  • 启动1个rand_metrics
  • 蕴含rand_key,每次key都不一样,测试申请metrics接口的后果

    var (  T1 = prometheus.NewGaugeVec(prometheus.GaugeOpts{      Name: "rand_metrics",      Help: "rand_metrics",  }, []string{"rand_key"}))

实现形式01 业务代码中间接实现打点:不实现Collector接口

  • 代码如下,模仿极其状况,每0.1秒生成随机key 和value设置metrics

    package mainimport (  "fmt"  "github.com/prometheus/client_golang/prometheus"  "github.com/prometheus/client_golang/prometheus/promhttp"  "math/rand"  "net/http"  "time")var (  T1 = prometheus.NewGaugeVec(prometheus.GaugeOpts{      Name: "rand_metrics",      Help: "rand_metrics",  }, []string{"rand_key"}))func init() {  prometheus.DefaultRegisterer.MustRegister(T1)}func RandStr(length int) string {  str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"  bytes := []byte(str)  result := []byte{}  rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100)))  for i := 0; i < length; i++ {      result = append(result, bytes[rand.Intn(len(bytes))])  }  return string(result)}func push() {  for {      randKey := RandStr(10)      rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100)))      T1.With(prometheus.Labels{"rand_key": randKey}).Set(rand.Float64())      time.Sleep(100 * time.Millisecond)  }}func main() {  go push()  addr := ":8081"  http.Handle("/metrics", promhttp.Handler())  srv := http.Server{Addr: addr}  err := srv.ListenAndServe()  fmt.Println(err)}
  • 启动服务之后申请 :8081/metrics接口发现 过期的 rand_key还会保留,不会清理

    # HELP rand_metrics rand_metrics# TYPE rand_metrics gaugerand_metrics{rand_key="00DsYGkd6x"} 0.02229735291486387rand_metrics{rand_key="017UBn8S2T"} 0.7192676436571013rand_metrics{rand_key="01Ar4ca3i1"} 0.24131184816722678rand_metrics{rand_key="02Ay5kqsDH"} 0.11462075954697458rand_metrics{rand_key="02JZNZvMng"} 0.9874169937518104rand_metrics{rand_key="02arsU5qNT"} 0.8552103362564516rand_metrics{rand_key="02nMy3thfh"} 0.039571420204118024rand_metrics{rand_key="032cyHjRhP"} 0.14576779289125183rand_metrics{rand_key="03DPDckbfs"} 0.6106184905871918rand_metrics{rand_key="03lbtLwFUO"} 0.936911945555629rand_metrics{rand_key="03wqYiguP2"} 0.20167059771916385rand_metrics{rand_key="04uG2s3X0C"} 0.3324314184499403

实现形式02 实现Collector接口

  • 实现prometheus sdk中的collect 接口 :也就是给1个构造体 绑定Collect和Describe办法
  • 在Collect中 实现设置标签和赋值办法
  • 在Describe中 传入desc

    package mainimport (  "fmt"  "github.com/prometheus/client_golang/prometheus"  "github.com/prometheus/client_golang/prometheus/promhttp"  "log"  "math/rand"  "net/http"  "time")var (  T1 = prometheus.NewDesc(      "rand_metrics",      "rand_metrics",      []string{"rand_key"},      nil))type MyCollector struct {  Name string}func (mc *MyCollector) Collect(ch chan<- prometheus.Metric) {  log.Printf("MyCollector.collect.called")  ch <- prometheus.MustNewConstMetric(T1,      prometheus.GaugeValue, rand.Float64(), RandStr(10))}func (mc *MyCollector) Describe(ch chan<- *prometheus.Desc) {  log.Printf("MyCollector.Describe.called")  ch <- T1}func RandStr(length int) string {  str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"  bytes := []byte(str)  result := []byte{}  rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100)))  for i := 0; i < length; i++ {      result = append(result, bytes[rand.Intn(len(bytes))])  }  return string(result)}func main() {  //go push()  mc := &MyCollector{Name: "abc"}  prometheus.MustRegister(mc)  addr := ":8082"  http.Handle("/metrics", promhttp.Handler())  srv := http.Server{Addr: addr}  err := srv.ListenAndServe()  fmt.Println(err)}
  • metrics成果测试 :申请:8082/metrics接口发现 rand_metrics总是只有1个值

    # HELP rand_metrics rand_metrics# TYPE rand_metrics gaugerand_metrics{rand_key="e1JU185kE4"} 0.12268247569586412
  • 并且查看日志发现,每次咱们申请/metrics接口时 MyCollector.collect.called会调用

    2022/06/21 11:46:40 MyCollector.Describe.called2022/06/21 11:46:44 MyCollector.collect.called2022/06/21 11:46:47 MyCollector.collect.called2022/06/21 11:46:47 MyCollector.collect.called2022/06/21 11:46:47 MyCollector.collect.called2022/06/21 11:46:47 MyCollector.collect.called

景象总结

  • 实现Collector接口的形式 能满足过期指标清理的需要,并且打点函数是随同/metrics接口申请触发的
  • 不实现Collector接口的形式 不能满足过期指标清理的需要,指标会随着业务打点沉积

源码解读相干起因

01 两种形式都是从web申请获取的指标,所以得先从 /metrics接口看

  • 入口就是 http.Handle("/metrics", promhttp.Handler())
  • 追踪后发现是 D:\go_path\pkg\mod\github.com\prometheus\client_golang@v1.12.2\prometheus\promhttp\http.go
  • 次要逻辑为:

    • 调用reg的Gather办法 获取 MetricFamily数组
    • 而后编码,写到http的resp中
  • 伪代码如下

    func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {  mfs, err := reg.Gather()  for _, mf := range mfs {      if handleError(enc.Encode(mf)) {      return  }}}

reg.Gather :遍历reg中已注册的collector 调用他们的collect办法

  • 先调用他们的collect办法获取metrics后果

      collectWorker := func() {      for {          select {          case collector := <-checkedCollectors:              collector.Collect(checkedMetricChan)          case collector := <-uncheckedCollectors:              collector.Collect(uncheckedMetricChan)          default:              return          }          wg.Done()      }  }
  • 而后生产chan中的数据,解决metrics

      cmc := checkedMetricChan  umc := uncheckedMetricChan  for {      select {      case metric, ok := <-cmc:          if !ok {              cmc = nil              break          }          errs.Append(processMetric(              metric, metricFamiliesByName,              metricHashes,              registeredDescIDs,          ))      case metric, ok := <-umc:          if !ok {              umc = nil              break          }          errs.Append(processMetric(              metric, metricFamiliesByName,              metricHashes,              nil,          ))

processMetric解决办法统一,所以形式12的不同就在 collect办法

02 不实现Collector接口的形式的collect办法追踪

  • 因为咱们往reg中注册的是 prometheus.NewGaugeVec生成的*GaugeVec指针
  • 所以执行的是*GaugeVec的collect办法
  • 而GaugeVec 又继承了MetricVec

    type GaugeVec struct {  *MetricVec}
  • 而MetricVec中有个metricMap对象, 所以最终是metricMap的collect办法

    type MetricVec struct {  *metricMap  curry []curriedLabelValue  // hashAdd and hashAddByte can be replaced for testing collision handling.  hashAdd     func(h uint64, s string) uint64  hashAddByte func(h uint64, b byte) uint64}

    察看metricMap构造体和办法

  • metricMap有个metrics的map
  • 而它的Collect办法就是遍历这个map内层的所有metricWithLabelValues接口,塞入ch中解决

    // metricVecs.type metricMap struct {  mtx       sync.RWMutex // Protects metrics.  metrics   map[uint64][]metricWithLabelValues  desc      *Desc  newMetric func(labelValues ...string) Metric}// Describe implements Collector. It will send exactly one Desc to the provided// channel.func (m *metricMap) Describe(ch chan<- *Desc) {  ch <- m.desc}// Collect implements Collector.func (m *metricMap) Collect(ch chan<- Metric) {  m.mtx.RLock()  defer m.mtx.RUnlock()  for _, metrics := range m.metrics {      for _, metric := range metrics {          ch <- metric.metric      }  }}
  • 看到这里就很清晰了,只有metrics map中的元素不被显示的删除,那么数据就会始终存在
  • 有一些exporter是采纳这种显式删除的流派的,比方event_expoter

03 实现Collector接口的形式的collect办法追踪

  • 因为咱们的collector 实现了collect办法
  • 所以间接申请Gather会调用咱们的collect办法 获取后果

    func (mc *MyCollector) Collect(ch chan<- prometheus.Metric) {  log.Printf("MyCollector.collect.called")  ch <- prometheus.MustNewConstMetric(T1,      prometheus.GaugeValue, rand.Float64(), RandStr(10))}
  • 所以它不会往metricsMap中写入,所以只有1个值

总结

  • 两种打点形式的collect办法是不一样的
  • 其实支流的exporter的成果也是不沉闷的指标会删掉:

    • 比方 process-exporter监控过程,过程不存在指标曲线就会隐没:从grafana图上看就是断点:不然采集一次会始终存在
    • 比方 node-exporter 监控挂载点等,当挂载点隐没相干曲线也会隐没
  • 因为支流的exporter采纳都是 实现collect办法的形式:
  • 还有k8s中kube-state-metrics采纳的是 metrics-store作为informer的store 去watch etcd的delete 事件: pod删除的时候相干的曲线也会隐没
  • 或者能够显示的调用delete 办法,将过期的series从map中删掉,不过须要hold中上一次的和这一次的diff
  • 总之两个流派:map显式删除VS实现collector接口