curl向influxdb集群的任意meta节点发送查问database申请,都能够返回集群中所有database信息:
curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"
原理是每个meta节点都保护了最新的snapshot信息,当有查问申请时,返回本地的snapshot中的databases信息;snapshot信息由后盾goroutine定期的向leader查问失去。
整体流程:
HTTP handler
与create database的流程相似,咱们从ExecuteStatement开始:
// cluster/statement_executor.gofunc (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { ...... switch stmt := stmt.(type) { case *influxql.ShowDatabasesStatement: rows, err = e.executeShowDatabasesStatement(stmt, ctx) } .....}
同样的,还是metaClient调用,这次是Database()函数:
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) { dis := e.MetaClient.Databases() a := ctx.ExecutionOptions.Authorizer row := &models.Row{Name: "databases", Columns: []string{"name"}} for _, di := range dis { // Only include databases that the user is authorized to read or write. if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) { row.Values = append(row.Values, []interface{}{di.Name}) } } return []*models.Row{row}, nil}
这里的Database()返回的是client保留的cacheData:
// services/meta/client.gofunc (c *Client) Databases() []DatabaseInfo { dbs := c.Data().Databases if dbs == nil { return []DatabaseInfo{} } return dbs}func (c *Client) Data() *Data { c.mu.RLock() defer c.mu.RUnlock() return c.cacheData}
snapshot cacheData的保护
client的cacheData,由client的后盾goroutine保护更新:
// services/meta/client.gofunc (c *Client) Open() error { c.changed = make(chan struct{}) c.closing = make(chan struct{}) c.cacheData = c.retryUntilSnapshot(0) go c.pollForUpdates() return nil}
调用retryUntilSnapshot,而后更新cacheData:
// services/meta/client.gofunc (c *Client) pollForUpdates() { for { data := c.retryUntilSnapshot(c.index()) if data == nil { // this will only be nil if the client has been closed, // so we can exit out return } // update the data and notify of the change c.mu.Lock() idx := c.cacheData.Index c.cacheData = data c.updateAuthCache() c.mu.Unlock() }}
retryUntilSnapshot()执行取数据:
- 它尝试向不同的server发送http申请,GET /index,直到返回数据为止;
- 当发送到follower节点时,可能没有index前残缺的数据,会返回error,通过for进行重试;
func (c *Client) retryUntilSnapshot(idx uint64) *Data { currentServer := 0 for { .... if currentServer >= len(c.metaServers) { currentServer = 0 } server := c.metaServers[currentServer] .... data, err := c.getSnapshot(server, idx) if err == nil { return data } time.Sleep(errSleep) currentServer++ }}
getSnapshot()理论发送HTTP申请(8091),GET /index:
func (c *Client) getSnapshot(server string, index uint64) (*Data, error) { resp, err := http.Get(c.url(server) + fmt.Sprintf("?index=%d", index)) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("meta server returned non-200: %s", resp.Status) } b, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } data := &Data{} if err := data.UnmarshalBinary(b); err != nil { return nil, err } return data, nil}
再看一下GET /index接口的实现:
- 查问该节点上index前的最新snapshot数据,而后protobuf序列化,返回给client;
// services/meta/handler.gofunc (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { .... // get the current index that client has index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64) select { //取的是index后的最新数据 case <-h.store.afterIndex(index): // Send updated snapshot to client. ss, err := h.store.snapshot() if err != nil { h.httpError(err, w, http.StatusInternalServerError) return } b, err := ss.MarshalBinary() if err != nil { h.httpError(err, w, http.StatusInternalServerError) return } w.Header().Add("Content-Type", "application/octet-stream") w.Write(b) return } ....}