wal.watcher利用在remote_write的流程中:
- wal.watcher会实时读取wal最新的数据;
- 而后将其Apppend给QueueManager;
- 最初由QueueManager发送给remote storage。
一.整体框架
wal.watcher在启动时,记录wal.startTimestamp = now();
- wal.watcher先读checkpoint目录下的segment,调用readSegment()读取wal中的数据;
- wal.watcher再读wal目录下的segment,调用readSegment()读取wal中的数据;
因为wal目录中的数据是实时追加,wal.watcher会实时读取其中的数据。
readSegment()读取wal中数据的流程:
- 若遇到record.Series,则将其增加到QueueManager的series缓存中;
- 若遇到reocrd.Samples,查看其工夫 > wal.startTimeStamp,若是,则将其Append到QueueMananger;
二.流程代码
入口是QueueManager.Start():
// storage/remote/queue_manager.gofunc (t *QueueManager) Start() { ...... t.watcher.Start() ......}
其次要工作在w.loop()中:
// tsdb/wal/watcher.gofunc (w *Watcher) Start() { w.setMetrics() level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name) go w.loop()}
w.loop()保障除非退出,否则会始终执行w.Run():
// tsdb/wal/watcher.gofunc (w *Watcher) loop() { defer close(w.done) // We may encounter failures processing the WAL; we should wait and retry. for !isClosed(w.quit) { w.SetStartTime(time.Now()) // 设置w.startTimestamp if err := w.Run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } select { case <-w.quit: return case <-time.After(5 * time.Second): } }}
w.Run()是具体的执行流程:
- 首先,读checkpoint目录下的segment,进行LiveReader;
- 而后,读wal目录下的segment,进行LiveReader;
// tsdb/wal/watcher.gofunc (w *Watcher) Run() error { _, lastSegment, err := w.firstAndLast() lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) if err == nil { if err = w.readCheckpoint(lastCheckpoint); err != nil { // 首先,LiveReader checkpoint目录下的segments return errors.Wrap(err, "readCheckpoint") } } w.lastCheckpoint = lastCheckpoint currentSegment, err := w.findSegmentForIndex(checkpointIndex) for !isClosed(w.quit) { // 而后,LiveReader wal目录下的segments if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { return err } ... currentSegment++ } return nil}
读checkpoint目录下的segment:
- 遍历checkpoint目录的所有segment;
- 对每个segment,应用LiveReader读取文件,应用readSegment()解析解决;
// tsdb/wal/watcher.go// Read all the series records from a Checkpoint directory.func (w *Watcher) readCheckpoint(checkpointDir string) error { index, err := checkpointNum(checkpointDir) // Ensure we read the whole contents of every segment in the checkpoint dir. segs, err := w.segments(checkpointDir) for _, seg := range segs { sr, err := OpenReadSegment(SegmentName(checkpointDir, seg)) defer sr.Close() r := NewLiveReader(w.logger, w.readerMetrics, sr) if err := w.readSegment(r, index, false); err != io.EOF && err != nil { return errors.Wrap(err, "readSegment") } } return nil}
读wal目录下的segment:
- 调用w.readSegment()继续的对segmentNum进行读取;
// tsdb/wal/watcher.gofunc (w *Watcher) watch(segmentNum int, tail bool) error { segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum)) reader := NewLiveReader(w.logger, w.readerMetrics, segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() ....... gcSem := make(chan struct{}, 1) for { select { case <-w.quit: return nil ...... case <-readTicker.C: err = w.readSegment(reader, segmentNum, tail) // Otherwise, when we are tailing, non-EOFs are fatal. if err != io.EOF { return err } } }}
三.读数据的代码
读取在w.readSegment()中:
- r.Next(): 应用LiveReader读取segment文件;
在readSegment()中对读取的数据进行解析,发送;
- 若record是series类型,则将其保留到QueueManager;
- 若record是samples类型,查看sample.T > w.startTimestamp,而后符合条件的samples发送给QueueManager;
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { ...... for r.Next() && !isClosed(w.quit) { // 读一条record rec := r.Record() switch dec.Type(rec) { case record.Series: series, err := dec.Series(rec, series[:0]) // 将series信息保留到QueueManager w.writer.StoreSeries(series, segmentNum) case record.Samples: // 对samples而言,若segment不是最初一个正在读写的,则跳过 if !tail { break } samples, err := dec.Samples(rec, samples[:0]) for _, s := range samples { if s.T > w.startTimestamp { //工夫上无效的samples .... send = append(send, s) } } if len(send) > 0 { // 将samples发送给QueueManager w.writer.Append(send) send = send[:0] } ...... } } return r.Err()}