乐趣区

关于prometheus:prometheus-remotewrite解析二-源码解读

整体流程

  • remoteConfigs 反对配置多个 remoteStorage,每个 remoteStorage 应用 1 个 QueueManager;
  • wathcer 将 sample 发送给 QueueManager;
  • 1 个 QueueManager 中治理多个 shard,每个 shard 的容量为 capactiy;
  • 每个 shard 会 定时 (batch_send_deadline) 定量 (max_samples_per_send) 的向 remote endpoint 发送数据;

代码入口

入口:storage/remote/write.go
次要工作是初始化 QueueManager,而后调用 start()让其干活。

// 依据配置初始化 QueueManager,而后让 QueueManager 干活
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
    .....
    newQueues := make(map[string]*QueueManager)
    newHashes := []string{}
    for _, rwConf := range conf.RemoteWriteConfigs {hash, err := toHash(rwConf)
        if err != nil {return err}
        // Don't allow duplicate remote write configs.
        if _, ok := newQueues[hash]; ok {return fmt.Errorf("duplicate remote write configs are not allowed, found duplicate for URL: %s", rwConf.URL)
        }
        // Set the queue name to the config hash if the user has not set
        // a name in their remote write config so we can still differentiate
        // between queues that have the same remote write endpoint.
        name := hash[:6]
        if rwConf.Name != "" {name = rwConf.Name}
        c, err := NewWriteClient(name, &ClientConfig{
            URL:              rwConf.URL,
            Timeout:          rwConf.RemoteTimeout,
            HTTPClientConfig: rwConf.HTTPClientConfig,
        })
        if err != nil {return err}
        queue, ok := rws.queues[hash]
        ......
        endpoint := rwConf.URL.String()
        newQueues[hash] = NewQueueManager(                        ## 初始化 QueueManager
            newQueueManagerMetrics(rws.reg, name, endpoint),
            rws.watcherMetrics,
            rws.liveReaderMetrics,
            rws.logger,
            rws.walDir,            ## 这里是 /prometheus,在 walwatcher 中会被初始化为 /prometheus/wal
            rws.samplesIn,
            rwConf.QueueConfig,
            conf.GlobalConfig.ExternalLabels,
            rwConf.WriteRelabelConfigs,
            c,
            rws.flushDeadline,
        )
        // Keep track of which queues are new so we know which to start.
        newHashes = append(newHashes, hash)
    }
    // Anything remaining in rws.queues is a queue who's config has
    // changed or was removed from the overall remote write config.
    for _, q := range rws.queues {q.Stop()
    }
    for _, hash := range newHashes {        ##QueueManager 干活
        newQueues[hash].Start()}
    rws.queues = newQueues
    return nil
}

具体看一下 QueueManager 做的事件:

  • shards.Start():为每个 shard 启动 1 个 Goroutine 干活;
  • watcher.Start(): 监听 watcher 的变动,将 wal 新增数据写入 shards;
  • updateShardsLoop(): 定期依据 sample in / sample out 计算新的 shard;
  • reshardLoop(): 更新 shard;
func (t *QueueManager) Start() {
    // 注册 prometheus 的监控参数
    t.metrics.register()
    t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
    t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards))
    t.metrics.minNumShards.Set(float64(t.cfg.MinShards))
    t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards))

    t.shards.start(t.numShards)    ##shard 默认 =cfg.MinShard,也就是 1;这外面会对每个 shard 进行初始化、赋值、发送
    t.watcher.Start()    ##walwatcher 监控变动

    t.wg.Add(2)
    go t.updateShardsLoop()    ## 定期依据 sample in / sample out 计算新的 shard
    go t.reshardLoop()        ## 更新 shard 为新值}

shards.Start()解析

为每个 shard 启动 1 个 Goroutine,让 shard 干活;

