一.VictoriaMetrics概述
VictoriaMetrics是一个疾速、高效和可扩大的时序数据库,可作为prometheus的长期存储(long-term storage)。
VictoriaMetrics反对PromQL查询语言,也反对Influxdb行协定,对以后支流的时序协定反对比拟好。
本文重点关注其集群架构。
单实例的victoriametric只有一个过程。
集群版的victoriametrics有3类过程,即3类微服务组成:
- vmstorage: 数据存储节点,负责存储时序数据;
- vmselect: 数据查问节点,负责接管用户查问申请,向vmstorage查问时序数据;
- vminsert: 数据插入节点,负责接管用户插入申请,向vmstorage写入时序数据;
在部署时能够依照需要,不同的微服务部署不同的正本,以应答业务需要:
- 若数据量比拟大,部署较多的vmstorage正本;
- 若查问申请比拟多,部署较多的vmselect正本;
- 若插入申请比拟多,部署较多的vminsert正本;
二.VictoriaMetrics集群实现
集群中vmselect、vminsert节点都是无状态的,惟一有状态的是vmstorage。
vmstorage的多节点采纳shared noting architecture,各节点间不共享数据,也不晓得彼此的存在。
vmstorage
nodes don't know about each other, don't communicate with each other and don't share any data. This is shared nothing architecture. It increases cluster availability, simplifies cluster maintenance and cluster scaling.
为保障时序数据的可用性,采纳复制的办法,即每份数据存入N个不同的节点,在查问时,同时查问多个节点,去重后返回给client。
若设置--replicationFactor=2,即数据正本=2:
在写入数据时:
- 对输出数据进行一致性hash计算,将写入storageIndex节点;
- 因为配置写入2正本,它同时要写入storageIndex+1节点;
在查问数据时:
- 向所有的vmstorage发动查问;
- 将查问后果合并、去重后,返回给client;
- 依据vmstorage正确响应的节点和replica的个数,判断返回数据isPartial/OK;
三.VictoriaMetrics写入数据
以InfluxDB行协定插入数据为例,联合源码剖析victoriametrics集群版本写入数据的流程。
写入数据在vminsert服务中解决,假如--replicationFactor=2,即数据正本=2:
插入时:
- 对每条时序数据,依照其label计算一致性hash值,作为它存入的指标节点,假如为storageNodeA;
- 对于该条时序数据,因为正本=2,它将被写入storageNodeA和storageNodeA +1两个vmstorage节点;
1)API入口
API: influx/write
// app/vminsert/main.gofunc requestHandler(w http.ResponseWriter, r *http.Request) bool { ...... switch p.Suffix { case "influx/write", "influx/api/v2/write": if err := influx.InsertHandlerForHTTP(at, r); err != nil { httpserver.Errorf(w, r, "%s", err) return true } w.WriteHeader(http.StatusNoContent) return true } ......}
API handler:
// app/vminsert/influx/request_handler.gofunc InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } //限流执行 return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" q := req.URL.Query() precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { return insertRows(at, db, rows, extraLabels, false) }) })}
能够看到,API handler中做了限流。
2)一致性hash
遍历每条时序数据,依据其label,计算一致性hash值,作为存入的指标节点:
// app/vminsert/influx/request_handler.gofunc insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label, mayOverrideAccountProjectID bool) error { ic := &ctx.Common atCopy := *at for i := range rows { r := &rows[i] ...... ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels) for j := range r.Fields { f := &r.Fields[j] ... metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf) ic.Labels = ic.Labels[:labelsLen] ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[len(ic.Labels)-1]) // 一致性hash: 计算storageIdx storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels) // 每个时序数据写入对应的ctx.bufRowss[storageIdx] if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { return err } } } ......}
一致性hash计算storageIdx的代码:jump_hash算法
// app/vminsert/netstorage/insert_ctx.gofunc (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int { ...... buf := ctx.labelsBuf[:0] buf = encoding.MarshalUint32(buf, at.AccountID) buf = encoding.MarshalUint32(buf, at.ProjectID) for i := range labels { label := &labels[i] buf = marshalBytesFast(buf, label.Name) buf = marshalBytesFast(buf, label.Value) } h := xxhash.Sum64(buf) //计算hash值 ctx.labelsBuf = buf idx := int(jump.Hash(h, int32(len(storageNodes)))) //失去storageIdx return idx}
3) 写入多节点
写入流程,若配置多个正本(replica),则将数据发送至多个storageNode;
- 对单个storageNode,通过sn.esendBufRowsNonBlocking()发送:
- 若发送胜利,则跳出内层for;
- 否则,尝试发送至idx++节点;
- 若尝试了所有节点后,则间接返回;
// app/vminsert/netstorage/netstorage.gofunc sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool { usedStorageNodes := make(map[*storageNode]bool, replicas) for i := 0; i < replicas; i++ { //多个正本,发送至多个storageNode idx := snIdx + i attempts := 0 for { attempts++ if attempts > len(storageNodes) { //已尝试了所有节点 if i == 0 { return false //第一个正本就失败了,返回false } return true } sn := storageNodes[idx] idx++ //发送 if !sn.sendBufRowsNonblocking(br) { continue //若失败,则尝试下一个节点idx++ } break //胜利就返回 } } return true}
四.VictoriaMetrics查问数据
以PromQL查问数据为例,联合源码剖析victoriametrics集群版本查问数据数据的流程。
查问数据在vmselect服务中解决,假如--replicationFactor=2,即数据正本=2:
查问时:
- 向N个vmstorage节点查问时序数据(N为vminsert启动时配置的vmstorage参数);
- 数据返回后,通过合并去重,返回给client;
1)API入口
// app/vmselect/main.gofunc selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool { ...... switch p.Suffix { ...... case "prometheus/api/v1/query_range": if err := prometheus.QueryRangeHandler(startTime, at, w, r); err != nil { sendPrometheusError(w, r, err) return true } return true } ......}
2)并发向storageNode查问
并发的向所有storageNode发动查问,为每个storageNode调配1个goroutine:
// app/vmselect/netstorage/netstorage.gofunc startStorageNodesRequest(denyPartialResponse bool, f func(idx int, sn *storageNode) interface{}) *storageNodesRequest { resultsCh := make(chan interface{}, len(storageNodes)) for idx, sn := range storageNodes { // 每个storageNode调配一个goroutine go func(idx int, sn *storageNode) { result := f(idx, sn) resultsCh <- result //将后果放入chan }(idx, sn) } return &storageNodesRequest{ denyPartialResponse: denyPartialResponse, resultsCh: resultsCh, }}
3)返回查问后果
失常状况下,同一个series的指标数据,应用jumphash会固定存储在同一个节点上;
然而,因为网络抖动和接口出错等起因,本来存储在storageIdx上的数据,可能会被存储到storageIdx+1节点上。
基于以上起因,如果只有一部分storageNode返回数据,那后果可能是isPartial的数据。
最终后果的判断准则:
- 最完满的状况:
所有节点都失常返回且没有谬误,则认为后果时残缺且没有谬误的: - 若失常返回的后果 > len(storageNode) - replicaFactor:
则被断定为数据残缺且没有谬误,间接返回;
比方3个节点,replica=2,只有>=2个节点失常返回,则认为后果数据是残缺的 - 若所有节点都出错了:
则认为后果是谬误的,并返回第1个谬误 - 若局部节点返回谬误:
则认为后果是不残缺的:
// app/vmselect/netstorage/netstorage.gofunc (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) { var errors []error resultsCollected := 0 for i := 0; i < len(storageNodes); i++ { result := <-snr.resultsCh //返回后果 if err := f(result); err != nil { errors = append(errors, err) continue } resultsCollected++ // 断定为数据残缺且没有谬误,间接返回 if resultsCollected > len(storageNodes)-*replicationFactor { return false, nil } } isPartial := false if len(errors) > 0 { // 所有后果都出错了,返回第一个谬误 if len(errors) == len(storageNodes) { return false, errors[0] } isPartial = true } return isPartial, nil}
五.VictoriaMetrics VS Influxdb
1)测试方法
VictoriaMetrics和Influxdb均反对remote-write协定,将kubernetes集群的指标通过remote-write同时写入victoriametrics和Influxdb,比照其:
- cpu使用率;
- 内存使用量;
- 存储使用量;
VictoriaMetrics和Influxdb均应用单机版部署,运行工夫:18hour左右。
2)比照CPU使用率
- influxdb的cpu使用率,大略在35%左右;
- VictoriaMetrics的cpu使用率,大略在12%左右;
- 整体上,influxdb的cpu使用率,高于VictoriaMetrics;
3)比照Mem使用量
- Influxdb的mem使用量,大略在1.2G左右;
- VictoriaMetrics的mem使用量,大略在320M左右;
4)对此Disk使用量
比照存储时序数据后,其Disk使用量:
- Influxdb的磁盘占用总量≈1.2G;
- VictoriaMetrics的磁盘占用总量≈300M;
## influxdb# du -sh /var/lib/influxdb/*1.2G /var/lib/influxdb/data4.0K /var/lib/influxdb/influxd.pid4.0K /var/lib/influxdb/meta12M /var/lib/influxdb/wal## victoriametrics# du -sh ./victoria-metrics-data/*219M ./victoria-metrics-data/data0 ./victoria-metrics-data/flock.lock40M ./victoria-metrics-data/indexdb4.0K ./victoria-metrics-data/metadata0 ./victoria-metrics-data/snapshots0 ./victoria-metrics-data/tmp
参考
- victoriametrics集群doc: https://docs.victoriametrics....
- Influxdb行协定:https://docs.influxdata.com/i...
- victoriametircs与Influxdb性能比照:https://medium.com/@valyala/i...