本文次要钻研一下loki的Query

Query

loki/pkg/logql/engine.go

// Query is a LogQL query to be executed.type Query interface {    // Exec processes the query.    Exec(ctx context.Context) (Result, error)}// Result is the result of a query execution.type Result struct {    Data       promql_parser.Value    Statistics stats.Result}
Query接口定义了Exec办法,返回Result;Result定义了Data、Statistics属性

Exec

loki/pkg/logql/engine.go

// Exec Implements `Query`. It handles instrumentation & defers to Eval.func (q *query) Exec(ctx context.Context) (Result, error) {    log, ctx := spanlogger.New(ctx, "query.Exec")    defer log.Finish()    rangeType := GetRangeType(q.params)    timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType)))    defer timer.ObserveDuration()    // records query statistics    var statResult stats.Result    start := time.Now()    ctx = stats.NewContext(ctx)    data, err := q.Eval(ctx)    statResult = stats.Snapshot(ctx, time.Since(start))    statResult.Log(level.Debug(log))    status := "200"    if err != nil {        status = "500"        if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) {            status = "400"        }    }    if q.record {        RecordMetrics(ctx, q.params, status, statResult)    }    return Result{        Data:       data,        Statistics: statResult,    }, err}
Exec办法执行q.Eval(ctx)及stats.Snapshot

Eval

loki/pkg/logql/engine.go

func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {    ctx, cancel := context.WithTimeout(ctx, q.timeout)    defer cancel()    expr, err := q.parse(ctx, q.params.Query())    if err != nil {        return nil, err    }    switch e := expr.(type) {    case SampleExpr:        value, err := q.evalSample(ctx, e)        return value, err    case LogSelectorExpr:        iter, err := q.evaluator.Iterator(ctx, e, q.params)        if err != nil {            return nil, err        }        defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)        streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())        return streams, err    default:        return nil, errors.New("Unexpected type (%T): cannot evaluate")    }}
Eval办法执行q.parse解析为Expr,之后依据Expr的类型做不同解决,如果是SampleExpr类型执行q.evalSample;如果是LogSelectorExpr类型则执行q.evaluator.Iterator

Snapshot

loki/pkg/logql/stats/context.go

func Snapshot(ctx context.Context, execTime time.Duration) Result {    // ingester data is decoded from grpc trailers.    res := decodeTrailers(ctx)    // collect data from store.    s, ok := ctx.Value(storeKey).(*StoreData)    if ok {        res.Store.TotalChunksRef = s.TotalChunksRef        res.Store.TotalChunksDownloaded = s.TotalChunksDownloaded        res.Store.ChunksDownloadTime = s.ChunksDownloadTime.Seconds()    }    // collect data from chunks iteration.    c, ok := ctx.Value(chunksKey).(*ChunkData)    if ok {        res.Store.HeadChunkBytes = c.HeadChunkBytes        res.Store.HeadChunkLines = c.HeadChunkLines        res.Store.DecompressedBytes = c.DecompressedBytes        res.Store.DecompressedLines = c.DecompressedLines        res.Store.CompressedBytes = c.CompressedBytes        res.Store.TotalDuplicates = c.TotalDuplicates    }    existing, err := GetResult(ctx)    if err != nil {        res.ComputeSummary(execTime)        return res    }    existing.Merge(res)    existing.ComputeSummary(execTime)    return *existing}
Snapshot办法从ctx.Value取出StoreData及ChunkData计算res,而后再取出Result,进行Merge及ComputeSummary

小结

loki的Query接口定义了Exec办法,返回Result;Result定义了Data、Statistics属性;query实现了Query接口,其Exec办法执行q.Eval(ctx)及stats.Snapshot。

doc

  • cortex