prometheus没有提供近程存储,但提供了近程存储的接口:

  • 近程存储只有实现这一接口,即可存储和读取prometheus的数据;
  • 这里仅剖析remote-write:

笔者的prometheus被prometheus-operator部署在kubernetes中,kubernetes应用prometheus这个CRD治理配置,prometheus-operator监听到配置变动,将新配置apply到prometheus POD上。

prometheus CRD中的remote-write配置:

remoteWrite:  - url: "https://1.2.3.4/api/monitor/v1/prom/write"    tlsConfig:      insecureSkipVerify: true

apply当前,prometheus生成如下的配置:

remote_write:- url: https://1.2.3.4/api/monitor/v1/prom/write  remote_timeout: 30s  tls_config:    insecure_skip_verify: true  queue_config:    capacity: 500    max_shards: 1000    min_shards: 1    max_samples_per_send: 100    batch_send_deadline: 5s    min_backoff: 30ms    max_backoff: 100ms

能够看到,它减少了queue_config,即传输过程中的队列配置。
假如每个remoteStorage应用1个queue进行传输:

  • queue中的初始shards数=min_shards,最大shards数=max_shards;
  • 每个shard的容量=capacity个sample;
  • 通过HTTP向remoteStorage发送数据时,若发送失败,则回退min_backoff;再次失败,则回退2*min_backoff,直到max_backoff;

prometheus的remote-write数据协定

prometheus的samples,通过protobuf的序列化,而后再通过snappy压缩,最初通过HTTP发送给remoteStorage;

对应的源代码:

// prometheus/storage/remote/queue_manager.gofunc buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, error) {    var highest int64    for _, ts := range samples {        // At the moment we only ever append a TimeSeries with a single sample in it.        if ts.Samples[0].Timestamp > highest {            highest = ts.Samples[0].Timestamp        }    }    req := &prompb.WriteRequest{        Timeseries: samples,    }    data, err := proto.Marshal(req)    if err != nil {        return nil, highest, err    }    // snappy uses len() to see if it needs to allocate a new slice. Make the    // buffer as long as possible.    if buf != nil {        buf = buf[0:cap(buf)]    }    compressed := snappy.Encode(buf, data)    return compressed, highest, nil}

remoteStorage如何实现remote-write协定接口

remoteStorage要实现remoteConfigs中定义的HTTP接口,这里次要参考influxdb的实现。
HTTP接口:

// 实现如下的APIRoute{            "prometheus-write", // Prometheus remote write            "POST", "/api/v1/prom/write", false, true, h.servePromWrite,        },

HTTP接口的实现:

func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) {    ......    var bs []byte    if r.ContentLength > 0 {        bs = make([]byte, 0, r.ContentLength)    }    body := r.Body    buf := bytes.NewBuffer(bs)    // 读request body    _, err := buf.ReadFrom(body)    // snappy解压缩    reqBuf, err := snappy.Decode(nil, buf.Bytes())    if err != nil {        h.httpError(w, err.Error(), http.StatusBadRequest)        return    }    // Convert the Prometheus remote write request to Influx Points    var req remote.WriteRequest    // protobuf反序列化    if err := proto.Unmarshal(reqBuf, &req); err != nil {        h.httpError(w, err.Error(), http.StatusBadRequest)        return    }    ......}

跟prometheus做的事件正好相同,这里先进行sappy的解压缩,而后再protobuf反序列化,失去实在的数据。