关于prometheus:prometheus-remotewrite解析一-使用

11次阅读

共计 2343 个字符,预计需要花费 6 分钟才能阅读完成。

prometheus 没有提供近程存储,但提供了近程存储的接口:

  • 近程存储只有实现这一接口,即可存储和读取 prometheus 的数据;
  • 这里仅剖析 remote-write:

笔者的 prometheus 被 prometheus-operator 部署在 kubernetes 中,kubernetes 应用 prometheus 这个 CRD 治理配置,prometheus-operator 监听到配置变动,将新配置 apply 到 prometheus POD 上。

prometheus CRD 中的 remote-write 配置:

remoteWrite:
  - url: "https://1.2.3.4/api/monitor/v1/prom/write"
    tlsConfig:
      insecureSkipVerify: true

apply 当前,prometheus 生成如下的配置:

remote_write:
- url: https://1.2.3.4/api/monitor/v1/prom/write
  remote_timeout: 30s
  tls_config:
    insecure_skip_verify: true
  queue_config:
    capacity: 500
    max_shards: 1000
    min_shards: 1
    max_samples_per_send: 100
    batch_send_deadline: 5s
    min_backoff: 30ms
    max_backoff: 100ms

能够看到,它减少了 queue_config,即传输过程中的队列配置。
假如每个 remoteStorage 应用 1 个 queue 进行传输:

  • queue 中的初始 shards 数 =min_shards,最大 shards 数 =max_shards;
  • 每个 shard 的容量 =capacity 个 sample;
  • 通过 HTTP 向 remoteStorage 发送数据时,若发送失败,则回退 min_backoff;再次失败,则回退 2 *min_backoff,直到 max_backoff;

prometheus 的 remote-write 数据协定

prometheus 的 samples,通过 protobuf 的序列化,而后再通过 snappy 压缩,最初通过 HTTP 发送给 remoteStorage;

对应的源代码:

// prometheus/storage/remote/queue_manager.go
func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, error) {
    var highest int64
    for _, ts := range samples {
        // At the moment we only ever append a TimeSeries with a single sample in it.
        if ts.Samples[0].Timestamp > highest {highest = ts.Samples[0].Timestamp
        }
    }
    req := &prompb.WriteRequest{Timeseries: samples,}

    data, err := proto.Marshal(req)
    if err != nil {return nil, highest, err}

    // snappy uses len() to see if it needs to allocate a new slice. Make the
    // buffer as long as possible.
    if buf != nil {buf = buf[0:cap(buf)]
    }
    compressed := snappy.Encode(buf, data)
    return compressed, highest, nil
}

remoteStorage 如何实现 remote-write 协定接口

remoteStorage 要实现 remoteConfigs 中定义的 HTTP 接口,这里次要参考 influxdb 的实现。
HTTP 接口:

// 实现如下的 API
Route{
            "prometheus-write", // Prometheus remote write
            "POST", "/api/v1/prom/write", false, true, h.servePromWrite,
        },

HTTP 接口的实现:

func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
    ......
    var bs []byte
    if r.ContentLength > 0 {bs = make([]byte, 0, r.ContentLength)
    }
    body := r.Body
    buf := bytes.NewBuffer(bs)
    // 读 request body
    _, err := buf.ReadFrom(body)
    // snappy 解压缩
    reqBuf, err := snappy.Decode(nil, buf.Bytes())
    if err != nil {h.httpError(w, err.Error(), http.StatusBadRequest)
        return
    }
    // Convert the Prometheus remote write request to Influx Points
    var req remote.WriteRequest
    // protobuf 反序列化
    if err := proto.Unmarshal(reqBuf, &req); err != nil {h.httpError(w, err.Error(), http.StatusBadRequest)
        return
    }
    ......
}

跟 prometheus 做的事件正好相同,这里先进行 sappy 的解压缩,而后再 protobuf 反序列化,失去实在的数据。

正文完
 0