为什么要compact?

  • 待删除的数据未被真正删除,仅记录在tombstone文件中;
  • 相邻block的index文件个别状况是雷同,compact能够缩小disk usage;
  • 当查问的后果波及N个block时,blocks数据的合并会消耗大量资源;

compact不会无止境的运行,prometheus中限度单个block最长的时间跨度=31d(744hour),或者1/10的retentionTime,两者取最小值。

{ // Max block size  settings.   if cfg.tsdb.MaxBlockDuration == 0 {      maxBlockDuration, err := model.ParseDuration("31d")      // When the time retention is set and not too big use to define the max block duration.      if cfg.tsdb.RetentionDuration != 0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration {         maxBlockDuration = cfg.tsdb.RetentionDuration / 10      }      cfg.tsdb.MaxBlockDuration = maxBlockDuration   }}

compact的过程中,波及到基于retention的数据删除,能够基于sizeRetention 或 timeRetention。

一.整体框架

prometheus中启动1个goroutine执行db.run()。
在db.run()中执行了1min的loop循环:

  • 查看Head block是否须要compact,若是,则先将head block compact 成 disk block;
  • 而后compact disk block:

    • 首先,生成compact打算,即找出能够compact的blocks;
    • 而后,将找进去的blocks执行compact过程;
    • 最初,删除掉过期的blocks;

二.compact的整体逻辑代码

代码入口:

  • 1min的循环loop,在loop前,有backoff的工夫微调;
// tsdb/db.gofunc (db *DB) run() {    backoff := time.Duration(0)    for {        // 工夫回退        select {                case <-time.After(backoff):        }        select {        case <-time.After(1 * time.Minute):            select {            case db.compactc <- struct{}{}:            default:            }        // 1min执行1次        case <-db.compactc:                if db.autoCompact {                if err := db.Compact(); err != nil {    //失败的话,指数回退                        backoff = exponential(backoff, 1*time.Second, 1*time.Minute)                } else {                    backoff = 0                }            }            ......        }    }}

具体compact的逻辑:

// tsdb/db.gofunc (db *DB) Compact() (err error) {    .....    // 先查看head block是否须要compact    // 行将head block --> disk block    for {        if !db.head.compactable() {            break        }        mint := db.head.MinTime()        maxt := rangeForTimestamp(mint, db.head.chunkRange)        head := NewRangeHead(db.head, mint, maxt-1)        if err := db.compactHead(head); err != nil {            return err        }    }    // 再compact磁盘block    return db.compactBlocks()}

先查看head block是否须要compact
当head block的时间跨度 > chunkRange的1.5倍时,则须要将head block变成disk block:

// tsdb/head.gofunc (h *Head) compactable() bool {    return h.MaxTime()-h.MinTime() > h.chunkRange/2*3}

三.disk block compact的过程剖析

磁盘block的compact过程:

  • 学生成compact plan;
  • 再执行compact;
  • 最初reload删掉源文件;
// tsdb/db.gofunc (db *DB) compactBlocks() (err error) {    // Check for compactions of multiple blocks.    for {        // 学生成compact plan        plan, err := db.compactor.Plan(db.dir)                if len(plan) == 0 {    // 没有plan,间接返回            break        }            // 再执行compact        uid, err := db.compactor.Compact(db.dir, plan, db.blocks)        if err != nil {            return errors.Wrapf(err, "compact %s", plan)        }        runtime.GC()        // 最初reload删掉源文件        if err := db.reload(); err != nil {            if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {                return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid)            }            return errors.Wrap(err, "reload blocks")        }        runtime.GC()    }    return nil}

1.生成compact plan

抉择blocks的优先级如下:

  • 首先,查看是否有Overlap的block,若存在则间接返回;

    • 个别状况下,block之间的timestamp没有overlap,除非是被动写入了<以后timestamp的时序序列;
    Moreover, Prometheus itself does not produce overlapping blocks, it's only possible if you backfill some data into Prometheus.
  • 其次,按timeRange在blocks中抉择能够compact的blocks,若找到则返回;
  • 最初,找满足 numTombstoneSeries /numSeriesTotal > 5% 的block,该plan仅输入1个block,它会生成新的block(不含tombstone中的数据);
// tsdb/compact.gofunc (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {    sort.Slice(dms, func(i, j int) bool {        return dms[i].meta.MinTime < dms[j].meta.MinTime    })    // 1.先查看是否有overlap的block    // 若有,间接返回    res := c.selectOverlappingDirs(dms)    if len(res) > 0 {        return res, nil    }    // 排除掉最新的block    dms = dms[:len(dms)-1]    // 2.而后,在block dir中抉择能够压缩的block    // 若找到,则返回    for _, dm := range c.selectDirs(dms) {        res = append(res, dm.dir)    }    if len(res) > 0 {        return res, nil    }    // 3. 找block,满足 numTombstone / numSeries > 5%    // Compact any blocks with big enough time range that have >5% tombstones.    for i := len(dms) - 1; i >= 0; i-- {        meta := dms[i].meta        if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {            break        }        if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {            return []string{dms[i].dir}, nil        }    }    return nil, nil}

如何在blocks中抉择出要compact的block?

  • 将所有的blocks按minTime从小到大排序,在筛选blocks时,先排除最新的blocks,即去掉blocks[size-1];
  • 按range查找block列表,range={6hour, 18hour, 54hour, ....},3的倍数递增;
  • 先按range=6hour找,找到在6hour内的blocks,则返回;否则,按18hour找;

以range=6hour为例,筛选parts的条件:

  • 假如以后block: {b1, b2, b3, b-latest},将其按6hour进行切割

    • 切完后:[b1, b2], [b3], b-latest
  • 对于[b1, b2]:

    • 若b2.maxTime - b1.minTime = 6hour,则[b1, b2]能够执行压缩;
    • 若b2.maxTime < b3.minTime,则[b1, b2]能够执行压缩;
// tsdb/db.go// 返回[2, 6, 18, 54, ......],即2,2*3,2*3*3,...rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)// 抉择deletableBlocksfunc (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {    if len(c.ranges) < 2 || len(ds) < 1 {        return nil    }    highTime := ds[len(ds)-1].meta.MinTime    // range[1:] = {6hour, 18hour, 54hour, ....}    for _, iv := range c.ranges[1:] {        parts := splitByRange(ds, iv)    // 按range=iv查找blocks        if len(parts) == 0 {            continue        }    Outer:            for _, p := range parts {            mint := p[0].meta.MinTime            maxt := p[len(p)-1].meta.MaxTime            // Pick the range of blocks if it spans the full range (potentially with gaps)            // or is before the most recent block.            // This ensures we don't compact blocks prematurely when another one of the same            // size still fits in the range.            if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {    //满足条件                return p            }        }    }    return nil}

2.执行compact

将上一步抉择的plan作为compact的起源,执行compact:

  • 将plan中的blocks数据compact生成新的block;
  • 将plan中的blocks元数据合并生成新的meta.json;

    • 元数据中指明source、parents、minTime、maxTime,同时将compactLevel + 1;
// tsdb/compact.go// dest = db.dir, dirs = plan, open = db.blocksfunc (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {    var (        blocks []BlockReader        bs     []*Block        metas  []*BlockMeta        uids   []string    )        // 将dir下所有block加载,放入[]blocks    for _, d := range dirs {        meta, _, err := readMetaFile(d)                    var b *Block        // Use already open blocks if we can, to avoid        // having the index data in memory twice.        for _, o := range open {            if meta.ULID == o.Meta().ULID {                b = o                break            }        }        if b == nil {                        b, err = OpenBlock(c.logger, d, c.chunkPool)                        defer b.Close()        }        metas = append(metas, meta)        blocks = append(blocks, b)        bs = append(bs, b)        uids = append(uids, meta.ULID.String())    }        // 新的uid        uid = ulid.MustNew(ulid.Now(), rand.Reader)    // 新的metadata    meta := compactBlockMetas(uid, metas...)    // 合并blocks,将meta信息写入meta.json    // 合并index,写入index文件    err = c.write(dest, meta, blocks...)    var merr tsdb_errors.MultiError    merr.Add(err)        return uid, merr}

3.删掉源文件:reload

查看retentionTime和retentionSize,删除不满足条件的blocks。

  • 从所有的blocks中抉择能够被删除的blocks:

    • 整个block的工夫,已齐全超过retentionTime;
    • 所有block的size,已超过retentionSize,删除满足retentionSize的最晚工夫的那些blocks;
  • 将deleteable blocks删除;
// tsdb/db.go// reload blocks and trigger head truncation if new blocks appeared.// Blocks that are obsolete due to replacement or retention will be deleted.func (db *DB) reload() (err error) {        loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)        // 抉择可删除的blocks(已超过retentionTime或retentionSize)    deletable := db.deletableBlocks(loadable)    if err := db.deleteBlocks(deletable); err != nil {    // 删除blocks        return err    }    // Garbage collect data in the head if the most recent persisted block    // covers data of its current time range.    if len(loadable) == 0 {        return nil    }    maxt := loadable[len(loadable)-1].Meta().MaxTime    return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")

超过retentionTime的blocks

因为blocks按maxTime从大到小排序,blocks[0]是最近工夫的block;
当遍历blocks时,当发现(block0.maxTime - blockA.maxTime) > retentionDuration时,可认为以后blockA以及其后的block,都能够被删掉了。

// tsdb/db.gofunc (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {    // Time retention is disabled or no blocks to work with.    if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 {        return    }    deletable = make(map[ulid.ULID]*Block)    for i, block := range blocks {        // The difference between the first block and this block is larger than        // the retention period so any blocks after that are added as deletable.        if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration {            for _, b := range blocks[i:] {                deletable[b.meta.ULID] = b            }            break        }    }    return deletable}

超过retentionSize的blocks

总的blockSize = walSize + headChunkSize + block1Size + block2Size + ...
blocks按maxTime从大到小排序,blocks[0]是最近工夫的block。
遍历blocks时,sumBlockSize += blockSize, 若发现sumBlockSize > retentionSize,则将该block以及其后的block退出待删除列表:

// tsdb/db.gofunc (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {    // Size retention is disabled or no blocks to work with.    if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 {        return    }    deletable = make(map[ulid.ULID]*Block)    walSize, _ := db.Head().wal.Size()    headChunksSize := db.Head().chunkDiskMapper.Size()    // Initializing size counter with WAL size and Head chunks    // written to disk, as that is part of the retention strategy.    blocksSize := walSize + headChunksSize    for i, block := range blocks {        blocksSize += block.Size()        if blocksSize > int64(db.opts.MaxBytes) {            // Add this and all following blocks for deletion.            for _, b := range blocks[i:] {                deletable[b.meta.ULID] = b            }            break        }    }    return deletable}

参考:

1.https://ganeshvernekar.com/blog/prometheus-tsdb-compaction-an...