写入VictoriaMetrics的数据,首先被保留在内存shard,内存shard的数据定期的被压缩到inmemoryPart(内存中),inmemoryPart的数据被定期的合并到磁盘part中。
tv的压缩,产生在内存shard转换为inmemoryPart的过程中。
一. 入口代码
shard中寄存原始数据,通过压缩后,数据被寄存到inmemoryPart:
- rows被压缩到inmemoryPart的入口:mp.InitFromRows(rows);
// lib/storage/partition.gofunc (pt *partition) addRowsPart(rows []rawRow) { ... mp := getInmemoryPart() mp.InitFromRows(rows) //这里:将rows数据压缩后保留到inmemoryPart中 ... p, err := mp.NewPart() pw := &partWrapper{ p: p, mp: mp, refCount: 1, } pt.partsLock.Lock() pt.smallParts = append(pt.smallParts, pw) ok := len(pt.smallParts) <= maxSmallPartsPerPartition //常量=256 pt.partsLock.Unlock() if ok { return // 以后len(smallParts)<=256,间接返回 } // The added part exceeds available limit. Help merging parts. // // Prioritize assisted merges over searches. ...}
二. 整体流程
shard的rows按(tsid,timestamp)排序,而后将其分为N个block,每个block最多8k rows;
对每个block中进行压缩,压缩后block中蕴含:
- tsid: 即seriesId;
- timestamps: []int64
- values: 原为[]float64,转为[]int64和scale(保留在header中);
因为block中的values和times均为[]int64,故可应用雷同的压缩算法:
对于[]int64的压缩,VictoriaMetrics针对不同的场景做了优化:
- 若[]int64内都是雷同的值,则保留第一个值即可;
- 若[]int64内是delta雷同的等差数列,则保留第一个值和delta值即可;
- 若[]int64内是Gauge类型值,则先计算value的delta,而后用zigzag压缩;
- 若[]int64内是Counter类型值,则先计算value的delta of delta,而后用zigzag压缩;
三. 源码剖析
1.代码入口
由RawRowsMarshaller执行序列化和压缩:
// lib/storage/inmemory_part.go// InitFromRows initializes mp from the given rows.func (mp *inmemoryPart) InitFromRows(rows []rawRow) { if len(rows) == 0 { logger.Panicf("BUG: Inmemory.InitFromRows must accept at least one row") } mp.Reset() rrm := getRawRowsMarshaler() rrm.marshalToInmemoryPart(mp, rows) putRawRowsMarshaler(rrm) mp.creationTime = fasttime.UnixTimestamp()}
2.划分block,每个block别离执行
- 首先,对所有的rows依照tsid,timestamp进行排序;
- 而后,将rows依照metricID和maxRowsPerBlock(8K),划分为1个个的Block;
而后,Block内的values值由[]float64转为[]int64+scale;
- scale保留在block.header中;
- 最初,执行block的压缩;
// lib/storage/raw_row.gofunc (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawRow) { ... rrm.bsw.InitFromInmemoryPart(mp) ph := &mp.ph ph.Reset() // 1.对所有的rows执行sort(按tsid,timestamp) // Sort rows by (TSID, Timestamp) if they aren't sorted yet. rrs := rawRowsSort(rows) if !sort.IsSorted(&rrs) { sort.Sort(&rrs) } // 2.按block进行划分 // Group rows into blocks. var scale int16 var rowsMerged uint64 r := &rows[0] tsid := &r.TSID precisionBits := r.PrecisionBits tmpBlock := getBlock() defer putBlock(tmpBlock) for i := range rows { r = &rows[i] if r.TSID.MetricID == tsid.MetricID && len(rrm.auxTimestamps) < maxRowsPerBlock { // maxRowsPerBlock常量=8*1024 rrm.auxTimestamps = append(rrm.auxTimestamps, r.Timestamp) rrm.auxFloatValues = append(rrm.auxFloatValues, r.Value) continue } // 3.1.将[]float64转为[]int64和scale rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues) tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits) // 3.2.执行序列化和压缩 rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged) tsid = &r.TSID precisionBits = r.PrecisionBits rrm.auxTimestamps = append(rrm.auxTimestamps[:0], r.Timestamp) rrm.auxFloatValues = append(rrm.auxFloatValues[:0], r.Value) } // 3.3.最初一个block的压缩 rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues) tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits) rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged) ... rrm.bsw.MustClose()}
[]float64转[]int64+scale,由decimal.AppendFloatToDecimal实现:
// lib/decimal/decimal.go// AppendFloatToDecimal converts each item in src to v*10^e and appends// each v to dst returning it as va.//// It tries minimizing each item in dst.func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) { ... // 比方,输出{1.2, 3.4, 5, 6, 7.8} // 输入:{12 34 50 60 78},scale=-1 ...}
3.block内的压缩
- 首先,将block内的timestamps和values进行压缩,压缩为[]byte;
- 而后,将timestampsData的[]byte写入;
- 最初,将valuesData的[]byte写入;
// lib/storage/block_stream_writer.go// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) { ... // 1.将block内的timestamp/values进行压缩 headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) ... // 2.写入timestampsData fs.MustWriteData(bsw.timestampsWriter, timestampsData) ... // 3.写入valuesData fs.MustWriteData(bsw.valuesWriter, valuesData) ... // 更新partHeader updatePartHeader(b, ph)}
重点看一下block内数据的压缩过程:
- 顺次压缩values和timestamps;
- PrecisionBits默认=64,即无损压缩;
// lib/storage/block.go// MarshalData marshals the block into binary representation.func (b *Block) MarshalData(timestampsBlockOffset, valuesBlockOffset uint64) ([]byte, []byte, []byte) { ... timestamps := b.timestamps[b.nextIdx:] values := b.values[b.nextIdx:] ... //1. 压缩values, PrecisionBits默认=64,即无损压缩 b.valuesData, b.bh.ValuesMarshalType, b.bh.FirstValue = encoding.MarshalValues(b.valuesData[:0], values, b.bh.PrecisionBits) b.bh.ValuesBlockOffset = valuesBlockOffset b.bh.ValuesBlockSize = uint32(len(b.valuesData)) b.values = b.values[:0] // 2. 压缩timestamps, PrecisionBits默认=64,即无损压缩 b.timestampsData, b.bh.TimestampsMarshalType, b.bh.MinTimestamp = encoding.MarshalTimestamps(b.timestampsData[:0], timestamps, b.bh.PrecisionBits) b.bh.TimestampsBlockOffset = timestampsBlockOffset b.bh.TimestampsBlockSize = uint32(len(b.timestampsData)) b.bh.MaxTimestamp = timestamps[len(timestamps)-1] b.timestamps = b.timestamps[:0] b.bh.RowsCount = uint32(len(values)) b.headerData = b.bh.Marshal(b.headerData[:0]) b.nextIdx = 0 return b.headerData, b.timestampsData, b.valuesData}
encode.MarshalValues和MarshallTimestamps都是对[]int64进行压缩,压缩的流程是雷同的;
// lib/encoding/encoding.gofunc MarshalValues(dst []byte, values []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) { return marshalInt64Array(dst, values, precisionBits)}func MarshalTimestamps(dst []byte, timestamps []int64, precisionBits uint8) (result []byte, mt MarshalType, firstTimestamp int64) { return marshalInt64Array(dst, timestamps, precisionBits)}
重点看一下[]int64的压缩过程:
- 若[]int64内都是雷同的值,则保留第一个值即可;
- 若[]int64内是delta的等差数列,则保留第一个值和delta值;
- 若[]int64内是Gauge类型的值,则应用delta encoding进行压缩;
- 若[]int64内是Counter类型的值,则应用delta2 encoding进行压缩;
最初,还对后果应用zstd算法进行二次压缩:
// lib/encoding/encoding.gofunc marshalInt64Array(dst []byte, a []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) { // 1.[]int64内都是雷同的值 if isConst(a) { firstValue = a[0] return dst, MarshalTypeConst, firstValue } // 2.[]int64内是delta雷同的等差数列 if isDeltaConst(a) { firstValue = a[0] dst = MarshalVarInt64(dst, a[1]-a[0]) return dst, MarshalTypeDeltaConst, firstValue } bb := bbPool.Get() // 3.[]int64内是Gauge类型的数值 if isGauge(a) { // 应用delta encoding压缩 // Gauge values are better compressed with delta encoding. mt = MarshalTypeZSTDNearestDelta pb := precisionBits ... bb.B, firstValue = marshalInt64NearestDelta(bb.B[:0], a, pb) } else { // 4.[]int64内是Counter类型的数值,应用delta2 encoding // Non-gauge values, i.e. counters are better compressed with delta2 encoding. mt = MarshalTypeZSTDNearestDelta2 bb.B, firstValue = marshalInt64NearestDelta2(bb.B[:0], a, precisionBits) } // 5.最初还用zstd对后果进行二次压缩 // Try compressing the result. dstOrig := dst if len(bb.B) >= minCompressibleBlockSize { //minCompressibleBlockSize常量=128 compressLevel := getCompressLevel(len(a)) dst = CompressZSTDLevel(dst, bb.B, compressLevel) } ... return dst, mt, firstValue}
看一下delta encoding的具体实现:
// lib/encoding/nearest_delta.go// marshalInt64NearestDelta encodes src using `nearest delta` encoding// with the given precisionBits and appends the encoded value to dst.// precisionBits默认=64,即无损压缩func marshalInt64NearestDelta(dst []byte, src []int64, precisionBits uint8) (result []byte, firstValue int64) { ... // 1.计算src内value的delta // 即delta=curValue-preValue firstValue = src[0] v := src[0] src = src[1:] is := GetInt64s(len(src)) if precisionBits == 64 { // Fast path. for i, next := range src { d := next - v v += d is.A[i] = d } } else { .... } // 2.将[]delta后果应用zigzag进行压缩 dst = MarshalVarInt64s(dst, is.A) PutInt64s(is) return dst, firstValue}
MarshalVarInt64s()负责将[]delta应用zigzag进行压缩:
// lib/encoding/int.go// MarshalVarInt64s appends marshaled vs to dst and returns the result.func MarshalVarInt64s(dst []byte, vs []int64) []byte { for _, v := range vs { if v < 0x40 && v > -0x40 { // Fast path c := int8(v) v := (c << 1) ^ (c >> 7) // zig-zag encoding without branching. dst = append(dst, byte(v)) continue } v = (v << 1) ^ (v >> 63) // zig-zag encoding without branching. u := uint64(v) for u > 0x7f { dst = append(dst, 0x80|byte(u)) u >>= 7 } dst = append(dst, byte(u)) } return dst}