client通过GET /query查问influxdb集群中的指标数据:

curl -G 'http://ops1:8086/query?pretty=true' --data-urlencode "db=falcon" --data-urlencode "q=SELECT * FROM \"cpu.user\" order by time desc limit 10"

influxdb集群中的数据分shard在不同的节点上存储,client查问的指标数据,可能不在以后节点上,也可能以后节点和其它节点上都有,所以在查问时,即须要查问以后节点,也须要查问近程节点,而后将数据合并后返回client。

整体流程:

  • node1在8086上接管/query申请,而后依据查问条件,确定指标数据在哪些节点上(node1&node2);
  • 依据查问条件,本机查问指标数据,失去localData;
  • 向远端节点node2发送查问申请,失去remoteData;
  • 将localData和remoteData合并返回client;

HTTP handler

http handler入口:

// services/httpd/handler.gofunc (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) {    ......    // Execute query.    results := h.QueryExecutor.ExecuteQuery(q, opts, closing)    ......}

执行查问,可能有多个查问语句:

// query/executor.gofunc (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result {    results := make(chan *Result)    go e.executeQuery(query, opt, closing, results)    return results}func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) {    ......    for ; i < len(query.Statements); i++ {        .....        err = e.StatementExecutor.ExecuteStatement(stmt, ctx)        .....    }    .....}

本地和远端查问

对于每个statement,其查问过程:

  • 基于statement,创立iterator(含本地节点和远端节点);
  • 基于iterator,创立emitter,迭代emitter.Emit()拿到后果;
// cluster/statement_executor.gofunc (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {    cur, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions)    ....    em := query.NewEmitter(cur, ctx.ChunkSize)    defer em.Close()    for {        row, partial, err := em.Emit()        result := &query.Result{            Series:  []*models.Row{row},            Partial: partial,        }        .......        err := ctx.Send(result)    }}

重点看一下iterator的创立过程:

// cluster/statement_executor.gofunc (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) (query.Cursor, error) {    sopt := query.SelectOptions{        NodeID:      opt.NodeID,        MaxSeriesN:  e.MaxSelectSeriesN,        MaxPointN:   e.MaxSelectPointN,        MaxBucketsN: e.MaxSelectBucketsN,        Authorizer:  opt.Authorizer,    }    // Create a set of iterators from a selection.    cur, err := query.Select(ctx, stmt, e.ShardMapper, sopt)    if err != nil {        return nil, err    }    return cur, nil}

持续走:

// query/select.gofunc Select(ctx context.Context, stmt *influxql.SelectStatement, shardMapper ShardMapper, opt SelectOptions) (Cursor, error) {    s, err := Prepare(stmt, shardMapper, opt)    if err != nil {        return nil, err    }    // Must be deferred so it runs after Select.    defer s.Close()    return s.Select(ctx)}func (p *preparedStatement) Select(ctx context.Context) (Cursor, error) {    .....    cur, err := buildCursor(ctx, p.stmt, p.ic, opt)    ....}

持续走:

// query/select.gofunc buildAuxIterator(ctx context.Context, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions) (Iterator, error) {    ......    if err := func() error {        for _, source := range sources {            switch source := source.(type) {            case *influxql.Measurement:                input, err := ic.CreateIterator(ctx, source, opt)    //这里是要害                if err != nil {                    return err                }                inputs = append(inputs, input)    }(), err != nil {    }}

调用链条比拟深,到这里比拟容易理解了:

  • 先创立LocalShard的iterator;
  • 再创立remoteShard的iterator;
  • 最初将iterator合并返回;
func (c *ClusterShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {    ics := []query.Iterator{}    localIterator, err := c.LocalShardMapping.CreateIterator(ctx, m, opt)    ics = append(ics, localIterator)    ....    for _, sg := range c.RemoteShardGroup[source] {        ri, err := sg.CreateIterator(ctx, m, opt)        .....        ics = append(ics, ri)    }    return query.Iterators(ics).Merge(opt)}