共计 3347 个字符,预计需要花费 9 分钟才能阅读完成。
curl 向 influxdb 集群的任意 meta 节点发送查问 database 申请,都能够返回集群中所有 database 信息:
curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"
原理是每个 meta 节点都保护了最新的 snapshot 信息,当有查问申请时,返回本地的 snapshot 中的 databases 信息;snapshot 信息由后盾 goroutine 定期的向 leader 查问失去。
整体流程:
HTTP handler
与 create database 的流程相似,咱们从 ExecuteStatement 开始:
// cluster/statement_executor.go
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
......
switch stmt := stmt.(type) {
case *influxql.ShowDatabasesStatement:
rows, err = e.executeShowDatabasesStatement(stmt, ctx)
}
.....
}
同样的,还是 metaClient 调用,这次是 Database() 函数:
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) {dis := e.MetaClient.Databases()
a := ctx.ExecutionOptions.Authorizer
row := &models.Row{Name: "databases", Columns: []string{"name"}}
for _, di := range dis {
// Only include databases that the user is authorized to read or write.
if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) {row.Values = append(row.Values, []interface{}{di.Name})
}
}
return []*models.Row{row}, nil
}
这里的 Database() 返回的是 client 保留的 cacheData:
// services/meta/client.go
func (c *Client) Databases() []DatabaseInfo {dbs := c.Data().Databases
if dbs == nil {return []DatabaseInfo{}}
return dbs
}
func (c *Client) Data() *Data {c.mu.RLock()
defer c.mu.RUnlock()
return c.cacheData
}
snapshot cacheData 的保护
client 的 cacheData,由 client 的后盾 goroutine 保护更新:
// services/meta/client.go
func (c *Client) Open() error {c.changed = make(chan struct{})
c.closing = make(chan struct{})
c.cacheData = c.retryUntilSnapshot(0)
go c.pollForUpdates()
return nil
}
调用 retryUntilSnapshot,而后更新 cacheData:
// services/meta/client.go
func (c *Client) pollForUpdates() {
for {data := c.retryUntilSnapshot(c.index())
if data == nil {
// this will only be nil if the client has been closed,
// so we can exit out
return
}
// update the data and notify of the change
c.mu.Lock()
idx := c.cacheData.Index
c.cacheData = data
c.updateAuthCache()
c.mu.Unlock()}
}
retryUntilSnapshot() 执行取数据:
- 它尝试向不同的 server 发送 http 申请,GET /index,直到返回数据为止;
- 当发送到 follower 节点时,可能没有 index 前残缺的数据,会返回 error,通过 for 进行重试;
func (c *Client) retryUntilSnapshot(idx uint64) *Data {
currentServer := 0
for {
....
if currentServer >= len(c.metaServers) {currentServer = 0}
server := c.metaServers[currentServer]
....
data, err := c.getSnapshot(server, idx)
if err == nil {return data}
time.Sleep(errSleep)
currentServer++
}
}
getSnapshot() 理论发送 HTTP 申请 (8091),GET /index:
func (c *Client) getSnapshot(server string, index uint64) (*Data, error) {resp, err := http.Get(c.url(server) + fmt.Sprintf("?index=%d", index))
if err != nil {return nil, err}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {return nil, fmt.Errorf("meta server returned non-200: %s", resp.Status)
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {return nil, err}
data := &Data{}
if err := data.UnmarshalBinary(b); err != nil {return nil, err}
return data, nil
}
再看一下 GET /index 接口的实现:
- 查问该节点上 index 前的最新 snapshot 数据,而后 protobuf 序列化,返回给 client;
// services/meta/handler.go
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
....
// get the current index that client has
index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64)
select {
// 取的是 index 后的最新数据
case <-h.store.afterIndex(index):
// Send updated snapshot to client.
ss, err := h.store.snapshot()
if err != nil {h.httpError(err, w, http.StatusInternalServerError)
return
}
b, err := ss.MarshalBinary()
if err != nil {h.httpError(err, w, http.StatusInternalServerError)
return
}
w.Header().Add("Content-Type", "application/octet-stream")
w.Write(b)
return
}
....
}
正文完