共计 5917 个字符,预计需要花费 15 分钟才能阅读完成。
prometheus 中的指标 t / v 数据保留在 block/chunks 下,label 数据保留在 block/index 下。
对于 t / v 数据,prometheus 采纳 Facebook Gorilla 论文的压缩形式:
- timestamp: delta-of-delta 形式压缩时序点的工夫值;
- value: xor 形式压缩时序点的 value 值;
依照上述压缩形式,能够将一个 16byte 的时序点压缩成 1.37byte,压缩率十分高。
时序点 t / v 的压缩
1)timestamp 压缩
在时序上,相邻两个点的工夫戳的差值个别是固定的,若隔 60s pull 一次,那么 timestamp 差值个别都是 60s,比方
- p1: 10:00:00,p2: 10:01:00,p3: 10:01:59,p4:10:03:00,p5:10:04:00,p6:10:05:00
- 工夫戳的差值为:60s,59s,61s,60s,60s;
Gorilla 论文采纳 delta-of-delta 形式压缩 timestamp:
- 第一个时序点的工夫戳 t0,被残缺存储起来;
- 第二个时序点的工夫戳 t1,存储 delta=t1-t0;
-
对后续的工夫戳 tn,首先计算 dod 值:delta=(tn – tn-1) – (tn-1 – tn-2);
- 如果 dod=0,则应用 1bit=“0”存储该工夫戳;
- 如果 dod=[-8191, 8192],则先存入“10”作为标识,再用 14bit 存储该 dod 值;
- 如果 dod=[-65535, 65536],则先存入“110”作为标识,再用 17bit 存储该 dod 值;
- 如果 dod=[-524287, 524288],则先存入“1110”作为标识,再用 20bit 存储该 dod 值;
- 如果 dod>524288,则先存入“1111”作为标识,再用 64bit 存储该 dod 值;
在实践中发现,95% 的 timestamp 可能依照 dod= 0 的情景进行存储。
2)value 压缩
Gorilla 论文对时序点 value 的压缩基于:
- 相邻时序点的 value 值不会产生显著变动;
- value 大多是浮点数,当两个 value 十分靠近的时候,这两个浮点数的符号位、指数位和尾数局部的前几 bit 都是雷同的;
value 值的压缩算法:
- 第一个时序点的 value 值不压缩,间接保留;
-
从第二个点开始,将其 value 与上一个 value 进行 XOR 运算;
- 若 XOR 运算后果为“0”,则示意前后两个 value 雷同,仅存入 1bit 的“0”值即可;
-
否则,存入 1bit 值“1”;
- 若 XOR 后果中非 0 的局部蕴含在前一个 XOR 后果中,那么再写入 1bit 值“0”,而后存入 XOR 中非 0 的局部;
- 否则,写入 1bit 值“1”,用 5bit 存入 XOR 中前值 0 的个数,6bit 存入两头非 0 的长度,最初再存入两头的非 0 位;
数据显示,大概有 60% 的 value 值仅用 1bit 存储,有 30% 的 value 值落入“10”范畴,残余 10% 的 value 值落入“11”范畴。
3) 压缩示例
输出时序序列值
10:00:00 3.1 | |
10:01:01 3.2 | |
10:02:00 3.0 | |
10:02:59 3.2 | |
10:03:00 3.1 |
那么将存入
10:00:00 3.1 | |
61 3.2 xor 3.1 | |
-2(59-61) 3.0 xor 3.2 | |
0(59-59) 3.2 xor 3.0 | |
2(61-59) 3.1 xor 3.2 |
写入 t / v 的源码剖析
xorAppender 负责写入 t / v 的值,t=int64,v=float64
// tsdb/chunkenc/xor.go | |
func (a *xorAppender) Append(t int64, v float64) { | |
var tDelta uint64 | |
num := binary.BigEndian.Uint16(a.b.bytes()) | |
// 第一个点,残缺记录 t1 和 v1 的值 | |
if num == 0 {buf := make([]byte, binary.MaxVarintLen64) | |
for _, b := range buf[:binary.PutVarint(buf, t)] {a.b.writeByte(b) // 写入 t1 的值 | |
} | |
a.b.writeBits(math.Float64bits(v), 64) // 写入 v1 的值 | |
} else if num == 1 { // 第二个点 | |
tDelta = uint64(t - a.t) | |
buf := make([]byte, binary.MaxVarintLen64) | |
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {a.b.writeByte(b) // 写入 tDeleta=t2-t1 | |
} | |
a.writeVDelta(v) // 写入 v2^v1 的值 | |
} else { // 第三个点及当前的点 | |
tDelta = uint64(t - a.t) | |
dod := int64(tDelta - a.tDelta) // 计算 dod | |
// Gorilla has a max resolution of seconds, Prometheus milliseconds. | |
// Thus we use higher value range steps with larger bit size. | |
switch { | |
case dod == 0: | |
a.b.writeBit(zero) // 写入 0 | |
case bitRange(dod, 14): //dod=[-8191,8192], 先存入 10 作为标识,再用 14bit 存储 dod 的值 | |
a.b.writeBits(0x02, 2) // '10' | |
a.b.writeBits(uint64(dod), 14) | |
case bitRange(dod, 17): //dod=[-65535,65536], 先存入 110 作为标识,再用 17bit 存储该 dod 的值 | |
a.b.writeBits(0x06, 3) // '110' | |
a.b.writeBits(uint64(dod), 17) | |
case bitRange(dod, 20): //dod=[-524287,524288], 先存入 1110 作为标识,再用 20bit 存储该 dod 的值 | |
a.b.writeBits(0x0e, 4) // '1110' | |
a.b.writeBits(uint64(dod), 20) | |
default: //dod>524288, 先存入 1111 作为标识,再用 64bit 存储该 dod 的值 | |
a.b.writeBits(0x0f, 4) // '1111' | |
a.b.writeBits(uint64(dod), 64) | |
} | |
a.writeVDelta(v) // 写入 vn^vn-1 | |
} | |
a.t = t // 写入的最初一个 t | |
a.v = v // 写入的最初一个 v | |
binary.BigEndian.PutUint16(a.b.bytes(), num+1) | |
a.tDelta = tDelta // 写入的最初一个 tDelta | |
} |
再看一下应用 xor 写入 VDelta 的源码:
// tsdb/chunkenc/xor.go | |
func (a *xorAppender) writeVDelta(v float64) {vDelta := math.Float64bits(v) ^ math.Float64bits(a.v) // 以后 value 与上一个 value 进行 xor | |
if vDelta == 0 { //xor=0, 存入 1bit'0' 即可 | |
a.b.writeBit(zero) | |
return | |
} | |
a.b.writeBit(one) // 先存入管制位 '1' | |
leading := uint8(bits.LeadingZeros64(vDelta)) // 计算 vdelta 前置 0 的个数 | |
trailing := uint8(bits.TrailingZeros64(vDelta)) // 计算 vdelta 后置 0 的个数 | |
// Clamp number of leading zeros to avoid overflow when encoding. | |
if leading >= 32 {leading = 31} | |
if a.leading != 0xff && leading >= a.leading && trailing >= a.trailing {a.b.writeBit(zero) | |
a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) | |
} else { | |
a.leading, a.trailing = leading, trailing | |
a.b.writeBit(one) | |
a.b.writeBits(uint64(leading), 5) | |
// Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have. | |
// Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0). | |
// So instead we write out a 0 and adjust it back to 64 on unpacking. | |
sigbits := 64 - leading - trailing | |
a.b.writeBits(uint64(sigbits), 6) | |
a.b.writeBits(vDelta>>trailing, int(sigbits)) | |
} | |
} |
读取 t / v 的源码剖析
xorIterator 负责 t / v 数据的读取:根本就是写入过程的反过程
// tsdb/chunkenc/xor.go | |
func (it *xorIterator) Next() bool { | |
if it.err != nil || it.numRead == it.numTotal {return false} | |
// 读第 1 个点 | |
if it.numRead == 0 {t, err := binary.ReadVarint(&it.br) //time 原值读取 | |
if err != nil { | |
it.err = err | |
return false | |
} | |
v, err := it.br.readBits(64) //value 原值读取 | |
if err != nil { | |
it.err = err | |
return false | |
} | |
it.t = t | |
it.val = math.Float64frombits(v) | |
it.numRead++ // 读取数量 +1 | |
return true | |
} | |
// 读第 2 个点 | |
if it.numRead == 1 {tDelta, err := binary.ReadUvarint(&it.br) // 读取 tDelta | |
if err != nil { | |
it.err = err | |
return false | |
} | |
it.tDelta = tDelta | |
it.t = it.t + int64(it.tDelta) // 计算 time | |
return it.readValue() // 读取 xor 并计算出原值} | |
// 读第 3 个及当前的点 | |
var d byte | |
// 读前缀,最多 4bit | |
// read delta-of-delta | |
for i := 0; i < 4; i++ { | |
d <<= 1 | |
bit, err := it.br.readBit() | |
if err != nil { | |
it.err = err | |
return false | |
} | |
if bit == zero {break} | |
d |= 1 | |
} | |
var sz uint8 | |
var dod int64 | |
switch d { | |
case 0x00: | |
// dod == 0 // 前缀 =0 | |
case 0x02: | |
sz = 14 // 前缀 =10,用 14bit 保留 dod | |
case 0x06: // 前缀 =110,用 17bit 保留 dod | |
sz = 17 | |
case 0x0e: // 前缀 =1110,用 20bit 保留 dod | |
sz = 20 | |
case 0x0f: // 前缀 =1111,用 64bit 保留 dod | |
bits, err := it.br.readBits(64) | |
if err != nil { | |
it.err = err | |
return false | |
} | |
dod = int64(bits) | |
} | |
if sz != 0 {bits, err := it.br.readBits(int(sz)) | |
if err != nil { | |
it.err = err | |
return false | |
} | |
if bits > (1 << (sz - 1)) { | |
// or something | |
bits = bits - (1 << sz) | |
} | |
dod = int64(bits) // 读取并计算 dod 的值 | |
} | |
it.tDelta = uint64(int64(it.tDelta) + dod) // 计算 tdelta | |
it.t = it.t + int64(it.tDelta) // 计算 time | |
return it.readValue() // 读取 xor 的值} |
再看一下读 xor 值的流程:将上一个 value 与 xor 的值进行异或
// tsdb/chunkenc/xor.go | |
func (it *xorIterator) readValue() bool {bit, err := it.br.readBit() // 读第 1 个 bit | |
if err != nil { | |
it.err = err | |
return false | |
} | |
if bit == zero {// 如果第 1 个 bit=0,value 放弃不变 ( 故无需更新) | |
// it.val = it.val | |
} else {bit, err := it.br.readBit() | |
if err != nil { | |
it.err = err | |
return false | |
} | |
if bit == zero { | |
// reuse leading/trailing zero bits | |
// it.leading, it.trailing = it.leading, it.trailing | |
} else {bits, err := it.br.readBits(5) | |
if err != nil { | |
it.err = err | |
return false | |
} | |
it.leading = uint8(bits) | |
bits, err = it.br.readBits(6) | |
if err != nil { | |
it.err = err | |
return false | |
} | |
mbits := uint8(bits) | |
// 0 significant bits here means we overflowed and we actually need 64; see comment in encoder | |
if mbits == 0 {mbits = 64} | |
it.trailing = 64 - it.leading - mbits | |
} | |
mbits := int(64 - it.leading - it.trailing) | |
bits, err := it.br.readBits(mbits) | |
if err != nil { | |
it.err = err | |
return false | |
} | |
vbits := math.Float64bits(it.val) // 拿到上一个 value | |
vbits ^= (bits << it.trailing) // 与 xor 的值进行异或,失去本地的 value | |
it.val = math.Float64frombits(vbits) // v1^v2=xor,那么 v2=v1^xor | |
} | |
it.numRead++ | |
return true | |
} |