共计 7973 个字符,预计需要花费 20 分钟才能阅读完成。
为什么要 compact?
- 待删除的数据未被真正删除,仅记录在 tombstone 文件中;
- 相邻 block 的 index 文件个别状况是雷同,compact 能够缩小 disk usage;
- 当查问的后果波及 N 个 block 时,blocks 数据的合并会消耗大量资源;
compact 不会无止境的运行,prometheus 中限度单个 block 最长的时间跨度 =31d(744hour),或者 1 /10 的 retentionTime,两者取最小值。
{ // Max block size settings.
if cfg.tsdb.MaxBlockDuration == 0 {maxBlockDuration, err := model.ParseDuration("31d")
// When the time retention is set and not too big use to define the max block duration.
if cfg.tsdb.RetentionDuration != 0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration {maxBlockDuration = cfg.tsdb.RetentionDuration / 10}
cfg.tsdb.MaxBlockDuration = maxBlockDuration
}
}
compact 的过程中,波及到基于 retention 的数据删除,能够基于 sizeRetention 或 timeRetention。
一. 整体框架
prometheus 中启动 1 个 goroutine 执行 db.run()。
在 db.run()中执行了 1min 的 loop 循环:
- 查看 Head block 是否须要 compact,若是,则先将 head block compact 成 disk block;
-
而后 compact disk block:
- 首先,生成 compact 打算,即找出能够 compact 的 blocks;
- 而后,将找进去的 blocks 执行 compact 过程;
- 最初,删除掉过期的 blocks;
二.compact 的整体逻辑代码
代码入口:
- 1min 的循环 loop,在 loop 前,有 backoff 的工夫微调;
// tsdb/db.go
func (db *DB) run() {backoff := time.Duration(0)
for {
// 工夫回退
select {case <-time.After(backoff):
}
select {case <-time.After(1 * time.Minute):
select {case db.compactc <- struct{}{}:
default:
}
// 1min 执行 1 次
case <-db.compactc:
if db.autoCompact {if err := db.Compact(); err != nil { // 失败的话,指数回退
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {backoff = 0}
}
......
}
}
}
具体 compact 的逻辑:
// tsdb/db.go
func (db *DB) Compact() (err error) {
.....
// 先查看 head block 是否须要 compact
// 行将 head block --> disk block
for {if !db.head.compactable() {break}
mint := db.head.MinTime()
maxt := rangeForTimestamp(mint, db.head.chunkRange)
head := NewRangeHead(db.head, mint, maxt-1)
if err := db.compactHead(head); err != nil {return err}
}
// 再 compact 磁盘 block
return db.compactBlocks()}
先查看 head block 是否须要 compact
当 head block 的时间跨度 > chunkRange 的 1.5 倍时,则须要将 head block 变成 disk block:
// tsdb/head.go
func (h *Head) compactable() bool {return h.MaxTime()-h.MinTime() > h.chunkRange/2*3}
三.disk block compact 的过程剖析
磁盘 block 的 compact 过程:
- 学生成 compact plan;
- 再执行 compact;
- 最初 reload 删掉源文件;
// tsdb/db.go
func (db *DB) compactBlocks() (err error) {
// Check for compactions of multiple blocks.
for {
// 学生成 compact plan
plan, err := db.compactor.Plan(db.dir)
if len(plan) == 0 { // 没有 plan,间接返回
break
}
// 再执行 compact
uid, err := db.compactor.Compact(db.dir, plan, db.blocks)
if err != nil {return errors.Wrapf(err, "compact %s", plan)
}
runtime.GC()
// 最初 reload 删掉源文件
if err := db.reload(); err != nil {if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid)
}
return errors.Wrap(err, "reload blocks")
}
runtime.GC()}
return nil
}
1. 生成 compact plan
抉择 blocks 的优先级如下:
-
首先,查看是否有 Overlap 的 block,若存在则间接返回;
- 个别状况下,block 之间的 timestamp 没有 overlap,除非是被动写入了 < 以后 timestamp 的时序序列;
Moreover, Prometheus itself does not produce overlapping blocks, it’s only possible if you backfill some data into Prometheus.
- 其次,按 timeRange 在 blocks 中抉择能够 compact 的 blocks,若找到则返回;
- 最初,找满足 numTombstoneSeries /numSeriesTotal > 5% 的 block,该 plan 仅输入 1 个 block,它会生成新的 block(不含 tombstone 中的数据);
// tsdb/compact.go
func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {sort.Slice(dms, func(i, j int) bool {return dms[i].meta.MinTime < dms[j].meta.MinTime
})
// 1. 先查看是否有 overlap 的 block
// 若有,间接返回
res := c.selectOverlappingDirs(dms)
if len(res) > 0 {return res, nil}
// 排除掉最新的 block
dms = dms[:len(dms)-1]
// 2. 而后,在 block dir 中抉择能够压缩的 block
// 若找到,则返回
for _, dm := range c.selectDirs(dms) {res = append(res, dm.dir)
}
if len(res) > 0 {return res, nil}
// 3. 找 block,满足 numTombstone / numSeries > 5%
// Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {break}
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {return []string{dms[i].dir}, nil
}
}
return nil, nil
}
如何在 blocks 中抉择出要 compact 的 block?
- 将所有的 blocks 按 minTime 从小到大排序,在筛选 blocks 时,先排除最新的 blocks,即去掉 blocks[size-1];
- 按 range 查找 block 列表,range={6hour, 18hour, 54hour, ….},3 的倍数递增;
- 先按 range=6hour 找,找到在 6hour 内的 blocks,则返回;否则,按 18hour 找;
以 range=6hour 为例,筛选 parts 的条件:
-
假如以后 block: {b1, b2, b3, b-latest},将其按 6hour 进行切割
- 切完后:[b1, b2], [b3], b-latest
-
对于[b1, b2]:
- 若 b2.maxTime – b1.minTime = 6hour,则 [b1, b2] 能够执行压缩;
- 若 b2.maxTime < b3.minTime,则 [b1, b2] 能够执行压缩;
// tsdb/db.go
// 返回[2, 6, 18, 54, ......],即 2,2*3,2*3*3,...
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
// 抉择 deletableBlocks
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {if len(c.ranges) < 2 || len(ds) < 1 {return nil}
highTime := ds[len(ds)-1].meta.MinTime
// range[1:] = {6hour, 18hour, 54hour, ....}
for _, iv := range c.ranges[1:] {parts := splitByRange(ds, iv) // 按 range=iv 查找 blocks
if len(parts) == 0 {continue}
Outer:
for _, p := range parts {mint := p[0].meta.MinTime
maxt := p[len(p)-1].meta.MaxTime
// Pick the range of blocks if it spans the full range (potentially with gaps)
// or is before the most recent block.
// This ensures we don't compact blocks prematurely when another one of the same
// size still fits in the range.
if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { // 满足条件
return p
}
}
}
return nil
}
2. 执行 compact
将上一步抉择的 plan 作为 compact 的起源,执行 compact:
- 将 plan 中的 blocks 数据 compact 生成新的 block;
-
将 plan 中的 blocks 元数据合并生成新的 meta.json;
- 元数据中指明 source、parents、minTime、maxTime,同时将 compactLevel + 1;
// tsdb/compact.go
// dest = db.dir, dirs = plan, open = db.blocks
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
var (blocks []BlockReader
bs []*Block
metas []*BlockMeta
uids []string)
// 将 dir 下所有 block 加载,放入[]blocks
for _, d := range dirs {meta, _, err := readMetaFile(d)
var b *Block
// Use already open blocks if we can, to avoid
// having the index data in memory twice.
for _, o := range open {if meta.ULID == o.Meta().ULID {
b = o
break
}
}
if b == nil {b, err = OpenBlock(c.logger, d, c.chunkPool)
defer b.Close()}
metas = append(metas, meta)
blocks = append(blocks, b)
bs = append(bs, b)
uids = append(uids, meta.ULID.String())
}
// 新的 uid
uid = ulid.MustNew(ulid.Now(), rand.Reader)
// 新的 metadata
meta := compactBlockMetas(uid, metas...)
// 合并 blocks,将 meta 信息写入 meta.json
// 合并 index,写入 index 文件
err = c.write(dest, meta, blocks...)
var merr tsdb_errors.MultiError
merr.Add(err)
return uid, merr
}
3. 删掉源文件:reload
查看 retentionTime 和 retentionSize,删除不满足条件的 blocks。
-
从所有的 blocks 中抉择能够被删除的 blocks:
- 整个 block 的工夫,已齐全超过 retentionTime;
- 所有 block 的 size,已超过 retentionSize,删除满足 retentionSize 的最晚工夫的那些 blocks;
- 将 deleteable blocks 删除;
// tsdb/db.go
// reload blocks and trigger head truncation if new blocks appeared.
// Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) {loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
// 抉择可删除的 blocks(已超过 retentionTime 或 retentionSize)
deletable := db.deletableBlocks(loadable)
if err := db.deleteBlocks(deletable); err != nil { // 删除 blocks
return err
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(loadable) == 0 {return nil}
maxt := loadable[len(loadable)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
超过 retentionTime 的 blocks
因为 blocks 按 maxTime 从大到小排序,blocks[0]是最近工夫的 block;
当遍历 blocks 时,当发现(block0.maxTime – blockA.maxTime) > retentionDuration 时,可认为以后 blockA 以及其后的 block,都能够被删掉了。
// tsdb/db.go
func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {
// Time retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 {return}
deletable = make(map[ulid.ULID]*Block)
for i, block := range blocks {
// The difference between the first block and this block is larger than
// the retention period so any blocks after that are added as deletable.
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration {for _, b := range blocks[i:] {deletable[b.meta.ULID] = b
}
break
}
}
return deletable
}
超过 retentionSize 的 blocks
总的 blockSize = walSize + headChunkSize + block1Size + block2Size + …
blocks 按 maxTime 从大到小排序,blocks[0]是最近工夫的 block。
遍历 blocks 时,sumBlockSize += blockSize, 若发现 sumBlockSize > retentionSize,则将该 block 以及其后的 block 退出待删除列表:
// tsdb/db.go
func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {
// Size retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 {return}
deletable = make(map[ulid.ULID]*Block)
walSize, _ := db.Head().wal.Size()
headChunksSize := db.Head().chunkDiskMapper.Size()
// Initializing size counter with WAL size and Head chunks
// written to disk, as that is part of the retention strategy.
blocksSize := walSize + headChunksSize
for i, block := range blocks {blocksSize += block.Size()
if blocksSize > int64(db.opts.MaxBytes) {
// Add this and all following blocks for deletion.
for _, b := range blocks[i:] {deletable[b.meta.ULID] = b
}
break
}
}
return deletable
}
参考:
1.https://ganeshvernekar.com/blog/prometheus-tsdb-compaction-an…