本文次要钻研一下promtail的positions

Positions

loki/pkg/promtail/positions/positions.go

type Positions interface {    // GetString returns how far we've through a file as a string.    // JournalTarget writes a journal cursor to the positions file, while    // FileTarget writes an integer offset. Use Get to read the integer    // offset.    GetString(path string) string    // Get returns how far we've read through a file. Returns an error    // if the value stored for the file is not an integer.    Get(path string) (int64, error)    // PutString records (asynchronously) how far we've read through a file.    // Unlike Put, it records a string offset and is only useful for    // JournalTargets which doesn't have integer offsets.    PutString(path string, pos string)    // Put records (asynchronously) how far we've read through a file.    Put(path string, pos int64)    // Remove removes the position tracking for a filepath    Remove(path string)    // SyncPeriod returns how often the positions file gets resynced    SyncPeriod() time.Duration    // Stop the Position tracker.    Stop()}
Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop办法

positions

loki/pkg/promtail/positions/positions.go

// Positions tracks how far through each file we've read.type positions struct {    logger    log.Logger    cfg       Config    mtx       sync.Mutex    positions map[string]string    quit      chan struct{}    done      chan struct{}}func (p *positions) Stop() {    close(p.quit)    <-p.done}func (p *positions) PutString(path string, pos string) {    p.mtx.Lock()    defer p.mtx.Unlock()    p.positions[path] = pos}func (p *positions) Put(path string, pos int64) {    p.PutString(path, strconv.FormatInt(pos, 10))}func (p *positions) GetString(path string) string {    p.mtx.Lock()    defer p.mtx.Unlock()    return p.positions[path]}func (p *positions) Get(path string) (int64, error) {    p.mtx.Lock()    defer p.mtx.Unlock()    pos, ok := p.positions[path]    if !ok {        return 0, nil    }    return strconv.ParseInt(pos, 10, 64)}func (p *positions) Remove(path string) {    p.mtx.Lock()    defer p.mtx.Unlock()    p.remove(path)}func (p *positions) remove(path string) {    delete(p.positions, path)}func (p *positions) SyncPeriod() time.Duration {    return p.cfg.SyncPeriod}
positions定义了logger、cfg、mtx、positions、quit、done属性;它实现了Positions接口;其Get办法从p.positions读取数据;其Put办法写数据到p.positions中;其SyncPeriod办法返回的是p.cfg.SyncPeriod;其Remove办法将path从p.positions中删除

New

loki/pkg/promtail/positions/positions.go

// New makes a new Positions.func New(logger log.Logger, cfg Config) (Positions, error) {    positionData, err := readPositionsFile(cfg, logger)    if err != nil {        return nil, err    }    p := &positions{        logger:    logger,        cfg:       cfg,        positions: positionData,        quit:      make(chan struct{}),        done:      make(chan struct{}),    }    go p.run()    return p, nil}
New办法会通过readPositionsFile读取positionData创立positions,而后异步执行p.run()

run

loki/pkg/promtail/positions/positions.go

func (p *positions) run() {    defer func() {        p.save()        level.Debug(p.logger).Log("msg", "positions saved")        close(p.done)    }()    ticker := time.NewTicker(p.cfg.SyncPeriod)    for {        select {        case <-p.quit:            return        case <-ticker.C:            p.save()            p.cleanup()        }    }}func (p *positions) save() {    if p.cfg.ReadOnly {        return    }    p.mtx.Lock()    positions := make(map[string]string, len(p.positions))    for k, v := range p.positions {        positions[k] = v    }    p.mtx.Unlock()    if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {        level.Error(p.logger).Log("msg", "error writing positions file", "error", err)    }}func (p *positions) cleanup() {    p.mtx.Lock()    defer p.mtx.Unlock()    toRemove := []string{}    for k := range p.positions {        // If the position file is prefixed with journal, it's a        // JournalTarget cursor and not a file on disk.        if strings.HasPrefix(k, "journal-") {            continue        }        if _, err := os.Stat(k); err != nil {            if os.IsNotExist(err) {                // File no longer exists.                toRemove = append(toRemove, k)            } else {                // Can't determine if file exists or not, some other error.                level.Warn(p.logger).Log("msg", "could not determine if log file "+                    "still exists while cleaning positions file", "error", err)            }        }    }    for _, tr := range toRemove {        p.remove(tr)    }}
run办法通过time.NewTicker(p.cfg.SyncPeriod)来触发执行p.save()及p.cleanup();save办法将positions长久化到文件;cleanup办法遍历p.positions,从内存中移除文件不存在的position

readPositionsFile

loki/pkg/promtail/positions/positions.go

func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {    cleanfn := filepath.Clean(cfg.PositionsFile)    buf, err := ioutil.ReadFile(cleanfn)    if err != nil {        if os.IsNotExist(err) {            return map[string]string{}, nil        }        return nil, err    }    var p File    err = yaml.UnmarshalStrict(buf, &p)    if err != nil {        // return empty if cfg option enabled        if cfg.IgnoreInvalidYaml {            level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)            return map[string]string{}, nil        }        return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)    }    // p.Positions will be nil if the file exists but is empty    if p.Positions == nil {        p.Positions = map[string]string{}    }    return p.Positions, nil}
readPositionsFile办法从文件读取地位到p.Positions

writePositionFile

loki/pkg/promtail/positions/positions.go

func writePositionFile(filename string, positions map[string]string) error {    buf, err := yaml.Marshal(File{        Positions: positions,    })    if err != nil {        return err    }    target := filepath.Clean(filename)    temp := target + "-new"    err = ioutil.WriteFile(temp, buf, os.FileMode(positionFileMode))    if err != nil {        return err    }    return os.Rename(temp, target)}
writePositionFile办法将positions写入文件

小结

promtail的Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop办法;positions实现了Positions接口;其Get办法从p.positions读取数据;其Put办法写数据到p.positions中;其SyncPeriod办法返回的是p.cfg.SyncPeriod;其Remove办法将path从p.positions中删除。

doc

  • promtail