乐趣区

关于prometheus:prometheus源码分析checkpoint

什么是 checkpoint?

checkout 是 wal 中一个目录:

# ls -alh
-rw-r--r--  1 root root  11M 12 月 14 17:00 00000999
-rw-r--r--  1 root root  11M 12 月 14 19:00 00001000
-rw-r--r--  1 root root 2.1M 12 月 14 19:22 00001001
drwxr-xr-x  2 root root   22 12 月 14 19:00 checkpoint.00000998
# ls -alh checkpoint.00000998/
-rw-r--r-- 1 root root 384K 12 月 14 19:00 00000000

prometheus 写入指标时,将指标写入内存和 wal,而后返回写入胜利。wal 保障了 prometheus crash-free 的能力。

prometheus 内存中缓存了最近 2hour 的指标数据,而后 2hour 的数据被压缩成 block,存储在硬盘上;此时这 2hour 的 WAL 数据就能够被删除了。

checkpoint 就是用来清理 wal 日志的。

当 2hour 的内存数据被压缩成 block 存储至硬盘时:

  • 该工夫之前的 wal 日志就能够删除了;因为曾经长久化到硬盘了,即便 prometheus 实例宕掉,也不会丢数据;
  • 此时,prometheus 生成一个 checkpoint,进行 wal 日志的清理;

checkpoint 的整体流程

假如之前 checkpoint 为 checkpoint.m,在 segment n 处,进行 block 的存储,此时 checkpoint 的流程如下:

  • 读取文件:

    • checkpoint.m 目录下所有文件
    • segment m+1~n 之间的所有文件
  • 遍历文件内容,将 block 之后的 series 和 samples 数据,写入 checkpoint.n 目录;

checkpoint 生成之后,会删除以下文件:

  • 删除 segment.n 之前的 segment;
  • 删除之前的 checkpoint:< n(如 checkpoint.m);

checkpoint 的代码剖析

入口代码:

// tsdb/head.go
// Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) (err error) {
    ....
    // 生成新的 checkpoint
    wal.Checkpoint(h.logger, h.wal, first, last, keep, mint)
    // 将 segment.n 之前的 segment 删掉
    err := h.wal.Truncate(last + 1)
    // 将之前的 checkpoint 删掉: < last
    err := wal.DeleteCheckpoints(h.wal.Dir(), last)
}

checkpoint 的过程代码:

// tsdb/wal/checkpoint.go
// from/to:segment id
// keep:用于判断 series 是否保留
// mint: sample.timestamp >= mint 时保留
func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
    var sgmReader io.ReadCloser
    {var sgmRange []SegmentRange
        // 读 checkpoint.m
        dir, idx, err := LastCheckpoint(w.Dir())
        last := idx + 1
        if err == nil {
            from = last
            sgmRange = append(sgmRange, SegmentRange{Dir: dir, Last: math.MaxInt32})
        }
        // 读 segment.m+1 ~ n
        sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to})
        sgmReader, err = NewSegmentsRangeReader(sgmRange...)
    }
    
    r := NewReader(sgmReader)
    // 遍历所有的 record
    for r.Next() {rec := r.Record()
        switch dec.Type(rec) {
        case record.Series:
            series, err = dec.Series(rec, series)
            for _, s := range series {
                // series 是否保留
                if keep(s.Ref) {repl = append(repl, s)
                }
            }
            // 写入 buf
            if len(repl) > 0 {buf = enc.Series(repl, buf)
            }
        case record.Samples:
            samples, err = dec.Samples(rec, samples)
            repl := samples[:0]
            for _, s := range samples {
                // timestamp >= mint 须要保留
                if s.T >= mint {repl = append(repl, s)
                }
            }
            // 写入 buf
            if len(repl) > 0 {buf = enc.Samples(repl, buf)
            }
        }
        recs = append(recs, buf[start:])
        // Flush records in 1 MB increments.
        if len(buf) > 1*1024*1024 {
            // 写入 checkpoint.n 目录中
            if err := cp.Log(recs...); err != nil {return nil, errors.Wrap(err, "flush records")
            }
            buf, recs = buf[:0], recs[:0]
        }
    }
    // Flush remaining records.
    err := cp.Log(recs...)
    err := cp.Close();
    ......
}

参考:

1.https://ganeshvernekar.com/bl…

退出移动版