关于prometheus:prometheus源码分析compaction-和-retention

6次阅读

共计 7973 个字符,预计需要花费 20 分钟才能阅读完成。

为什么要 compact?

  • 待删除的数据未被真正删除,仅记录在 tombstone 文件中;
  • 相邻 block 的 index 文件个别状况是雷同,compact 能够缩小 disk usage;
  • 当查问的后果波及 N 个 block 时,blocks 数据的合并会消耗大量资源;

compact 不会无止境的运行,prometheus 中限度单个 block 最长的时间跨度 =31d(744hour),或者 1 /10 的 retentionTime,两者取最小值。

{ // Max block size  settings.
   if cfg.tsdb.MaxBlockDuration == 0 {maxBlockDuration, err := model.ParseDuration("31d")
      // When the time retention is set and not too big use to define the max block duration.
      if cfg.tsdb.RetentionDuration != 0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration {maxBlockDuration = cfg.tsdb.RetentionDuration / 10}
      cfg.tsdb.MaxBlockDuration = maxBlockDuration
   }
}

compact 的过程中,波及到基于 retention 的数据删除,能够基于 sizeRetention 或 timeRetention。

一. 整体框架

prometheus 中启动 1 个 goroutine 执行 db.run()。
在 db.run()中执行了 1min 的 loop 循环:

  • 查看 Head block 是否须要 compact,若是,则先将 head block compact 成 disk block;
  • 而后 compact disk block:

    • 首先,生成 compact 打算,即找出能够 compact 的 blocks;
    • 而后,将找进去的 blocks 执行 compact 过程;
    • 最初,删除掉过期的 blocks;

二.compact 的整体逻辑代码

代码入口:

  • 1min 的循环 loop,在 loop 前,有 backoff 的工夫微调;
// tsdb/db.go
func (db *DB) run() {backoff := time.Duration(0)
    for {
        // 工夫回退
        select {case <-time.After(backoff):
        }
        select {case <-time.After(1 * time.Minute):
            select {case db.compactc <- struct{}{}:
            default:
            }
        // 1min 执行 1 次
        case <-db.compactc:    
            if db.autoCompact {if err := db.Compact(); err != nil {    // 失败的话,指数回退    
                    backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
                } else {backoff = 0}
            }
            ......
        }
    }
}

具体 compact 的逻辑:

// tsdb/db.go
func (db *DB) Compact() (err error) {
    .....
    // 先查看 head block 是否须要 compact
    // 行将 head block --> disk block
    for {if !db.head.compactable() {break}
        mint := db.head.MinTime()
        maxt := rangeForTimestamp(mint, db.head.chunkRange)
        head := NewRangeHead(db.head, mint, maxt-1)
        if err := db.compactHead(head); err != nil {return err}
    }
    // 再 compact 磁盘 block
    return db.compactBlocks()}

先查看 head block 是否须要 compact
当 head block 的时间跨度 > chunkRange 的 1.5 倍时,则须要将 head block 变成 disk block:

// tsdb/head.go
func (h *Head) compactable() bool {return h.MaxTime()-h.MinTime() > h.chunkRange/2*3}

三.disk block compact 的过程剖析

磁盘 block 的 compact 过程:

  • 学生成 compact plan;
  • 再执行 compact;
  • 最初 reload 删掉源文件;
// tsdb/db.go
func (db *DB) compactBlocks() (err error) {
    // Check for compactions of multiple blocks.
    for {
        // 学生成 compact plan
        plan, err := db.compactor.Plan(db.dir)        
        if len(plan) == 0 {    // 没有 plan,间接返回
            break
        }    
        // 再执行 compact
        uid, err := db.compactor.Compact(db.dir, plan, db.blocks)
        if err != nil {return errors.Wrapf(err, "compact %s", plan)
        }
        runtime.GC()
        // 最初 reload 删掉源文件
        if err := db.reload(); err != nil {if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid)
            }
            return errors.Wrap(err, "reload blocks")
        }
        runtime.GC()}
    return nil
}

