共计 7119 个字符,预计需要花费 18 分钟才能阅读完成。
k8s 教程阐明
- k8s 底层原理和源码解说之精髓篇
- k8s 底层原理和源码解说之进阶篇
- k8s 纯源码解读课程,助力你变成 k8s 专家
- k8s-operator 和 crd 实战开发 助你成为 k8s 专家
- tekton 全流水线实战和 pipeline 运行原理源码解读
prometheus 全组件的教程
- 01_prometheus 全组件配置应用、底层原理解析、高可用实战
- 02_prometheus-thanos 应用和源码解读
- 03_kube-prometheus 和 prometheus-operator 实战和原理介绍
- 04_prometheus 源码解说和二次开发
go 语言课程
- golang 根底课程
- golang 运维平台实战,服务树, 日志监控,工作执行,分布式探测
问题形容
- 比方对于 1 个构建的流水线指标 pipeline_step_duration,会设置 1 个标签是 step
-
每次流水线蕴含的 step 可能不雷同
# 比方 流水线 a 第 1 次的 step 蕴含 clone 和 build pipeline_step_duration{step="clone"} pipeline_step_duration{step="build"} # 第 2 次 的 step 蕴含 build 和 push pipeline_step_duration{step="build"} pipeline_step_duration{step="push"}
- 那么问题来了:第 2 次的 pipeline_step_duration{step=”build”} 要不要删掉?
- 其实在这个场景外面是要删掉的,因为曾经不蕴含 clone 了
问题能够总结成:之前采集的标签曾经不存在了,数据要及时清理掉 – 问题是如何清理?
探讨这个问题前做个试验:比照两种常见的自打点形式对于不沉闷指标的删除解决
试验伎俩:prometheus client-go sdk
- 启动 1 个 rand_metrics
-
蕴含 rand_key,每次 key 都不一样,测试申请 metrics 接口的后果
var ( T1 = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "rand_metrics", Help: "rand_metrics", }, []string{"rand_key"}) )
实现形式 01 业务代码中间接实现打点:不实现 Collector 接口
-
代码如下,模仿极其状况,每 0.1 秒生成随机 key 和 value 设置 metrics
package main import ( "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "math/rand" "net/http" "time" ) var ( T1 = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "rand_metrics", Help: "rand_metrics", }, []string{"rand_key"}) ) func init() {prometheus.DefaultRegisterer.MustRegister(T1) } func RandStr(length int) string { str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" bytes := []byte(str) result := []byte{} rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100))) for i := 0; i < length; i++ {result = append(result, bytes[rand.Intn(len(bytes))]) } return string(result) } func push() { for {randKey := RandStr(10) rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100))) T1.With(prometheus.Labels{"rand_key": randKey}).Set(rand.Float64()) time.Sleep(100 * time.Millisecond) } } func main() {go push() addr := ":8081" http.Handle("/metrics", promhttp.Handler()) srv := http.Server{Addr: addr} err := srv.ListenAndServe() fmt.Println(err) }
-
启动服务之后申请 :8081/metrics 接口发现 过期的 rand_key 还会保留,不会清理
# HELP rand_metrics rand_metrics # TYPE rand_metrics gauge rand_metrics{rand_key="00DsYGkd6x"} 0.02229735291486387 rand_metrics{rand_key="017UBn8S2T"} 0.7192676436571013 rand_metrics{rand_key="01Ar4ca3i1"} 0.24131184816722678 rand_metrics{rand_key="02Ay5kqsDH"} 0.11462075954697458 rand_metrics{rand_key="02JZNZvMng"} 0.9874169937518104 rand_metrics{rand_key="02arsU5qNT"} 0.8552103362564516 rand_metrics{rand_key="02nMy3thfh"} 0.039571420204118024 rand_metrics{rand_key="032cyHjRhP"} 0.14576779289125183 rand_metrics{rand_key="03DPDckbfs"} 0.6106184905871918 rand_metrics{rand_key="03lbtLwFUO"} 0.936911945555629 rand_metrics{rand_key="03wqYiguP2"} 0.20167059771916385 rand_metrics{rand_key="04uG2s3X0C"} 0.3324314184499403
实现形式 02 实现 Collector 接口
- 实现 prometheus sdk 中的 collect 接口:也就是给 1 个构造体 绑定 Collect 和 Describe 办法
- 在 Collect 中 实现设置标签和赋值办法
-
在 Describe 中 传入 desc
package main import ( "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "log" "math/rand" "net/http" "time" ) var ( T1 = prometheus.NewDesc( "rand_metrics", "rand_metrics", []string{"rand_key"}, nil) ) type MyCollector struct {Name string} func (mc *MyCollector) Collect(ch chan<- prometheus.Metric) {log.Printf("MyCollector.collect.called") ch <- prometheus.MustNewConstMetric(T1, prometheus.GaugeValue, rand.Float64(), RandStr(10)) } func (mc *MyCollector) Describe(ch chan<- *prometheus.Desc) {log.Printf("MyCollector.Describe.called") ch <- T1 } func RandStr(length int) string { str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" bytes := []byte(str) result := []byte{} rand.Seed(time.Now().UnixNano() + int64(rand.Intn(100))) for i := 0; i < length; i++ {result = append(result, bytes[rand.Intn(len(bytes))]) } return string(result) } func main() {//go push() mc := &MyCollector{Name: "abc"} prometheus.MustRegister(mc) addr := ":8082" http.Handle("/metrics", promhttp.Handler()) srv := http.Server{Addr: addr} err := srv.ListenAndServe() fmt.Println(err) }
-
metrics 成果测试:申请:8082/metrics 接口发现 rand_metrics 总是只有 1 个值
# HELP rand_metrics rand_metrics # TYPE rand_metrics gauge rand_metrics{rand_key="e1JU185kE4"} 0.12268247569586412
-
并且查看日志发现,每次咱们申请 /metrics 接口时 MyCollector.collect.called 会调用
2022/06/21 11:46:40 MyCollector.Describe.called 2022/06/21 11:46:44 MyCollector.collect.called 2022/06/21 11:46:47 MyCollector.collect.called 2022/06/21 11:46:47 MyCollector.collect.called 2022/06/21 11:46:47 MyCollector.collect.called 2022/06/21 11:46:47 MyCollector.collect.called
景象总结
- 实现 Collector 接口的形式 能满足过期指标清理的需要,并且打点函数是随同 /metrics 接口申请触发的
- 不实现 Collector 接口的形式 不能满足过期指标清理的需要,指标会随着业务打点沉积
源码解读相干起因
01 两种形式都是从 web 申请获取的指标,所以得先从 /metrics 接口看
- 入口就是 http.Handle(“/metrics”, promhttp.Handler())
- 追踪后发现是 D:\go_path\pkg\mod\github.com\prometheus\client_golang@v1.12.2\prometheus\promhttp\http.go
-
次要逻辑为:
- 调用 reg 的 Gather 办法 获取 MetricFamily 数组
- 而后编码,写到 http 的 resp 中
-
伪代码如下
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {mfs, err := reg.Gather() for _, mf := range mfs {if handleError(enc.Encode(mf)) {return} } }
reg.Gather:遍历 reg 中已注册的 collector 调用他们的 collect 办法
-
先调用他们的 collect 办法获取 metrics 后果
collectWorker := func() { for { select { case collector := <-checkedCollectors: collector.Collect(checkedMetricChan) case collector := <-uncheckedCollectors: collector.Collect(uncheckedMetricChan) default: return } wg.Done()} }
-
而后生产 chan 中的数据,解决 metrics
cmc := checkedMetricChan umc := uncheckedMetricChan for { select { case metric, ok := <-cmc: if !ok { cmc = nil break } errs.Append(processMetric( metric, metricFamiliesByName, metricHashes, registeredDescIDs, )) case metric, ok := <-umc: if !ok { umc = nil break } errs.Append(processMetric( metric, metricFamiliesByName, metricHashes, nil, ))
processMetric 解决办法统一,所以形式 12 的不同就在 collect 办法
02 不实现 Collector 接口的形式的 collect 办法追踪
- 因为咱们往 reg 中注册的是 prometheus.NewGaugeVec 生成的 *GaugeVec 指针
- 所以执行的是 *GaugeVec 的 collect 办法
-
而 GaugeVec 又继承了 MetricVec
type GaugeVec struct {*MetricVec}
-
而 MetricVec 中有个 metricMap 对象,所以最终是 metricMap 的 collect 办法
type MetricVec struct { *metricMap curry []curriedLabelValue // hashAdd and hashAddByte can be replaced for testing collision handling. hashAdd func(h uint64, s string) uint64 hashAddByte func(h uint64, b byte) uint64 }
察看 metricMap 构造体和办法
- metricMap 有个 metrics 的 map
-
而它的 Collect 办法就是遍历这个 map 内层的所有 metricWithLabelValues 接口,塞入 ch 中解决
// metricVecs. type metricMap struct { mtx sync.RWMutex // Protects metrics. metrics map[uint64][]metricWithLabelValues desc *Desc newMetric func(labelValues ...string) Metric } // Describe implements Collector. It will send exactly one Desc to the provided // channel. func (m *metricMap) Describe(ch chan<- *Desc) {ch <- m.desc} // Collect implements Collector. func (m *metricMap) Collect(ch chan<- Metric) {m.mtx.RLock() defer m.mtx.RUnlock() for _, metrics := range m.metrics { for _, metric := range metrics {ch <- metric.metric} } }
- 看到这里就很清晰了,只有 metrics map 中的元素不被显示的删除,那么数据就会始终存在
- 有一些 exporter 是采纳这种显式删除的流派的,比方 event_expoter
03 实现 Collector 接口的形式的 collect 办法追踪
- 因为咱们的 collector 实现了 collect 办法
-
所以间接申请 Gather 会调用咱们的 collect 办法 获取后果
func (mc *MyCollector) Collect(ch chan<- prometheus.Metric) {log.Printf("MyCollector.collect.called") ch <- prometheus.MustNewConstMetric(T1, prometheus.GaugeValue, rand.Float64(), RandStr(10)) }
- 所以它不会往 metricsMap 中写入,所以只有 1 个值
总结
- 两种打点形式的 collect 办法是不一样的
-
其实支流的 exporter 的成果也是不沉闷的指标会删掉:
- 比方 process-exporter 监控过程,过程不存在指标曲线就会隐没:从 grafana 图上看就是断点:不然采集一次会始终存在
- 比方 node-exporter 监控挂载点等,当挂载点隐没相干曲线也会隐没
- 因为支流的 exporter 采纳都是 实现 collect 办法的形式:
- 还有 k8s 中 kube-state-metrics 采纳的是 metrics-store 作为 informer 的 store 去 watch etcd 的 delete 事件:pod 删除的时候相干的曲线也会隐没
- 或者能够显示的调用 delete 办法,将过期的 series 从 map 中删掉,不过须要 hold 中上一次的和这一次的 diff
- 总之两个流派:map 显式删除 VS 实现 collector 接口
正文完
发表至: prometheus
2022-06-21