为什么要compact?
- 待删除的数据未被真正删除,仅记录在tombstone文件中;
- 相邻block的index文件个别状况是雷同,compact能够缩小disk usage;
- 当查问的后果波及N个block时,blocks数据的合并会消耗大量资源;
compact不会无止境的运行,prometheus中限度单个block最长的时间跨度=31d(744hour),或者1/10的retentionTime,两者取最小值。
{ // Max block size settings. if cfg.tsdb.MaxBlockDuration == 0 { maxBlockDuration, err := model.ParseDuration("31d") // When the time retention is set and not too big use to define the max block duration. if cfg.tsdb.RetentionDuration != 0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration { maxBlockDuration = cfg.tsdb.RetentionDuration / 10 } cfg.tsdb.MaxBlockDuration = maxBlockDuration }}
compact的过程中,波及到基于retention的数据删除,能够基于sizeRetention 或 timeRetention。
一.整体框架
prometheus中启动1个goroutine执行db.run()。
在db.run()中执行了1min的loop循环:
- 查看Head block是否须要compact,若是,则先将head block compact 成 disk block;
而后compact disk block:
- 首先,生成compact打算,即找出能够compact的blocks;
- 而后,将找进去的blocks执行compact过程;
- 最初,删除掉过期的blocks;
二.compact的整体逻辑代码
代码入口:
- 1min的循环loop,在loop前,有backoff的工夫微调;
// tsdb/db.gofunc (db *DB) run() { backoff := time.Duration(0) for { // 工夫回退 select { case <-time.After(backoff): } select { case <-time.After(1 * time.Minute): select { case db.compactc <- struct{}{}: default: } // 1min执行1次 case <-db.compactc: if db.autoCompact { if err := db.Compact(); err != nil { //失败的话,指数回退 backoff = exponential(backoff, 1*time.Second, 1*time.Minute) } else { backoff = 0 } } ...... } }}
具体compact的逻辑:
// tsdb/db.gofunc (db *DB) Compact() (err error) { ..... // 先查看head block是否须要compact // 行将head block --> disk block for { if !db.head.compactable() { break } mint := db.head.MinTime() maxt := rangeForTimestamp(mint, db.head.chunkRange) head := NewRangeHead(db.head, mint, maxt-1) if err := db.compactHead(head); err != nil { return err } } // 再compact磁盘block return db.compactBlocks()}
先查看head block是否须要compact
当head block的时间跨度 > chunkRange的1.5倍时,则须要将head block变成disk block:
// tsdb/head.gofunc (h *Head) compactable() bool { return h.MaxTime()-h.MinTime() > h.chunkRange/2*3}
三.disk block compact的过程剖析
磁盘block的compact过程:
- 学生成compact plan;
- 再执行compact;
- 最初reload删掉源文件;
// tsdb/db.gofunc (db *DB) compactBlocks() (err error) { // Check for compactions of multiple blocks. for { // 学生成compact plan plan, err := db.compactor.Plan(db.dir) if len(plan) == 0 { // 没有plan,间接返回 break } // 再执行compact uid, err := db.compactor.Compact(db.dir, plan, db.blocks) if err != nil { return errors.Wrapf(err, "compact %s", plan) } runtime.GC() // 最初reload删掉源文件 if err := db.reload(); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid) } return errors.Wrap(err, "reload blocks") } runtime.GC() } return nil}
1.生成compact plan
抉择blocks的优先级如下:
首先,查看是否有Overlap的block,若存在则间接返回;
- 个别状况下,block之间的timestamp没有overlap,除非是被动写入了<以后timestamp的时序序列;
Moreover, Prometheus itself does not produce overlapping blocks, it's only possible if you backfill some data into Prometheus.
- 其次,按timeRange在blocks中抉择能够compact的blocks,若找到则返回;
- 最初,找满足 numTombstoneSeries /numSeriesTotal > 5% 的block,该plan仅输入1个block,它会生成新的block(不含tombstone中的数据);
// tsdb/compact.gofunc (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { sort.Slice(dms, func(i, j int) bool { return dms[i].meta.MinTime < dms[j].meta.MinTime }) // 1.先查看是否有overlap的block // 若有,间接返回 res := c.selectOverlappingDirs(dms) if len(res) > 0 { return res, nil } // 排除掉最新的block dms = dms[:len(dms)-1] // 2.而后,在block dir中抉择能够压缩的block // 若找到,则返回 for _, dm := range c.selectDirs(dms) { res = append(res, dm.dir) } if len(res) > 0 { return res, nil } // 3. 找block,满足 numTombstone / numSeries > 5% // Compact any blocks with big enough time range that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { break } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } } return nil, nil}
如何在blocks中抉择出要compact的block?
- 将所有的blocks按minTime从小到大排序,在筛选blocks时,先排除最新的blocks,即去掉blocks[size-1];
- 按range查找block列表,range={6hour, 18hour, 54hour, ....},3的倍数递增;
- 先按range=6hour找,找到在6hour内的blocks,则返回;否则,按18hour找;
以range=6hour为例,筛选parts的条件:
假如以后block: {b1, b2, b3, b-latest},将其按6hour进行切割
- 切完后:[b1, b2], [b3], b-latest
对于[b1, b2]:
- 若b2.maxTime - b1.minTime = 6hour,则[b1, b2]能够执行压缩;
- 若b2.maxTime < b3.minTime,则[b1, b2]能够执行压缩;
// tsdb/db.go// 返回[2, 6, 18, 54, ......],即2,2*3,2*3*3,...rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)// 抉择deletableBlocksfunc (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { if len(c.ranges) < 2 || len(ds) < 1 { return nil } highTime := ds[len(ds)-1].meta.MinTime // range[1:] = {6hour, 18hour, 54hour, ....} for _, iv := range c.ranges[1:] { parts := splitByRange(ds, iv) // 按range=iv查找blocks if len(parts) == 0 { continue } Outer: for _, p := range parts { mint := p[0].meta.MinTime maxt := p[len(p)-1].meta.MaxTime // Pick the range of blocks if it spans the full range (potentially with gaps) // or is before the most recent block. // This ensures we don't compact blocks prematurely when another one of the same // size still fits in the range. if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { //满足条件 return p } } } return nil}
2.执行compact
将上一步抉择的plan作为compact的起源,执行compact:
- 将plan中的blocks数据compact生成新的block;
将plan中的blocks元数据合并生成新的meta.json;
- 元数据中指明source、parents、minTime、maxTime,同时将compactLevel + 1;
// tsdb/compact.go// dest = db.dir, dirs = plan, open = db.blocksfunc (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { var ( blocks []BlockReader bs []*Block metas []*BlockMeta uids []string ) // 将dir下所有block加载,放入[]blocks for _, d := range dirs { meta, _, err := readMetaFile(d) var b *Block // Use already open blocks if we can, to avoid // having the index data in memory twice. for _, o := range open { if meta.ULID == o.Meta().ULID { b = o break } } if b == nil { b, err = OpenBlock(c.logger, d, c.chunkPool) defer b.Close() } metas = append(metas, meta) blocks = append(blocks, b) bs = append(bs, b) uids = append(uids, meta.ULID.String()) } // 新的uid uid = ulid.MustNew(ulid.Now(), rand.Reader) // 新的metadata meta := compactBlockMetas(uid, metas...) // 合并blocks,将meta信息写入meta.json // 合并index,写入index文件 err = c.write(dest, meta, blocks...) var merr tsdb_errors.MultiError merr.Add(err) return uid, merr}
3.删掉源文件:reload
查看retentionTime和retentionSize,删除不满足条件的blocks。
从所有的blocks中抉择能够被删除的blocks:
- 整个block的工夫,已齐全超过retentionTime;
- 所有block的size,已超过retentionSize,删除满足retentionSize的最晚工夫的那些blocks;
- 将deleteable blocks删除;
// tsdb/db.go// reload blocks and trigger head truncation if new blocks appeared.// Blocks that are obsolete due to replacement or retention will be deleted.func (db *DB) reload() (err error) { loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) // 抉择可删除的blocks(已超过retentionTime或retentionSize) deletable := db.deletableBlocks(loadable) if err := db.deleteBlocks(deletable); err != nil { // 删除blocks return err } // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. if len(loadable) == 0 { return nil } maxt := loadable[len(loadable)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
超过retentionTime的blocks
因为blocks按maxTime从大到小排序,blocks[0]是最近工夫的block;
当遍历blocks时,当发现(block0.maxTime - blockA.maxTime) > retentionDuration时,可认为以后blockA以及其后的block,都能够被删掉了。
// tsdb/db.gofunc (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) { // Time retention is disabled or no blocks to work with. if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { return } deletable = make(map[ulid.ULID]*Block) for i, block := range blocks { // The difference between the first block and this block is larger than // the retention period so any blocks after that are added as deletable. if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration { for _, b := range blocks[i:] { deletable[b.meta.ULID] = b } break } } return deletable}
超过retentionSize的blocks
总的blockSize = walSize + headChunkSize + block1Size + block2Size + ...
blocks按maxTime从大到小排序,blocks[0]是最近工夫的block。
遍历blocks时,sumBlockSize += blockSize, 若发现sumBlockSize > retentionSize,则将该block以及其后的block退出待删除列表:
// tsdb/db.gofunc (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) { // Size retention is disabled or no blocks to work with. if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { return } deletable = make(map[ulid.ULID]*Block) walSize, _ := db.Head().wal.Size() headChunksSize := db.Head().chunkDiskMapper.Size() // Initializing size counter with WAL size and Head chunks // written to disk, as that is part of the retention strategy. blocksSize := walSize + headChunksSize for i, block := range blocks { blocksSize += block.Size() if blocksSize > int64(db.opts.MaxBytes) { // Add this and all following blocks for deletion. for _, b := range blocks[i:] { deletable[b.meta.ULID] = b } break } } return deletable}
参考:
1.https://ganeshvernekar.com/blog/prometheus-tsdb-compaction-an...