prometheus中remote-write和remote-read的配置:

# store data to influxdbremote_write:  - url: "http://10.21.1.74:8086/api/v1/prom/write?db=prometheus"# read data from influxdbremote_read:  - url: "http://10.21.1.74:8086/api/v1/prom/read?db=prometheus"

remote-read能够让prometheus读取近程存储上的时序数据,扩大了本地存储。

prometheus在应答/query查问申请时,由fanoutStorage解决;

  • fanoutStorage蕴含localStorage(本地TSDB)和remoteStorage(近程存储),它们均实现了查问接口;
  • localStorage执行本地查问;
  • remoteStorage通过HTTP执行近程查问;
  • 将上述2个查问后果进行合并,返回给client;

demo演示

  1. prometheus配置remote-write和remote-read;
  2. 运行一段时间后:
进行prometheus: stop prometheus;
删除本地数据:delete prometheus/data目录;
启动prometheus: start promethesu;

上述操作模仿:本地存储宕机,应用近程存储的场景。

  1. 在prometheus UI上执行查问,能够失去历史数据(近程存储);

remote-read的代码

执行近程查问的入口代码:生成query,而后发送HTTP近程查问

// storage/remote/read.gofunc (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {    if len(q.requiredMatchers) > 0 {        requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...)        for _, m := range matchers {            for i, r := range requiredMatchers {                if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {                    // Requirement matched.                    requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...)                    break                }            }        }            }    // 加label    m, added := q.addExternalLabels(matchers)    // 生成查问    query, err := ToQuery(q.mint, q.maxt, m, hints)    // HTTP client发动近程查问    res, err := q.client.Read(q.ctx, query)    return newSeriesSetFilter(FromQueryResult(sortSeries, res), added)}

client对象的结构:每个remote有一个client,应用其配置的URL/HttpConfig结构

//storage/remote/storage.gofunc (s *Storage) ApplyConfig(conf *config.Config) error {    for _, rrConf := range conf.RemoteReadConfigs {        c, err := newReadClient(name, &ClientConfig{            URL:              rrConf.URL,            Timeout:          rrConf.RemoteTimeout,            HTTPClientConfig: rrConf.HTTPClientConfig,        })        queryables = append(queryables, NewSampleAndChunkQueryableClient(            c,            conf.GlobalConfig.ExternalLabels,            labelsToEqualityMatchers(rrConf.RequiredMatchers),            rrConf.ReadRecent,            s.localStartTimeCallback,        ))        ......    }    ......}

发动HTTP近程查问申请:

  • HTTP request: 先用protobuf序列化,再用snappy压缩;
  • HTTP response: 先用snappy解压缩,而后再用protobuf反序列化;
//storage/remote/client.go// Read reads from a remote endpoint.func (c *client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {    req := &prompb.ReadRequest{        Queries: []*prompb.Query{            query,        },    }    // protobuf序列化    data, err := proto.Marshal(req)    // snappy压缩    compressed := snappy.Encode(nil, data)    // 发送HTTP POST    httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))        httpReq.Header.Add("Content-Encoding", "snappy")    httpReq.Header.Add("Accept-Encoding", "snappy")    httpReq.Header.Set("Content-Type", "application/x-protobuf")    httpReq.Header.Set("User-Agent", userAgent)    httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")    ctx, cancel := context.WithTimeout(ctx, c.timeout)    defer cancel()    httpReq = httpReq.WithContext(ctx)    // 发送request    httpResp, err := c.client.Do(httpReq)    compressed, err = ioutil.ReadAll(httpResp.Body)        //返回的后果,先snappy解压缩    uncompressed, err := snappy.Decode(nil, compressed)    var resp prompb.ReadResponse    // 再protobuf反序列化    err = proto.Unmarshal(uncompressed, &resp)    return resp.Results[0], nil}

参考:
1.https://yunlzheng.gitbook.io/...