关于时序数据库:VictoriaMetrics源码tv写入流程

3次阅读

共计 4423 个字符,预计需要花费 12 分钟才能阅读完成。

一. 整体流程

  • client 写入时,会首先写入内存的 shard,若 shard 写入胜利,则间接返回 client;
  • 若写入后 shard 已满,则将 shard 内的数据压缩后保留为 inmemoryPart(依然在内存中),而后返回 client:
  • inmemoryPart 的数据,被后盾的 goroutine,定期的 merge 为 part 构造,保留到 disk 中;

时序数据在磁盘中的存储:

  • partition: 一个月 1 个目录,比方 2023_05:
  • part: 作为 partition 下的子目录,外面蕴含 tv 和索引信息;
# ls 2023_05/
117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD

# # ls 2023_05/117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD/
index.bin  metaindex.bin  min_dedup_interval  timestamps.bin  values.bin

二. 内存 shard

shards 的个数:

  • shards 个数跟 CPU 核数无关;
  • 对 4C4G 的机器来说,就 1 个 shard;
// The number of shards for rawRow entries per partition.
//
var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 3) / 4

每个 shard 保留的 rows 个数:

  • 范畴 =1w~50w;
  • unsafe.Sizeof(rawRow{})=50
  • 对 4C4G 机器来说:

    • memory.Allowed() = 4G×60%;
    • rowRowsShardsPerPartition=1;
    • 每个 shard 保留的 rows 个数:后果 =(4G×60% / 1 / 256 / 50) = 18w;
// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerShard() int {maxRawRowsPerPartitionOnce.Do(func() {n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{}))
        if n < 1e4 {n = 1e4}
        if n > 500e3 {n = 500e3}
        maxRawRowsPerPartition = n
    })
    return maxRawRowsPerPartition
}

若存在 N 个 shards,写入时保留到哪个 shard?

  • 按程序轮转写:roundRobin

    • 第一次写第一个,下一次写第二个 …
func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {n := atomic.AddUint32(&rrss.shardIdx, 1)
    shards := rrss.shards
    idx := n % uint32(len(shards))
    shard := &shards[idx]
    shard.addRows(pt, rows)
}

三. 写入的代码

首先,依据 rows 工夫找指标 partition,为简化剖析,仅思考一个 partition 写入的状况:

  • 一个 partition 就是一个 month;
// lib/storage/table.go
func (tb *table) AddRows(rows []rawRow) error {if len(rows) == 0 {return nil}
    // Verify whether all the rows may be added to a single partition.
    ptwsX := getPartitionWrappers()
    defer putPartitionWrappers(ptwsX)

    ptwsX.a = tb.GetPartitions(ptwsX.a[:0])
    ptws := ptwsX.a
    for i, ptw := range ptws {
        singlePt := true
        for j := range rows {if !ptw.pt.HasTimestamp(rows[j].Timestamp) {
                singlePt = false
                break
            }
        }
        if !singlePt {continue}
        ...
        // 所有 rows 都在一个 partition
        // Fast path - add all the rows into the ptw.
        ptw.pt.AddRows(rows)
        tb.PutPartitions(ptws)
        return nil
    }
    ...
}

确定 partition 后,前面的流程即在 partion 内写入:

  • rows 会被先写入 partition 内的 []shard;
// lib/storage/partition.go
// AddRows adds the given rows to the partition pt.
func (pt *partition) AddRows(rows []rawRow) {if len(rows) == 0 {return}
    // Validate all the rows.
    ...
    // 写入到 []shard
    pt.rawRows.addRows(pt, rows)
}

shard 内的写入:

  • 若 shard 内有足够的空位写入 rows,则写入 shard 并返回 client 了;
  • 否则,将 shard 内已有的 rows 和新 rows 保留在 rowsToFlush 中;而后将 rowsToFlush 中的数据写入 inmemoryPart;
