wal.watcher利用在remote_write的流程中:

  • wal.watcher会实时读取wal最新的数据;
  • 而后将其Apppend给QueueManager;
  • 最初由QueueManager发送给remote storage。

一.整体框架

wal.watcher在启动时,记录wal.startTimestamp = now();

  • wal.watcher先读checkpoint目录下的segment,调用readSegment()读取wal中的数据;
  • wal.watcher再读wal目录下的segment,调用readSegment()读取wal中的数据;

因为wal目录中的数据是实时追加,wal.watcher会实时读取其中的数据。

readSegment()读取wal中数据的流程:

  • 若遇到record.Series,则将其增加到QueueManager的series缓存中;
  • 若遇到reocrd.Samples,查看其工夫 > wal.startTimeStamp,若是,则将其Append到QueueMananger;

二.流程代码

入口是QueueManager.Start():

// storage/remote/queue_manager.gofunc (t *QueueManager) Start() {    ......    t.watcher.Start()    ......}

其次要工作在w.loop()中:

// tsdb/wal/watcher.gofunc (w *Watcher) Start() {    w.setMetrics()    level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)    go w.loop()}

w.loop()保障除非退出,否则会始终执行w.Run():

// tsdb/wal/watcher.gofunc (w *Watcher) loop() {    defer close(w.done)    // We may encounter failures processing the WAL; we should wait and retry.    for !isClosed(w.quit) {        w.SetStartTime(time.Now())    // 设置w.startTimestamp        if err := w.Run(); err != nil {            level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)        }        select {        case <-w.quit:            return        case <-time.After(5 * time.Second):        }    }}

w.Run()是具体的执行流程:

  • 首先,读checkpoint目录下的segment,进行LiveReader;
  • 而后,读wal目录下的segment,进行LiveReader;
// tsdb/wal/watcher.gofunc (w *Watcher) Run() error {    _, lastSegment, err := w.firstAndLast()    lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)    if err == nil {        if err = w.readCheckpoint(lastCheckpoint); err != nil {    // 首先,LiveReader checkpoint目录下的segments            return errors.Wrap(err, "readCheckpoint")        }    }    w.lastCheckpoint = lastCheckpoint    currentSegment, err := w.findSegmentForIndex(checkpointIndex)    for !isClosed(w.quit) {        // 而后,LiveReader wal目录下的segments        if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil {            return err        }        ...        currentSegment++    }    return nil}

读checkpoint目录下的segment:

  • 遍历checkpoint目录的所有segment;
  • 对每个segment,应用LiveReader读取文件,应用readSegment()解析解决;
// tsdb/wal/watcher.go// Read all the series records from a Checkpoint directory.func (w *Watcher) readCheckpoint(checkpointDir string) error {        index, err := checkpointNum(checkpointDir)    // Ensure we read the whole contents of every segment in the checkpoint dir.    segs, err := w.segments(checkpointDir)        for _, seg := range segs {            sr, err := OpenReadSegment(SegmentName(checkpointDir, seg))        defer sr.Close()        r := NewLiveReader(w.logger, w.readerMetrics, sr)        if err := w.readSegment(r, index, false); err != io.EOF && err != nil {            return errors.Wrap(err, "readSegment")        }    }    return nil}

读wal目录下的segment:

  • 调用w.readSegment()继续的对segmentNum进行读取;
// tsdb/wal/watcher.gofunc (w *Watcher) watch(segmentNum int, tail bool) error {    segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))    reader := NewLiveReader(w.logger, w.readerMetrics, segment)    readTicker := time.NewTicker(readPeriod)    defer readTicker.Stop()    .......    gcSem := make(chan struct{}, 1)    for {        select {        case <-w.quit:            return nil        ......        case <-readTicker.C:            err = w.readSegment(reader, segmentNum, tail)            // Otherwise, when we are tailing, non-EOFs are fatal.            if err != io.EOF {                return err            }        }    }}

三.读数据的代码

读取在w.readSegment()中:

  • r.Next(): 应用LiveReader读取segment文件;
  • 在readSegment()中对读取的数据进行解析,发送;

    • 若record是series类型,则将其保留到QueueManager;
    • 若record是samples类型,查看sample.T > w.startTimestamp,而后符合条件的samples发送给QueueManager;
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {    ......    for r.Next() && !isClosed(w.quit) {    // 读一条record            rec := r.Record()        switch dec.Type(rec) {        case record.Series:            series, err := dec.Series(rec, series[:0])            // 将series信息保留到QueueManager            w.writer.StoreSeries(series, segmentNum)        case record.Samples:            // 对samples而言,若segment不是最初一个正在读写的,则跳过            if !tail {                break            }            samples, err := dec.Samples(rec, samples[:0])                        for _, s := range samples {                                if s.T > w.startTimestamp {    //工夫上无效的samples                    ....                    send = append(send, s)                }            }            if len(send) > 0 {                // 将samples发送给QueueManager                w.writer.Append(send)                                send = send[:0]            }        ......        }    }    return r.Err()}