乐趣区

关于golang:聊聊cortex的Ingester

本文次要钻研一下 cortex 的 Ingester

Ingester

cortex/pkg/api/api.go

// Ingester is defined as an interface to allow for alternative implementations
// of ingesters to be passed into the API.RegisterIngester() method.
type Ingester interface {
    client.IngesterServer
    FlushHandler(http.ResponseWriter, *http.Request)
    ShutdownHandler(http.ResponseWriter, *http.Request)
    Push(context.Context, *client.WriteRequest) (*client.WriteResponse, error)
}

Ingester 接口内嵌了 client.IngesterServer,定义了 FlushHandler、ShutdownHandler、Push 办法

client.IngesterServer

cortex/pkg/ingester/client/cortex.pb.go

// IngesterServer is the server API for Ingester service.
type IngesterServer interface {Push(context.Context, *WriteRequest) (*WriteResponse, error)
    Query(context.Context, *QueryRequest) (*QueryResponse, error)
    QueryStream(*QueryRequest, Ingester_QueryStreamServer) error
    LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error)
    LabelNames(context.Context, *LabelNamesRequest) (*LabelNamesResponse, error)
    UserStats(context.Context, *UserStatsRequest) (*UserStatsResponse, error)
    AllUserStats(context.Context, *UserStatsRequest) (*UsersStatsResponse, error)
    MetricsForLabelMatchers(context.Context, *MetricsForLabelMatchersRequest) (*MetricsForLabelMatchersResponse, error)
    MetricsMetadata(context.Context, *MetricsMetadataRequest) (*MetricsMetadataResponse, error)
    // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server).
    TransferChunks(Ingester_TransferChunksServer) error
}

client.IngesterServer 接口定义了 Push、Query、QueryStream、LabelValues、LabelNames、UserStats、AllUserStats、MetricsForLabelMatchers、MetricsMetadata、TransferChunks 办法

Push

cortex/pkg/ingester/ingester.go

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {if err := i.checkRunningOrStopping(); err != nil {return nil, err}

    if i.cfg.BlocksStorageEnabled {return i.v2Push(ctx, req)
    }

    // NOTE: because we use `unsafe` in deserialisation, we must not
    // retain anything from `req` past the call to ReuseSlice
    defer client.ReuseSlice(req.Timeseries)

    userID, err := tenant.TenantID(ctx)
    if err != nil {return nil, fmt.Errorf("no user id")
    }

    // Given metadata is a best-effort approach, and we don't halt on errors
    // process it before samples. Otherwise, we risk returning an error before ingestion.
    i.pushMetadata(ctx, userID, req.GetMetadata())

    var firstPartialErr *validationError
    var record *WALRecord
    if i.cfg.WALConfig.WALEnabled {record = recordPool.Get().(*WALRecord)
        record.UserID = userID
        // Assuming there is not much churn in most cases, there is no use
        // keeping the record.Labels slice hanging around.
        record.Series = nil
        if cap(record.Samples) < len(req.Timeseries) {record.Samples = make([]tsdb_record.RefSample, 0, len(req.Timeseries))
        } else {record.Samples = record.Samples[:0]
        }
    }

    for _, ts := range req.Timeseries {
        seriesSamplesIngested := 0
        for _, s := range ts.Samples {// append() copies the memory in `ts.Labels` except on the error path
            err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
            if err == nil {
                seriesSamplesIngested++
                continue
            }

            i.metrics.ingestedSamplesFail.Inc()
            if ve, ok := err.(*validationError); ok {
                if firstPartialErr == nil {firstPartialErr = ve}
                continue
            }

            // non-validation error: abandon this request
            return nil, grpcForwardableError(userID, http.StatusInternalServerError, err)
        }

        if i.cfg.ActiveSeriesMetricsEnabled && seriesSamplesIngested > 0 {
            // updateActiveSeries will copy labels if necessary.
            i.updateActiveSeries(userID, time.Now(), ts.Labels)
        }
    }

    if record != nil {
        // Log the record only if there was no error in ingestion.
        if err := i.wal.Log(record); err != nil {return nil, err}
        recordPool.Put(record)
    }

    if firstPartialErr != nil {
        // grpcForwardableError turns the error into a string so it no longer references `req`
        return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)
    }

    return &client.WriteResponse{}, nil}

