乐趣区

关于golang:聊聊promtail的Client

本文次要钻研一下 promtail 的 Client

Client

loki/pkg/promtail/client/client.go

// Client pushes entries to Loki and can be stopped
type Client interface {
    api.EntryHandler
    // Stop goroutine sending batch of entries.
    Stop()}

Client 接口内嵌了 api.EntryHandler 接口,定义了 Stop 办法

EntryHandler

loki/pkg/promtail/api/types.go

// EntryHandler is something that can "handle" entries.
type EntryHandler interface {Handle(labels model.LabelSet, time time.Time, entry string) error
}

EntryHandler 接口定义了 Handle 办法

client

loki/pkg/promtail/client/client.go

// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
    logger  log.Logger
    cfg     Config
    client  *http.Client
    quit    chan struct{}
    once    sync.Once
    entries chan entry
    wg      sync.WaitGroup

    externalLabels model.LabelSet
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {if len(c.externalLabels) > 0 {ls = c.externalLabels.Merge(ls)
    }

    // Get the tenant  ID in case it has been overridden while processing
    // the pipeline stages, then remove the special label
    tenantID := c.getTenantID(ls)
    if _, ok := ls[ReservedLabelTenantID]; ok {
        // Clone the label set to not manipulate the input one
        ls = ls.Clone()
        delete(ls, ReservedLabelTenantID)
    }

    c.entries <- entry{tenantID, ls, logproto.Entry{
        Timestamp: t,
        Line:      s,
    }}
    return nil
}

// Stop the client.
func (c *client) Stop() {c.once.Do(func() {close(c.quit) })
    c.wg.Wait()}

client 定义了 logger、cfg、client、quit、once、entries、wg、externalLabels 属性;它实现了 Client 接口的 Handle、Stop 办法;Handle 办法判断 LabelSet 是否蕴含 ReservedLabelTenantID,如果蕴含则会执行 ls.Clone() 及而后移除,之后结构 entry 发送到 c.entries 这个 channel;Stop 办法执行 close(c.quit)

run

loki/pkg/promtail/client/client.go

func (c *client) run() {batches := map[string]*batch{}

    // Given the client handles multiple batches (1 per tenant) and each batch
    // can be created at a different point in time, we look for batches whose
    // max wait time has been reached every 10 times per BatchWait, so that the
    // maximum delay we have sending batches is 10% of the max waiting time.
    // We apply a cap of 10ms to the ticker, to avoid too frequent checks in
    // case the BatchWait is very low.
    minWaitCheckFrequency := 10 * time.Millisecond
    maxWaitCheckFrequency := c.cfg.BatchWait / 10
    if maxWaitCheckFrequency < minWaitCheckFrequency {maxWaitCheckFrequency = minWaitCheckFrequency}

    maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)

    defer func() {
        // Send all pending batches
        for tenantID, batch := range batches {c.sendBatch(tenantID, batch)
        }

        c.wg.Done()}()

    for {
        select {
        case <-c.quit:
            return

        case e := <-c.entries:
            batch, ok := batches[e.tenantID]

            // If the batch doesn't exist yet, we create a new one with the entry
            if !ok {batches[e.tenantID] = newBatch(e)
                break
            }

            // If adding the entry to the batch will increase the size over the max
            // size allowed, we do send the current batch and then create a new one
            if batch.sizeBytesAfter(e) > c.cfg.BatchSize {c.sendBatch(e.tenantID, batch)

                batches[e.tenantID] = newBatch(e)
                break
            }

            // The max size of the batch isn't reached, so we can add the entry
            batch.add(e)

        case <-maxWaitCheck.C:
            // Send all batches whose max wait time has been reached
            for tenantID, batch := range batches {if batch.age() < c.cfg.BatchWait {continue}

                c.sendBatch(tenantID, batch)
                delete(batches, tenantID)
            }
        }
    }
}

client 的 run 办法创立 time.NewTicker(maxWaitCheckFrequency),而后 for 循环,如果是 c.entries 读取到了数据就执行 batch.add(e),如果是 maxWaitCheck 触发了则遍历 batches,执行 c.sendBatch(tenantID, batch) 及 delete;最初 quit 的时候,还有 defer 办法遍历 batches 执行 c.sendBatch(tenantID, batch)

sendBatch

loki/pkg/promtail/client/client.go

func (c *client) sendBatch(tenantID string, batch *batch) {buf, entriesCount, err := batch.encode()
    if err != nil {level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
        return
    }
    bufBytes := float64(len(buf))
    encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

    ctx := context.Background()
    backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
    var status int
    for backoff.Ongoing() {start := time.Now()
        status, err = c.send(ctx, tenantID, buf)
        requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

        if err == nil {sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
            sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
            for _, s := range batch.streams {lbls, err := parser.ParseMetric(s.Labels)
                if err != nil {
                    // is this possible?
                    level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
                    return
                }
                var lblSet model.LabelSet
                for i := range lbls {if lbls[i].Name == LatencyLabel {
                        lblSet = model.LabelSet{model.LabelName(HostLabel):    model.LabelValue(c.cfg.URL.Host),
                            model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
                        }
                    }
                }
                if lblSet != nil {streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
                }
            }
            return
        }

        // Only retry 429s, 500s and connection-level errors.
        if status > 0 && status != 429 && status/100 != 5 {break}

        level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
        batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
        backoff.Wait()}

    if err != nil {level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
        droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
        droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
    }
}

sendBatch 办法先通过 batch.encode() 编码为 buf,而后通过 c.send(ctx, tenantID, buf) 进行发送

send

loki/pkg/promtail/client/client.go

func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
    defer cancel()
    req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
    if err != nil {return -1, err}
    req = req.WithContext(ctx)
    req.Header.Set("Content-Type", contentType)
    req.Header.Set("User-Agent", UserAgent)

    // If the tenant ID is not empty promtail is running in multi-tenant mode, so
    // we should send it to Loki
    if tenantID != "" {req.Header.Set("X-Scope-OrgID", tenantID)
    }

    resp, err := c.client.Do(req)
    if err != nil {return -1, err}
    defer helpers.LogError("closing response body", resp.Body.Close)

    if resp.StatusCode/100 != 2 {scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
        line := ""
        if scanner.Scan() {line = scanner.Text()
        }
        err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
    }
    return resp.StatusCode, err
}

send 办法执行一个 POST 的 http 申请发送到 c.cfg.URL.String()

小结

promtail 的 client 定义了 logger、cfg、client、quit、once、entries、wg、externalLabels 属性;它实现了 Client 接口的 Handle、Stop 办法;Handle 办法结构 entry 发送到 c.entries 这个 channel;Stop 办法执行 close(c.quit);而后它还有一个 run 办法将 entry 增加到 batch,而后将 batch 通过 http 的 POST 申请发送到指定的地址。

doc

  • promtail
退出移动版