乐趣区

关于时序数据库:VictoriaMetrics源码tv的压缩

写入 VictoriaMetrics 的数据,首先被保留在内存 shard,内存 shard 的数据定期的被压缩到 inmemoryPart(内存中),inmemoryPart 的数据被定期的合并到磁盘 part 中。

tv 的压缩,产生在内存 shard 转换为 inmemoryPart 的过程中。

一. 入口代码

shard 中寄存原始数据,通过压缩后,数据被寄存到 inmemoryPart:

  • rows 被压缩到 inmemoryPart 的入口:mp.InitFromRows(rows);
// lib/storage/partition.go
func (pt *partition) addRowsPart(rows []rawRow) {
    ...
    mp := getInmemoryPart()
    mp.InitFromRows(rows)        // 这里:将 rows 数据压缩后保留到 inmemoryPart 中
    ...
    p, err := mp.NewPart()    
    pw := &partWrapper{
        p:        p,
        mp:       mp,
        refCount: 1,
    }
    pt.partsLock.Lock()
    pt.smallParts = append(pt.smallParts, pw)
    ok := len(pt.smallParts) <= maxSmallPartsPerPartition    // 常量 =256
    pt.partsLock.Unlock()
    if ok {return        // 以后 len(smallParts)<=256,间接返回
    }
    // The added part exceeds available limit. Help merging parts.
    //
    // Prioritize assisted merges over searches.
    ...
}

二. 整体流程

shard 的 rows 按 (tsid,timestamp) 排序,而后将其分为 N 个 block,每个 block 最多 8k rows;

对每个 block 中进行压缩,压缩后 block 中蕴含:

  • tsid: 即 seriesId;
  • timestamps: []int64
  • values: 原为[]float64,转为[]int64 和 scale(保留在 header 中);

因为 block 中的 values 和 times 均为[]int64,故可应用雷同的压缩算法:

对于[]int64 的压缩,VictoriaMetrics 针对不同的场景做了优化:

  • 若[]int64 内都是雷同的值,则保留第一个值即可;
  • 若[]int64 内是 delta 雷同的等差数列,则保留第一个值和 delta 值即可;
  • 若[]int64 内是 Gauge 类型值,则先计算 value 的 delta,而后用 zigzag 压缩;
  • 若[]int64 内是 Counter 类型值,则先计算 value 的 delta of delta,而后用 zigzag 压缩;

三. 源码剖析

1. 代码入口

由 RawRowsMarshaller 执行序列化和压缩:

// lib/storage/inmemory_part.go
// InitFromRows initializes mp from the given rows.
func (mp *inmemoryPart) InitFromRows(rows []rawRow) {if len(rows) == 0 {logger.Panicf("BUG: Inmemory.InitFromRows must accept at least one row")
    }
    mp.Reset()
    rrm := getRawRowsMarshaler()
    rrm.marshalToInmemoryPart(mp, rows)
    putRawRowsMarshaler(rrm)
    mp.creationTime = fasttime.UnixTimestamp()}

2. 划分 block,每个 block 别离执行

  • 首先,对所有的 rows 依照 tsid,timestamp 进行排序;
  • 而后,将 rows 依照 metricID 和 maxRowsPerBlock(8K),划分为 1 个个的 Block;
  • 而后,Block 内的 values 值由[]float64 转为[]int64+scale;

    • scale 保留在 block.header 中;
  • 最初,执行 block 的压缩;
