共计 3580 个字符,预计需要花费 9 分钟才能阅读完成。
wal.watcher 利用在 remote_write 的流程中:
- wal.watcher 会实时读取 wal 最新的数据;
- 而后将其 Apppend 给 QueueManager;
- 最初由 QueueManager 发送给 remote storage。
一. 整体框架
wal.watcher 在启动时,记录 wal.startTimestamp = now();
- wal.watcher 先读 checkpoint 目录下的 segment,调用 readSegment() 读取 wal 中的数据;
- wal.watcher 再读 wal 目录下的 segment,调用 readSegment() 读取 wal 中的数据;
因为 wal 目录中的数据是实时追加,wal.watcher 会实时读取其中的数据。
readSegment() 读取 wal 中数据的流程:
- 若遇到 record.Series,则将其增加到 QueueManager 的 series 缓存中;
- 若遇到 reocrd.Samples,查看其工夫 > wal.startTimeStamp,若是,则将其 Append 到 QueueMananger;
二. 流程代码
入口是 QueueManager.Start():
// storage/remote/queue_manager.go
func (t *QueueManager) Start() {
......
t.watcher.Start()
......
}
其次要工作在 w.loop() 中:
// tsdb/wal/watcher.go
func (w *Watcher) Start() {w.setMetrics()
level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)
go w.loop()}
w.loop() 保障除非退出,否则会始终执行 w.Run():
// tsdb/wal/watcher.go
func (w *Watcher) loop() {defer close(w.done)
// We may encounter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) {w.SetStartTime(time.Now()) // 设置 w.startTimestamp
if err := w.Run(); err != nil {level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
}
select {
case <-w.quit:
return
case <-time.After(5 * time.Second):
}
}
}
w.Run() 是具体的执行流程:
- 首先,读 checkpoint 目录下的 segment,进行 LiveReader;
- 而后,读 wal 目录下的 segment,进行 LiveReader;
// tsdb/wal/watcher.go
func (w *Watcher) Run() error {_, lastSegment, err := w.firstAndLast()
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
if err == nil {if err = w.readCheckpoint(lastCheckpoint); err != nil { // 首先,LiveReader checkpoint 目录下的 segments
return errors.Wrap(err, "readCheckpoint")
}
}
w.lastCheckpoint = lastCheckpoint
currentSegment, err := w.findSegmentForIndex(checkpointIndex)
for !isClosed(w.quit) {
// 而后,LiveReader wal 目录下的 segments
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil {return err}
...
currentSegment++
}
return nil
}
读 checkpoint 目录下的 segment:
- 遍历 checkpoint 目录的所有 segment;
- 对每个 segment,应用 LiveReader 读取文件,应用 readSegment() 解析解决;
// tsdb/wal/watcher.go
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string) error {index, err := checkpointNum(checkpointDir)
// Ensure we read the whole contents of every segment in the checkpoint dir.
segs, err := w.segments(checkpointDir)
for _, seg := range segs {sr, err := OpenReadSegment(SegmentName(checkpointDir, seg))
defer sr.Close()
r := NewLiveReader(w.logger, w.readerMetrics, sr)
if err := w.readSegment(r, index, false); err != io.EOF && err != nil {return errors.Wrap(err, "readSegment")
}
}
return nil
}
读 wal 目录下的 segment:
- 调用 w.readSegment() 继续的对 segmentNum 进行读取;
// tsdb/wal/watcher.go
func (w *Watcher) watch(segmentNum int, tail bool) error {segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
readTicker := time.NewTicker(readPeriod)
defer readTicker.Stop()
.......
gcSem := make(chan struct{}, 1)
for {
select {
case <-w.quit:
return nil
......
case <-readTicker.C:
err = w.readSegment(reader, segmentNum, tail)
// Otherwise, when we are tailing, non-EOFs are fatal.
if err != io.EOF {return err}
}
}
}
三. 读数据的代码
读取在 w.readSegment() 中:
- r.Next(): 应用 LiveReader 读取 segment 文件;
-
在 readSegment() 中对读取的数据进行解析,发送;
- 若 record 是 series 类型,则将其保留到 QueueManager;
- 若 record 是 samples 类型,查看 sample.T > w.startTimestamp,而后符合条件的 samples 发送给 QueueManager;
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
......
for r.Next() && !isClosed(w.quit) { // 读一条 record
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
// 将 series 信息保留到 QueueManager
w.writer.StoreSeries(series, segmentNum)
case record.Samples:
// 对 samples 而言,若 segment 不是最初一个正在读写的,则跳过
if !tail {break}
samples, err := dec.Samples(rec, samples[:0])
for _, s := range samples {
if s.T > w.startTimestamp { // 工夫上无效的 samples
....
send = append(send, s)
}
}
if len(send) > 0 {
// 将 samples 发送给 QueueManager
w.writer.Append(send)
send = send[:0]
}
......
}
}
return r.Err()}
正文完
发表至: prometheus
2023-03-14