1. 生成 compact plan

抉择 blocks 的优先级如下:

  • 首先,查看是否有 Overlap 的 block,若存在则间接返回;

    • 个别状况下,block 之间的 timestamp 没有 overlap,除非是被动写入了 < 以后 timestamp 的时序序列;

    Moreover, Prometheus itself does not produce overlapping blocks, it’s only possible if you backfill some data into Prometheus.

  • 其次,按 timeRange 在 blocks 中抉择能够 compact 的 blocks,若找到则返回;
  • 最初,找满足 numTombstoneSeries /numSeriesTotal > 5% 的 block,该 plan 仅输入 1 个 block,它会生成新的 block(不含 tombstone 中的数据);
// tsdb/compact.go
func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {sort.Slice(dms, func(i, j int) bool {return dms[i].meta.MinTime < dms[j].meta.MinTime
    })
    // 1. 先查看是否有 overlap 的 block
    // 若有,间接返回
    res := c.selectOverlappingDirs(dms)
    if len(res) > 0 {return res, nil}
    // 排除掉最新的 block
    dms = dms[:len(dms)-1]
    // 2. 而后,在 block dir 中抉择能够压缩的 block
    // 若找到,则返回
    for _, dm := range c.selectDirs(dms) {res = append(res, dm.dir)
    }
    if len(res) > 0 {return res, nil}
    // 3. 找 block,满足 numTombstone / numSeries > 5%
    // Compact any blocks with big enough time range that have >5% tombstones.
    for i := len(dms) - 1; i >= 0; i-- {meta := dms[i].meta
        if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {break}
        if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {return []string{dms[i].dir}, nil
        }
    }
    return nil, nil
}

如何在 blocks 中抉择出要 compact 的 block?

  • 将所有的 blocks 按 minTime 从小到大排序,在筛选 blocks 时,先排除最新的 blocks,即去掉 blocks[size-1];
  • 按 range 查找 block 列表,range={6hour, 18hour, 54hour, ….},3 的倍数递增;
  • 先按 range=6hour 找,找到在 6hour 内的 blocks,则返回;否则,按 18hour 找;

以 range=6hour 为例,筛选 parts 的条件:

  • 假如以后 block: {b1, b2, b3, b-latest},将其按 6hour 进行切割

    • 切完后:[b1, b2], [b3], b-latest
  • 对于[b1, b2]:

    • 若 b2.maxTime – b1.minTime = 6hour,则 [b1, b2] 能够执行压缩;
    • 若 b2.maxTime < b3.minTime,则 [b1, b2] 能够执行压缩;
// tsdb/db.go
// 返回[2, 6, 18, 54, ......],即 2,2*3,2*3*3,...
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)

// 抉择 deletableBlocks
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {if len(c.ranges) < 2 || len(ds) < 1 {return nil}
    highTime := ds[len(ds)-1].meta.MinTime
    // range[1:] = {6hour, 18hour, 54hour, ....}
    for _, iv := range c.ranges[1:] {parts := splitByRange(ds, iv)    // 按 range=iv 查找 blocks
        if len(parts) == 0 {continue}
    Outer:    
        for _, p := range parts {mint := p[0].meta.MinTime
            maxt := p[len(p)-1].meta.MaxTime
            // Pick the range of blocks if it spans the full range (potentially with gaps)
            // or is before the most recent block.
            // This ensures we don't compact blocks prematurely when another one of the same
            // size still fits in the range.
            if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {    // 满足条件
                return p
            }
        }
    }
    return nil
}

2. 执行 compact

将上一步抉择的 plan 作为 compact 的起源,执行 compact:

  • 将 plan 中的 blocks 数据 compact 生成新的 block;
  • 将 plan 中的 blocks 元数据合并生成新的 meta.json;

    • 元数据中指明 source、parents、minTime、maxTime,同时将 compactLevel + 1;