// lib/storage/raw_row.go
func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawRow) {
    ...
    rrm.bsw.InitFromInmemoryPart(mp)
    ph := &mp.ph
    ph.Reset()
    // 1. 对所有的 rows 执行 sort(按 tsid,timestamp)
    // Sort rows by (TSID, Timestamp) if they aren't sorted yet.
    rrs := rawRowsSort(rows)
    if !sort.IsSorted(&rrs) {sort.Sort(&rrs)
    }
    // 2. 按 block 进行划分
    // Group rows into blocks.
    var scale int16
    var rowsMerged uint64
    r := &rows[0]
    tsid := &r.TSID
    precisionBits := r.PrecisionBits
    tmpBlock := getBlock()
    defer putBlock(tmpBlock)
    for i := range rows {r = &rows[i]
        if r.TSID.MetricID == tsid.MetricID && len(rrm.auxTimestamps) < maxRowsPerBlock {    // maxRowsPerBlock 常量 =8*1024
            rrm.auxTimestamps = append(rrm.auxTimestamps, r.Timestamp)
            rrm.auxFloatValues = append(rrm.auxFloatValues, r.Value)
            continue
        }
        // 3.1. 将[]float64 转为[]int64 和 scale
        rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
        tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
        // 3.2. 执行序列化和压缩
        rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)

        tsid = &r.TSID
        precisionBits = r.PrecisionBits
        rrm.auxTimestamps = append(rrm.auxTimestamps[:0], r.Timestamp)
        rrm.auxFloatValues = append(rrm.auxFloatValues[:0], r.Value)
    }
    // 3.3. 最初一个 block 的压缩
    rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
    tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
    rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
    ...
    rrm.bsw.MustClose()}

[]float64 转[]int64+scale,由 decimal.AppendFloatToDecimal 实现:

// lib/decimal/decimal.go
// AppendFloatToDecimal converts each item in src to v*10^e and appends
// each v to dst returning it as va.
//
// It tries minimizing each item in dst.
func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {
    ...
    // 比方,输出{1.2, 3.4, 5, 6, 7.8}
    // 输入:{12 34 50 60 78},scale=-1
    ...
}

3.block 内的压缩

  • 首先,将 block 内的 timestamps 和 values 进行压缩,压缩为[]byte;
  • 而后,将 timestampsData 的[]byte 写入;
  • 最初,将 valuesData 的[]byte 写入;
// lib/storage/block_stream_writer.go
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
    ...
    // 1. 将 block 内的 timestamp/values 进行压缩
    headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
    ...
    // 2. 写入 timestampsData
    fs.MustWriteData(bsw.timestampsWriter, timestampsData)
    ...
    // 3. 写入 valuesData
    fs.MustWriteData(bsw.valuesWriter, valuesData)
    ...
    // 更新 partHeader
    updatePartHeader(b, ph)
}

重点看一下 block 内数据的压缩过程:

  • 顺次压缩 values 和 timestamps;
  • PrecisionBits 默认 =64,即无损压缩;
// lib/storage/block.go
// MarshalData marshals the block into binary representation.
func (b *Block) MarshalData(timestampsBlockOffset, valuesBlockOffset uint64) ([]byte, []byte, []byte) {
    ...
    timestamps := b.timestamps[b.nextIdx:]
    values := b.values[b.nextIdx:]
    ...
    //1. 压缩 values, PrecisionBits 默认 =64,即无损压缩
    b.valuesData, b.bh.ValuesMarshalType, b.bh.FirstValue = encoding.MarshalValues(b.valuesData[:0], values, b.bh.PrecisionBits)
    b.bh.ValuesBlockOffset = valuesBlockOffset
    b.bh.ValuesBlockSize = uint32(len(b.valuesData))
    b.values = b.values[:0]
    
    // 2. 压缩 timestamps, PrecisionBits 默认 =64,即无损压缩
    b.timestampsData, b.bh.TimestampsMarshalType, b.bh.MinTimestamp = encoding.MarshalTimestamps(b.timestampsData[:0], timestamps, b.bh.PrecisionBits)
    b.bh.TimestampsBlockOffset = timestampsBlockOffset
    b.bh.TimestampsBlockSize = uint32(len(b.timestampsData))
    b.bh.MaxTimestamp = timestamps[len(timestamps)-1]
    b.timestamps = b.timestamps[:0]

    b.bh.RowsCount = uint32(len(values))
    b.headerData = b.bh.Marshal(b.headerData[:0])

    b.nextIdx = 0

    return b.headerData, b.timestampsData, b.valuesData
}

encode.MarshalValues 和 MarshallTimestamps 都是对[]int64 进行压缩,压缩的流程是雷同的;

