关于prometheus:VictoriaMetrics集群原理

40次阅读

共计 7066 个字符,预计需要花费 18 分钟才能阅读完成。

一.VictoriaMetrics 概述

VictoriaMetrics 是一个疾速、高效和可扩大的时序数据库,可作为 prometheus 的长期存储 (long-term storage)。

VictoriaMetrics 反对 PromQL 查询语言,也反对 Influxdb 行协定,对以后支流的时序协定反对比拟好。

本文重点关注其集群架构。

单实例的 victoriametric 只有一个过程。

集群版的 victoriametrics 有 3 类过程,即 3 类微服务组成:

  • vmstorage: 数据存储节点,负责存储时序数据;
  • vmselect: 数据查问节点,负责接管用户查问申请,向 vmstorage 查问时序数据;
  • vminsert: 数据插入节点,负责接管用户插入申请,向 vmstorage 写入时序数据;

在部署时能够依照需要,不同的微服务部署不同的正本,以应答业务需要:

  • 若数据量比拟大,部署较多的 vmstorage 正本;
  • 若查问申请比拟多,部署较多的 vmselect 正本;
  • 若插入申请比拟多,部署较多的 vminsert 正本;

二.VictoriaMetrics 集群实现

集群中 vmselect、vminsert 节点都是无状态的,惟一有状态的是 vmstorage。

vmstorage 的多节点采纳 shared noting architecture,各节点间不共享数据,也不晓得彼此的存在。

vmstorage nodes don’t know about each other, don’t communicate with each other and don’t share any data. This is shared nothing architecture. It increases cluster availability, simplifies cluster maintenance and cluster scaling.

为保障时序数据的可用性,采纳复制的办法,即每份数据存入 N 个不同的节点,在查问时,同时查问多个节点,去重后返回给 client。

若设置 –replicationFactor=2,即数据正本 =2:

在写入数据时:

  • 对输出数据进行一致性 hash 计算,将写入 storageIndex 节点;
  • 因为配置写入 2 正本,它同时要写入 storageIndex+ 1 节点;

在查问数据时:

  • 向所有的 vmstorage 发动查问;
  • 将查问后果合并、去重后,返回给 client;
  • 依据 vmstorage 正确响应的节点和 replica 的个数,判断返回数据 isPartial/OK;

三.VictoriaMetrics 写入数据

以 InfluxDB 行协定插入数据为例,联合源码剖析 victoriametrics 集群版本写入数据的流程。

写入数据在 vminsert 服务中解决,假如 –replicationFactor=2,即数据正本 =2:

插入时:

  • 对每条时序数据,依照其 label 计算一致性 hash 值,作为它存入的指标节点,假如为 storageNodeA;
  • 对于该条时序数据,因为正本 =2,它将被写入 storageNodeA 和 storageNodeA + 1 两个 vmstorage 节点;

1)API 入口

API: influx/write

// app/vminsert/main.go
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
    ......
    switch p.Suffix {
    case "influx/write", "influx/api/v2/write":
        if err := influx.InsertHandlerForHTTP(at, r); err != nil {httpserver.Errorf(w, r, "%s", err)
            return true
        }
        w.WriteHeader(http.StatusNoContent)
        return true
    }
    ......
}

API handler:

// app/vminsert/influx/request_handler.go
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {extraLabels, err := parserCommon.GetExtraLabels(req)
    if err != nil {return err}
    // 限流执行
    return writeconcurrencylimiter.Do(func() error {isGzipped := req.Header.Get("Content-Encoding") == "gzip"
        q := req.URL.Query()
        precision := q.Get("precision")
        // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
        db := q.Get("db")
        return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {return insertRows(at, db, rows, extraLabels, false)
        })
    })
}

能够看到,API handler 中做了限流。

2)一致性 hash

遍历每条时序数据,依据其 label,计算一致性 hash 值,作为存入的指标节点:

// app/vminsert/influx/request_handler.go
func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label, mayOverrideAccountProjectID bool) error {
    ic := &ctx.Common
    atCopy := *at
    for i := range rows {r := &rows[i]           
        ......
        ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels)    
        for j := range r.Fields {f := &r.Fields[j]    
            ...        
            metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
            ic.Labels = ic.Labels[:labelsLen]        
            ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[len(ic.Labels)-1])
            // 一致性 hash: 计算 storageIdx
            storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels)
            // 每个时序数据写入对应的 ctx.bufRowss[storageIdx]
            if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {return err}
        }
    }
    ......
}

一致性 hash 计算 storageIdx 的代码:jump_hash 算法

// app/vminsert/netstorage/insert_ctx.go
func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
    ......
    buf := ctx.labelsBuf[:0]
    buf = encoding.MarshalUint32(buf, at.AccountID)
    buf = encoding.MarshalUint32(buf, at.ProjectID)
    for i := range labels {label := &labels[i]
        buf = marshalBytesFast(buf, label.Name)
        buf = marshalBytesFast(buf, label.Value)
    }
    h := xxhash.Sum64(buf)    // 计算 hash 值
    ctx.labelsBuf = buf
    idx := int(jump.Hash(h, int32(len(storageNodes))))    // 失去 storageIdx
    return idx
}

3) 写入多节点

写入流程,若配置多个正本 (replica),则将数据发送至多个 storageNode;

  • 对单个 storageNode,通过 sn.esendBufRowsNonBlocking() 发送:
  • 若发送胜利,则跳出内层 for;
  • 否则,尝试发送至 idx++ 节点;
  • 若尝试了所有节点后,则间接返回;
