关于prometheus:prometheus源码分析walwatcher与remotewrite

4次阅读

共计 3580 个字符,预计需要花费 9 分钟才能阅读完成。

wal.watcher 利用在 remote_write 的流程中:

  • wal.watcher 会实时读取 wal 最新的数据;
  • 而后将其 Apppend 给 QueueManager;
  • 最初由 QueueManager 发送给 remote storage。

一. 整体框架

wal.watcher 在启动时,记录 wal.startTimestamp = now();

  • wal.watcher 先读 checkpoint 目录下的 segment,调用 readSegment() 读取 wal 中的数据;
  • wal.watcher 再读 wal 目录下的 segment,调用 readSegment() 读取 wal 中的数据;

因为 wal 目录中的数据是实时追加,wal.watcher 会实时读取其中的数据。

readSegment() 读取 wal 中数据的流程:

  • 若遇到 record.Series,则将其增加到 QueueManager 的 series 缓存中;
  • 若遇到 reocrd.Samples,查看其工夫 > wal.startTimeStamp,若是,则将其 Append 到 QueueMananger;

二. 流程代码

入口是 QueueManager.Start():

// storage/remote/queue_manager.go
func (t *QueueManager) Start() {
    ......
    t.watcher.Start()
    ......
}

其次要工作在 w.loop() 中:

// tsdb/wal/watcher.go
func (w *Watcher) Start() {w.setMetrics()
    level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)
    go w.loop()}

w.loop() 保障除非退出,否则会始终执行 w.Run():

// tsdb/wal/watcher.go
func (w *Watcher) loop() {defer close(w.done)
    // We may encounter failures processing the WAL; we should wait and retry.
    for !isClosed(w.quit) {w.SetStartTime(time.Now())    // 设置 w.startTimestamp
        if err := w.Run(); err != nil {level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
        }
        select {
        case <-w.quit:
            return
        case <-time.After(5 * time.Second):
        }
    }
}

w.Run() 是具体的执行流程:

  • 首先,读 checkpoint 目录下的 segment,进行 LiveReader;
  • 而后,读 wal 目录下的 segment,进行 LiveReader;
// tsdb/wal/watcher.go
func (w *Watcher) Run() error {_, lastSegment, err := w.firstAndLast()
    lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
    if err == nil {if err = w.readCheckpoint(lastCheckpoint); err != nil {    // 首先,LiveReader checkpoint 目录下的 segments
            return errors.Wrap(err, "readCheckpoint")
        }
    }
    w.lastCheckpoint = lastCheckpoint

    currentSegment, err := w.findSegmentForIndex(checkpointIndex)
    for !isClosed(w.quit) {
        // 而后,LiveReader wal 目录下的 segments
        if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil {return err}
        ...
        currentSegment++
    }
    return nil
}

读 checkpoint 目录下的 segment:

  • 遍历 checkpoint 目录的所有 segment;
  • 对每个 segment,应用 LiveReader 读取文件,应用 readSegment() 解析解决;
// tsdb/wal/watcher.go
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string) error {index, err := checkpointNum(checkpointDir)
    // Ensure we read the whole contents of every segment in the checkpoint dir.
    segs, err := w.segments(checkpointDir)
    
    for _, seg := range segs {sr, err := OpenReadSegment(SegmentName(checkpointDir, seg))
        defer sr.Close()

        r := NewLiveReader(w.logger, w.readerMetrics, sr)
        if err := w.readSegment(r, index, false); err != io.EOF && err != nil {return errors.Wrap(err, "readSegment")
        }
    }
    return nil
}

读 wal 目录下的 segment:

  • 调用 w.readSegment() 继续的对 segmentNum 进行读取;
// tsdb/wal/watcher.go
func (w *Watcher) watch(segmentNum int, tail bool) error {segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
    reader := NewLiveReader(w.logger, w.readerMetrics, segment)

    readTicker := time.NewTicker(readPeriod)
    defer readTicker.Stop()
    .......
    gcSem := make(chan struct{}, 1)
    for {
        select {
        case <-w.quit:
            return nil
        ......
        case <-readTicker.C:
            err = w.readSegment(reader, segmentNum, tail)
            // Otherwise, when we are tailing, non-EOFs are fatal.
            if err != io.EOF {return err}
        }
    }
}

三. 读数据的代码

读取在 w.readSegment() 中:

  • r.Next(): 应用 LiveReader 读取 segment 文件;
  • 在 readSegment() 中对读取的数据进行解析,发送;

    • 若 record 是 series 类型,则将其保留到 QueueManager;
    • 若 record 是 samples 类型,查看 sample.T > w.startTimestamp,而后符合条件的 samples 发送给 QueueManager;
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
    ......
    for r.Next() && !isClosed(w.quit) {    // 读一条 record    
        rec := r.Record()
        switch dec.Type(rec) {
        case record.Series:
            series, err := dec.Series(rec, series[:0])
            // 将 series 信息保留到 QueueManager
            w.writer.StoreSeries(series, segmentNum)
        case record.Samples:
            // 对 samples 而言,若 segment 不是最初一个正在读写的,则跳过
            if !tail {break}
            samples, err := dec.Samples(rec, samples[:0])            
            for _, s := range samples {                
                if s.T > w.startTimestamp {    // 工夫上无效的 samples
                    ....
                    send = append(send, s)
                }
            }
            if len(send) > 0 {
                // 将 samples 发送给 QueueManager
                w.writer.Append(send)                
                send = send[:0]
            }
        ......
        }
    }
    return r.Err()}

正文完
 0