关于prometheus:prometheus-pushgateway使用及源码分析

30次阅读

共计 6267 个字符,预计需要花费 16 分钟才能阅读完成。

一.Pushgateway 是什么

pushgatway 是 prometheus 社区推出的一个推送指标的组件,次要利用在:

  • 短生命周期 (short-lived) 或者批工作 (batch jobs) 的资源 / 作业的指标;
  • prometheus 无奈拉取到 (网络起因) 的 target 的指标;

作业工作能够将指标通过 HTTP API 推送给 pushgateway,而后由 prometheus 拉取 pushgateway 的指标。

二.Pushgateway 如何应用

1. 装置启动 pushgateway

# wget https://github.com/prometheus/pushgateway/releases/download/v1.2.0/pushgateway-1.2.0.linux-amd64.tar.gz
# ./pushgateway --web.listen-address=":9099"

2. 配置 pushgateway 被 prometheus 拉取

scrape_configs:- job_name: 'pushgateway'
    static_configs:
    - targets: ['127.0.0.1:9099']

3. 向 pushgateway 发送数据

这里通过 shell,调用 pushgateway 的 HTTP 接口,发送数据:

#!/bin/bash
instance_name=`hostname -f | cut -d'.' -f1`

if [$instance_name == "localhost"];then
  echo "Must FQDN hostname"
  exit 1
fi

# For waitting connections
label="count_netstat_wait_connections"
count_netstat_wait_connections=`netstat -an | grep -i wait | wc -l`

cat <<EOF | curl --data-binary @- http://127.0.0.1:9099/metrics/job/pushgateway/instance/$instance_name
# TYPE $label gauge
# HELP $label current connection in wait state
$label $count_netstat_wait_connections
EOF

查问本机中处于 wait 状态的网络连接数,而后发送给 pushgateway:

# ./net_exporter_shell.sh

4.prometheus UI 验证数据正确接管 & 拉取

首先,看一下 pushgateway 的 /metrics 是否有咱们定义的指标:

# curl http://127.0.0.1:9099/metrics
# HELP count_netstat_wait_connections current connection in wait state
# TYPE count_netstat_wait_connections gauge
count_netstat_wait_connections{instance="dev",job="pushgateway"} 0
...

而后,再看 prometheus UI 上是否能够查问到该指标:

三.Pushgateway 的源码剖析

pushgateway 的源码:https://github.com/prometheus…

1. 指标推送的 API:

// pushgateway/main.go
func main() {
    ...
    // Handlers for pushing and deleting metrics.
    pushAPIPath := *routePrefix + "/metrics"
    for _, suffix := range []string{"", handler.Base64Suffix} {
        jobBase64Encoded := suffix == handler.Base64Suffix
        // URL 中的 labels 被解析为 jobname,instance
        r.Post(pushAPIPath+"/job"+suffix+"/:job/*labels", handler.Push(ms, false, !*pushUnchecked, jobBase64Encoded, logger))
        ...
    }
    ...
}

推送的逻辑在 handler.Push(…)

// pushgateway/handler/push.go
func Push(
    ms storage.MetricStore,
    replace, check, jobBase64Encoded bool,
    logger log.Logger,
) func(http.ResponseWriter, *http.Request) {handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {job := route.Param(r.Context(), "job")
        // 解析 URL 中 labels
        labelsString := route.Param(r.Context(), "labels")
        labels, err := splitLabels(labelsString)
        labels["job"] = job
        // 解析 request body 中的 text,解析成 prom 格局的 metric
        var parser expfmt.TextParser
        metricFamilies, err = parser.TextToMetricFamilies(r.Body)
        
        if !check {
            // 将指标存入到 storage.MetricStore
            ms.SubmitWriteRequest(storage.WriteRequest{
                Labels:         labels,
                Timestamp:      now,
                MetricFamilies: metricFamilies,
                Replace:        replace,
            })
            w.WriteHeader(http.StatusAccepted)
            return
        }
    }
    instrumentedHandler := promhttp.InstrumentHandlerRequestSize(
        httpPushSize, promhttp.InstrumentHandlerDuration(httpPushDuration, InstrumentWithCounter("push", handler),
        ))

    return func(w http.ResponseWriter, r *http.Request) {mtx.Lock()
        instrumentedHandler.ServeHTTP(w, r)
    }
}

看一下 storage.MetricStore 存储指标的逻辑:

