本文次要钻研一下promtail的Client

Client

loki/pkg/promtail/client/client.go

// Client pushes entries to Loki and can be stoppedtype Client interface {    api.EntryHandler    // Stop goroutine sending batch of entries.    Stop()}
Client接口内嵌了api.EntryHandler接口,定义了Stop办法

EntryHandler

loki/pkg/promtail/api/types.go

// EntryHandler is something that can "handle" entries.type EntryHandler interface {    Handle(labels model.LabelSet, time time.Time, entry string) error}
EntryHandler接口定义了Handle办法

client

loki/pkg/promtail/client/client.go

// Client for pushing logs in snappy-compressed protos over HTTP.type client struct {    logger  log.Logger    cfg     Config    client  *http.Client    quit    chan struct{}    once    sync.Once    entries chan entry    wg      sync.WaitGroup    externalLabels model.LabelSet}// Handle implement EntryHandler; adds a new line to the next batch; send is async.func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {    if len(c.externalLabels) > 0 {        ls = c.externalLabels.Merge(ls)    }    // Get the tenant  ID in case it has been overridden while processing    // the pipeline stages, then remove the special label    tenantID := c.getTenantID(ls)    if _, ok := ls[ReservedLabelTenantID]; ok {        // Clone the label set to not manipulate the input one        ls = ls.Clone()        delete(ls, ReservedLabelTenantID)    }    c.entries <- entry{tenantID, ls, logproto.Entry{        Timestamp: t,        Line:      s,    }}    return nil}// Stop the client.func (c *client) Stop() {    c.once.Do(func() { close(c.quit) })    c.wg.Wait()}
client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop办法;Handle办法判断LabelSet是否蕴含ReservedLabelTenantID,如果蕴含则会执行ls.Clone()及而后移除,之后结构entry发送到c.entries这个channel;Stop办法执行close(c.quit)

run

loki/pkg/promtail/client/client.go

func (c *client) run() {    batches := map[string]*batch{}    // Given the client handles multiple batches (1 per tenant) and each batch    // can be created at a different point in time, we look for batches whose    // max wait time has been reached every 10 times per BatchWait, so that the    // maximum delay we have sending batches is 10% of the max waiting time.    // We apply a cap of 10ms to the ticker, to avoid too frequent checks in    // case the BatchWait is very low.    minWaitCheckFrequency := 10 * time.Millisecond    maxWaitCheckFrequency := c.cfg.BatchWait / 10    if maxWaitCheckFrequency < minWaitCheckFrequency {        maxWaitCheckFrequency = minWaitCheckFrequency    }    maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)    defer func() {        // Send all pending batches        for tenantID, batch := range batches {            c.sendBatch(tenantID, batch)        }        c.wg.Done()    }()    for {        select {        case <-c.quit:            return        case e := <-c.entries:            batch, ok := batches[e.tenantID]            // If the batch doesn't exist yet, we create a new one with the entry            if !ok {                batches[e.tenantID] = newBatch(e)                break            }            // If adding the entry to the batch will increase the size over the max            // size allowed, we do send the current batch and then create a new one            if batch.sizeBytesAfter(e) > c.cfg.BatchSize {                c.sendBatch(e.tenantID, batch)                batches[e.tenantID] = newBatch(e)                break            }            // The max size of the batch isn't reached, so we can add the entry            batch.add(e)        case <-maxWaitCheck.C:            // Send all batches whose max wait time has been reached            for tenantID, batch := range batches {                if batch.age() < c.cfg.BatchWait {                    continue                }                c.sendBatch(tenantID, batch)                delete(batches, tenantID)            }        }    }}
client的run办法创立time.NewTicker(maxWaitCheckFrequency),而后for循环,如果是c.entries读取到了数据就执行batch.add(e),如果是maxWaitCheck触发了则遍历batches,执行c.sendBatch(tenantID, batch)及delete;最初quit的时候,还有defer办法遍历batches执行c.sendBatch(tenantID, batch)

sendBatch

loki/pkg/promtail/client/client.go

func (c *client) sendBatch(tenantID string, batch *batch) {    buf, entriesCount, err := batch.encode()    if err != nil {        level.Error(c.logger).Log("msg", "error encoding batch", "error", err)        return    }    bufBytes := float64(len(buf))    encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)    ctx := context.Background()    backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)    var status int    for backoff.Ongoing() {        start := time.Now()        status, err = c.send(ctx, tenantID, buf)        requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())        if err == nil {            sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)            sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))            for _, s := range batch.streams {                lbls, err := parser.ParseMetric(s.Labels)                if err != nil {                    // is this possible?                    level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)                    return                }                var lblSet model.LabelSet                for i := range lbls {                    if lbls[i].Name == LatencyLabel {                        lblSet = model.LabelSet{                            model.LabelName(HostLabel):    model.LabelValue(c.cfg.URL.Host),                            model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),                        }                    }                }                if lblSet != nil {                    streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())                }            }            return        }        // Only retry 429s, 500s and connection-level errors.        if status > 0 && status != 429 && status/100 != 5 {            break        }        level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)        batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()        backoff.Wait()    }    if err != nil {        level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)        droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)        droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))    }}
sendBatch办法先通过batch.encode()编码为buf,而后通过c.send(ctx, tenantID, buf)进行发送

send

loki/pkg/promtail/client/client.go

func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {    ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)    defer cancel()    req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))    if err != nil {        return -1, err    }    req = req.WithContext(ctx)    req.Header.Set("Content-Type", contentType)    req.Header.Set("User-Agent", UserAgent)    // If the tenant ID is not empty promtail is running in multi-tenant mode, so    // we should send it to Loki    if tenantID != "" {        req.Header.Set("X-Scope-OrgID", tenantID)    }    resp, err := c.client.Do(req)    if err != nil {        return -1, err    }    defer helpers.LogError("closing response body", resp.Body.Close)    if resp.StatusCode/100 != 2 {        scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))        line := ""        if scanner.Scan() {            line = scanner.Text()        }        err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)    }    return resp.StatusCode, err}
send办法执行一个POST的http申请发送到c.cfg.URL.String()

小结

promtail的client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop办法;Handle办法结构entry发送到c.entries这个channel;Stop办法执行close(c.quit);而后它还有一个run办法将entry增加到batch,而后将batch通过http的POST申请发送到指定的地址。

doc

  • promtail