本文次要钻研一下cortex的ReadRing

ReadRing

cortex/pkg/ring/ring.go

// ReadRing represents the read interface to the ring.type ReadRing interface {    prometheus.Collector    // Get returns n (or more) ingesters which form the replicas for the given key.    // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value    // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().    Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error)    // GetAllHealthy returns all healthy instances in the ring, for the given operation.    // This function doesn't check if the quorum is honored, so doesn't fail if the number    // of unhealthy ingesters is greater than the tolerated max unavailable.    GetAllHealthy(op Operation) (ReplicationSet, error)    // GetReplicationSetForOperation returns all instances where the input operation should be executed.    // The resulting ReplicationSet doesn't necessarily contains all healthy instances    // in the ring, but could contain the minimum set of instances required to execute    // the input operation.    GetReplicationSetForOperation(op Operation) (ReplicationSet, error)    ReplicationFactor() int    IngesterCount() int    // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)    // and size (number of instances).    ShuffleShard(identifier string, size int) ReadRing    // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes    // all instances that have been part of the identifier's shard since "now - lookbackPeriod".    ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing    // HasInstance returns whether the ring contains an instance matching the provided instanceID.    HasInstance(instanceID string) bool}
ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance办法

Get

cortex/pkg/ring/ring.go

// Get returns n (or more) ingesters which form the replicas for the given key.func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) {    r.mtx.RLock()    defer r.mtx.RUnlock()    if r.ringDesc == nil || len(r.ringTokens) == 0 {        return ReplicationSet{}, ErrEmptyRing    }    var (        n          = r.cfg.ReplicationFactor        ingesters  = bufDescs[:0]        start      = searchToken(r.ringTokens, key)        iterations = 0        // We use a slice instead of a map because it's faster to search within a        // slice than lookup a map for a very low number of items.        distinctHosts = bufHosts[:0]        distinctZones = bufZones[:0]    )    for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {        iterations++        // Wrap i around in the ring.        i %= len(r.ringTokens)        token := r.ringTokens[i]        info, ok := r.ringInstanceByToken[token]        if !ok {            // This should never happen unless a bug in the ring code.            return ReplicationSet{}, ErrInconsistentTokensInfo        }        // We want n *distinct* ingesters && distinct zones.        if util.StringsContain(distinctHosts, info.InstanceID) {            continue        }        // Ignore if the ingesters don't have a zone set.        if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {            if util.StringsContain(distinctZones, info.Zone) {                continue            }            distinctZones = append(distinctZones, info.Zone)        }        distinctHosts = append(distinctHosts, info.InstanceID)        ingester := r.ringDesc.Ingesters[info.InstanceID]        // Check whether the replica set should be extended given we're including        // this instance.        if op.ShouldExtendReplicaSetOnState(ingester.State) {            n++        }        ingesters = append(ingesters, ingester)    }    liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)    if err != nil {        return ReplicationSet{}, err    }    return ReplicationSet{        Ingesters: liveIngesters,        MaxErrors: maxFailure,    }, nil}
Get办法先通过r.ringInstanceByToken[token]获取info,再通过r.ringDesc.Ingesters[info.InstanceID]获取ingester,之后通过r.strategy.Filter过滤出liveIngesters

GetAllHealthy

cortex/pkg/ring/ring.go

// GetAllHealthy implements ReadRing.func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {    r.mtx.RLock()    defer r.mtx.RUnlock()    if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {        return ReplicationSet{}, ErrEmptyRing    }    now := time.Now()    ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))    for _, ingester := range r.ringDesc.Ingesters {        if r.IsHealthy(&ingester, op, now) {            ingesters = append(ingesters, ingester)        }    }    return ReplicationSet{        Ingesters: ingesters,        MaxErrors: 0,    }, nil}
GetAllHealthy办法遍历r.ringDesc.Ingesters,而后通过r.IsHealthy(&ingester, op, now)提取healthy的ingester

GetReplicationSetForOperation

cortex/pkg/ring/ring.go