// app/vminsert/netstorage/netstorage.go
func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {usedStorageNodes := make(map[*storageNode]bool, replicas)
    for i := 0; i < replicas; i++ {    // 多个正本,发送至多个 storageNode
        idx := snIdx + i
        attempts := 0
        for {
            attempts++
            if attempts > len(storageNodes) {    // 已尝试了所有节点
                if i == 0 {return false    // 第一个正本就失败了,返回 false}
                return true
            }
            sn := storageNodes[idx]
            idx++
            // 发送
            if !sn.sendBufRowsNonblocking(br) {continue    // 若失败,则尝试下一个节点 idx++}
            break    // 胜利就返回
        }
    }
    return true
}

四.VictoriaMetrics 查问数据

以 PromQL 查问数据为例,联合源码剖析 victoriametrics 集群版本查问数据数据的流程。

查问数据在 vmselect 服务中解决,假如 –replicationFactor=2,即数据正本 =2:

查问时:

  • 向 N 个 vmstorage 节点查问时序数据 (N 为 vminsert 启动时配置的 vmstorage 参数);
  • 数据返回后,通过合并去重,返回给 client;

1)API 入口

// app/vmselect/main.go
func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool {
    ......
    switch p.Suffix {
    ......
    case "prometheus/api/v1/query_range":
        if err := prometheus.QueryRangeHandler(startTime, at, w, r); err != nil {sendPrometheusError(w, r, err)
            return true
        }
        return true
    }
    ......
}

2)并发向 storageNode 查问

并发的向所有 storageNode 发动查问,为每个 storageNode 调配 1 个 goroutine:

// app/vmselect/netstorage/netstorage.go
func startStorageNodesRequest(denyPartialResponse bool, f func(idx int, sn *storageNode) interface{}) *storageNodesRequest {resultsCh := make(chan interface{}, len(storageNodes))
    for idx, sn := range storageNodes {
        // 每个 storageNode 调配一个 goroutine
        go func(idx int, sn *storageNode) {result := f(idx, sn)
            resultsCh <- result    // 将后果放入 chan
        }(idx, sn)
    }
    return &storageNodesRequest{
        denyPartialResponse: denyPartialResponse,
        resultsCh:           resultsCh,
    }
}

3)返回查问后果

失常状况下,同一个 series 的指标数据,应用 jumphash 会固定存储在同一个节点上;

然而,因为网络抖动和接口出错等起因,本来存储在 storageIdx 上的数据,可能会被存储到 storageIdx+ 1 节点上。

基于以上起因,如果只有一部分 storageNode 返回数据,那后果可能是 isPartial 的数据。

最终后果的判断准则:

  • 最完满的状况:
    所有节点都失常返回且没有谬误,则认为后果时残缺且没有谬误的:
  • 若失常返回的后果 > len(storageNode) – replicaFactor:
    则被断定为数据残缺且没有谬误,间接返回;
    比方 3 个节点,replica=2,只有 >= 2 个节点失常返回,则认为后果数据是残缺的
  • 若所有节点都出错了:
    则认为后果是谬误的,并返回第 1 个谬误
  • 若局部节点返回谬误:
    则认为后果是不残缺的:
// app/vmselect/netstorage/netstorage.go
func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) {var errors []error
    resultsCollected := 0
    for i := 0; i < len(storageNodes); i++ {
        result := <-snr.resultsCh    // 返回后果
        if err := f(result); err != nil {errors = append(errors, err)
            continue
        }
        resultsCollected++
        // 断定为数据残缺且没有谬误,间接返回
        if resultsCollected > len(storageNodes)-*replicationFactor {return false, nil}
    }
    isPartial := false
    if len(errors) > 0 {
        // 所有后果都出错了,返回第一个谬误
        if len(errors) == len(storageNodes) {return false, errors[0]
        }
        isPartial = true
    }
    return isPartial, nil
}

五.VictoriaMetrics VS Influxdb

1)测试方法

VictoriaMetrics 和 Influxdb 均反对 remote-write 协定,将 kubernetes 集群的指标通过 remote-write 同时写入 victoriametrics 和 Influxdb,比照其:

  • cpu 使用率;
  • 内存使用量;
  • 存储使用量;

VictoriaMetrics 和 Influxdb 均应用单机版部署,运行工夫:18hour 左右。

2)比照 CPU 使用率

  • influxdb 的 cpu 使用率,大略在 35% 左右;
  • VictoriaMetrics 的 cpu 使用率,大略在 12% 左右;
  • 整体上,influxdb 的 cpu 使用率,高于 VictoriaMetrics;

3)比照 Mem 使用量

  • Influxdb 的 mem 使用量,大略在 1.2G 左右;
  • VictoriaMetrics 的 mem 使用量,大略在 320M 左右;

4)对此 Disk 使用量

比照存储时序数据后,其 Disk 使用量:

  • Influxdb 的磁盘占用总量≈1.2G;
  • VictoriaMetrics 的磁盘占用总量≈300M;
## influxdb
# du -sh /var/lib/influxdb/*
1.2G    /var/lib/influxdb/data
4.0K    /var/lib/influxdb/influxd.pid
4.0K    /var/lib/influxdb/meta
12M     /var/lib/influxdb/wal

## victoriametrics
# du -sh ./victoria-metrics-data/*
219M    ./victoria-metrics-data/data
0       ./victoria-metrics-data/flock.lock
40M     ./victoria-metrics-data/indexdb
4.0K    ./victoria-metrics-data/metadata
0       ./victoria-metrics-data/snapshots
0       ./victoria-metrics-data/tmp

参考

  1. victoriametrics 集群 doc: https://docs.victoriametrics….
  2. Influxdb 行协定:https://docs.influxdata.com/i…
  3. victoriametircs 与 Influxdb 性能比照:https://medium.com/@valyala/i…

正文完
 0