// tsdb/compact.go
// dest = db.dir, dirs = plan, open = db.blocks
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
    var (blocks []BlockReader
        bs     []*Block
        metas  []*BlockMeta
        uids   []string)    
    // 将 dir 下所有 block 加载,放入[]blocks
    for _, d := range dirs {meta, _, err := readMetaFile(d)    
        
        var b *Block
        // Use already open blocks if we can, to avoid
        // having the index data in memory twice.
        for _, o := range open {if meta.ULID == o.Meta().ULID {
                b = o
                break
            }
        }
        if b == nil {b, err = OpenBlock(c.logger, d, c.chunkPool)            
            defer b.Close()}
        metas = append(metas, meta)
        blocks = append(blocks, b)
        bs = append(bs, b)
        uids = append(uids, meta.ULID.String())
    }
    
    // 新的 uid    
    uid = ulid.MustNew(ulid.Now(), rand.Reader)
    // 新的 metadata
    meta := compactBlockMetas(uid, metas...)
    // 合并 blocks,将 meta 信息写入 meta.json
    // 合并 index,写入 index 文件
    err = c.write(dest, meta, blocks...)
    var merr tsdb_errors.MultiError
    merr.Add(err)    
    return uid, merr
}

3. 删掉源文件:reload

查看 retentionTime 和 retentionSize,删除不满足条件的 blocks。

  • 从所有的 blocks 中抉择能够被删除的 blocks:

    • 整个 block 的工夫,已齐全超过 retentionTime;
    • 所有 block 的 size,已超过 retentionSize,删除满足 retentionSize 的最晚工夫的那些 blocks;
  • 将 deleteable blocks 删除;
// tsdb/db.go
// reload blocks and trigger head truncation if new blocks appeared.
// Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) {loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)    

    // 抉择可删除的 blocks(已超过 retentionTime 或 retentionSize)
    deletable := db.deletableBlocks(loadable)

    if err := db.deleteBlocks(deletable); err != nil {    // 删除 blocks
        return err
    }

    // Garbage collect data in the head if the most recent persisted block
    // covers data of its current time range.
    if len(loadable) == 0 {return nil}
    maxt := loadable[len(loadable)-1].Meta().MaxTime
    return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")

超过 retentionTime 的 blocks

因为 blocks 按 maxTime 从大到小排序,blocks[0]是最近工夫的 block;
当遍历 blocks 时,当发现(block0.maxTime – blockA.maxTime) > retentionDuration 时,可认为以后 blockA 以及其后的 block,都能够被删掉了。

// tsdb/db.go
func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {
    // Time retention is disabled or no blocks to work with.
    if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 {return}
    deletable = make(map[ulid.ULID]*Block)
    for i, block := range blocks {
        // The difference between the first block and this block is larger than
        // the retention period so any blocks after that are added as deletable.
        if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration {for _, b := range blocks[i:] {deletable[b.meta.ULID] = b
            }
            break
        }
    }
    return deletable
}

超过 retentionSize 的 blocks

总的 blockSize = walSize + headChunkSize + block1Size + block2Size + …
blocks 按 maxTime 从大到小排序,blocks[0]是最近工夫的 block。
遍历 blocks 时,sumBlockSize += blockSize, 若发现 sumBlockSize > retentionSize,则将该 block 以及其后的 block 退出待删除列表:

// tsdb/db.go
func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {
    // Size retention is disabled or no blocks to work with.
    if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 {return}
    deletable = make(map[ulid.ULID]*Block)
    walSize, _ := db.Head().wal.Size()
    headChunksSize := db.Head().chunkDiskMapper.Size()
    // Initializing size counter with WAL size and Head chunks
    // written to disk, as that is part of the retention strategy.
    blocksSize := walSize + headChunksSize
    for i, block := range blocks {blocksSize += block.Size()
        if blocksSize > int64(db.opts.MaxBytes) {
            // Add this and all following blocks for deletion.
            for _, b := range blocks[i:] {deletable[b.meta.ULID] = b
            }
            break
        }
    }
    return deletable
}

参考:

1.https://ganeshvernekar.com/blog/prometheus-tsdb-compaction-an…

正文完
 0