本文次要钻研一下 promtail 的 Client
// Client pushes entries to Loki and can be stopped
type Client interface {
// Stop goroutine sending batch of entries.
Client 接口内嵌了 api.EntryHandler 接口,定义了 Stop 办法
// EntryHandler is something that can "handle" entries.
type EntryHandler interface {Handle(labels model.LabelSet, time time.Time, entry string) error
EntryHandler 接口定义了 Handle 办法
// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
logger log.Logger
cfg Config
client *http.Client
quit chan struct{}
once sync.Once
entries chan entry
wg sync.WaitGroup
externalLabels model.LabelSet
// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {if len(c.externalLabels) > 0 {ls = c.externalLabels.Merge(ls)
// Get the tenant ID in case it has been overridden while processing
// the pipeline stages, then remove the special label
tenantID := c.getTenantID(ls)
if _, ok := ls[ReservedLabelTenantID]; ok {
// Clone the label set to not manipulate the input one
ls = ls.Clone()
delete(ls, ReservedLabelTenantID)
c.entries <- entry{tenantID, ls, logproto.Entry{
Timestamp: t,
Line: s,
return nil
// Stop the client.
func (c *client) Stop() {c.once.Do(func() {close(c.quit) })
client 定义了 logger、cfg、client、quit、once、entries、wg、externalLabels 属性;它实现了 Client 接口的 Handle、Stop 办法;Handle 办法判断 LabelSet 是否蕴含 ReservedLabelTenantID,如果蕴含则会执行 ls.Clone() 及而后移除,之后结构 entry 发送到 c.entries 这个 channel;Stop 办法执行 close(c.quit)
func (c *client) run() {batches := map[string]*batch{}
// Given the client handles multiple batches (1 per tenant) and each batch
// can be created at a different point in time, we look for batches whose
// max wait time has been reached every 10 times per BatchWait, so that the
// maximum delay we have sending batches is 10% of the max waiting time.
// We apply a cap of 10ms to the ticker, to avoid too frequent checks in
// case the BatchWait is very low.
minWaitCheckFrequency := 10 * time.Millisecond
maxWaitCheckFrequency := c.cfg.BatchWait / 10
if maxWaitCheckFrequency < minWaitCheckFrequency {maxWaitCheckFrequency = minWaitCheckFrequency}
maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)
defer func() {
// Send all pending batches
for tenantID, batch := range batches {c.sendBatch(tenantID, batch)
for {
select {
case <-c.quit:
case e := <-c.entries:
batch, ok := batches[e.tenantID]
// If the batch doesn't exist yet, we create a new one with the entry
if !ok {batches[e.tenantID] = newBatch(e)
// If adding the entry to the batch will increase the size over the max
// size allowed, we do send the current batch and then create a new one
if batch.sizeBytesAfter(e) > c.cfg.BatchSize {c.sendBatch(e.tenantID, batch)
batches[e.tenantID] = newBatch(e)
// The max size of the batch isn't reached, so we can add the entry
case <-maxWaitCheck.C:
// Send all batches whose max wait time has been reached
for tenantID, batch := range batches {if batch.age() < c.cfg.BatchWait {continue}
c.sendBatch(tenantID, batch)
delete(batches, tenantID)
client 的 run 办法创立 time.NewTicker(maxWaitCheckFrequency),而后 for 循环,如果是 c.entries 读取到了数据就执行 batch.add(e),如果是 maxWaitCheck 触发了则遍历 batches,执行 c.sendBatch(tenantID, batch) 及 delete;最初 quit 的时候,还有 defer 办法遍历 batches 执行 c.sendBatch(tenantID, batch)
func (c *client) sendBatch(tenantID string, batch *batch) {buf, entriesCount, err := batch.encode()
if err != nil {level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
bufBytes := float64(len(buf))
ctx := context.Background()
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {start := time.Now()
status, err = c.send(ctx, tenantID, buf)
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
if err == nil {sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
for _, s := range batch.streams {lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
// is this possible?
level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
var lblSet model.LabelSet
for i := range lbls {if lbls[i].Name == LatencyLabel {
lblSet = model.LabelSet{model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host),
model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
if lblSet != nil {streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
// Only retry 429s, 500s and connection-level errors.
if status > 0 && status != 429 && status/100 != 5 {break}
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
if err != nil {level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
sendBatch 办法先通过 batch.encode() 编码为 buf,而后通过 c.send(ctx, tenantID, buf) 进行发送
func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
if err != nil {return -1, err}
req = req.WithContext(ctx)
req.Header.Set("Content-Type", contentType)
req.Header.Set("User-Agent", UserAgent)
// If the tenant ID is not empty promtail is running in multi-tenant mode, so
// we should send it to Loki
if tenantID != "" {req.Header.Set("X-Scope-OrgID", tenantID)
resp, err := c.client.Do(req)
if err != nil {return -1, err}
defer helpers.LogError("closing response body", resp.Body.Close)
if resp.StatusCode/100 != 2 {scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {line = scanner.Text()
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
return resp.StatusCode, err
send 办法执行一个 POST 的 http 申请发送到 c.cfg.URL.String()
promtail 的 client 定义了 logger、cfg、client、quit、once、entries、wg、externalLabels 属性;它实现了 Client 接口的 Handle、Stop 办法;Handle 办法结构 entry 发送到 c.entries 这个 channel;Stop 办法执行 close(c.quit);而后它还有一个 run 办法将 entry 增加到 batch,而后将 batch 通过 http 的 POST 申请发送到指定的地址。
- promtail