整体流程
- remoteConfigs 反对配置多个 remoteStorage,每个 remoteStorage 应用 1 个 QueueManager;
- wathcer 将 sample 发送给 QueueManager;
- 1 个 QueueManager 中治理多个 shard,每个 shard 的容量为 capactiy;
- 每个 shard 会 定时 (batch_send_deadline) 定量 (max_samples_per_send) 的向 remote endpoint 发送数据;
代码入口
入口:storage/remote/write.go
次要工作是初始化 QueueManager,而后调用 start()让其干活。
// 依据配置初始化 QueueManager,而后让 QueueManager 干活
func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
.....
newQueues := make(map[string]*QueueManager)
newHashes := []string{}
for _, rwConf := range conf.RemoteWriteConfigs {hash, err := toHash(rwConf)
if err != nil {return err}
// Don't allow duplicate remote write configs.
if _, ok := newQueues[hash]; ok {return fmt.Errorf("duplicate remote write configs are not allowed, found duplicate for URL: %s", rwConf.URL)
}
// Set the queue name to the config hash if the user has not set
// a name in their remote write config so we can still differentiate
// between queues that have the same remote write endpoint.
name := hash[:6]
if rwConf.Name != "" {name = rwConf.Name}
c, err := NewWriteClient(name, &ClientConfig{
URL: rwConf.URL,
Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig,
})
if err != nil {return err}
queue, ok := rws.queues[hash]
......
endpoint := rwConf.URL.String()
newQueues[hash] = NewQueueManager( ## 初始化 QueueManager
newQueueManagerMetrics(rws.reg, name, endpoint),
rws.watcherMetrics,
rws.liveReaderMetrics,
rws.logger,
rws.walDir, ## 这里是 /prometheus,在 walwatcher 中会被初始化为 /prometheus/wal
rws.samplesIn,
rwConf.QueueConfig,
conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs,
c,
rws.flushDeadline,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)
}
// Anything remaining in rws.queues is a queue who's config has
// changed or was removed from the overall remote write config.
for _, q := range rws.queues {q.Stop()
}
for _, hash := range newHashes { ##QueueManager 干活
newQueues[hash].Start()}
rws.queues = newQueues
return nil
}
具体看一下 QueueManager 做的事件:
- shards.Start():为每个 shard 启动 1 个 Goroutine 干活;
- watcher.Start(): 监听 watcher 的变动,将 wal 新增数据写入 shards;
- updateShardsLoop(): 定期依据 sample in / sample out 计算新的 shard;
- reshardLoop(): 更新 shard;
func (t *QueueManager) Start() {
// 注册 prometheus 的监控参数
t.metrics.register()
t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards))
t.metrics.minNumShards.Set(float64(t.cfg.MinShards))
t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards))
t.shards.start(t.numShards) ##shard 默认 =cfg.MinShard,也就是 1;这外面会对每个 shard 进行初始化、赋值、发送
t.watcher.Start() ##walwatcher 监控变动
t.wg.Add(2)
go t.updateShardsLoop() ## 定期依据 sample in / sample out 计算新的 shard
go t.reshardLoop() ## 更新 shard 为新值}
shards.Start()解析
为每个 shard 启动 1 个 Goroutine,让 shard 干活;
// 每个 shards 有一个 queue,蕴含 N 个 shard
// 每个 shard 中有 capacity 大小(默认 cfg.Capacity=500)
// start the shards; must be called before any call to enqueue.
func (s *shards) start(n int) {s.mtx.Lock()
defer s.mtx.Unlock()
s.qm.metrics.pendingSamples.Set(0)
s.qm.metrics.numShards.Set(float64(n))
newQueues := make([]chan sample, n) // N 个 shard, 初始只有 1 个
for i := 0; i < n; i++ {newQueues[i] = make(chan sample, s.qm.cfg.Capacity) // 每个 shard 最大有 capacity 个元素,初始 =500
}
s.queues = newQueues
var hardShutdownCtx context.Context
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
s.softShutdown = make(chan struct{})
s.running = int32(n)
s.done = make(chan struct{})
atomic.StoreUint32(&s.droppedOnHardShutdown, 0)
for i := 0; i < n; i++ {go s.runShard(hardShutdownCtx, i, newQueues[i]) ## 这里进行理论的发送
}
}
shard Goroutine 干了发送的活:
- queueManager 中有一个 samples 数组,接管发送给 queue 的数据;
- runShard()接管 watcher 发送的数据,保留到 samples 数组中;
-
发送给 remote 的机会:
- 定时:定时器事件到,cfg.BatchSendDeadLine(默认 =5s);
- 定量:samples 数组大小达到 cfg.MaxSamplesPerSend(默认 =100);
func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {defer func() {if atomic.AddInt32(&s.running, -1) == 0 {close(s.done)
}
}()
shardNum := strconv.Itoa(shardID)
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
var (
max = s.qm.cfg.MaxSamplesPerSend
nPending = 0
pendingSamples = allocateTimeSeries(max)
buf []byte)
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
}
defer stop()
for {
select {case <-ctx.Done():
// In this case we drop all samples in the buffer and the queue.
// Remove them from pending and mark them as failed.
droppedSamples := nPending + len(queue)
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples))
return
// 接收数据,保留到 pendingSamples
case sample, ok := <-queue:
if !ok {
if nPending > 0 {level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending)
s.sendSamples(ctx, pendingSamples[:nPending], &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPending))
level.Debug(s.qm.logger).Log("msg", "Done flushing.")
}
return
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingSamples[nPending].Labels = labelsToLabelsProto(sample.labels, pendingSamples[nPending].Labels)
pendingSamples[nPending].Samples[0].Timestamp = sample.t
pendingSamples[nPending].Samples[0].Value = sample.v
nPending++
// 达到 cfg.MaxSamplesPerSend,则发送
if nPending >= max {s.sendSamples(ctx, pendingSamples, &buf)
nPending = 0
s.qm.metrics.pendingSamples.Sub(float64(max))
stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
// 定时器事件到:cfg.BatchSendDeadLine
case <-timer.C:
if nPending > 0 {level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum)
s.sendSamples(ctx, pendingSamples[:nPending], &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPending))
nPending = 0
}
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
}
}
接管 sampels 数据
watcher 监控指标的变动,调用 QueueManager.Append()写入 samples;
QueueManager.Append():
- 调用 shards.enqueue 将 sample 入队;
- 入队过程中应用 2 倍回退算法:入队失败,2 倍工夫回退,直到最大回退值;
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
outer:
for _, s := range samples {t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref]
if !ok {t.metrics.droppedSamplesTotal.Inc()
t.samplesDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok {level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
}
t.seriesMtx.Unlock()
continue
}
t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded.
backoff := t.cfg.MinBackoff
for {
select {
case <-t.quit:
return false
default:
}
if t.shards.enqueue(s.Ref, sample{
labels: lbls,
t: s.T,
v: s.V,
}) {continue outer}
t.metrics.enqueueRetriesTotal.Inc()
time.Sleep(time.Duration(backoff))
backoff = backoff * 2
if backoff > t.cfg.MaxBackoff {backoff = t.cfg.MaxBackoff}
}
}
return true
}
shards 入队的流程:
- sample 的 ref % shards:入队哪个 shard;
- 入队用 channel,间接 <- sample;
// enqueue a sample. If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
func (s *shards) enqueue(ref uint64, sample sample) bool {s.mtx.RLock()
defer s.mtx.RUnlock()
select {
case <-s.softShutdown:
return false
default:
}
shard := uint64(ref) % uint64(len(s.queues))
select {
case <-s.softShutdown:
return false
case s.queues[shard] <- sample:
s.qm.metrics.pendingSamples.Inc()
return true
}
}
发送时 min_backoff 与 max_backoff
发送在 s.sendSamples 实现,sendsamples 调用 sendsamplesWithBackoff:
- 若发送失败,进行 backoff,初始 backoff=minBackoff=30ms;
- 若持续发送失败,进行 2 倍 backoff,直到 maxBackoff=100ms;
- backoff 的形式:time.Sleep(backoff);
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) {begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, buf) // 具体干活
if err != nil {level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(len(samples)))
}
// These counters are used to calculate the dynamic sharding, and as such
// should be maintained irrespective of success or failure.
s.qm.samplesOut.incr(int64(len(samples)))
s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
atomic.StoreInt64(&s.qm.lastSendTimestamp, time.Now().Unix())
}
具体发送动作:s.sendSamplesWithBackOff():
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error {req, highest, err := buildWriteRequest(samples, *buf)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
backoff := s.qm.cfg.MinBackoff
reqSize := len(*buf)
sampleCount := len(samples)
*buf = req
try := 0
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func() error {span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Send Batch")
defer span.Finish()
span.SetTag("samples", sampleCount)
span.SetTag("request_size", reqSize)
span.SetTag("try", try)
span.SetTag("remote_name", s.qm.storeClient.Name())
span.SetTag("remote_url", s.qm.storeClient.Endpoint())
begin := time.Now()
err := s.qm.client().Store(ctx, *buf) ##HTTP 将数据发送进来
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err != nil {span.LogKV("error", err)
ext.Error.Set(span, true)
return err
}
return nil
}
for {
select {case <-ctx.Done():
return ctx.Err()
default:
}
err = attemptStore()
// 若发送失败
if err != nil {
// If the error is unrecoverable, we should not retry.
if _, ok := err.(recoverableError); !ok {return err}
// If we make it this far, we've encountered a recoverable error and will retry.
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err)
time.Sleep(time.Duration(backoff)) // 通过 sleep 进行 backoff
backoff = backoff * 2 // 进行 2 倍回退
if backoff > s.qm.cfg.MaxBackoff { //backoff 最大 =cfg.MaxBackoff
backoff = s.qm.cfg.MaxBackoff
}
try++
continue
}
// Since we retry forever on recoverable errors, this needs to stay inside the loop.
s.qm.metrics.succeededSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.bytesSent.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return nil
}
}