Push 办法首先执行 checkRunningOrStopping,若 i.cfg.BlocksStorageEnabled 则执行 i.v2Push(ctx, req);否则遍历 req.Timeseries 执行 i.append

FlushHandler

cortex/pkg/ingester/flush.go

// FlushHandler triggers a flush of all in memory chunks.  Mainly used for
// local testing.
func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {
    if i.cfg.BlocksStorageEnabled {i.v2FlushHandler(w, r)
        return
    }

    level.Info(util.Logger).Log("msg", "starting to flush all the chunks")
    i.sweepUsers(true)
    level.Info(util.Logger).Log("msg", "chunks queued for flushing")
    w.WriteHeader(http.StatusNoContent)
}

FlushHandler 办法在 i.cfg.BlocksStorageEnabled 为 true 时执行 i.v2FlushHandler(w, r)

ShutdownHandler

cortex/pkg/ingester/ingester.go

// ShutdownHandler triggers the following set of operations in order:
//     * Change the state of ring to stop accepting writes.
//     * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {originalFlush := i.lifecycler.FlushOnShutdown()
    // We want to flush the chunks if transfer fails irrespective of original flag.
    i.lifecycler.SetFlushOnShutdown(true)

    // In the case of an HTTP shutdown, we want to unregister no matter what.
    originalUnregister := i.lifecycler.ShouldUnregisterOnShutdown()
    i.lifecycler.SetUnregisterOnShutdown(true)

    _ = services.StopAndAwaitTerminated(context.Background(), i)
    // Set state back to original.
    i.lifecycler.SetFlushOnShutdown(originalFlush)
    i.lifecycler.SetUnregisterOnShutdown(originalUnregister)

    w.WriteHeader(http.StatusNoContent)
}

ShutdownHandler 办法执行 i.lifecycler.FlushOnShutdown()、i.lifecycler.ShouldUnregisterOnShutdown() 以及 services.StopAndAwaitTerminated

Query

cortex/pkg/ingester/ingester.go

// Query implements service.IngesterServer
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {if err := i.checkRunningOrStopping(); err != nil {return nil, err}

    if i.cfg.BlocksStorageEnabled {return i.v2Query(ctx, req)
    }

    userID, err := tenant.TenantID(ctx)
    if err != nil {return nil, err}

    from, through, matchers, err := client.FromQueryRequest(req)
    if err != nil {return nil, err}

    i.metrics.queries.Inc()

    i.userStatesMtx.RLock()
    state, ok, err := i.userStates.getViaContext(ctx)
    i.userStatesMtx.RUnlock()
    if err != nil {return nil, err} else if !ok {return &client.QueryResponse{}, nil
    }

    result := &client.QueryResponse{}
    numSeries, numSamples := 0, 0
    maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID)
    err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {values, err := series.samplesForRange(from, through)
        if err != nil {return err}
        if len(values) == 0 {return nil}
        numSeries++

        numSamples += len(values)
        if numSamples > maxSamplesPerQuery {return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "exceeded maximum number of samples in a query (%d)", maxSamplesPerQuery)
        }

        ts := client.TimeSeries{Labels:  client.FromLabelsToLabelAdapters(series.metric),
            Samples: make([]client.Sample, 0, len(values)),
        }
        for _, s := range values {
            ts.Samples = append(ts.Samples, client.Sample{Value:       float64(s.Value),
                TimestampMs: int64(s.Timestamp),
            })
        }
        result.Timeseries = append(result.Timeseries, ts)
        return nil
    }, nil, 0)
    i.metrics.queriedSeries.Observe(float64(numSeries))
    i.metrics.queriedSamples.Observe(float64(numSamples))
    return result, err
}

Query 办法先判断 i.checkRunningOrStopping();若 i.cfg.BlocksStorageEnabled 则执行 i.v2Query(ctx, req);否则通过 series.samplesForRange(from, through) 获取数据

小结

cortex 的 Ingester 接口内嵌了 client.IngesterServer,定义了 FlushHandler、ShutdownHandler、Push 办法;Ingester 实现了 Ingester 接口。

doc

  • cortex
退出移动版