整体流程
- remoteConfigs反对配置多个remoteStorage,每个remoteStorage应用1个QueueManager;
- wathcer将sample发送给QueueManager;
- 1个QueueManager中治理多个shard,每个shard的容量为capactiy;
- 每个shard会定时(batch_send_deadline)定量(max_samples_per_send)的向remote endpoint发送数据;
代码入口
入口:storage/remote/write.go
次要工作是初始化QueueManager,而后调用start()让其干活。
// 依据配置初始化QueueManager,而后让QueueManager干活func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { ..... newQueues := make(map[string]*QueueManager) newHashes := []string{} for _, rwConf := range conf.RemoteWriteConfigs { hash, err := toHash(rwConf) if err != nil { return err } // Don't allow duplicate remote write configs. if _, ok := newQueues[hash]; ok { return fmt.Errorf("duplicate remote write configs are not allowed, found duplicate for URL: %s", rwConf.URL) } // Set the queue name to the config hash if the user has not set // a name in their remote write config so we can still differentiate // between queues that have the same remote write endpoint. name := hash[:6] if rwConf.Name != "" { name = rwConf.Name } c, err := NewWriteClient(name, &ClientConfig{ URL: rwConf.URL, Timeout: rwConf.RemoteTimeout, HTTPClientConfig: rwConf.HTTPClientConfig, }) if err != nil { return err } queue, ok := rws.queues[hash] ...... endpoint := rwConf.URL.String() newQueues[hash] = NewQueueManager( ##初始化QueueManager newQueueManagerMetrics(rws.reg, name, endpoint), rws.watcherMetrics, rws.liveReaderMetrics, rws.logger, rws.walDir, ##这里是/prometheus,在walwatcher中会被初始化为/prometheus/wal rws.samplesIn, rwConf.QueueConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c, rws.flushDeadline, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) } // Anything remaining in rws.queues is a queue who's config has // changed or was removed from the overall remote write config. for _, q := range rws.queues { q.Stop() } for _, hash := range newHashes { ##QueueManager干活 newQueues[hash].Start() } rws.queues = newQueues return nil}
具体看一下QueueManager做的事件:
- shards.Start():为每个shard启动1个Goroutine干活;
- watcher.Start(): 监听watcher的变动,将wal新增数据写入shards;
- updateShardsLoop(): 定期依据sample in / sample out计算新的shard;
- reshardLoop(): 更新shard;
func (t *QueueManager) Start() { // 注册prometheus的监控参数 t.metrics.register() t.metrics.shardCapacity.Set(float64(t.cfg.Capacity)) t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards)) t.metrics.minNumShards.Set(float64(t.cfg.MinShards)) t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards)) t.shards.start(t.numShards) ##shard默认=cfg.MinShard,也就是1;这外面会对每个shard进行初始化、赋值、发送 t.watcher.Start() ##walwatcher监控变动 t.wg.Add(2) go t.updateShardsLoop() ##定期依据sample in / sample out计算新的shard go t.reshardLoop() ##更新shard为新值}
shards.Start()解析
为每个shard启动1个Goroutine,让shard干活;
//每个shards有一个queue,蕴含N个shard//每个shard中有capacity大小(默认cfg.Capacity=500)// start the shards; must be called before any call to enqueue.func (s *shards) start(n int) { s.mtx.Lock() defer s.mtx.Unlock() s.qm.metrics.pendingSamples.Set(0) s.qm.metrics.numShards.Set(float64(n)) newQueues := make([]chan sample, n) //N个shard,初始只有1个 for i := 0; i < n; i++ { newQueues[i] = make(chan sample, s.qm.cfg.Capacity) //每个shard最大有capacity个元素,初始=500 } s.queues = newQueues var hardShutdownCtx context.Context hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background()) s.softShutdown = make(chan struct{}) s.running = int32(n) s.done = make(chan struct{}) atomic.StoreUint32(&s.droppedOnHardShutdown, 0) for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) ##这里进行理论的发送 }}
shard Goroutine干了发送的活:
- queueManager中有一个samples数组,接管发送给queue的数据;
- runShard()接管watcher发送的数据,保留到samples数组中;
发送给remote的机会:
- 定时:定时器事件到,cfg.BatchSendDeadLine(默认=5s);
- 定量:samples数组大小达到cfg.MaxSamplesPerSend(默认=100);
func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { defer func() { if atomic.AddInt32(&s.running, -1) == 0 { close(s.done) } }() shardNum := strconv.Itoa(shardID) // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline // anyways. var ( max = s.qm.cfg.MaxSamplesPerSend nPending = 0 pendingSamples = allocateTimeSeries(max) buf []byte ) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { if !timer.Stop() { select { case <-timer.C: default: } } } defer stop() for { select { case <-ctx.Done(): // In this case we drop all samples in the buffer and the queue. // Remove them from pending and mark them as failed. droppedSamples := nPending + len(queue) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples)) return //接收数据,保留到pendingSamples case sample, ok := <-queue: if !ok { if nPending > 0 { level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", nPending) s.sendSamples(ctx, pendingSamples[:nPending], &buf) s.qm.metrics.pendingSamples.Sub(float64(nPending)) level.Debug(s.qm.logger).Log("msg", "Done flushing.") } return } // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. pendingSamples[nPending].Labels = labelsToLabelsProto(sample.labels, pendingSamples[nPending].Labels) pendingSamples[nPending].Samples[0].Timestamp = sample.t pendingSamples[nPending].Samples[0].Value = sample.v nPending++ //达到cfg.MaxSamplesPerSend,则发送 if nPending >= max { s.sendSamples(ctx, pendingSamples, &buf) nPending = 0 s.qm.metrics.pendingSamples.Sub(float64(max)) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) } //定时器事件到:cfg.BatchSendDeadLine case <-timer.C: if nPending > 0 { level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", nPending, "shard", shardNum) s.sendSamples(ctx, pendingSamples[:nPending], &buf) s.qm.metrics.pendingSamples.Sub(float64(nPending)) nPending = 0 } timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) } }}
接管sampels数据
watcher监控指标的变动,调用QueueManager.Append()写入samples;
QueueManager.Append():
- 调用shards.enqueue将sample入队;
- 入队过程中应用2倍回退算法:入队失败,2倍工夫回退,直到最大回退值;
// Append queues a sample to be sent to the remote storage. Blocks until all samples are// enqueued on their shards or a shutdown signal is received.func (t *QueueManager) Append(samples []record.RefSample) bool {outer: for _, s := range samples { t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { t.metrics.droppedSamplesTotal.Inc() t.samplesDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) } t.seriesMtx.Unlock() continue } t.seriesMtx.Unlock() // This will only loop if the queues are being resharded. backoff := t.cfg.MinBackoff for { select { case <-t.quit: return false default: } if t.shards.enqueue(s.Ref, sample{ labels: lbls, t: s.T, v: s.V, }) { continue outer } t.metrics.enqueueRetriesTotal.Inc() time.Sleep(time.Duration(backoff)) backoff = backoff * 2 if backoff > t.cfg.MaxBackoff { backoff = t.cfg.MaxBackoff } } } return true}
shards入队的流程:
- sample的ref % shards:入队哪个shard;
- 入队用channel,间接<- sample;
// enqueue a sample. If we are currently in the process of shutting down or resharding,// will return false; in this case, you should back off and retry.func (s *shards) enqueue(ref uint64, sample sample) bool { s.mtx.RLock() defer s.mtx.RUnlock() select { case <-s.softShutdown: return false default: } shard := uint64(ref) % uint64(len(s.queues)) select { case <-s.softShutdown: return false case s.queues[shard] <- sample: s.qm.metrics.pendingSamples.Inc() return true }}
发送时min_backoff与max_backoff
发送在s.sendSamples实现,sendsamples调用sendsamplesWithBackoff:
- 若发送失败,进行backoff,初始backoff=minBackoff=30ms;
- 若持续发送失败,进行2倍backoff,直到maxBackoff=100ms;
- backoff的形式:time.Sleep(backoff);
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) { begin := time.Now() err := s.sendSamplesWithBackoff(ctx, samples, buf) //具体干活 if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) s.qm.metrics.failedSamplesTotal.Add(float64(len(samples))) } // These counters are used to calculate the dynamic sharding, and as such // should be maintained irrespective of success or failure. s.qm.samplesOut.incr(int64(len(samples))) s.qm.samplesOutDuration.incr(int64(time.Since(begin))) atomic.StoreInt64(&s.qm.lastSendTimestamp, time.Now().Unix())}
具体发送动作:s.sendSamplesWithBackOff():
// sendSamples to the remote storage with backoff for recoverable errors.func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error { req, highest, err := buildWriteRequest(samples, *buf) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. return err } backoff := s.qm.cfg.MinBackoff reqSize := len(*buf) sampleCount := len(samples) *buf = req try := 0 // An anonymous function allows us to defer the completion of our per-try spans // without causing a memory leak, and it has the nice effect of not propagating any // parameters for sendSamplesWithBackoff/3. attemptStore := func() error { span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Send Batch") defer span.Finish() span.SetTag("samples", sampleCount) span.SetTag("request_size", reqSize) span.SetTag("try", try) span.SetTag("remote_name", s.qm.storeClient.Name()) span.SetTag("remote_url", s.qm.storeClient.Endpoint()) begin := time.Now() err := s.qm.client().Store(ctx, *buf) ##HTTP将数据发送进来 s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err != nil { span.LogKV("error", err) ext.Error.Set(span, true) return err } return nil } for { select { case <-ctx.Done(): return ctx.Err() default: } err = attemptStore() //若发送失败 if err != nil { // If the error is unrecoverable, we should not retry. if _, ok := err.(recoverableError); !ok { return err } // If we make it this far, we've encountered a recoverable error and will retry. s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err) time.Sleep(time.Duration(backoff)) //通过sleep进行backoff backoff = backoff * 2 //进行2倍回退 if backoff > s.qm.cfg.MaxBackoff { //backoff最大=cfg.MaxBackoff backoff = s.qm.cfg.MaxBackoff } try++ continue } // Since we retry forever on recoverable errors, this needs to stay inside the loop. s.qm.metrics.succeededSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.bytesSent.Add(float64(reqSize)) s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) return nil }}