本文次要钻研一下cortex的Distributor

Distributor

cortex/pkg/distributor/distributor.go

// Distributor is a storage.SampleAppender and a client.Querier which// forwards appends and queries to individual ingesters.type Distributor struct {    services.Service    cfg           Config    ingestersRing ring.ReadRing    ingesterPool  *ring_client.Pool    limits        *validation.Overrides    // The global rate limiter requires a distributors ring to count    // the number of healthy instances    distributorsRing *ring.Lifecycler    // For handling HA replicas.    HATracker *haTracker    // Per-user rate limiter.    ingestionRateLimiter *limiter.RateLimiter    // Manager for subservices (HA Tracker, distributor ring and client pool)    subservices        *services.Manager    subservicesWatcher *services.FailureWatcher}
Distributor用于转发、追加、查问ingesters

Push

cortex/pkg/distributor/distributor.go

// Push implements client.IngesterServerfunc (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {    userID, err := tenant.TenantID(ctx)    if err != nil {        return nil, err    }    source := util.GetSourceIPsFromOutgoingCtx(ctx)    var firstPartialErr error    removeReplica := false    numSamples := 0    for _, ts := range req.Timeseries {        numSamples += len(ts.Samples)    }    // Count the total samples in, prior to validation or deduplication, for comparison with other metrics.    incomingSamples.WithLabelValues(userID).Add(float64(numSamples))    // Count the total number of metadata in.    incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))    // A WriteRequest can only contain series or metadata but not both. This might change in the future.    // For each timeseries or samples, we compute a hash to distribute across ingesters;    // check each sample/metadata and discard if outside limits.    validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))    validatedMetadata := make([]*client.MetricMetadata, 0, len(req.Metadata))    metadataKeys := make([]uint32, 0, len(req.Metadata))    seriesKeys := make([]uint32, 0, len(req.Timeseries))    validatedSamples := 0    if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {        cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)        removeReplica, err = d.checkSample(ctx, userID, cluster, replica)        if err != nil {            // Ensure the request slice is reused if the series get deduped.            client.ReuseSlice(req.Timeseries)            if errors.Is(err, replicasNotMatchError{}) {                // These samples have been deduped.                dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))                return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())            }            if errors.Is(err, tooManyClustersError{}) {                validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))                return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())            }            return nil, err        }        // If there wasn't an error but removeReplica is false that means we didn't find both HA labels.        if !removeReplica {            nonHASamples.WithLabelValues(userID).Add(float64(numSamples))        }    }    latestSampleTimestampMs := int64(0)    defer func() {        // Update this metric even in case of errors.        if latestSampleTimestampMs > 0 {            latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000)        }    }()    // For each timeseries, compute a hash to distribute across ingesters;    // check each sample and discard if outside limits.    for _, ts := range req.Timeseries {        // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.        if len(ts.Samples) > 0 {            latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)        }        if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {            l := relabel.Process(client.FromLabelAdaptersToLabels(ts.Labels), mrc...)            ts.Labels = client.FromLabelsToLabelAdapters(l)        }        // If we found both the cluster and replica labels, we only want to include the cluster label when        // storing series in Cortex. If we kept the replica label we would end up with another series for the same        // series we're trying to dedupe when HA tracking moves over to a different replica.        if removeReplica {            removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)        }        for _, labelName := range d.limits.DropLabels(userID) {            removeLabel(labelName, &ts.Labels)        }        if len(ts.Labels) == 0 {            continue        }        // We rely on sorted labels in different places:        // 1) When computing token for labels, and sharding by all labels. Here different order of labels returns        // different tokens, which is bad.        // 2) In validation code, when checking for duplicate label names. As duplicate label names are rejected        // later in the validation phase, we ignore them here.        sortLabelsIfNeeded(ts.Labels)        // Generate the sharding token based on the series labels without the HA replica        // label and dropped labels (if any)        key, err := d.tokenForLabels(userID, ts.Labels)        if err != nil {            return nil, err        }        validatedSeries, err := d.validateSeries(ts, userID)        // Errors in validation are considered non-fatal, as one series in a request may contain        // invalid data but all the remaining series could be perfectly valid.        if err != nil && firstPartialErr == nil {            firstPartialErr = err        }        // validateSeries would have returned an emptyPreallocSeries if there were no valid samples.        if validatedSeries == emptyPreallocSeries {            continue        }        seriesKeys = append(seriesKeys, key)        validatedTimeseries = append(validatedTimeseries, validatedSeries)        validatedSamples += len(ts.Samples)    }    for _, m := range req.Metadata {        err := validation.ValidateMetadata(d.limits, userID, m)        if err != nil {            if firstPartialErr == nil {                firstPartialErr = err            }            continue        }        metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName))        validatedMetadata = append(validatedMetadata, m)    }    receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))    receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))    if len(seriesKeys) == 0 && len(metadataKeys) == 0 {        // Ensure the request slice is reused if there's no series or metadata passing the validation.        client.ReuseSlice(req.Timeseries)        return &client.WriteResponse{}, firstPartialErr    }    now := time.Now()    totalN := validatedSamples + len(validatedMetadata)    if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {        // Ensure the request slice is reused if the request is rate limited.        client.ReuseSlice(req.Timeseries)        // Return a 4xx here to have the client discard the data and not retry. If a client        // is sending too much data consistently we will unlikely ever catch up otherwise.        validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))        validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))        return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))    }    subRing := d.ingestersRing    // Obtain a subring if required.    if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {        subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))    }    keys := append(seriesKeys, metadataKeys...)    initialMetadataIndex := len(seriesKeys)    op := ring.WriteNoExtend    if d.cfg.ExtendWrites {        op = ring.Write    }    err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {        timeseries := make([]client.PreallocTimeseries, 0, len(indexes))        var metadata []*client.MetricMetadata        for _, i := range indexes {            if i >= initialMetadataIndex {                metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])            } else {                timeseries = append(timeseries, validatedTimeseries[i])            }        }        // Use a background context to make sure all ingesters get samples even if we return early        localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)        defer cancel()        localCtx = user.InjectOrgID(localCtx, userID)        if sp := opentracing.SpanFromContext(ctx); sp != nil {            localCtx = opentracing.ContextWithSpan(localCtx, sp)        }        // Get clientIP(s) from Context and add it to localCtx        localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)        return d.send(localCtx, ingester, timeseries, metadata, req.Source)    }, func() { client.ReuseSlice(req.Timeseries) })    if err != nil {        return nil, err    }    return &client.WriteResponse{}, firstPartialErr}
Push办法在d.cfg.ShardingStrategy为util.ShardingStrategyShuffle时,会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys,其callback函数执行d.send(localCtx, ingester, timeseries, metadata, req.Source)

