乐趣区

关于golang:聊聊cortex的ReadRing

本文次要钻研一下 cortex 的 ReadRing

ReadRing

cortex/pkg/ring/ring.go

// ReadRing represents the read interface to the ring.
type ReadRing interface {
    prometheus.Collector

    // Get returns n (or more) ingesters which form the replicas for the given key.
    // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
    // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
    Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error)

    // GetAllHealthy returns all healthy instances in the ring, for the given operation.
    // This function doesn't check if the quorum is honored, so doesn't fail if the number
    // of unhealthy ingesters is greater than the tolerated max unavailable.
    GetAllHealthy(op Operation) (ReplicationSet, error)

    // GetReplicationSetForOperation returns all instances where the input operation should be executed.
    // The resulting ReplicationSet doesn't necessarily contains all healthy instances
    // in the ring, but could contain the minimum set of instances required to execute
    // the input operation.
    GetReplicationSetForOperation(op Operation) (ReplicationSet, error)

    ReplicationFactor() int
    IngesterCount() int

    // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
    // and size (number of instances).
    ShuffleShard(identifier string, size int) ReadRing

    // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
    // all instances that have been part of the identifier's shard since"now - lookbackPeriod".
    ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing

    // HasInstance returns whether the ring contains an instance matching the provided instanceID.
    HasInstance(instanceID string) bool
}

ReadRing 内嵌了 prometheus.Collector,定义了 Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance 办法

Get

cortex/pkg/ring/ring.go

// Get returns n (or more) ingesters which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) {r.mtx.RLock()
    defer r.mtx.RUnlock()
    if r.ringDesc == nil || len(r.ringTokens) == 0 {return ReplicationSet{}, ErrEmptyRing
    }

    var (
        n          = r.cfg.ReplicationFactor
        ingesters  = bufDescs[:0]
        start      = searchToken(r.ringTokens, key)
        iterations = 0

        // We use a slice instead of a map because it's faster to search within a
        // slice than lookup a map for a very low number of items.
        distinctHosts = bufHosts[:0]
        distinctZones = bufZones[:0]
    )
    for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
        iterations++
        // Wrap i around in the ring.
        i %= len(r.ringTokens)
        token := r.ringTokens[i]

        info, ok := r.ringInstanceByToken[token]
        if !ok {
            // This should never happen unless a bug in the ring code.
            return ReplicationSet{}, ErrInconsistentTokensInfo}

        // We want n *distinct* ingesters && distinct zones.
        if util.StringsContain(distinctHosts, info.InstanceID) {continue}

        // Ignore if the ingesters don't have a zone set.
        if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {if util.StringsContain(distinctZones, info.Zone) {continue}
            distinctZones = append(distinctZones, info.Zone)
        }

        distinctHosts = append(distinctHosts, info.InstanceID)
        ingester := r.ringDesc.Ingesters[info.InstanceID]

        // Check whether the replica set should be extended given we're including
        // this instance.
        if op.ShouldExtendReplicaSetOnState(ingester.State) {n++}

        ingesters = append(ingesters, ingester)
    }

    liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
    if err != nil {return ReplicationSet{}, err
    }

    return ReplicationSet{
        Ingesters: liveIngesters,
        MaxErrors: maxFailure,
    }, nil
}

Get 办法先通过 r.ringInstanceByToken[token] 获取 info,再通过 r.ringDesc.Ingesters[info.InstanceID] 获取 ingester,之后通过 r.strategy.Filter 过滤出 liveIngesters

GetAllHealthy

cortex/pkg/ring/ring.go

// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {r.mtx.RLock()
    defer r.mtx.RUnlock()

    if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {return ReplicationSet{}, ErrEmptyRing
    }

    now := time.Now()
    ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
    for _, ingester := range r.ringDesc.Ingesters {if r.IsHealthy(&ingester, op, now) {ingesters = append(ingesters, ingester)
        }
    }

    return ReplicationSet{
        Ingesters: ingesters,
        MaxErrors: 0,
    }, nil
}

GetAllHealthy 办法遍历 r.ringDesc.Ingesters,而后通过 r.IsHealthy(&ingester, op, now) 提取 healthy 的 ingester

GetReplicationSetForOperation

cortex/pkg/ring/ring.go

