创立database,能够通过CLI命令,也能够通过HTTP,两种形式走的是同一套逻辑流程。

本文以HTTP为例,剖析集群模式下创立database的源码流程。

curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"

database是集群的元信息,须要Raft强统一;
create database的request若被发送给follower节点,则返回NotLeader的redirect响应(带leader的url),client从新向Leader节点发送request。

整体流程:

HTTP handler

8086是httpd的监听端口,其hander:

// services/httpd/handler.gofunc NewHandler(c Config) *Handler {    ......    h.AddRoutes([]Route{        {            "query", // Query serving route.            "POST", "/query", true, true, h.serveQuery,        },        })}

持续向下走:

// services/httpd/handler.gofunc (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) {    ...    // Execute query.    results := h.QueryExecutor.ExecuteQuery(q, opts, closing)    ...}// query/executor.gofunc (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) {    ......    // Send any other statements to the underlying statement executor.    err = e.StatementExecutor.ExecuteStatement(stmt, ctx)    ......}

辨认到Create database语句:

// cluster/statement_executor.gofunc (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {    ...    switch stmt := stmt.(type) {    case *influxql.CreateDatabaseStatement:        if ctx.ReadOnly {            messages = append(messages, query.ReadOnlyWarning(stmt.String()))        }        err = e.executeCreateDatabaseStatement(stmt)    ......    }}

最终是通过metaClient.CreateDatabase()实现调用:

// cluster/statement_executor.gofunc (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {    if !meta.ValidName(stmt.Name) {        // TODO This should probably be in `(*meta.Data).CreateDatabase`        // but can't go there until 1.1 is used everywhere        return meta.ErrInvalidName    }    if !stmt.RetentionPolicyCreate {        _, err := e.MetaClient.CreateDatabase(stmt.Name)        return err    }    .....}

metaClient

metaClient在执行CreateDatabase时:

  • 首先将CreateDatabase封装成1个command;
  • 而后将这个command通过http(8091) POST /execute发送到其它节点;

    • 若节点返回errRedirect,则提取Leader的url,从新尝试向Leader发送申请;
    • 重试maxRetries次,直到胜利为止;

结构command并发送给集群执行:

// services/meta/client.gofunc (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {    if db := c.Database(name); db != nil {        return db, nil    }    cmd := &internal.CreateDatabaseCommand{        Name: proto.String(name),    }    err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd)    if err != nil {        return nil, err    }    return c.Database(name), nil}

发送POST /execute,重试maxRetries次,直到胜利:

func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {    var index uint64    tries := 0    currentServer := 0    var redirectServer string    for {        ...        // build the url to hit the redirect server or the next metaserver        var url string        if redirectServer != "" {            url = redirectServer            redirectServer = ""        } else {                        server := c.metaServers[currentServer]            url = fmt.Sprintf("http://%s/execute", server)                    }        index, err = c.exec(url, typ, desc, value)        tries++        currentServer++        if err == nil {            c.waitForIndex(index)            return nil        }        if tries > maxRetries {            return err        }        //如果返回redirect的response        if e, ok := err.(errRedirect); ok {            redirectServer = e.host            continue        }        time.Sleep(errSleep)    }}

节点解决CreateDatabase的Command

节点监听在8091端口,解决POST /execute申请,request body是protobuf序列化的command:

// services/meta/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {    switch r.Method {    case "POST":        h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)    }}

handler解决申请时,首先将body apply到store,若返回ErrNotLeader,则返回client redirect响应:

// services/meta/handler.gofunc (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {    .......    var resp *internal.Response    if err := h.store.apply(body); err != nil {        // If we aren't the leader, redirect client to the leader.        if err == raft.ErrNotLeader {            l := h.store.leaderHTTP()                        scheme := "http://"                        l = scheme + l + "/execute"            http.Redirect(w, r, l, http.StatusTemporaryRedirect)            return        }        // Error wasn't a leadership error so pass it back to client.        resp = &internal.Response{            OK:    proto.Bool(false),            Error: proto.String(err.Error()),        }    }    ......}

具体看下store.apply()做了什么事件:

// services/meta/store_fsm.gofunc (fsm *storeFSM) Apply(l *raft.Log) interface{} {    var cmd internal.Command    if err := proto.Unmarshal(l.Data, &cmd); err != nil {        panic(fmt.Errorf("cannot marshal command: %x", l.Data))    }    err := func() interface{} {        switch cmd.GetType() {        case internal.Command_CreateDatabaseCommand:            return fsm.applyCreateDatabaseCommand(&cmd)        }    }()    ....}

跟增加节点相似,把新创建的database的信息,更新到fsm.data中:

// services/meta/store_fsm.gofunc (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {    ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command)    v := ext.(*internal.CreateDatabaseCommand)    other := fsm.data.Clone()    if err := other.CreateDatabase(v.GetName()); err != nil {        return err    }    ....    fsm.data = other    ....}