什么是checkpoint?

checkout是wal中一个目录:

# ls -alh-rw-r--r--  1 root root  11M 12月 14 17:00 00000999-rw-r--r--  1 root root  11M 12月 14 19:00 00001000-rw-r--r--  1 root root 2.1M 12月 14 19:22 00001001drwxr-xr-x  2 root root   22 12月 14 19:00 checkpoint.00000998# ls -alh checkpoint.00000998/-rw-r--r-- 1 root root 384K 12月 14 19:00 00000000

prometheus写入指标时,将指标写入内存和wal,而后返回写入胜利。wal保障了prometheus crash-free的能力。

prometheus内存中缓存了最近2hour的指标数据,而后2hour的数据被压缩成block,存储在硬盘上;此时这2hour的WAL数据就能够被删除了。

checkpoint就是用来清理wal日志的。

当2hour的内存数据被压缩成block存储至硬盘时:

  • 该工夫之前的wal日志就能够删除了;因为曾经长久化到硬盘了,即便prometheus实例宕掉,也不会丢数据;
  • 此时,prometheus生成一个checkpoint,进行wal日志的清理;

checkpoint的整体流程

假如之前checkpoint为checkpoint.m,在segment n处,进行block的存储,此时checkpoint的流程如下:

  • 读取文件:

    • checkpoint.m目录下所有文件
    • segment m+1~n之间的所有文件
  • 遍历文件内容,将block之后的series和samples数据,写入checkpoint.n目录;

checkpoint生成之后,会删除以下文件:

  • 删除segment.n之前的segment;
  • 删除之前的checkpoint:< n(如checkpoint.m);

checkpoint的代码剖析

入口代码:

// tsdb/head.go// Truncate removes old data before mint from the head.func (h *Head) Truncate(mint int64) (err error) {    ....    // 生成新的checkpoint    wal.Checkpoint(h.logger, h.wal, first, last, keep, mint)    // 将segment.n之前的segment删掉    err := h.wal.Truncate(last + 1)    // 将之前的checkpoint删掉: < last    err := wal.DeleteCheckpoints(h.wal.Dir(), last)}

checkpoint的过程代码:

// tsdb/wal/checkpoint.go// from/to:segment id// keep:用于判断series是否保留// mint: sample.timestamp >= mint时保留func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {    var sgmReader io.ReadCloser    {        var sgmRange []SegmentRange        // 读checkpoint.m        dir, idx, err := LastCheckpoint(w.Dir())        last := idx + 1        if err == nil {            from = last            sgmRange = append(sgmRange, SegmentRange{Dir: dir, Last: math.MaxInt32})        }        // 读segment.m+1 ~ n        sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to})        sgmReader, err = NewSegmentsRangeReader(sgmRange...)    }        r := NewReader(sgmReader)    // 遍历所有的record    for r.Next() {        rec := r.Record()        switch dec.Type(rec) {        case record.Series:            series, err = dec.Series(rec, series)            for _, s := range series {                // series是否保留                if keep(s.Ref) {                    repl = append(repl, s)                }            }            // 写入buf            if len(repl) > 0 {                buf = enc.Series(repl, buf)            }        case record.Samples:            samples, err = dec.Samples(rec, samples)            repl := samples[:0]            for _, s := range samples {                // timestamp >= mint须要保留                if s.T >= mint {                    repl = append(repl, s)                }            }            // 写入buf            if len(repl) > 0 {                buf = enc.Samples(repl, buf)            }        }        recs = append(recs, buf[start:])        // Flush records in 1 MB increments.        if len(buf) > 1*1024*1024 {            // 写入checkpoint.n目录中            if err := cp.Log(recs...); err != nil {                return nil, errors.Wrap(err, "flush records")            }            buf, recs = buf[:0], recs[:0]        }    }    // Flush remaining records.    err := cp.Log(recs...)    err := cp.Close();    ......}

参考:

1.https://ganeshvernekar.com/bl...