关于influxdb:InfluxDB集群-查询database源码分析

curl向influxdb集群的任意meta节点发送查问database申请,都能够返回集群中所有database信息:

curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"

原理是每个meta节点都保护了最新的snapshot信息,当有查问申请时,返回本地的snapshot中的databases信息;snapshot信息由后盾goroutine定期的向leader查问失去。

整体流程:

HTTP handler

与create database的流程相似,咱们从ExecuteStatement开始:

// cluster/statement_executor.go
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
    ......
    switch stmt := stmt.(type) {
    case *influxql.ShowDatabasesStatement:
        rows, err = e.executeShowDatabasesStatement(stmt, ctx)
    }
    .....
}

同样的,还是metaClient调用,这次是Database()函数:

func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) {
    dis := e.MetaClient.Databases()
    a := ctx.ExecutionOptions.Authorizer

    row := &models.Row{Name: "databases", Columns: []string{"name"}}
    for _, di := range dis {
        // Only include databases that the user is authorized to read or write.
        if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) {
            row.Values = append(row.Values, []interface{}{di.Name})
        }
    }
    return []*models.Row{row}, nil
}

这里的Database()返回的是client保留的cacheData:

// services/meta/client.go
func (c *Client) Databases() []DatabaseInfo {
    dbs := c.Data().Databases
    if dbs == nil {
        return []DatabaseInfo{}
    }
    return dbs
}
func (c *Client) Data() *Data {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.cacheData
}

snapshot cacheData的保护

client的cacheData,由client的后盾goroutine保护更新:

// services/meta/client.go
func (c *Client) Open() error {
    c.changed = make(chan struct{})
    c.closing = make(chan struct{})
    c.cacheData = c.retryUntilSnapshot(0)

    go c.pollForUpdates()

    return nil
}

调用retryUntilSnapshot,而后更新cacheData:

// services/meta/client.go
func (c *Client) pollForUpdates() {
    for {
        data := c.retryUntilSnapshot(c.index())
        if data == nil {
            // this will only be nil if the client has been closed,
            // so we can exit out
            return
        }
        // update the data and notify of the change
        c.mu.Lock()
        idx := c.cacheData.Index
        c.cacheData = data
        c.updateAuthCache()
           
        c.mu.Unlock()
    }
}

retryUntilSnapshot()执行取数据:

  • 它尝试向不同的server发送http申请,GET /index,直到返回数据为止;
  • 当发送到follower节点时,可能没有index前残缺的数据,会返回error,通过for进行重试;
func (c *Client) retryUntilSnapshot(idx uint64) *Data {
    currentServer := 0
    for {
        ....
        if currentServer >= len(c.metaServers) {
            currentServer = 0
        }
        server := c.metaServers[currentServer]
        ....
        data, err := c.getSnapshot(server, idx)
        if err == nil {
            return data
        }
        time.Sleep(errSleep)
        currentServer++
    }
}

getSnapshot()理论发送HTTP申请(8091),GET /index:

func (c *Client) getSnapshot(server string, index uint64) (*Data, error) {
    resp, err := http.Get(c.url(server) + fmt.Sprintf("?index=%d", index))
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("meta server returned non-200: %s", resp.Status)
    }

    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }
    data := &Data{}
    if err := data.UnmarshalBinary(b); err != nil {
        return nil, err
    }
    return data, nil
}

再看一下GET /index接口的实现:

  • 查问该节点上index前的最新snapshot数据,而后protobuf序列化,返回给client;
// services/meta/handler.go
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
    ....
    // get the current index that client has
    index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64)
    select {
    //取的是index后的最新数据
    case <-h.store.afterIndex(index):
        // Send updated snapshot to client.
        ss, err := h.store.snapshot()
        if err != nil {
            h.httpError(err, w, http.StatusInternalServerError)
            return
        }
        b, err := ss.MarshalBinary()
        if err != nil {
            h.httpError(err, w, http.StatusInternalServerError)
            return
        }
        w.Header().Add("Content-Type", "application/octet-stream")
        w.Write(b)
        return
    }
    ....
}

【腾讯云】云产品限时秒杀,爆款1核2G云服务器,首年50元

阿里云限时活动-2核2G-5M带宽-60G SSD-1000G月流量 ,特惠价99元/年(原价1234.2元/年,可以直接买3年),速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表评论

您的电子邮箱地址不会被公开。

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据