共计 4387 个字符,预计需要花费 11 分钟才能阅读完成。
创立 database,能够通过 CLI 命令,也能够通过 HTTP,两种形式走的是同一套逻辑流程。
本文以 HTTP 为例,剖析集群模式下创立 database 的源码流程。
curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"
database 是集群的元信息,须要 Raft 强统一;
create database 的 request 若被发送给 follower 节点,则返回 NotLeader 的 redirect 响应 (带 leader 的 url),client 从新向 Leader 节点发送 request。
整体流程:
HTTP handler
8086 是 httpd 的监听端口,其 hander:
// services/httpd/handler.go | |
func NewHandler(c Config) *Handler { | |
...... | |
h.AddRoutes([]Route{ | |
{ | |
"query", // Query serving route. | |
"POST", "/query", true, true, h.serveQuery, | |
}, | |
}) | |
} |
持续向下走:
// services/httpd/handler.go | |
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { | |
... | |
// Execute query. | |
results := h.QueryExecutor.ExecuteQuery(q, opts, closing) | |
... | |
} | |
// query/executor.go | |
func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { | |
...... | |
// Send any other statements to the underlying statement executor. | |
err = e.StatementExecutor.ExecuteStatement(stmt, ctx) | |
...... | |
} |
辨认到 Create database 语句:
// cluster/statement_executor.go | |
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { | |
... | |
switch stmt := stmt.(type) { | |
case *influxql.CreateDatabaseStatement: | |
if ctx.ReadOnly {messages = append(messages, query.ReadOnlyWarning(stmt.String())) | |
} | |
err = e.executeCreateDatabaseStatement(stmt) | |
...... | |
} | |
} |
最终是通过 metaClient.CreateDatabase() 实现调用:
// cluster/statement_executor.go | |
func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {if !meta.ValidName(stmt.Name) {// TODO This should probably be in `(*meta.Data).CreateDatabase` | |
// but can't go there until 1.1 is used everywhere | |
return meta.ErrInvalidName | |
} | |
if !stmt.RetentionPolicyCreate {_, err := e.MetaClient.CreateDatabase(stmt.Name) | |
return err | |
} | |
..... | |
} |
metaClient
metaClient 在执行 CreateDatabase 时:
- 首先将 CreateDatabase 封装成 1 个 command;
-
而后将这个 command 通过 http(8091) POST /execute 发送到其它节点;
- 若节点返回 errRedirect,则提取 Leader 的 url,从新尝试向 Leader 发送申请;
- 重试 maxRetries 次,直到胜利为止;
结构 command 并发送给集群执行:
// services/meta/client.go | |
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {if db := c.Database(name); db != nil {return db, nil} | |
cmd := &internal.CreateDatabaseCommand{Name: proto.String(name), | |
} | |
err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd) | |
if err != nil {return nil, err} | |
return c.Database(name), nil | |
} |
发送 POST /execute,重试 maxRetries 次,直到胜利:
func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error { | |
var index uint64 | |
tries := 0 | |
currentServer := 0 | |
var redirectServer string | |
for { | |
... | |
// build the url to hit the redirect server or the next metaserver | |
var url string | |
if redirectServer != "" { | |
url = redirectServer | |
redirectServer = "" | |
} else {server := c.metaServers[currentServer] | |
url = fmt.Sprintf("http://%s/execute", server) | |
} | |
index, err = c.exec(url, typ, desc, value) | |
tries++ | |
currentServer++ | |
if err == nil {c.waitForIndex(index) | |
return nil | |
} | |
if tries > maxRetries {return err} | |
// 如果返回 redirect 的 response | |
if e, ok := err.(errRedirect); ok { | |
redirectServer = e.host | |
continue | |
} | |
time.Sleep(errSleep) | |
} | |
} |
节点解决 CreateDatabase 的 Command
节点监听在 8091 端口,解决 POST /execute 申请,request body 是 protobuf 序列化的 command:
// services/meta/handler.go | |
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
switch r.Method { | |
case "POST": | |
h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r) | |
} | |
} |
handler 解决申请时,首先将 body apply 到 store,若返回 ErrNotLeader,则返回 client redirect 响应:
// services/meta/handler.go | |
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { | |
....... | |
var resp *internal.Response | |
if err := h.store.apply(body); err != nil { | |
// If we aren't the leader, redirect client to the leader. | |
if err == raft.ErrNotLeader {l := h.store.leaderHTTP() | |
scheme := "http://" | |
l = scheme + l + "/execute" | |
http.Redirect(w, r, l, http.StatusTemporaryRedirect) | |
return | |
} | |
// Error wasn't a leadership error so pass it back to client. | |
resp = &internal.Response{OK: proto.Bool(false), | |
Error: proto.String(err.Error()), | |
} | |
} | |
...... | |
} |
具体看下 store.apply() 做了什么事件:
// services/meta/store_fsm.go | |
func (fsm *storeFSM) Apply(l *raft.Log) interface{} { | |
var cmd internal.Command | |
if err := proto.Unmarshal(l.Data, &cmd); err != nil {panic(fmt.Errorf("cannot marshal command: %x", l.Data)) | |
} | |
err := func() interface{} {switch cmd.GetType() { | |
case internal.Command_CreateDatabaseCommand: | |
return fsm.applyCreateDatabaseCommand(&cmd) | |
} | |
}() | |
.... | |
} |
跟增加节点相似,把新创建的 database 的信息,更新到 fsm.data 中:
// services/meta/store_fsm.go | |
func (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command) | |
v := ext.(*internal.CreateDatabaseCommand) | |
other := fsm.data.Clone() | |
if err := other.CreateDatabase(v.GetName()); err != nil {return err} | |
.... | |
fsm.data = other | |
.... | |
} |
正文完