// 每个 shards 有一个 queue,蕴含 N 个 shard
// 每个 shard 中有 capacity 大小(默认 cfg.Capacity=500)
// start the shards; must be called before any call to enqueue.
func (s *shards) start(n int) {s.mtx.Lock()
    defer s.mtx.Unlock()

    s.qm.metrics.pendingSamples.Set(0)
    s.qm.metrics.numShards.Set(float64(n))

    newQueues := make([]chan sample, n)    // N 个 shard, 初始只有 1 个
    for i := 0; i < n; i++ {newQueues[i] = make(chan sample, s.qm.cfg.Capacity)    // 每个 shard 最大有 capacity 个元素,初始 =500
    }

    s.queues = newQueues

    var hardShutdownCtx context.Context
    hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
    s.softShutdown = make(chan struct{})
    s.running = int32(n)
    s.done = make(chan struct{})
    atomic.StoreUint32(&s.droppedOnHardShutdown, 0)
    for i := 0; i < n; i++ {go s.runShard(hardShutdownCtx, i, newQueues[i])    ## 这里进行理论的发送
    }
}

shard Goroutine 干了发送的活:

  • queueManager 中有一个 samples 数组,接管发送给 queue 的数据;
  • runShard()接管 watcher 发送的数据,保留到 samples 数组中;
  • 发送给 remote 的机会:

    • 定时:定时器事件到,cfg.BatchSendDeadLine(默认 =5s);
    • 定量:samples 数组大小达到 cfg.MaxSamplesPerSend(默认 =100);
func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {defer func() {if atomic.AddInt32(&s.running, -1) == 0 {close(s.done)
        }
    }()

    shardNum := strconv.Itoa(shardID)

    // Send batches of at most MaxSamplesPerSend samples to the remote storage.
    // If we have fewer samples than that, flush them out after a deadline
    // anyways.
    var (
        max            = s.qm.cfg.MaxSamplesPerSend
        nPending       = 0
        pendingSamples = allocateTimeSeries(max)
        buf            []byte)

    timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
    stop := func() {if !timer.Stop() {
            select {
            case <-timer.C:
            default:
            }
        }
    }
    defer stop()

    for {
        select {case <-ctx.Done():
            // In this case we drop all samples in the buffer and the queue.
            // Remove them from pending and mark them as failed.
            droppedSamples := nPending + len(queue)
            s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
            s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
            atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples))
            return
        // 接收数据,保留到 pendingSamples
        case sample, ok := <-queue:
            if !ok {
                if nPending > 0 {level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending)
                    s.sendSamples(ctx, pendingSamples[:nPending], &buf)
                    s.qm.metrics.pendingSamples.Sub(float64(nPending))
                    level.Debug(s.qm.logger).Log("msg", "Done flushing.")
                }
                return
            }
            // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
            // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
            // stop reading from the queue. This makes it safe to reference pendingSamples by index.
            pendingSamples[nPending].Labels = labelsToLabelsProto(sample.labels, pendingSamples[nPending].Labels)
            pendingSamples[nPending].Samples[0].Timestamp = sample.t
            pendingSamples[nPending].Samples[0].Value = sample.v
            nPending++
            // 达到 cfg.MaxSamplesPerSend,则发送
            if nPending >= max {s.sendSamples(ctx, pendingSamples, &buf)
                nPending = 0
                s.qm.metrics.pendingSamples.Sub(float64(max))

                stop()
                timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
            }
        // 定时器事件到:cfg.BatchSendDeadLine
        case <-timer.C:
            if nPending > 0 {level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum)
                s.sendSamples(ctx, pendingSamples[:nPending], &buf)
                s.qm.metrics.pendingSamples.Sub(float64(nPending))
                nPending = 0
            }
            timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
        }
    }
}

接管 sampels 数据

watcher 监控指标的变动,调用 QueueManager.Append()写入 samples;

QueueManager.Append():

  • 调用 shards.enqueue 将 sample 入队;
  • 入队过程中应用 2 倍回退算法:入队失败,2 倍工夫回退,直到最大回退值;
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
outer:
    for _, s := range samples {t.seriesMtx.Lock()
        lbls, ok := t.seriesLabels[s.Ref]
        if !ok {t.metrics.droppedSamplesTotal.Inc()
            t.samplesDropped.incr(1)
            if _, ok := t.droppedSeries[s.Ref]; !ok {level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
            }
            t.seriesMtx.Unlock()
            continue
        }
        t.seriesMtx.Unlock()
        // This will only loop if the queues are being resharded.
        backoff := t.cfg.MinBackoff
        for {
            select {
            case <-t.quit:
                return false
            default:
            }

            if t.shards.enqueue(s.Ref, sample{
                labels: lbls,
                t:      s.T,
                v:      s.V,
            }) {continue outer}

            t.metrics.enqueueRetriesTotal.Inc()
            time.Sleep(time.Duration(backoff))
            backoff = backoff * 2
            if backoff > t.cfg.MaxBackoff {backoff = t.cfg.MaxBackoff}
        }
    }
    return true
}

