乐趣区

关于influxdb:InfluxDB集群-创建database源码分析

创立 database,能够通过 CLI 命令,也能够通过 HTTP,两种形式走的是同一套逻辑流程。

本文以 HTTP 为例,剖析集群模式下创立 database 的源码流程。

curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"

database 是集群的元信息,须要 Raft 强统一;
create database 的 request 若被发送给 follower 节点,则返回 NotLeader 的 redirect 响应 (带 leader 的 url),client 从新向 Leader 节点发送 request。

整体流程:

HTTP handler

8086 是 httpd 的监听端口,其 hander:

// services/httpd/handler.go
func NewHandler(c Config) *Handler {
    ......
    h.AddRoutes([]Route{
        {
            "query", // Query serving route.
            "POST", "/query", true, true, h.serveQuery,
        },    
    })
}

持续向下走:

// services/httpd/handler.go
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) {
    ...
    // Execute query.
    results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
    ...
}
// query/executor.go
func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) {
    ......
    // Send any other statements to the underlying statement executor.
    err = e.StatementExecutor.ExecuteStatement(stmt, ctx)
    ......
}

辨认到 Create database 语句:

// cluster/statement_executor.go
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
    ...
    switch stmt := stmt.(type) {
    case *influxql.CreateDatabaseStatement:
        if ctx.ReadOnly {messages = append(messages, query.ReadOnlyWarning(stmt.String()))
        }
        err = e.executeCreateDatabaseStatement(stmt)
    ......
    }
}

最终是通过 metaClient.CreateDatabase() 实现调用:

// cluster/statement_executor.go
func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {if !meta.ValidName(stmt.Name) {// TODO This should probably be in `(*meta.Data).CreateDatabase`
        // but can't go there until 1.1 is used everywhere
        return meta.ErrInvalidName
    }

    if !stmt.RetentionPolicyCreate {_, err := e.MetaClient.CreateDatabase(stmt.Name)
        return err
    }
    .....
}

metaClient

metaClient 在执行 CreateDatabase 时:

  • 首先将 CreateDatabase 封装成 1 个 command;
  • 而后将这个 command 通过 http(8091) POST /execute 发送到其它节点;

    • 若节点返回 errRedirect,则提取 Leader 的 url,从新尝试向 Leader 发送申请;
    • 重试 maxRetries 次,直到胜利为止;

结构 command 并发送给集群执行:

// services/meta/client.go
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {if db := c.Database(name); db != nil {return db, nil}
    cmd := &internal.CreateDatabaseCommand{Name: proto.String(name),
    }
    err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd)
    if err != nil {return nil, err}
    return c.Database(name), nil
}

发送 POST /execute,重试 maxRetries 次,直到胜利:

func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
    var index uint64
    tries := 0
    currentServer := 0
    var redirectServer string

    for {
        ...
        // build the url to hit the redirect server or the next metaserver
        var url string
        if redirectServer != "" {
            url = redirectServer
            redirectServer = ""
        } else {server := c.metaServers[currentServer]
            url = fmt.Sprintf("http://%s/execute", server)            
        }

        index, err = c.exec(url, typ, desc, value)
        tries++
        currentServer++

        if err == nil {c.waitForIndex(index)
            return nil
        }
        if tries > maxRetries {return err}
        // 如果返回 redirect 的 response
        if e, ok := err.(errRedirect); ok {
            redirectServer = e.host
            continue
        }
        time.Sleep(errSleep)
    }
}

节点解决 CreateDatabase 的 Command

节点监听在 8091 端口,解决 POST /execute 申请,request body 是 protobuf 序列化的 command:

// services/meta/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "POST":
        h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)
    }
}

handler 解决申请时,首先将 body apply 到 store,若返回 ErrNotLeader,则返回 client redirect 响应:

// services/meta/handler.go
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
    .......
    var resp *internal.Response
    if err := h.store.apply(body); err != nil {
        // If we aren't the leader, redirect client to the leader.
        if err == raft.ErrNotLeader {l := h.store.leaderHTTP()            
            scheme := "http://"            
            l = scheme + l + "/execute"
            http.Redirect(w, r, l, http.StatusTemporaryRedirect)
            return
        }
        // Error wasn't a leadership error so pass it back to client.
        resp = &internal.Response{OK:    proto.Bool(false),
            Error: proto.String(err.Error()),
        }
    }
    ......
}

具体看下 store.apply() 做了什么事件:

// services/meta/store_fsm.go
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
    var cmd internal.Command
    if err := proto.Unmarshal(l.Data, &cmd); err != nil {panic(fmt.Errorf("cannot marshal command: %x", l.Data))
    }
    err := func() interface{} {switch cmd.GetType() {
        case internal.Command_CreateDatabaseCommand:
            return fsm.applyCreateDatabaseCommand(&cmd)
        }
    }()
    ....
}

跟增加节点相似,把新创建的 database 的信息,更新到 fsm.data 中:

// services/meta/store_fsm.go
func (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command)
    v := ext.(*internal.CreateDatabaseCommand)
    other := fsm.data.Clone()
    if err := other.CreateDatabase(v.GetName()); err != nil {return err}
    ....
    fsm.data = other
    ....
}
退出移动版