写入VictoriaMetrics的数据,首先被保留在内存shard,内存shard的数据定期的被压缩到inmemoryPart(内存中),inmemoryPart的数据被定期的合并到磁盘part中。

tv的压缩,产生在内存shard转换为inmemoryPart的过程中。

一. 入口代码

shard中寄存原始数据,通过压缩后,数据被寄存到inmemoryPart:

  • rows被压缩到inmemoryPart的入口:mp.InitFromRows(rows);
// lib/storage/partition.gofunc (pt *partition) addRowsPart(rows []rawRow) {    ...    mp := getInmemoryPart()    mp.InitFromRows(rows)        //这里:将rows数据压缩后保留到inmemoryPart中    ...    p, err := mp.NewPart()        pw := &partWrapper{        p:        p,        mp:       mp,        refCount: 1,    }    pt.partsLock.Lock()    pt.smallParts = append(pt.smallParts, pw)    ok := len(pt.smallParts) <= maxSmallPartsPerPartition    //常量=256    pt.partsLock.Unlock()    if ok {        return        // 以后len(smallParts)<=256,间接返回    }    // The added part exceeds available limit. Help merging parts.    //    // Prioritize assisted merges over searches.    ...}

二. 整体流程

shard的rows按(tsid,timestamp)排序,而后将其分为N个block,每个block最多8k rows;

对每个block中进行压缩,压缩后block中蕴含:

  • tsid: 即seriesId;
  • timestamps: []int64
  • values: 原为[]float64,转为[]int64和scale(保留在header中);

因为block中的values和times均为[]int64,故可应用雷同的压缩算法:

对于[]int64的压缩,VictoriaMetrics针对不同的场景做了优化:

  • 若[]int64内都是雷同的值,则保留第一个值即可;
  • 若[]int64内是delta雷同的等差数列,则保留第一个值和delta值即可;
  • 若[]int64内是Gauge类型值,则先计算value的delta,而后用zigzag压缩;
  • 若[]int64内是Counter类型值,则先计算value的delta of delta,而后用zigzag压缩;

三. 源码剖析

1.代码入口

由RawRowsMarshaller执行序列化和压缩:

// lib/storage/inmemory_part.go// InitFromRows initializes mp from the given rows.func (mp *inmemoryPart) InitFromRows(rows []rawRow) {    if len(rows) == 0 {        logger.Panicf("BUG: Inmemory.InitFromRows must accept at least one row")    }    mp.Reset()    rrm := getRawRowsMarshaler()    rrm.marshalToInmemoryPart(mp, rows)    putRawRowsMarshaler(rrm)    mp.creationTime = fasttime.UnixTimestamp()}

2.划分block,每个block别离执行

  • 首先,对所有的rows依照tsid,timestamp进行排序;
  • 而后,将rows依照metricID和maxRowsPerBlock(8K),划分为1个个的Block;
  • 而后,Block内的values值由[]float64转为[]int64+scale;

    • scale保留在block.header中;
  • 最初,执行block的压缩;
// lib/storage/raw_row.gofunc (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawRow) {    ...    rrm.bsw.InitFromInmemoryPart(mp)    ph := &mp.ph    ph.Reset()    // 1.对所有的rows执行sort(按tsid,timestamp)    // Sort rows by (TSID, Timestamp) if they aren't sorted yet.    rrs := rawRowsSort(rows)    if !sort.IsSorted(&rrs) {        sort.Sort(&rrs)    }    // 2.按block进行划分    // Group rows into blocks.    var scale int16    var rowsMerged uint64    r := &rows[0]    tsid := &r.TSID    precisionBits := r.PrecisionBits    tmpBlock := getBlock()    defer putBlock(tmpBlock)    for i := range rows {        r = &rows[i]        if r.TSID.MetricID == tsid.MetricID && len(rrm.auxTimestamps) < maxRowsPerBlock {    // maxRowsPerBlock常量=8*1024            rrm.auxTimestamps = append(rrm.auxTimestamps, r.Timestamp)            rrm.auxFloatValues = append(rrm.auxFloatValues, r.Value)            continue        }        // 3.1.将[]float64转为[]int64和scale        rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)        tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)        // 3.2.执行序列化和压缩        rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)        tsid = &r.TSID        precisionBits = r.PrecisionBits        rrm.auxTimestamps = append(rrm.auxTimestamps[:0], r.Timestamp)        rrm.auxFloatValues = append(rrm.auxFloatValues[:0], r.Value)    }    // 3.3.最初一个block的压缩    rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)    tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)    rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)    ...    rrm.bsw.MustClose()}

[]float64转[]int64+scale,由decimal.AppendFloatToDecimal实现:

// lib/decimal/decimal.go// AppendFloatToDecimal converts each item in src to v*10^e and appends// each v to dst returning it as va.//// It tries minimizing each item in dst.func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {    ...    // 比方,输出{1.2, 3.4, 5, 6, 7.8}    // 输入:{12 34 50 60 78},scale=-1    ...}

3.block内的压缩

  • 首先,将block内的timestamps和values进行压缩,压缩为[]byte;
  • 而后,将timestampsData的[]byte写入;
  • 最初,将valuesData的[]byte写入;
// lib/storage/block_stream_writer.go// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {    ...    // 1.将block内的timestamp/values进行压缩    headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)    ...    // 2.写入timestampsData    fs.MustWriteData(bsw.timestampsWriter, timestampsData)    ...    // 3.写入valuesData    fs.MustWriteData(bsw.valuesWriter, valuesData)    ...    // 更新partHeader    updatePartHeader(b, ph)}

