写入 VictoriaMetrics 的数据,首先被保留在内存 shard,内存 shard 的数据定期的被压缩到 inmemoryPart(内存中),inmemoryPart 的数据被定期的合并到磁盘 part 中。
tv 的压缩,产生在内存 shard 转换为 inmemoryPart 的过程中。
一. 入口代码
shard 中寄存原始数据,通过压缩后,数据被寄存到 inmemoryPart:
- rows 被压缩到 inmemoryPart 的入口:mp.InitFromRows(rows);
// lib/storage/partition.go
func (pt *partition) addRowsPart(rows []rawRow) {
...
mp := getInmemoryPart()
mp.InitFromRows(rows) // 这里:将 rows 数据压缩后保留到 inmemoryPart 中
...
p, err := mp.NewPart()
pw := &partWrapper{
p: p,
mp: mp,
refCount: 1,
}
pt.partsLock.Lock()
pt.smallParts = append(pt.smallParts, pw)
ok := len(pt.smallParts) <= maxSmallPartsPerPartition // 常量 =256
pt.partsLock.Unlock()
if ok {return // 以后 len(smallParts)<=256,间接返回
}
// The added part exceeds available limit. Help merging parts.
//
// Prioritize assisted merges over searches.
...
}
二. 整体流程
shard 的 rows 按 (tsid,timestamp) 排序,而后将其分为 N 个 block,每个 block 最多 8k rows;
对每个 block 中进行压缩,压缩后 block 中蕴含:
- tsid: 即 seriesId;
- timestamps: []int64
- values: 原为[]float64,转为[]int64 和 scale(保留在 header 中);
因为 block 中的 values 和 times 均为[]int64,故可应用雷同的压缩算法:
对于[]int64 的压缩,VictoriaMetrics 针对不同的场景做了优化:
- 若[]int64 内都是雷同的值,则保留第一个值即可;
- 若[]int64 内是 delta 雷同的等差数列,则保留第一个值和 delta 值即可;
- 若[]int64 内是 Gauge 类型值,则先计算 value 的 delta,而后用 zigzag 压缩;
- 若[]int64 内是 Counter 类型值,则先计算 value 的 delta of delta,而后用 zigzag 压缩;
三. 源码剖析
1. 代码入口
由 RawRowsMarshaller 执行序列化和压缩:
// lib/storage/inmemory_part.go
// InitFromRows initializes mp from the given rows.
func (mp *inmemoryPart) InitFromRows(rows []rawRow) {if len(rows) == 0 {logger.Panicf("BUG: Inmemory.InitFromRows must accept at least one row")
}
mp.Reset()
rrm := getRawRowsMarshaler()
rrm.marshalToInmemoryPart(mp, rows)
putRawRowsMarshaler(rrm)
mp.creationTime = fasttime.UnixTimestamp()}
2. 划分 block,每个 block 别离执行
- 首先,对所有的 rows 依照 tsid,timestamp 进行排序;
- 而后,将 rows 依照 metricID 和 maxRowsPerBlock(8K),划分为 1 个个的 Block;
-
而后,Block 内的 values 值由[]float64 转为[]int64+scale;
- scale 保留在 block.header 中;
- 最初,执行 block 的压缩;
// lib/storage/raw_row.go
func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawRow) {
...
rrm.bsw.InitFromInmemoryPart(mp)
ph := &mp.ph
ph.Reset()
// 1. 对所有的 rows 执行 sort(按 tsid,timestamp)
// Sort rows by (TSID, Timestamp) if they aren't sorted yet.
rrs := rawRowsSort(rows)
if !sort.IsSorted(&rrs) {sort.Sort(&rrs)
}
// 2. 按 block 进行划分
// Group rows into blocks.
var scale int16
var rowsMerged uint64
r := &rows[0]
tsid := &r.TSID
precisionBits := r.PrecisionBits
tmpBlock := getBlock()
defer putBlock(tmpBlock)
for i := range rows {r = &rows[i]
if r.TSID.MetricID == tsid.MetricID && len(rrm.auxTimestamps) < maxRowsPerBlock { // maxRowsPerBlock 常量 =8*1024
rrm.auxTimestamps = append(rrm.auxTimestamps, r.Timestamp)
rrm.auxFloatValues = append(rrm.auxFloatValues, r.Value)
continue
}
// 3.1. 将[]float64 转为[]int64 和 scale
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
// 3.2. 执行序列化和压缩
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
tsid = &r.TSID
precisionBits = r.PrecisionBits
rrm.auxTimestamps = append(rrm.auxTimestamps[:0], r.Timestamp)
rrm.auxFloatValues = append(rrm.auxFloatValues[:0], r.Value)
}
// 3.3. 最初一个 block 的压缩
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
...
rrm.bsw.MustClose()}
[]float64 转[]int64+scale,由 decimal.AppendFloatToDecimal 实现:
// lib/decimal/decimal.go
// AppendFloatToDecimal converts each item in src to v*10^e and appends
// each v to dst returning it as va.
//
// It tries minimizing each item in dst.
func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {
...
// 比方,输出{1.2, 3.4, 5, 6, 7.8}
// 输入:{12 34 50 60 78},scale=-1
...
}
3.block 内的压缩
- 首先,将 block 内的 timestamps 和 values 进行压缩,压缩为[]byte;
- 而后,将 timestampsData 的[]byte 写入;
- 最初,将 valuesData 的[]byte 写入;
// lib/storage/block_stream_writer.go
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
...
// 1. 将 block 内的 timestamp/values 进行压缩
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
...
// 2. 写入 timestampsData
fs.MustWriteData(bsw.timestampsWriter, timestampsData)
...
// 3. 写入 valuesData
fs.MustWriteData(bsw.valuesWriter, valuesData)
...
// 更新 partHeader
updatePartHeader(b, ph)
}
重点看一下 block 内数据的压缩过程:
- 顺次压缩 values 和 timestamps;
- PrecisionBits 默认 =64,即无损压缩;
// lib/storage/block.go
// MarshalData marshals the block into binary representation.
func (b *Block) MarshalData(timestampsBlockOffset, valuesBlockOffset uint64) ([]byte, []byte, []byte) {
...
timestamps := b.timestamps[b.nextIdx:]
values := b.values[b.nextIdx:]
...
//1. 压缩 values, PrecisionBits 默认 =64,即无损压缩
b.valuesData, b.bh.ValuesMarshalType, b.bh.FirstValue = encoding.MarshalValues(b.valuesData[:0], values, b.bh.PrecisionBits)
b.bh.ValuesBlockOffset = valuesBlockOffset
b.bh.ValuesBlockSize = uint32(len(b.valuesData))
b.values = b.values[:0]
// 2. 压缩 timestamps, PrecisionBits 默认 =64,即无损压缩
b.timestampsData, b.bh.TimestampsMarshalType, b.bh.MinTimestamp = encoding.MarshalTimestamps(b.timestampsData[:0], timestamps, b.bh.PrecisionBits)
b.bh.TimestampsBlockOffset = timestampsBlockOffset
b.bh.TimestampsBlockSize = uint32(len(b.timestampsData))
b.bh.MaxTimestamp = timestamps[len(timestamps)-1]
b.timestamps = b.timestamps[:0]
b.bh.RowsCount = uint32(len(values))
b.headerData = b.bh.Marshal(b.headerData[:0])
b.nextIdx = 0
return b.headerData, b.timestampsData, b.valuesData
}
encode.MarshalValues 和 MarshallTimestamps 都是对[]int64 进行压缩,压缩的流程是雷同的;
// lib/encoding/encoding.go
func MarshalValues(dst []byte, values []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {return marshalInt64Array(dst, values, precisionBits)
}
func MarshalTimestamps(dst []byte, timestamps []int64, precisionBits uint8) (result []byte, mt MarshalType, firstTimestamp int64) {return marshalInt64Array(dst, timestamps, precisionBits)
}
重点看一下[]int64 的压缩过程:
- 若[]int64 内都是雷同的值,则保留第一个值即可;
- 若[]int64 内是 delta 的等差数列,则保留第一个值和 delta 值;
- 若[]int64 内是 Gauge 类型的值,则应用 delta encoding 进行压缩;
- 若[]int64 内是 Counter 类型的值,则应用 delta2 encoding 进行压缩;
最初,还对后果应用 zstd 算法进行二次压缩:
// lib/encoding/encoding.go
func marshalInt64Array(dst []byte, a []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {// 1.[]int64 内都是雷同的值
if isConst(a) {firstValue = a[0]
return dst, MarshalTypeConst, firstValue
}
// 2.[]int64 内是 delta 雷同的等差数列
if isDeltaConst(a) {firstValue = a[0]
dst = MarshalVarInt64(dst, a[1]-a[0])
return dst, MarshalTypeDeltaConst, firstValue
}
bb := bbPool.Get()
// 3.[]int64 内是 Gauge 类型的数值
if isGauge(a) {
// 应用 delta encoding 压缩
// Gauge values are better compressed with delta encoding.
mt = MarshalTypeZSTDNearestDelta
pb := precisionBits
...
bb.B, firstValue = marshalInt64NearestDelta(bb.B[:0], a, pb)
} else {// 4.[]int64 内是 Counter 类型的数值,应用 delta2 encoding
// Non-gauge values, i.e. counters are better compressed with delta2 encoding.
mt = MarshalTypeZSTDNearestDelta2
bb.B, firstValue = marshalInt64NearestDelta2(bb.B[:0], a, precisionBits)
}
// 5. 最初还用 zstd 对后果进行二次压缩
// Try compressing the result.
dstOrig := dst
if len(bb.B) >= minCompressibleBlockSize { //minCompressibleBlockSize 常量 =128
compressLevel := getCompressLevel(len(a))
dst = CompressZSTDLevel(dst, bb.B, compressLevel)
}
...
return dst, mt, firstValue
}
看一下 delta encoding 的具体实现:
// lib/encoding/nearest_delta.go
// marshalInt64NearestDelta encodes src using `nearest delta` encoding
// with the given precisionBits and appends the encoded value to dst.
// precisionBits 默认 =64,即无损压缩
func marshalInt64NearestDelta(dst []byte, src []int64, precisionBits uint8) (result []byte, firstValue int64) {
...
// 1. 计算 src 内 value 的 delta
// 即 delta=curValue-preValue
firstValue = src[0]
v := src[0]
src = src[1:]
is := GetInt64s(len(src))
if precisionBits == 64 {
// Fast path.
for i, next := range src {
d := next - v
v += d
is.A[i] = d
}
} else {....}
// 2. 将[]delta 后果应用 zigzag 进行压缩
dst = MarshalVarInt64s(dst, is.A)
PutInt64s(is)
return dst, firstValue
}
MarshalVarInt64s()负责将[]delta 应用 zigzag 进行压缩:
// lib/encoding/int.go
// MarshalVarInt64s appends marshaled vs to dst and returns the result.
func MarshalVarInt64s(dst []byte, vs []int64) []byte {
for _, v := range vs {
if v < 0x40 && v > -0x40 {
// Fast path
c := int8(v)
v := (c << 1) ^ (c >> 7) // zig-zag encoding without branching.
dst = append(dst, byte(v))
continue
}
v = (v << 1) ^ (v >> 63) // zig-zag encoding without branching.
u := uint64(v)
for u > 0x7f {dst = append(dst, 0x80|byte(u))
u >>= 7
}
dst = append(dst, byte(u))
}
return dst
}