shards 入队的流程:

  • sample 的 ref % shards:入队哪个 shard;
  • 入队用 channel,间接 <- sample;
// enqueue a sample.  If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
func (s *shards) enqueue(ref uint64, sample sample) bool {s.mtx.RLock()
    defer s.mtx.RUnlock()

    select {
    case <-s.softShutdown:
        return false
    default:
    }

    shard := uint64(ref) % uint64(len(s.queues))
    select {
    case <-s.softShutdown:
        return false
    case s.queues[shard] <- sample:    
        s.qm.metrics.pendingSamples.Inc()
        return true
    }
}

发送时 min_backoff 与 max_backoff

发送在 s.sendSamples 实现,sendsamples 调用 sendsamplesWithBackoff:

  • 若发送失败,进行 backoff,初始 backoff=minBackoff=30ms;
  • 若持续发送失败,进行 2 倍 backoff,直到 maxBackoff=100ms;
  • backoff 的形式:time.Sleep(backoff);
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) {begin := time.Now()
    err := s.sendSamplesWithBackoff(ctx, samples, buf)    // 具体干活
    if err != nil {level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err)
        s.qm.metrics.failedSamplesTotal.Add(float64(len(samples)))
    }
    // These counters are used to calculate the dynamic sharding, and as such
    // should be maintained irrespective of success or failure.
    s.qm.samplesOut.incr(int64(len(samples)))
    s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
    atomic.StoreInt64(&s.qm.lastSendTimestamp, time.Now().Unix())
}

具体发送动作:s.sendSamplesWithBackOff():

// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error {req, highest, err := buildWriteRequest(samples, *buf)
    if err != nil {
        // Failing to build the write request is non-recoverable, since it will
        // only error if marshaling the proto to bytes fails.
        return err
    }
    backoff := s.qm.cfg.MinBackoff
    reqSize := len(*buf)
    sampleCount := len(samples)
    *buf = req
    try := 0
    // An anonymous function allows us to defer the completion of our per-try spans
    // without causing a memory leak, and it has the nice effect of not propagating any
    // parameters for sendSamplesWithBackoff/3.
    attemptStore := func() error {span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Send Batch")
        defer span.Finish()

        span.SetTag("samples", sampleCount)
        span.SetTag("request_size", reqSize)
        span.SetTag("try", try)
        span.SetTag("remote_name", s.qm.storeClient.Name())
        span.SetTag("remote_url", s.qm.storeClient.Endpoint())

        begin := time.Now()
        err := s.qm.client().Store(ctx, *buf)        ##HTTP 将数据发送进来
        s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())

        if err != nil {span.LogKV("error", err)
            ext.Error.Set(span, true)
            return err
        }
        return nil
    }

    for {
        select {case <-ctx.Done():
            return ctx.Err()
        default:
        }
        err = attemptStore()
        // 若发送失败
        if err != nil {
            // If the error is unrecoverable, we should not retry.
            if _, ok := err.(recoverableError); !ok {return err}
            // If we make it this far, we've encountered a recoverable error and will retry.
            s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
            level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err)
            time.Sleep(time.Duration(backoff))    // 通过 sleep 进行 backoff
            backoff = backoff * 2                 // 进行 2 倍回退
            if backoff > s.qm.cfg.MaxBackoff {    //backoff 最大 =cfg.MaxBackoff
                backoff = s.qm.cfg.MaxBackoff
            }
            try++
            continue
        }

        // Since we retry forever on recoverable errors, this needs to stay inside the loop.
        s.qm.metrics.succeededSamplesTotal.Add(float64(sampleCount))
        s.qm.metrics.bytesSent.Add(float64(reqSize))
        s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
        return nil
    }
}
退出移动版