curl向influxdb集群的任意meta节点发送查问database申请,都能够返回集群中所有database信息:

curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"

原理是每个meta节点都保护了最新的snapshot信息,当有查问申请时,返回本地的snapshot中的databases信息;snapshot信息由后盾goroutine定期的向leader查问失去。

整体流程:

HTTP handler

与create database的流程相似,咱们从ExecuteStatement开始:

// cluster/statement_executor.gofunc (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {    ......    switch stmt := stmt.(type) {    case *influxql.ShowDatabasesStatement:        rows, err = e.executeShowDatabasesStatement(stmt, ctx)    }    .....}

同样的,还是metaClient调用,这次是Database()函数:

func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) {    dis := e.MetaClient.Databases()    a := ctx.ExecutionOptions.Authorizer    row := &models.Row{Name: "databases", Columns: []string{"name"}}    for _, di := range dis {        // Only include databases that the user is authorized to read or write.        if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) {            row.Values = append(row.Values, []interface{}{di.Name})        }    }    return []*models.Row{row}, nil}

这里的Database()返回的是client保留的cacheData:

// services/meta/client.gofunc (c *Client) Databases() []DatabaseInfo {    dbs := c.Data().Databases    if dbs == nil {        return []DatabaseInfo{}    }    return dbs}func (c *Client) Data() *Data {    c.mu.RLock()    defer c.mu.RUnlock()    return c.cacheData}

snapshot cacheData的保护

client的cacheData,由client的后盾goroutine保护更新:

// services/meta/client.gofunc (c *Client) Open() error {    c.changed = make(chan struct{})    c.closing = make(chan struct{})    c.cacheData = c.retryUntilSnapshot(0)    go c.pollForUpdates()    return nil}

调用retryUntilSnapshot,而后更新cacheData:

// services/meta/client.gofunc (c *Client) pollForUpdates() {    for {        data := c.retryUntilSnapshot(c.index())        if data == nil {            // this will only be nil if the client has been closed,            // so we can exit out            return        }        // update the data and notify of the change        c.mu.Lock()        idx := c.cacheData.Index        c.cacheData = data        c.updateAuthCache()                   c.mu.Unlock()    }}

retryUntilSnapshot()执行取数据:

  • 它尝试向不同的server发送http申请,GET /index,直到返回数据为止;
  • 当发送到follower节点时,可能没有index前残缺的数据,会返回error,通过for进行重试;
func (c *Client) retryUntilSnapshot(idx uint64) *Data {    currentServer := 0    for {        ....        if currentServer >= len(c.metaServers) {            currentServer = 0        }        server := c.metaServers[currentServer]        ....        data, err := c.getSnapshot(server, idx)        if err == nil {            return data        }        time.Sleep(errSleep)        currentServer++    }}

getSnapshot()理论发送HTTP申请(8091),GET /index:

func (c *Client) getSnapshot(server string, index uint64) (*Data, error) {    resp, err := http.Get(c.url(server) + fmt.Sprintf("?index=%d", index))    if err != nil {        return nil, err    }    defer resp.Body.Close()    if resp.StatusCode != http.StatusOK {        return nil, fmt.Errorf("meta server returned non-200: %s", resp.Status)    }    b, err := ioutil.ReadAll(resp.Body)    if err != nil {        return nil, err    }    data := &Data{}    if err := data.UnmarshalBinary(b); err != nil {        return nil, err    }    return data, nil}

再看一下GET /index接口的实现:

  • 查问该节点上index前的最新snapshot数据,而后protobuf序列化,返回给client;
// services/meta/handler.gofunc (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {    ....    // get the current index that client has    index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64)    select {    //取的是index后的最新数据    case <-h.store.afterIndex(index):        // Send updated snapshot to client.        ss, err := h.store.snapshot()        if err != nil {            h.httpError(err, w, http.StatusInternalServerError)            return        }        b, err := ss.MarshalBinary()        if err != nil {            h.httpError(err, w, http.StatusInternalServerError)            return        }        w.Header().Add("Content-Type", "application/octet-stream")        w.Write(b)        return    }    ....}