关于influxdb:InfluxDB集群-添加DataNode源码分析

53次阅读

共计 3987 个字符,预计需要花费 10 分钟才能阅读完成。

influxdb 集群中,client 在 node1 上执行 add dataNode node3 的命令:

influxd_ctl add-data node3:8088

整体流程如下:

  • node1 收到 CLI 命令,向本人的 8084 发送 GET /add-data 申请,request body: {“node”: “node3:8088”};
  • node1 向 node3 的 8088 端口发送 TCP 音讯 AddDataNode;
  • node3 收到 AddDataNode 后,被动将本人退出集群;

CLI 命令解决

命令行解析的代码入口:

// cmd/influxd_ctl/cli/cli.go
func (c *CommandLine) Run() error {
    switch cmd {
    case "add-data":
        return do_add_data(c)
    }
}

向本机的集群治理端口 8084,发送 GET /add-data:

func do_add_data(c *CommandLine) error {
    var node string
    node = c.CMD[1]
    // 向本机的 8084 发送 http add-data
    url := c.getURL("add-data", map[string]string{"node": node})
    resp, err := http.Get(url)
    if err != nil {return err}
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    ......
    return nil
}

node1 向 node3 发送 AddDataNode

node1 解决 GET /add-data 的 handler:

// services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/add-data":
            h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)
        }
    }
}

结构了 1 个 AddDataNode 的 request,而后通过 TCP 发送给 node3:

func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query()
    if len(ps["node"]) != 1 {http.Error(w, "", http.StatusBadRequest)
    }
    node := ps["node"][0]
    req := Request{AddDataNode, "", 0}
    err := h.call(node, req)
    h.sendResponse(err, w)
}

h.call() 发送 TCP 音讯:AddDataNode

// 发送 tcp 音讯
func (h *handler) call(node string, req Request) error {conn, err := tcp.Dial("tcp", node, MuxHeader)
    if err != nil {return fmt.Errorf("tcp dial to node %s failed. %v", node, err)
    }
    defer conn.Close()
    if err = json.NewEncoder(conn).Encode(req); err != nil {return fmt.Errorf("encode and send request failed. %v", err)
    }
    // read response
    resp, err := h.readResponse(conn)
    ...
    return nil
}

node3 将本人退出集群

node3 接管到 TCP 音讯:AddDataNode

// services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {r := &Request{}
    err := json.NewDecoder(conn).Decode(r)
    switch r.Type {
    case AddDataNode:
        h.handleAddDataNode(r, conn)
    }
}

由 h.Server.DataServerJoin() 解决节点退出集群:

func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {if err := h.Server.DataServerJoin(); err != nil {Err := fmt.Sprintf("DataServerJoin failed. %v", err)
        if err := json.NewEncoder(conn).Encode(Response{Err}); err != nil {h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))
        }
    } else {if err := json.NewEncoder(conn).Encode(Response{}); err != nil {h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))
        }
    }
}

节点退出集群:

  • 由 metaClient 向集群发送增加 DataNode 的音讯;
  • 本机更新节点信息 node.json 文件;
// cmd/influxd/run/server.go
func (s *Server) DataServerJoin() (err error) {
    if s.config.Data.Enabled {
        // If we've already created a data node for our id, we're done
        if _, err = s.MetaClient.DataNode(s.Node.GetDataID()); err == nil {return nil}
        // 向 leader 发送增加 data 节点的的申请
        n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
        // 重试,直到胜利
        for err != nil {log.Printf("Unable to create data node. retry in 1s: %s", err.Error())
            time.Sleep(time.Second)
            n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
        }
        s.Node.ID = n.ID
        // 更新本机的 node.json 内容
        if serr := s.Node.Save(); serr != nil {return serr}
        return nil
    }
    return fmt.Errorf("Data node is not enabled")
}

metaClient.CreateDataNode() 理论是向 Leader 发送 Command_CreateDataNodeCommand:

// CreateDataNode will create a new data node in the metastore
func (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) {
    cmd := &internal.CreateDataNodeCommand{HTTPAddr: proto.String(httpAddr),
        TCPAddr:  proto.String(tcpAddr),
    }
    if err := c.retryUntilExec(internal.Command_CreateDataNodeCommand, internal.E_CreateDataNodeCommand_Command, cmd); err != nil {return nil, err}
    n, err := c.DataNodeByTCPHost(tcpAddr)
    if err != nil {return nil, err}
    c.nodeID = n.ID
    return n, nil
}

Raft Leader 收到 Command_CreateDatNodeCommand:

// services/meta/store_fsm.go
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
    var cmd internal.Command
    if err := proto.Unmarshal(l.Data, &cmd); err != nil {panic(fmt.Errorf("cannot marshal command: %x", l.Data))
    }
    switch cmd.GetType() {
    case internal.Command_CreateDataNodeCommand:
            return fsm.applyCreateDataNodeCommand(&cmd)
    }
}

将新节点更新到状态机:

func (fsm *storeFSM) applyCreateDataNodeCommand(cmd *internal.Command) interface{} {ext, _ := proto.GetExtension(cmd, internal.E_CreateDataNodeCommand_Command)
    v := ext.(*internal.CreateDataNodeCommand)

    other := fsm.data.Clone()
    other.CreateDataNode(v.GetHTTPAddr(), v.GetTCPAddr())
    fsm.data = other
    return nil
}

正文完
 0