关于prometheus:prometheus-remoteread使用与源码解读

67次阅读

共计 2962 个字符,预计需要花费 8 分钟才能阅读完成。

prometheus 中 remote-write 和 remote-read 的配置:

# store data to influxdb
remote_write:
  - url: "http://10.21.1.74:8086/api/v1/prom/write?db=prometheus"
# read data from influxdb
remote_read:
  - url: "http://10.21.1.74:8086/api/v1/prom/read?db=prometheus"

remote-read 能够让 prometheus 读取近程存储上的时序数据,扩大了本地存储。

prometheus 在应答 /query 查问申请时,由 fanoutStorage 解决;

  • fanoutStorage 蕴含 localStorage(本地 TSDB) 和 remoteStorage(近程存储),它们均实现了查问接口;
  • localStorage 执行本地查问;
  • remoteStorage 通过 HTTP 执行近程查问;
  • 将上述 2 个查问后果进行合并,返回给 client;

demo 演示

  1. prometheus 配置 remote-write 和 remote-read;
  2. 运行一段时间后:

进行 prometheus: stop prometheus;
删除本地数据:delete prometheus/data 目录;
启动 prometheus: start promethesu;

上述操作模仿:本地存储宕机,应用近程存储的场景。

  1. 在 prometheus UI 上执行查问,能够失去历史数据 (近程存储);

remote-read 的代码

执行近程查问的入口代码:生成 query,而后发送 HTTP 近程查问

// storage/remote/read.go
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {if len(q.requiredMatchers) > 0 {requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...)
        for _, m := range matchers {
            for i, r := range requiredMatchers {
                if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {
                    // Requirement matched.
                    requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...)
                    break
                }
            }
        }        
    }
    // 加 label
    m, added := q.addExternalLabels(matchers)
    // 生成查问
    query, err := ToQuery(q.mint, q.maxt, m, hints)
    // HTTP client 发动近程查问
    res, err := q.client.Read(q.ctx, query)
    return newSeriesSetFilter(FromQueryResult(sortSeries, res), added)
}

client 对象的结构:每个 remote 有一个 client,应用其配置的 URL/HttpConfig 结构

//storage/remote/storage.go
func (s *Storage) ApplyConfig(conf *config.Config) error {
    for _, rrConf := range conf.RemoteReadConfigs {
        c, err := newReadClient(name, &ClientConfig{
            URL:              rrConf.URL,
            Timeout:          rrConf.RemoteTimeout,
            HTTPClientConfig: rrConf.HTTPClientConfig,
        })
        queryables = append(queryables, NewSampleAndChunkQueryableClient(
            c,
            conf.GlobalConfig.ExternalLabels,
            labelsToEqualityMatchers(rrConf.RequiredMatchers),
            rrConf.ReadRecent,
            s.localStartTimeCallback,
        ))
        ......
    }
    ......
}

发动 HTTP 近程查问申请:

  • HTTP request:先用 protobuf 序列化,再用 snappy 压缩;
  • HTTP response: 先用 snappy 解压缩,而后再用 protobuf 反序列化;
//storage/remote/client.go
// Read reads from a remote endpoint.
func (c *client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
    req := &prompb.ReadRequest{Queries: []*prompb.Query{query,},
    }
    // protobuf 序列化
    data, err := proto.Marshal(req)
    // snappy 压缩
    compressed := snappy.Encode(nil, data)
    // 发送 HTTP POST
    httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))
    
    httpReq.Header.Add("Content-Encoding", "snappy")
    httpReq.Header.Add("Accept-Encoding", "snappy")
    httpReq.Header.Set("Content-Type", "application/x-protobuf")
    httpReq.Header.Set("User-Agent", userAgent)
    httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

    ctx, cancel := context.WithTimeout(ctx, c.timeout)
    defer cancel()

    httpReq = httpReq.WithContext(ctx)
    // 发送 request
    httpResp, err := c.client.Do(httpReq)
    compressed, err = ioutil.ReadAll(httpResp.Body)
    
    // 返回的后果,先 snappy 解压缩
    uncompressed, err := snappy.Decode(nil, compressed)
    var resp prompb.ReadResponse
    // 再 protobuf 反序列化
    err = proto.Unmarshal(uncompressed, &resp)
    return resp.Results[0], nil
}

参考:
1.https://yunlzheng.gitbook.io/…

正文完
 0