共计 6485 个字符,预计需要花费 17 分钟才能阅读完成。
一. 存储格局
下图是向 VictoriaMetrics 写入 prometheus 协定数据的示例:
VM 在收到写入申请时,会对申请中蕴含的时序数据做转换解决:
- 首先,依据 metrics+labels 组成的 MetricName,生成一个惟一标识 TSID;
-
而后:
- metric(指标名称__name__) + labels + TSID 作为索引 index;
- TSID + timestamp + value 作为数据 data;
- 最初,索引 index 和数据 data 别离进行存储和检索;
因而,VM 的数据整体上分为索引和数据 2 个局部:
- 索引局部,用以反对依照 label 或 tag 进行多维检索,失去 TSID;
- 数据局部,用以反对依照 TSID 失去 tv 数据;
二. 整体流程
VictoriaMetrics 在写入原始的 rows 数据时,写入过程分为两个局部:
- 写 index;
- 写 tv;
写入流程:
- 对于原始的 rows 数据,依据其 metricsName 从 cache 和内存索引中,查找其对应的 TSID;
- 若 TSID 找到,则写入 tv 数据,返回 client;
-
否则:
-
写 index:
- 结构 TSID,结构新的 index items,而后将其写入内存 shard;
- 内存 shard 被异步的 goroutine 压缩并保留到磁盘;
- 写 tv 数据;
- 返回 client;
-
三. 写入代码
1. 入口代码
vmstorage 监听 tcp 端口,收到 vminsert 的插入申请后,进行解决:
// app/vmstorage/servers/vminsert.go
func (s *VMInsertServer) run() {
...
for {c, err := s.ln.Accept()
...
go func() {bc, err := handshake.VMInsertServer(c, compressionLevel)
...
err = clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits)) // 入口代码
}, s.storage.IsReadOnly)
...
}()}
}
写入时,1 次最多写 8K 个 rows:
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
....
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 {
mrsBlock := mrs
// 一次最多写 8K,maxBlockLen=8000
if len(mrs) > maxBlockLen {mrsBlock = mrs[:maxBlockLen]
mrs = mrs[maxBlockLen:]
} else {mrs = nil}
// 写入 8K rows 的数据
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {firstErr = err}
continue
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
....
}
2. 写入流程的代码
写入过程次要分 2 步:
-
首先,为 row 查找或构建 TSID;
- 若该 row 的 metricNameRaw 与 prevMetricNameRaw,则应用 prevTSID;
- 若 cache 中有缓存的 metricNameRaw,则应用缓存的 metricNameRaw 对应的 TSID;
-
若上述都不满足,则去内存索引中查找,或者创立一个新的 TSID;
- 这一步是最耗时的;
- 而后,构建 TSID 结束后,插入 tv 数据;
// lib/storage/storage.go
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
...
// 1. 结构 r.TSID
// 若跟 prevMetricNameRaw 雷同,则应用 pervTSID;
// 若 cache 中有 metricNameRaw,则应用 cache.TSID;for i := range mrs {mr := &mrs[i]
...
dstMrs[j] = mr
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) { // 应用 prevTSID
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r.TSID = prevTSID
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { // 应用缓存的 TSID
...
r.TSID = genTSID.TSID
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
...
continue
}
...
}
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
sort.Slice(pendingMetricRows, func(i, j int) bool {return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
})
prevMetricNameRaw = nil
var slowInsertsCount uint64
for i := range pendingMetricRows {
...
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
// 尝试去 index 找查找,或者创立
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
...
continue
}
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
// 放回 cache
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
}
}
...
dstMrs = dstMrs[:j]
rows = rows[:j]
err := s.updatePerDateData(rows, dstMrs)
if err != nil {err = fmt.Errorf("cannot update per-date data: %w", err)
} else {
// TSID 结构结束,开始插入数据
err = s.tb.AddRows(rows)
...
}
...
return nil
}
3. 写 index
写 index 是 slow path,重点看一下:
- 首先,去内存索引中找 TSID,若找到,则返回;
- 否则,创立一个新的 TSID;
// lib/storage/index_db.go
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
// 1. 首先尝试在 index 中查找
if is.tsidByNameMisses < 100 {err := is.getTSIDByMetricName(dst, metricName)
// 在 index 中找到了
if err == nil {
// Fast path - the TSID for the given metricName has been found in the index.
is.tsidByNameMisses = 0
if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {return err}
return nil
}
is.tsidByNameMisses++
} else {
is.tsidByNameSkips++
if is.tsidByNameSkips > 10000 {
is.tsidByNameSkips = 0
is.tsidByNameMisses = 0
}
}
// 2. 没有找到,那么创立一个
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {userReadableMetricName := getUserReadableMetricName(metricNameRaw)
return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
}
return nil
}
4. 生成 TSID
具体生成 TSID 的逻辑:
- MetricGroupID: 由 metricGroup hash 而来;
- JobID:由 tags[0].Value hash 而来;
- InstanceID:由 tags[1].Value hash 而来;
// lib/storage/index_db.go
func generateTSID(dst *TSID, mn *MetricName) {
dst.AccountID = mn.AccountID
dst.ProjectID = mn.ProjectID
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
if len(mn.Tags) > 0 {dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
}
if len(mn.Tags) > 1 {dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
}
dst.MetricID = generateUniqueMetricID()}
而 TSID 中的 metricID 是由启动时的工夫戳 + 1 产生:
// Returns local unique MetricID.
func generateUniqueMetricID() uint64 {return atomic.AddUint64(&nextUniqueMetricID, 1)
}
var nextUniqueMetricID = uint64(time.Now().UnixNano())
5. 创立 index items
- 创立 MetricName -> TSID index;
- 创立 MetricID -> MetricName index;
- 创立 MetricID -> TSID index;
- 创立 tag -> MetricID 和 MetricGroup+tag -> MetricID index;
- 最初,将 index items 存入内存 shards;
// lib/storage/index_db.go
func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
// The order of index items is important.
// It guarantees index consistency.
ii := getIndexItems()
defer putIndexItems(ii)
// Create MetricName -> TSID index.
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create MetricID -> MetricName index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = mn.Marshal(ii.B)
ii.Next()
// Create MetricID -> TSID index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = tsid.Marshal(ii.B)
ii.Next()
prefix := kbPool.Get()
prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
kbPool.Put(prefix)
is.db.tb.AddItems(ii.Items) // 将 items 存入内存 shards
}
6. index items 存入内存 shards
Index items 结构实现后,被写入内存的 shards,会有异步的 goroutine 将其压缩写入 disk。
写内存 shards 的办法: roundRobin
- 内存中有若干个 index shards;
- 写入时,轮转写入:idx++ % shards
// lib/mergeset/table.go
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
shards := riss.shards
shardsLen := uint32(len(shards))
for len(items) > 0 {n := atomic.AddUint32(&riss.shardIdx, 1)
idx := n % shardsLen
items = shards[idx].addItems(tb, items)
}
}
内存中 shards 总数,跟 cpu 核数有关系:
- shards 总数 = (cpu*cpu + 1) / 2
- 对于 4C 的机器,有 8 个 shards;
// lib/mergeset/table.go
/ The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawItemsShardsPerTable = func() int {cpus := cgroup.AvailableCPUs()
multiplier := cpus
if multiplier > 16 {multiplier = 16}
return (cpus*multiplier + 1) / 2
}()
正文完