创立database,能够通过CLI命令,也能够通过HTTP,两种形式走的是同一套逻辑流程。
本文以HTTP为例,剖析集群模式下创立database的源码流程。
curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"
database是集群的元信息,须要Raft强统一;
create database的request若被发送给follower节点,则返回NotLeader的redirect响应(带leader的url),client从新向Leader节点发送request。
整体流程:
HTTP handler
8086是httpd的监听端口,其hander:
// services/httpd/handler.gofunc NewHandler(c Config) *Handler { ...... h.AddRoutes([]Route{ { "query", // Query serving route. "POST", "/query", true, true, h.serveQuery, }, })}
持续向下走:
// services/httpd/handler.gofunc (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { ... // Execute query. results := h.QueryExecutor.ExecuteQuery(q, opts, closing) ...}// query/executor.gofunc (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { ...... // Send any other statements to the underlying statement executor. err = e.StatementExecutor.ExecuteStatement(stmt, ctx) ......}
辨认到Create database语句:
// cluster/statement_executor.gofunc (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { ... switch stmt := stmt.(type) { case *influxql.CreateDatabaseStatement: if ctx.ReadOnly { messages = append(messages, query.ReadOnlyWarning(stmt.String())) } err = e.executeCreateDatabaseStatement(stmt) ...... }}
最终是通过metaClient.CreateDatabase()实现调用:
// cluster/statement_executor.gofunc (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error { if !meta.ValidName(stmt.Name) { // TODO This should probably be in `(*meta.Data).CreateDatabase` // but can't go there until 1.1 is used everywhere return meta.ErrInvalidName } if !stmt.RetentionPolicyCreate { _, err := e.MetaClient.CreateDatabase(stmt.Name) return err } .....}
metaClient
metaClient在执行CreateDatabase时:
- 首先将CreateDatabase封装成1个command;
而后将这个command通过http(8091) POST /execute发送到其它节点;
- 若节点返回errRedirect,则提取Leader的url,从新尝试向Leader发送申请;
- 重试maxRetries次,直到胜利为止;
结构command并发送给集群执行:
// services/meta/client.gofunc (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) { if db := c.Database(name); db != nil { return db, nil } cmd := &internal.CreateDatabaseCommand{ Name: proto.String(name), } err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd) if err != nil { return nil, err } return c.Database(name), nil}
发送POST /execute,重试maxRetries次,直到胜利:
func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error { var index uint64 tries := 0 currentServer := 0 var redirectServer string for { ... // build the url to hit the redirect server or the next metaserver var url string if redirectServer != "" { url = redirectServer redirectServer = "" } else { server := c.metaServers[currentServer] url = fmt.Sprintf("http://%s/execute", server) } index, err = c.exec(url, typ, desc, value) tries++ currentServer++ if err == nil { c.waitForIndex(index) return nil } if tries > maxRetries { return err } //如果返回redirect的response if e, ok := err.(errRedirect); ok { redirectServer = e.host continue } time.Sleep(errSleep) }}
节点解决CreateDatabase的Command
节点监听在8091端口,解决POST /execute申请,request body是protobuf序列化的command:
// services/meta/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "POST": h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r) }}
handler解决申请时,首先将body apply到store,若返回ErrNotLeader,则返回client redirect响应:
// services/meta/handler.gofunc (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { ....... var resp *internal.Response if err := h.store.apply(body); err != nil { // If we aren't the leader, redirect client to the leader. if err == raft.ErrNotLeader { l := h.store.leaderHTTP() scheme := "http://" l = scheme + l + "/execute" http.Redirect(w, r, l, http.StatusTemporaryRedirect) return } // Error wasn't a leadership error so pass it back to client. resp = &internal.Response{ OK: proto.Bool(false), Error: proto.String(err.Error()), } } ......}
具体看下store.apply()做了什么事件:
// services/meta/store_fsm.gofunc (fsm *storeFSM) Apply(l *raft.Log) interface{} { var cmd internal.Command if err := proto.Unmarshal(l.Data, &cmd); err != nil { panic(fmt.Errorf("cannot marshal command: %x", l.Data)) } err := func() interface{} { switch cmd.GetType() { case internal.Command_CreateDatabaseCommand: return fsm.applyCreateDatabaseCommand(&cmd) } }() ....}
跟增加节点相似,把新创建的database的信息,更新到fsm.data中:
// services/meta/store_fsm.gofunc (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} { ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command) v := ext.(*internal.CreateDatabaseCommand) other := fsm.data.Clone() if err := other.CreateDatabase(v.GetName()); err != nil { return err } .... fsm.data = other ....}