共计 3260 个字符,预计需要花费 9 分钟才能阅读完成。
client 通过 GET /query 查问 influxdb 集群中的指标数据:
curl -G 'http://ops1:8086/query?pretty=true' --data-urlencode "db=falcon" --data-urlencode "q=SELECT * FROM \"cpu.user\"order by time desc limit 10"
influxdb 集群中的数据分 shard 在不同的节点上存储,client 查问的指标数据,可能不在以后节点上,也可能以后节点和其它节点上都有,所以在查问时,即须要查问以后节点,也须要查问近程节点,而后将数据合并后返回 client。
整体流程:
- node1 在 8086 上接管 /query 申请,而后依据查问条件,确定指标数据在哪些节点上 (node1&node2);
- 依据查问条件,本机查问指标数据,失去 localData;
- 向远端节点 node2 发送查问申请,失去 remoteData;
- 将 localData 和 remoteData 合并返回 client;
HTTP handler
http handler 入口:
// services/httpd/handler.go | |
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { | |
...... | |
// Execute query. | |
results := h.QueryExecutor.ExecuteQuery(q, opts, closing) | |
...... | |
} |
执行查问,可能有多个查问语句:
// query/executor.go | |
func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result {results := make(chan *Result) | |
go e.executeQuery(query, opt, closing, results) | |
return results | |
} | |
func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { | |
...... | |
for ; i < len(query.Statements); i++ { | |
..... | |
err = e.StatementExecutor.ExecuteStatement(stmt, ctx) | |
..... | |
} | |
..... | |
} |
本地和远端查问
对于每个 statement,其查问过程:
- 基于 statement,创立 iterator(含本地节点和远端节点);
- 基于 iterator,创立 emitter,迭代 emitter.Emit() 拿到后果;
// cluster/statement_executor.go | |
func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {cur, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions) | |
.... | |
em := query.NewEmitter(cur, ctx.ChunkSize) | |
defer em.Close() | |
for {row, partial, err := em.Emit() | |
result := &query.Result{Series: []*models.Row{row}, | |
Partial: partial, | |
} | |
....... | |
err := ctx.Send(result) | |
} | |
} |
重点看一下 iterator 的创立过程:
// cluster/statement_executor.go | |
func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) (query.Cursor, error) { | |
sopt := query.SelectOptions{ | |
NodeID: opt.NodeID, | |
MaxSeriesN: e.MaxSelectSeriesN, | |
MaxPointN: e.MaxSelectPointN, | |
MaxBucketsN: e.MaxSelectBucketsN, | |
Authorizer: opt.Authorizer, | |
} | |
// Create a set of iterators from a selection. | |
cur, err := query.Select(ctx, stmt, e.ShardMapper, sopt) | |
if err != nil {return nil, err} | |
return cur, nil | |
} |
持续走:
// query/select.go | |
func Select(ctx context.Context, stmt *influxql.SelectStatement, shardMapper ShardMapper, opt SelectOptions) (Cursor, error) {s, err := Prepare(stmt, shardMapper, opt) | |
if err != nil {return nil, err} | |
// Must be deferred so it runs after Select. | |
defer s.Close() | |
return s.Select(ctx) | |
} | |
func (p *preparedStatement) Select(ctx context.Context) (Cursor, error) { | |
..... | |
cur, err := buildCursor(ctx, p.stmt, p.ic, opt) | |
.... | |
} |
持续走:
// query/select.go | |
func buildAuxIterator(ctx context.Context, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions) (Iterator, error) { | |
...... | |
if err := func() error { | |
for _, source := range sources {switch source := source.(type) { | |
case *influxql.Measurement: | |
input, err := ic.CreateIterator(ctx, source, opt) // 这里是要害 | |
if err != nil {return err} | |
inputs = append(inputs, input) | |
}(), err != nil {} | |
} |
调用链条比拟深,到这里比拟容易理解了:
- 先创立 LocalShard 的 iterator;
- 再创立 remoteShard 的 iterator;
- 最初将 iterator 合并返回;
func (c *ClusterShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {ics := []query.Iterator{} | |
localIterator, err := c.LocalShardMapping.CreateIterator(ctx, m, opt) | |
ics = append(ics, localIterator) | |
.... | |
for _, sg := range c.RemoteShardGroup {ri, err := sg.CreateIterator(ctx, m, opt) | |
..... | |
ics = append(ics, ri) | |
} | |
return query.Iterators(ics).Merge(opt) | |
} |
正文完