重点看一下block内数据的压缩过程:

  • 顺次压缩values和timestamps;
  • PrecisionBits默认=64,即无损压缩;
// lib/storage/block.go// MarshalData marshals the block into binary representation.func (b *Block) MarshalData(timestampsBlockOffset, valuesBlockOffset uint64) ([]byte, []byte, []byte) {    ...    timestamps := b.timestamps[b.nextIdx:]    values := b.values[b.nextIdx:]    ...    //1. 压缩values, PrecisionBits默认=64,即无损压缩    b.valuesData, b.bh.ValuesMarshalType, b.bh.FirstValue = encoding.MarshalValues(b.valuesData[:0], values, b.bh.PrecisionBits)    b.bh.ValuesBlockOffset = valuesBlockOffset    b.bh.ValuesBlockSize = uint32(len(b.valuesData))    b.values = b.values[:0]        // 2. 压缩timestamps, PrecisionBits默认=64,即无损压缩    b.timestampsData, b.bh.TimestampsMarshalType, b.bh.MinTimestamp = encoding.MarshalTimestamps(b.timestampsData[:0], timestamps, b.bh.PrecisionBits)    b.bh.TimestampsBlockOffset = timestampsBlockOffset    b.bh.TimestampsBlockSize = uint32(len(b.timestampsData))    b.bh.MaxTimestamp = timestamps[len(timestamps)-1]    b.timestamps = b.timestamps[:0]    b.bh.RowsCount = uint32(len(values))    b.headerData = b.bh.Marshal(b.headerData[:0])    b.nextIdx = 0    return b.headerData, b.timestampsData, b.valuesData}

encode.MarshalValues和MarshallTimestamps都是对[]int64进行压缩,压缩的流程是雷同的;

// lib/encoding/encoding.gofunc MarshalValues(dst []byte, values []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {    return marshalInt64Array(dst, values, precisionBits)}func MarshalTimestamps(dst []byte, timestamps []int64, precisionBits uint8) (result []byte, mt MarshalType, firstTimestamp int64) {    return marshalInt64Array(dst, timestamps, precisionBits)}

重点看一下[]int64的压缩过程:

  • 若[]int64内都是雷同的值,则保留第一个值即可;
  • 若[]int64内是delta的等差数列,则保留第一个值和delta值;
  • 若[]int64内是Gauge类型的值,则应用delta encoding进行压缩;
  • 若[]int64内是Counter类型的值,则应用delta2 encoding进行压缩;

最初,还对后果应用zstd算法进行二次压缩:

// lib/encoding/encoding.gofunc marshalInt64Array(dst []byte, a []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {    // 1.[]int64内都是雷同的值    if isConst(a) {        firstValue = a[0]        return dst, MarshalTypeConst, firstValue    }    // 2.[]int64内是delta雷同的等差数列    if isDeltaConst(a) {        firstValue = a[0]        dst = MarshalVarInt64(dst, a[1]-a[0])        return dst, MarshalTypeDeltaConst, firstValue    }    bb := bbPool.Get()    // 3.[]int64内是Gauge类型的数值    if isGauge(a) {        // 应用delta encoding压缩        // Gauge values are better compressed with delta encoding.        mt = MarshalTypeZSTDNearestDelta        pb := precisionBits        ...        bb.B, firstValue = marshalInt64NearestDelta(bb.B[:0], a, pb)    } else {        // 4.[]int64内是Counter类型的数值,应用delta2 encoding        // Non-gauge values, i.e. counters are better compressed with delta2 encoding.        mt = MarshalTypeZSTDNearestDelta2        bb.B, firstValue = marshalInt64NearestDelta2(bb.B[:0], a, precisionBits)    }    // 5.最初还用zstd对后果进行二次压缩    // Try compressing the result.    dstOrig := dst    if len(bb.B) >= minCompressibleBlockSize {    //minCompressibleBlockSize常量=128        compressLevel := getCompressLevel(len(a))        dst = CompressZSTDLevel(dst, bb.B, compressLevel)    }    ...    return dst, mt, firstValue}

看一下delta encoding的具体实现:

// lib/encoding/nearest_delta.go// marshalInt64NearestDelta encodes src using `nearest delta` encoding// with the given precisionBits and appends the encoded value to dst.// precisionBits默认=64,即无损压缩func marshalInt64NearestDelta(dst []byte, src []int64, precisionBits uint8) (result []byte, firstValue int64) {    ...    // 1.计算src内value的delta    // 即delta=curValue-preValue    firstValue = src[0]    v := src[0]    src = src[1:]    is := GetInt64s(len(src))    if precisionBits == 64 {        // Fast path.        for i, next := range src {            d := next - v            v += d            is.A[i] = d        }    } else {        ....    }    // 2.将[]delta后果应用zigzag进行压缩    dst = MarshalVarInt64s(dst, is.A)    PutInt64s(is)    return dst, firstValue}

MarshalVarInt64s()负责将[]delta应用zigzag进行压缩:

// lib/encoding/int.go// MarshalVarInt64s appends marshaled vs to dst and returns the result.func MarshalVarInt64s(dst []byte, vs []int64) []byte {    for _, v := range vs {        if v < 0x40 && v > -0x40 {            // Fast path            c := int8(v)            v := (c << 1) ^ (c >> 7) // zig-zag encoding without branching.            dst = append(dst, byte(v))            continue        }         v = (v << 1) ^ (v >> 63) // zig-zag encoding without branching.        u := uint64(v)        for u > 0x7f {            dst = append(dst, 0x80|byte(u))            u >>= 7        }        dst = append(dst, byte(u))    }    return dst}