prometheus初始应用min_shards运行,在运行过程中,利用sampleIn/sampleOut等指标计算新的shards,而后更新shards运行。

remote_write中shard的默认配置:

  • min_shards=1;
  • max_shards=1000;

按此配置,理论运行的shards值:

  • 初始=min_shards=1;
  • 有1个Goroutine依据以后的输入输出状况,定时计算新的desired_shards,而后进行reshard,以此动静调整shard数量;

入口

入口在上文讲到的QueueManager:

// Start the queue manager sending samples to the remote storage.// Does not block.func (t *QueueManager) Start() {   ....    go t.updateShardsLoop()    //计算新的shard    go t.reshardLoop()        //更新shard}

更新shard: reshardLoop()

更新shard很简略,stop老的shard,而后start新的shard;

func (t *QueueManager) reshardLoop() {    defer t.wg.Done()    for {        select {        case numShards := <-t.reshardChan:            // We start the newShards after we have stopped (the therefore completely            // flushed) the oldShards, to guarantee we only every deliver samples in            // order.            t.shards.stop()            t.shards.start(numShards)        case <-t.quit:            return        }    }}

计算新shard: updateShardsLoop()

计算新shard的过程略微简单一些。
外围逻辑在t.calculateDesiredShards():

func (t *QueueManager) updateShardsLoop() {    defer t.wg.Done()    ticker := time.NewTicker(shardUpdateDuration)    defer ticker.Stop()    for {        select {        case <-ticker.C:            desiredShards := t.calculateDesiredShards()    //外围逻辑在这里            if !t.shouldReshard(desiredShards) {                continue            }            // Resharding can take some time, and we want this loop            // to stay close to shardUpdateDuration.            select {            case t.reshardChan <- desiredShards:                level.Info(t.logger).Log("msg", "Remote storage resharding", "from", t.numShards, "to", desiredShards)                t.numShards = desiredShards            default:                level.Info(t.logger).Log("msg", "Currently resharding, skipping.")            }        case <-t.quit:            return        }    }}

1.根据哪些指标计算shards?

  • samplesIn: 输出速率;
  • samplesOut: 输入速率;
  • samplesDropped: 抛弃速率;

这些指标都是ewmaRate类型,应用指标加权均匀计算:

//参考:https://www.cnblogs.com/jiangxinyang/p/9705198.html//越是最近的值,对后果的影响越大v(t) = (vt-1) + *(vt-2) + **(vt-3)+.....(=0.2)

代码实现:

const ewmaWeight          = 0.2const shardUpdateDuration = 10 * time.SecondsamplesDropped:     newEWMARate(ewmaWeight, shardUpdateDuration),samplesOut:         newEWMARate(ewmaWeight, shardUpdateDuration),samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),func newEWMARate(alpha float64, interval time.Duration) *ewmaRate {    return &ewmaRate{        alpha:    alpha,        interval: interval,    }}// 更新速率的办法// tick assumes to be called every r.interval.func (r *ewmaRate) tick() {    newEvents := atomic.SwapInt64(&r.newEvents, 0)    instantRate := float64(newEvents) / r.interval.Seconds()    //最新的速率    r.mutex.Lock()    defer r.mutex.Unlock()    if r.init {        //指数加权均匀        r.lastRate += r.alpha * (instantRate - r.lastRate)       } else if newEvents > 0 {        r.init = true        r.lastRate = instantRate    }}

2.采样何种算法计算shards?
利特尔法令:吞吐量L= W

举例来讲:

  • 如果咱们开1个商店,均匀每分钟进店2个客人();
  • 每个客人从进店到出店消耗4分钟(W);
  • 那么咱们的商店承载量=2*4=8人;

3.算法公式
间接看代码,calculateDesiredShards()计算desiredShard:

func (t *QueueManager) calculateDesiredShards() int {    //更新ewma的值    t.samplesOut.tick()    t.samplesDropped.tick()    t.samplesOutDuration.tick()    // We use the number of incoming samples as a prediction of how much work we    // will need to do next iteration.  We add to this any pending samples    // (received - send) so we can catch up with any backlog. We use the average    // outgoing batch latency to work out how many shards we need.    var (        samplesInRate      = t.samplesIn.rate()        samplesOutRate     = t.samplesOut.rate()        samplesKeptRatio   = samplesOutRate / (t.samplesDropped.rate() + samplesOutRate)        samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second)        samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate        highestSent        = t.metrics.highestSentTimestamp.Get()        highestRecv        = highestTimestamp.Get()        delay              = highestRecv - highestSent        samplesPending     = delay * samplesInRate * samplesKeptRatio    )    if samplesOutRate <= 0 {        return t.numShards    }    // When behind we will try to catch up on a proporation of samples per tick.    // This works similarly to an integral accumulator in that pending samples    // is the result of the error integral.    const integralGain = 0.1 / float64(shardUpdateDuration/time.Second)    var (        timePerSample = samplesOutDuration / samplesOutRate        desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)    )    t.metrics.desiredNumShards.Set(desiredShards)    .....}

能够看到,最终的计算公式:

desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)

其中:

  • timePerSample=samplesOutDuration/samplesOutRate: 每个sample被输入破费的工夫;
  • samplesInRate: 输出速率;
  • samplesKeptRatio=sampleOut/(samplesOut+samplesDrop): 输入的成功率;
  • integralGain*samplesPending: 修改值,将pending的数据计算进去;

整个计算过程遵循利特尔法令:承载量=输出流量*单个流量的消耗工夫,只是两头退出了一些确保精确的成功率、修改值等。