// GetReplicationSetForOperation implements ReadRing.func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {    r.mtx.RLock()    defer r.mtx.RUnlock()    if r.ringDesc == nil || len(r.ringTokens) == 0 {        return ReplicationSet{}, ErrEmptyRing    }    // Build the initial replication set, excluding unhealthy instances.    healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))    zoneFailures := make(map[string]struct{})    now := time.Now()    for _, ingester := range r.ringDesc.Ingesters {        if r.IsHealthy(&ingester, op, now) {            healthyInstances = append(healthyInstances, ingester)        } else {            zoneFailures[ingester.Zone] = struct{}{}        }    }    // Max errors and max unavailable zones are mutually exclusive. We initialise both    // to 0 and then we update them whether zone-awareness is enabled or not.    maxErrors := 0    maxUnavailableZones := 0    if r.cfg.ZoneAwarenessEnabled {        // Given data is replicated to RF different zones, we can tolerate a number of        // RF/2 failing zones. However, we need to protect from the case the ring currently        // contains instances in a number of zones < RF.        numReplicatedZones := util.Min(len(r.ringZones), r.cfg.ReplicationFactor)        minSuccessZones := (numReplicatedZones / 2) + 1        maxUnavailableZones = minSuccessZones - 1        if len(zoneFailures) > maxUnavailableZones {            return ReplicationSet{}, ErrTooManyFailedIngesters        }        if len(zoneFailures) > 0 {            // We remove all instances (even healthy ones) from zones with at least            // 1 failing ingester. Due to how replication works when zone-awareness is            // enabled (data is replicated to RF different zones), there's no benefit in            // querying healthy instances from "failing zones". A zone is considered            // failed if there is single error.            filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))            for _, ingester := range healthyInstances {                if _, ok := zoneFailures[ingester.Zone]; !ok {                    filteredInstances = append(filteredInstances, ingester)                }            }            healthyInstances = filteredInstances        }        // Since we removed all instances from zones containing at least 1 failing        // instance, we have to decrease the max unavailable zones accordingly.        maxUnavailableZones -= len(zoneFailures)    } else {        // Calculate the number of required ingesters;        // ensure we always require at least RF-1 when RF=3.        numRequired := len(r.ringDesc.Ingesters)        if numRequired < r.cfg.ReplicationFactor {            numRequired = r.cfg.ReplicationFactor        }        // We can tolerate this many failures        numRequired -= r.cfg.ReplicationFactor / 2        if len(healthyInstances) < numRequired {            return ReplicationSet{}, ErrTooManyFailedIngesters        }        maxErrors = len(healthyInstances) - numRequired    }    return ReplicationSet{        Ingesters:           healthyInstances,        MaxErrors:           maxErrors,        MaxUnavailableZones: maxUnavailableZones,    }, nil}
GetReplicationSetForOperation先提取healthyInstances,而后再依据r.cfg.ZoneAwarenessEnabled进行进一步过滤

ShuffleShard

cortex/pkg/ring/ring.go

func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {    // Nothing to do if the shard size is not smaller then the actual ring.    if size <= 0 || r.IngesterCount() <= size {        return r    }    if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {        return cached    }    result := r.shuffleShard(identifier, size, 0, time.Now())    r.setCachedShuffledSubring(identifier, size, result)    return result}
ShuffleShard办法先从r.getCachedShuffledSubring获取,如果为nil则执行r.shuffleShard,再执行r.setCachedShuffledSubring

HasInstance

cortex/pkg/ring/ring.go

// HasInstance returns whether the ring contains an instance matching the provided instanceID.func (r *Ring) HasInstance(instanceID string) bool {    r.mtx.RLock()    defer r.mtx.RUnlock()    instances := r.ringDesc.GetIngesters()    _, ok := instances[instanceID]    return ok}
HasInstance通过r.ringDesc.GetIngesters()获取instances,在依据instanceID判断是否存在

小结

cortex的ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance办法。

doc

  • cortex