// lib/encoding/encoding.go
func MarshalValues(dst []byte, values []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {return marshalInt64Array(dst, values, precisionBits)
}
func MarshalTimestamps(dst []byte, timestamps []int64, precisionBits uint8) (result []byte, mt MarshalType, firstTimestamp int64) {return marshalInt64Array(dst, timestamps, precisionBits)
}

重点看一下[]int64 的压缩过程:

  • 若[]int64 内都是雷同的值,则保留第一个值即可;
  • 若[]int64 内是 delta 的等差数列,则保留第一个值和 delta 值;
  • 若[]int64 内是 Gauge 类型的值,则应用 delta encoding 进行压缩;
  • 若[]int64 内是 Counter 类型的值,则应用 delta2 encoding 进行压缩;

最初,还对后果应用 zstd 算法进行二次压缩:

// lib/encoding/encoding.go
func marshalInt64Array(dst []byte, a []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {// 1.[]int64 内都是雷同的值
    if isConst(a) {firstValue = a[0]
        return dst, MarshalTypeConst, firstValue
    }
    // 2.[]int64 内是 delta 雷同的等差数列
    if isDeltaConst(a) {firstValue = a[0]
        dst = MarshalVarInt64(dst, a[1]-a[0])
        return dst, MarshalTypeDeltaConst, firstValue
    }
    bb := bbPool.Get()
    // 3.[]int64 内是 Gauge 类型的数值
    if isGauge(a) {
        // 应用 delta encoding 压缩
        // Gauge values are better compressed with delta encoding.
        mt = MarshalTypeZSTDNearestDelta
        pb := precisionBits
        ...
        bb.B, firstValue = marshalInt64NearestDelta(bb.B[:0], a, pb)
    } else {// 4.[]int64 内是 Counter 类型的数值,应用 delta2 encoding
        // Non-gauge values, i.e. counters are better compressed with delta2 encoding.
        mt = MarshalTypeZSTDNearestDelta2
        bb.B, firstValue = marshalInt64NearestDelta2(bb.B[:0], a, precisionBits)
    }
    // 5. 最初还用 zstd 对后果进行二次压缩
    // Try compressing the result.
    dstOrig := dst
    if len(bb.B) >= minCompressibleBlockSize {    //minCompressibleBlockSize 常量 =128
        compressLevel := getCompressLevel(len(a))
        dst = CompressZSTDLevel(dst, bb.B, compressLevel)
    }
    ...
    return dst, mt, firstValue
}

看一下 delta encoding 的具体实现:

// lib/encoding/nearest_delta.go
// marshalInt64NearestDelta encodes src using `nearest delta` encoding
// with the given precisionBits and appends the encoded value to dst.
// precisionBits 默认 =64,即无损压缩
func marshalInt64NearestDelta(dst []byte, src []int64, precisionBits uint8) (result []byte, firstValue int64) {
    ...
    // 1. 计算 src 内 value 的 delta
    // 即 delta=curValue-preValue
    firstValue = src[0]
    v := src[0]
    src = src[1:]
    is := GetInt64s(len(src))
    if precisionBits == 64 {
        // Fast path.
        for i, next := range src {
            d := next - v
            v += d
            is.A[i] = d
        }
    } else {....}
    // 2. 将[]delta 后果应用 zigzag 进行压缩
    dst = MarshalVarInt64s(dst, is.A)
    PutInt64s(is)
    return dst, firstValue
}

MarshalVarInt64s()负责将[]delta 应用 zigzag 进行压缩:

// lib/encoding/int.go
// MarshalVarInt64s appends marshaled vs to dst and returns the result.
func MarshalVarInt64s(dst []byte, vs []int64) []byte {
    for _, v := range vs {
        if v < 0x40 && v > -0x40 {
            // Fast path
            c := int8(v)
            v := (c << 1) ^ (c >> 7) // zig-zag encoding without branching.
            dst = append(dst, byte(v))
            continue
        }
 
        v = (v << 1) ^ (v >> 63) // zig-zag encoding without branching.
        u := uint64(v)
        for u > 0x7f {dst = append(dst, 0x80|byte(u))
            u >>= 7
        }
        dst = append(dst, byte(u))
    }
    return dst
}
退出移动版