// GetReplicationSetForOperation implements ReadRing.
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {r.mtx.RLock()
    defer r.mtx.RUnlock()

    if r.ringDesc == nil || len(r.ringTokens) == 0 {return ReplicationSet{}, ErrEmptyRing
    }

    // Build the initial replication set, excluding unhealthy instances.
    healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
    zoneFailures := make(map[string]struct{})
    now := time.Now()

    for _, ingester := range r.ringDesc.Ingesters {if r.IsHealthy(&ingester, op, now) {healthyInstances = append(healthyInstances, ingester)
        } else {zoneFailures[ingester.Zone] = struct{}{}
        }
    }

    // Max errors and max unavailable zones are mutually exclusive. We initialise both
    // to 0 and then we update them whether zone-awareness is enabled or not.
    maxErrors := 0
    maxUnavailableZones := 0

    if r.cfg.ZoneAwarenessEnabled {
        // Given data is replicated to RF different zones, we can tolerate a number of
        // RF/2 failing zones. However, we need to protect from the case the ring currently
        // contains instances in a number of zones < RF.
        numReplicatedZones := util.Min(len(r.ringZones), r.cfg.ReplicationFactor)
        minSuccessZones := (numReplicatedZones / 2) + 1
        maxUnavailableZones = minSuccessZones - 1

        if len(zoneFailures) > maxUnavailableZones {return ReplicationSet{}, ErrTooManyFailedIngesters
        }

        if len(zoneFailures) > 0 {// We remove all instances (even healthy ones) from zones with at least
            // 1 failing ingester. Due to how replication works when zone-awareness is
            // enabled (data is replicated to RF different zones), there's no benefit in
            // querying healthy instances from "failing zones". A zone is considered
            // failed if there is single error.
            filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
            for _, ingester := range healthyInstances {if _, ok := zoneFailures[ingester.Zone]; !ok {filteredInstances = append(filteredInstances, ingester)
                }
            }

            healthyInstances = filteredInstances
        }

        // Since we removed all instances from zones containing at least 1 failing
        // instance, we have to decrease the max unavailable zones accordingly.
        maxUnavailableZones -= len(zoneFailures)
    } else {
        // Calculate the number of required ingesters;
        // ensure we always require at least RF-1 when RF=3.
        numRequired := len(r.ringDesc.Ingesters)
        if numRequired < r.cfg.ReplicationFactor {numRequired = r.cfg.ReplicationFactor}
        // We can tolerate this many failures
        numRequired -= r.cfg.ReplicationFactor / 2

        if len(healthyInstances) < numRequired {return ReplicationSet{}, ErrTooManyFailedIngesters
        }

        maxErrors = len(healthyInstances) - numRequired
    }

    return ReplicationSet{
        Ingesters:           healthyInstances,
        MaxErrors:           maxErrors,
        MaxUnavailableZones: maxUnavailableZones,
    }, nil
}

GetReplicationSetForOperation 先提取 healthyInstances,而后再依据 r.cfg.ZoneAwarenessEnabled 进行进一步过滤

ShuffleShard

cortex/pkg/ring/ring.go

func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
    // Nothing to do if the shard size is not smaller then the actual ring.
    if size <= 0 || r.IngesterCount() <= size {return r}

    if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {return cached}

    result := r.shuffleShard(identifier, size, 0, time.Now())

    r.setCachedShuffledSubring(identifier, size, result)
    return result
}

ShuffleShard 办法先从 r.getCachedShuffledSubring 获取,如果为 nil 则执行 r.shuffleShard,再执行 r.setCachedShuffledSubring

HasInstance

cortex/pkg/ring/ring.go

// HasInstance returns whether the ring contains an instance matching the provided instanceID.
func (r *Ring) HasInstance(instanceID string) bool {r.mtx.RLock()
    defer r.mtx.RUnlock()

    instances := r.ringDesc.GetIngesters()
    _, ok := instances[instanceID]
    return ok
}

HasInstance 通过 r.ringDesc.GetIngesters() 获取 instances,在依据 instanceID 判断是否存在

小结

cortex 的 ReadRing 内嵌了 prometheus.Collector,定义了 Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance 办法。

doc

  • cortex
退出移动版