关于influxdb:InfluxDB集群-查询database源码分析

74次阅读

共计 3347 个字符,预计需要花费 9 分钟才能阅读完成。

curl 向 influxdb 集群的任意 meta 节点发送查问 database 申请,都能够返回集群中所有 database 信息:

curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"

原理是每个 meta 节点都保护了最新的 snapshot 信息,当有查问申请时,返回本地的 snapshot 中的 databases 信息;snapshot 信息由后盾 goroutine 定期的向 leader 查问失去。

整体流程:

HTTP handler

与 create database 的流程相似,咱们从 ExecuteStatement 开始:

// cluster/statement_executor.go
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
    ......
    switch stmt := stmt.(type) {
    case *influxql.ShowDatabasesStatement:
        rows, err = e.executeShowDatabasesStatement(stmt, ctx)
    }
    .....
}

同样的,还是 metaClient 调用,这次是 Database() 函数:

func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) {dis := e.MetaClient.Databases()
    a := ctx.ExecutionOptions.Authorizer

    row := &models.Row{Name: "databases", Columns: []string{"name"}}
    for _, di := range dis {
        // Only include databases that the user is authorized to read or write.
        if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) {row.Values = append(row.Values, []interface{}{di.Name})
        }
    }
    return []*models.Row{row}, nil
}

这里的 Database() 返回的是 client 保留的 cacheData:

// services/meta/client.go
func (c *Client) Databases() []DatabaseInfo {dbs := c.Data().Databases
    if dbs == nil {return []DatabaseInfo{}}
    return dbs
}
func (c *Client) Data() *Data {c.mu.RLock()
    defer c.mu.RUnlock()
    return c.cacheData
}

snapshot cacheData 的保护

client 的 cacheData,由 client 的后盾 goroutine 保护更新:

// services/meta/client.go
func (c *Client) Open() error {c.changed = make(chan struct{})
    c.closing = make(chan struct{})
    c.cacheData = c.retryUntilSnapshot(0)

    go c.pollForUpdates()

    return nil
}

调用 retryUntilSnapshot,而后更新 cacheData:

// services/meta/client.go
func (c *Client) pollForUpdates() {
    for {data := c.retryUntilSnapshot(c.index())
        if data == nil {
            // this will only be nil if the client has been closed,
            // so we can exit out
            return
        }
        // update the data and notify of the change
        c.mu.Lock()
        idx := c.cacheData.Index
        c.cacheData = data
        c.updateAuthCache()
           
        c.mu.Unlock()}
}

retryUntilSnapshot() 执行取数据:

  • 它尝试向不同的 server 发送 http 申请,GET /index,直到返回数据为止;
  • 当发送到 follower 节点时,可能没有 index 前残缺的数据,会返回 error,通过 for 进行重试;
func (c *Client) retryUntilSnapshot(idx uint64) *Data {
    currentServer := 0
    for {
        ....
        if currentServer >= len(c.metaServers) {currentServer = 0}
        server := c.metaServers[currentServer]
        ....
        data, err := c.getSnapshot(server, idx)
        if err == nil {return data}
        time.Sleep(errSleep)
        currentServer++
    }
}

getSnapshot() 理论发送 HTTP 申请 (8091),GET /index:

func (c *Client) getSnapshot(server string, index uint64) (*Data, error) {resp, err := http.Get(c.url(server) + fmt.Sprintf("?index=%d", index))
    if err != nil {return nil, err}
    defer resp.Body.Close()
    if resp.StatusCode != http.StatusOK {return nil, fmt.Errorf("meta server returned non-200: %s", resp.Status)
    }

    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {return nil, err}
    data := &Data{}
    if err := data.UnmarshalBinary(b); err != nil {return nil, err}
    return data, nil
}

再看一下 GET /index 接口的实现:

  • 查问该节点上 index 前的最新 snapshot 数据,而后 protobuf 序列化,返回给 client;
// services/meta/handler.go
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
    ....
    // get the current index that client has
    index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64)
    select {
    // 取的是 index 后的最新数据
    case <-h.store.afterIndex(index):
        // Send updated snapshot to client.
        ss, err := h.store.snapshot()
        if err != nil {h.httpError(err, w, http.StatusInternalServerError)
            return
        }
        b, err := ss.MarshalBinary()
        if err != nil {h.httpError(err, w, http.StatusInternalServerError)
            return
        }
        w.Header().Add("Content-Type", "application/octet-stream")
        w.Write(b)
        return
    }
    ....
}

正文完
 0