DoBatch

cortex/pkg/ring/batch.go

// DoBatch request against a set of keys in the ring, handling replication and// failures. For example if we want to write N items where they may all// hit different ingesters, and we want them all replicated R ways with// quorum writes, we track the relationship between batch RPCs and the items// within them.//// Callback is passed the ingester to target, and the indexes of the keys// to send to that ingester.//// Not implemented as a method on Ring so we can test separately.func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {    if r.IngesterCount() <= 0 {        return fmt.Errorf("DoBatch: IngesterCount <= 0")    }    expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount()    itemTrackers := make([]itemTracker, len(keys))    ingesters := make(map[string]ingester, r.IngesterCount())    var (        bufDescs [GetBufferSize]IngesterDesc        bufHosts [GetBufferSize]string        bufZones [GetBufferSize]string    )    for i, key := range keys {        replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])        if err != nil {            return err        }        itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors        itemTrackers[i].maxFailures = replicationSet.MaxErrors        for _, desc := range replicationSet.Ingesters {            curr, found := ingesters[desc.Addr]            if !found {                curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers)                curr.indexes = make([]int, 0, expectedTrackers)            }            ingesters[desc.Addr] = ingester{                desc:         desc,                itemTrackers: append(curr.itemTrackers, &itemTrackers[i]),                indexes:      append(curr.indexes, i),            }        }    }    tracker := batchTracker{        done: make(chan struct{}, 1),        err:  make(chan error, 1),    }    tracker.rpcsPending.Store(int32(len(itemTrackers)))    var wg sync.WaitGroup    wg.Add(len(ingesters))    for _, i := range ingesters {        go func(i ingester) {            err := callback(i.desc, i.indexes)            tracker.record(i.itemTrackers, err)            wg.Done()        }(i)    }    // Perform cleanup at the end.    go func() {        wg.Wait()        cleanup()    }()    select {    case err := <-tracker.err:        return err    case <-tracker.done:        return nil    case <-ctx.Done():        return ctx.Err()    }}
DoBatch办法提供了callback函数用于解决ingester及indexes

Query

cortex/pkg/distributor/query.go

// Query multiple ingesters and returns a Matrix of samples.func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {    var matrix model.Matrix    err := instrument.CollectedRequest(ctx, "Distributor.Query", queryDuration, instrument.ErrorCode, func(ctx context.Context) error {        req, err := ingester_client.ToQueryRequest(from, to, matchers)        if err != nil {            return err        }        replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)        if err != nil {            return err        }        matrix, err = d.queryIngesters(ctx, replicationSet, req)        if err != nil {            return err        }        if s := opentracing.SpanFromContext(ctx); s != nil {            s.LogKV("series", len(matrix))        }        return nil    })    return matrix, err}
Query办法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix

小结

cortex的Distributor提供了Push、Query办法;Push办法会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys;Query办法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix。

doc

  • cortex