关于prometheus:prometheus-remotewrite解析三-reshard分析

52次阅读

共计 4040 个字符,预计需要花费 11 分钟才能阅读完成。

prometheus 初始应用 min_shards 运行,在运行过程中,利用 sampleIn/sampleOut 等指标计算新的 shards,而后更新 shards 运行。

remote_write 中 shard 的默认配置:

  • min_shards=1;
  • max_shards=1000;

按此配置,理论运行的 shards 值:

  • 初始 =min_shards=1;
  • 有 1 个 Goroutine 依据以后的输入输出状况,定时计算新的 desired_shards,而后进行 reshard,以此动静调整 shard 数量;

入口

入口在上文讲到的 QueueManager:

// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
   ....
    go t.updateShardsLoop()    // 计算新的 shard
    go t.reshardLoop()        // 更新 shard}

更新 shard: reshardLoop()

更新 shard 很简略,stop 老的 shard,而后 start 新的 shard;

func (t *QueueManager) reshardLoop() {defer t.wg.Done()

    for {
        select {
        case numShards := <-t.reshardChan:
            // We start the newShards after we have stopped (the therefore completely
            // flushed) the oldShards, to guarantee we only every deliver samples in
            // order.
            t.shards.stop()
            t.shards.start(numShards)
        case <-t.quit:
            return
        }
    }
}

计算新 shard: updateShardsLoop()

计算新 shard 的过程略微简单一些。
外围逻辑在 t.calculateDesiredShards():

func (t *QueueManager) updateShardsLoop() {defer t.wg.Done()

    ticker := time.NewTicker(shardUpdateDuration)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            desiredShards := t.calculateDesiredShards()    // 外围逻辑在这里
            if !t.shouldReshard(desiredShards) {continue}
            // Resharding can take some time, and we want this loop
            // to stay close to shardUpdateDuration.
            select {
            case t.reshardChan <- desiredShards:
                level.Info(t.logger).Log("msg", "Remote storage resharding", "from", t.numShards, "to", desiredShards)
                t.numShards = desiredShards
            default:
                level.Info(t.logger).Log("msg", "Currently resharding, skipping.")
            }
        case <-t.quit:
            return
        }
    }
}

1. 根据哪些指标计算 shards?

  • samplesIn: 输出速率;
  • samplesOut: 输入速率;
  • samplesDropped: 抛弃速率;

这些指标都是 ewmaRate 类型,应用指标加权均匀计算:

// 参考:https://www.cnblogs.com/jiangxinyang/p/9705198.html
// 越是最近的值,对后果的影响越大
v(t) = β(vt-1) + β*β(vt-2) + β*β*β(vt-3)+.....(β=0.2)

代码实现:

const ewmaWeight          = 0.2
const shardUpdateDuration = 10 * time.Second
samplesDropped:     newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOut:         newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),

func newEWMARate(alpha float64, interval time.Duration) *ewmaRate {
    return &ewmaRate{
        alpha:    alpha,
        interval: interval,
    }
}

// 更新速率的办法
// tick assumes to be called every r.interval.
func (r *ewmaRate) tick() {newEvents := atomic.SwapInt64(&r.newEvents, 0)
    instantRate := float64(newEvents) / r.interval.Seconds()    // 最新的速率
    r.mutex.Lock()
    defer r.mutex.Unlock()
    if r.init {
        // 指数加权均匀
        r.lastRate += r.alpha * (instantRate - r.lastRate)   
    } else if newEvents > 0 {
        r.init = true
        r.lastRate = instantRate
    }
}

2. 采样何种算法计算 shards?
利特尔法令:吞吐量 L = λW

举例来讲:

  • 如果咱们开 1 个商店,均匀每分钟进店 2 个客人(λ);
  • 每个客人从进店到出店消耗 4 分钟(W);
  • 那么咱们的商店承载量 =2*4= 8 人;

3. 算法公式
间接看代码,calculateDesiredShards()计算 desiredShard:

func (t *QueueManager) calculateDesiredShards() int {
    // 更新 ewma 的值
    t.samplesOut.tick()
    t.samplesDropped.tick()
    t.samplesOutDuration.tick()

    // We use the number of incoming samples as a prediction of how much work we
    // will need to do next iteration.  We add to this any pending samples
    // (received - send) so we can catch up with any backlog. We use the average
    // outgoing batch latency to work out how many shards we need.
    var (samplesInRate      = t.samplesIn.rate()
        samplesOutRate     = t.samplesOut.rate()
        samplesKeptRatio   = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate)
        samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second)
        samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate
        highestSent        = t.metrics.highestSentTimestamp.Get()
        highestRecv        = highestTimestamp.Get()
        delay              = highestRecv - highestSent
        samplesPending     = delay * samplesInRate * samplesKeptRatio
    )

    if samplesOutRate <= 0 {return t.numShards}

    // When behind we will try to catch up on a proporation of samples per tick.
    // This works similarly to an integral accumulator in that pending samples
    // is the result of the error integral.
    const integralGain = 0.1 / float64(shardUpdateDuration/time.Second)

    var (
        timePerSample = samplesOutDuration / samplesOutRate
        desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)
    )
    t.metrics.desiredNumShards.Set(desiredShards)
    .....
}

能够看到,最终的计算公式:

desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)

其中:

  • timePerSample=samplesOutDuration/samplesOutRate: 每个 sample 被输入破费的工夫;
  • samplesInRate: 输出速率;
  • samplesKeptRatio=sampleOut/(samplesOut+samplesDrop): 输入的成功率;
  • integralGain*samplesPending:修改值,将 pending 的数据计算进去;

整个计算过程遵循利特尔法令:承载量 = 输出流量 * 单个流量的消耗工夫,只是两头退出了一些确保精确的成功率、修改值等。

正文完
 0