共计 3347 个字符,预计需要花费 9 分钟才能阅读完成。
curl 向 influxdb 集群的任意 meta 节点发送查问 database 申请,都能够返回集群中所有 database 信息:
curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"
原理是每个 meta 节点都保护了最新的 snapshot 信息,当有查问申请时,返回本地的 snapshot 中的 databases 信息;snapshot 信息由后盾 goroutine 定期的向 leader 查问失去。
整体流程:
HTTP handler
与 create database 的流程相似,咱们从 ExecuteStatement 开始:
// cluster/statement_executor.go | |
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { | |
...... | |
switch stmt := stmt.(type) { | |
case *influxql.ShowDatabasesStatement: | |
rows, err = e.executeShowDatabasesStatement(stmt, ctx) | |
} | |
..... | |
} |
同样的,还是 metaClient 调用,这次是 Database() 函数:
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) {dis := e.MetaClient.Databases() | |
a := ctx.ExecutionOptions.Authorizer | |
row := &models.Row{Name: "databases", Columns: []string{"name"}} | |
for _, di := range dis { | |
// Only include databases that the user is authorized to read or write. | |
if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) {row.Values = append(row.Values, []interface{}{di.Name}) | |
} | |
} | |
return []*models.Row{row}, nil | |
} |
这里的 Database() 返回的是 client 保留的 cacheData:
// services/meta/client.go | |
func (c *Client) Databases() []DatabaseInfo {dbs := c.Data().Databases | |
if dbs == nil {return []DatabaseInfo{}} | |
return dbs | |
} | |
func (c *Client) Data() *Data {c.mu.RLock() | |
defer c.mu.RUnlock() | |
return c.cacheData | |
} |
snapshot cacheData 的保护
client 的 cacheData,由 client 的后盾 goroutine 保护更新:
// services/meta/client.go | |
func (c *Client) Open() error {c.changed = make(chan struct{}) | |
c.closing = make(chan struct{}) | |
c.cacheData = c.retryUntilSnapshot(0) | |
go c.pollForUpdates() | |
return nil | |
} |
调用 retryUntilSnapshot,而后更新 cacheData:
// services/meta/client.go | |
func (c *Client) pollForUpdates() { | |
for {data := c.retryUntilSnapshot(c.index()) | |
if data == nil { | |
// this will only be nil if the client has been closed, | |
// so we can exit out | |
return | |
} | |
// update the data and notify of the change | |
c.mu.Lock() | |
idx := c.cacheData.Index | |
c.cacheData = data | |
c.updateAuthCache() | |
c.mu.Unlock()} | |
} |
retryUntilSnapshot() 执行取数据:
- 它尝试向不同的 server 发送 http 申请,GET /index,直到返回数据为止;
- 当发送到 follower 节点时,可能没有 index 前残缺的数据,会返回 error,通过 for 进行重试;
func (c *Client) retryUntilSnapshot(idx uint64) *Data { | |
currentServer := 0 | |
for { | |
.... | |
if currentServer >= len(c.metaServers) {currentServer = 0} | |
server := c.metaServers[currentServer] | |
.... | |
data, err := c.getSnapshot(server, idx) | |
if err == nil {return data} | |
time.Sleep(errSleep) | |
currentServer++ | |
} | |
} |
getSnapshot() 理论发送 HTTP 申请 (8091),GET /index:
func (c *Client) getSnapshot(server string, index uint64) (*Data, error) {resp, err := http.Get(c.url(server) + fmt.Sprintf("?index=%d", index)) | |
if err != nil {return nil, err} | |
defer resp.Body.Close() | |
if resp.StatusCode != http.StatusOK {return nil, fmt.Errorf("meta server returned non-200: %s", resp.Status) | |
} | |
b, err := ioutil.ReadAll(resp.Body) | |
if err != nil {return nil, err} | |
data := &Data{} | |
if err := data.UnmarshalBinary(b); err != nil {return nil, err} | |
return data, nil | |
} |
再看一下 GET /index 接口的实现:
- 查问该节点上 index 前的最新 snapshot 数据,而后 protobuf 序列化,返回给 client;
// services/meta/handler.go | |
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { | |
.... | |
// get the current index that client has | |
index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64) | |
select { | |
// 取的是 index 后的最新数据 | |
case <-h.store.afterIndex(index): | |
// Send updated snapshot to client. | |
ss, err := h.store.snapshot() | |
if err != nil {h.httpError(err, w, http.StatusInternalServerError) | |
return | |
} | |
b, err := ss.MarshalBinary() | |
if err != nil {h.httpError(err, w, http.StatusInternalServerError) | |
return | |
} | |
w.Header().Add("Content-Type", "application/octet-stream") | |
w.Write(b) | |
return | |
} | |
.... | |
} |
正文完