// pushgateway/storage/diskmetricstore.go
func (dms *DiskMetricStore) SubmitWriteRequest(req WriteRequest) {dms.writeQueue <- req    // 写入 channel}

dms 中有一个 loop 解决 channel 中的数据:

// pushgateway/storage/diskmetricstore.go
func (dms *DiskMetricStore) loop(persistenceInterval time.Duration) {
    ...
    for {
        select {
        case wr := <-dms.writeQueue:
            lastWrite = time.Now()
            if dms.checkWriteRequest(wr) {dms.processWriteRequest(wr)
            } 
        ...
    }
}
// pushgateway/storage/diskmetricstore.go
func (dms *DiskMetricStore) processWriteRequest(wr WriteRequest) {key := groupingKeyFor(wr.Labels)
    group, ok := dms.metricGroups[key]
    if !ok {
        group = MetricGroup{
            Labels:  wr.Labels,
            Metrics: NameToTimestampedMetricFamilyMap{},}
        dms.metricGroups[key] = group
    } 
    ...
}

能够看到,指标最终被写入 dms.metricGroups 中,它是一个 map 构造:

// pushgateway/storage/diskmetricstore.go
type DiskMetricStore struct {
    ...
    metricGroups    GroupingKeyToMetricGroup
}
// 内存的 map 构造
type GroupingKeyToMetricGroup map[string]MetricGroup

2. 指标查问的 API:/metrics

// pushgateway/main.go
func main() {
    ...
    r.Get("/metrics", wrap("api/v1/metrics", api.metrics))    
}

API 的 handler 解决:

  • 从 metricStorage 中获取所有的指标;
  • 将指标组装后返回 client;
// pushgateway/api/v1/api.go
func (api *API) metrics(w http.ResponseWriter, r *http.Request) {
    // 从 storage.MetricStorage 中获取所有的
    familyMaps := api.MetricStore.GetMetricFamiliesMap()
    res := []interface{}{}
    for _, v := range familyMaps {metricResponse := map[string]interface{}{}
        for name, metricValues := range v.Metrics {....}
        res = append(res, metricResponse)
    }
    api.respond(w, res)        // 返回 client
}

查 DiskMetricStore 的时候,查问的是外面的 metricGroups 内容,也是上一步中咱们 push 指标的目的地:

// pushgateway/storage/diskmetricstore.go
func (dms *DiskMetricStore) GetMetricFamiliesMap() GroupingKeyToMetricGroup {
    ...
    groupsCopy := make(GroupingKeyToMetricGroup, len(dms.metricGroups))
    for k, g := range dms.metricGroups {...}
    return groupsCopy
}

3. 总结

  • 推送指标:最新的指标被存入 DiskMetricStore.metricGroup;
  • 查问指标:查问 DiskMetricStore.metricGroup 中最新的值;

四.Pushgateway 的最佳实际

pushgateway 官网强调的是,不能应用 pushgateway 将 prometheus 变成一个 push 模型:

First of all, the Pushgateway is not capable of turning Prometheus into a push-based monitoring system.

pushgateway 官网认为,pushgateway 的最佳用处是:抓取服务层的批工作的指标

  • 服务层的批工作:sevice-level batch job,意味着它跟具体 instance/job 都无关;

Usually, the only valid use case for the Pushgateway is for capturing the outcome of a service-level batch job. A “service-level” batch job is one which is not semantically related to a specific machine or job instance (for example, a batch job that deletes a number of users for an entire service).

此外,pushgateway 在应用时,有一些弱点。

1. pushgateway 的应用弱点一

当作业服务不再向 pushgateway 推送指标时,依然能够从 pushgateway 的 /metrics 接口中,查问到过期的数据。

比方:

  • targetA 在 12:01:00 向 pushgateway 推送 metricA 指标;
  • targetA 在 12:01:10 服务宕机,不再推送指标;
  • 通过 pushgateway 的 /metrics 接口:

    • 在 12:01:10 之后,依然能够查到 metricA 指标,始终不会过期;
    • 即 12:02:00、12:03:00、…、12:30:00 都能够查问到 metricA 指标;

社区对此问题的解释:

A while ago, we decided to not implement a“timeout”or TTL for pushed metrics because almost all proposed use cases turned out to be anti-patterns we strongly discourage. You can follow a more recent discussion on the prometheus-developers mailing list.

解决该问题的一个办法是,能够通过 pushgateway 的 Delete 接口,被动删除该 target 的指标,这样 /metrics 接口就查不到了:

curl -X DELETE http://127.0.0.1:9099/metrics/job/some_job/instance/some_instance

2. pushgateway 的应用弱点二

假如 target 上报指标的工夫 =t1,prometheus 拉取 pushgatway 的工夫 =t2,无奈保障 t1 和 t2 在同一个拉取周期中,也就是 无奈保障 prometheus 能够拉取到最新的数据

比方:

  • target 的上报周期 =30s,最近一次在 12:00:20 上报,下一次在 12:00:50 上报;
  • prometheus 的拉取周期 =30s,最近一次在 12:00:10 拉取,下一次在 12:00:40 拉取;
  • 也就是说,prometheus 拉取的总是 target 上个周期的数据;

社区对该问题的解释:

As there aren’t any use cases where it would make sense to attach a different timestamp, and many users attempting to incorrectly do so (despite no client library supporting this), the Pushgateway rejects any pushes with timestamps.

If you think you need to push a timestamp, please see When To Use The Pushgateway.

参考

1.https://prometheus.io/docs/pr…
2.https://github.com/prometheus…

正文完
 0