// lib/storage/partition.go
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {var rowsToFlush []rawRow

    rrs.mu.Lock()
    if cap(rrs.rows) == 0 {n := getMaxRawRowsPerShard()
        rrs.rows = make([]rawRow, 0, n)
    }
    maxRowsCount := cap(rrs.rows)
    capacity := maxRowsCount - len(rrs.rows)
    // 还有空位
    if capacity >= len(rows) {
        // Fast path - rows fit capacity.
        rrs.rows = append(rrs.rows, rows...)
    } else {
        // shard 中没有空位了
        // 将 shard 中的 rows 和新 rows 保留在 rowsToFlush 中
        // Slow path - rows don't fit capacity.
        // Put rrs.rows and rows to rowsToFlush and convert it to a part.
        rowsToFlush = append(rowsToFlush, rrs.rows...)
       rowsToFlush = append(rowsToFlush, rows...)
        rrs.rows = rrs.rows[:0]
        rrs.lastFlushTime = fasttime.UnixTimestamp()}
    rrs.mu.Unlock()
    // 将 rowsToFlush 内的 rows 写入 inmemoryPart
    pt.flushRowsToParts(rowsToFlush)
}

将 rowsToFlush 的数据写入 inmemoryPart 的过程:

  • 这里能够看到,若 rowsToFlush 为空的话,函数就间接返回了;
  • 具体工作由 pt.addRowsPart(rowsPart) 执行;
// lib/storage/partition.go
func (pt *partition) flushRowsToParts(rows []rawRow) {maxRows := getMaxRawRowsPerShard()
    wg := getWaitGroup()
    for len(rows) > 0 {
        n := maxRows
        if n > len(rows) {n = len(rows)
        }
        wg.Add(1)
        go func(rowsPart []rawRow) {defer wg.Done()
            pt.addRowsPart(rowsPart)    // 执行工作的函数
        }(rows[:n])
        rows = rows[n:]
    }
    wg.Wait()
    putWaitGroup(wg)
}

将 rows 内容写入 inmemoryPart,而后结构一个 partWrapper,将 partWrapper 保留到 partition.smallParts 中;

与此同时,判断 pt.smallParts 是否超过 256 个,若 <=256 则间接返回;否则帮忙执行 merge part;

// lib/storage/partition.go
func (pt *partition) addRowsPart(rows []rawRow) {
    // 将 rows 的内容写入 inmemoryPart
    mp := getInmemoryPart()
    mp.InitFromRows(rows)    // 将 rows 写入 inmemoryPart 时会对 rows 数据进行压缩
    ...
    // 结构一个 partWrapper
    p, err := mp.NewPart()
    pw := &partWrapper{
        p:        p,
        mp:       mp,
        refCount: 1,
    }
    // 将新的 partWrapper 保留在 partition 的 smallParts 中
    pt.smallParts = append(pt.smallParts, pw)
    // 断定是否超过 256
    ok := len(pt.smallParts) <= maxSmallPartsPerPartition
    pt.partsLock.Unlock()
    if ok {return    // smallParts <= 256,间接返回}
    // 若 smallParts > 256,则帮忙执行 merge
    // The added part exceeds available limit. Help merging parts.
    ...
    err = pt.mergeSmallParts(false)
    if err == nil {atomic.AddUint64(&pt.smallAssistedMerges, 1)
        return
    }
    ....
}

四. 总结

  • 最快的流程:

    • rows 写入到 partition 写的某个 shard,而后返回;
    • shard 内的 rows 被 goroutine 定期的保留到 inmemoryPart;
  • 次快的流程:

    • rows 写入的指标 shard 满了,将 shard rows 和新 rows 存入 inmemoryPart,保留到 partition.smallParts 中;
    • inmemoryPart 内的 rows 被 goroutine 定期的 merge 到磁盘,保留为 part 目录;
  • 最慢的流程:

    • 在上一个流程的根底上,发现 pt.smallParts 超过 256 个,帮忙执行 merge;
正文完
 0