influxdb集群中,client在node1上执行add dataNode node3的命令:

influxd_ctl add-data node3:8088

整体流程如下:

  • node1收到CLI命令,向本人的8084发送GET /add-data申请,request body: {"node": "node3:8088"};
  • node1向node3的8088端口发送TCP音讯AddDataNode;
  • node3收到AddDataNode后,被动将本人退出集群;

CLI命令解决

命令行解析的代码入口:

// cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error {    switch cmd {    case "add-data":        return do_add_data(c)    }}

向本机的集群治理端口8084,发送GET /add-data:

func do_add_data(c *CommandLine) error {    var node string    node = c.CMD[1]    // 向本机的8084发送http add-data    url := c.getURL("add-data", map[string]string{"node": node})    resp, err := http.Get(url)    if err != nil {        return err    }    defer resp.Body.Close()    body, err := ioutil.ReadAll(resp.Body)    ......    return nil}

node1向node3发送AddDataNode

node1解决GET /add-data的handler:

// services/admin_cluster/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {    switch r.Method {    case "GET":        switch r.URL.Path {        case "/add-data":            h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)        }    }}

结构了1个AddDataNode的request,而后通过TCP发送给node3:

func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {    ps := r.URL.Query()    if len(ps["node"]) != 1 {        http.Error(w, "", http.StatusBadRequest)    }    node := ps["node"][0]    req := Request{AddDataNode, "", 0}    err := h.call(node, req)    h.sendResponse(err, w)}

h.call()发送TCP音讯:AddDataNode

// 发送tcp音讯func (h *handler) call(node string, req Request) error {    conn, err := tcp.Dial("tcp", node, MuxHeader)    if err != nil {        return fmt.Errorf("tcp dial to node %s failed. %v", node, err)    }    defer conn.Close()    if err = json.NewEncoder(conn).Encode(req); err != nil {        return fmt.Errorf("encode and send request failed. %v", err)    }    // read response    resp, err := h.readResponse(conn)    ...    return nil}

node3将本人退出集群

node3接管到TCP音讯:AddDataNode

// services/admin_cluster/tcphandler.gofunc (h *TCPHandler) handleConn(conn net.Conn) {    r := &Request{}    err := json.NewDecoder(conn).Decode(r)    switch r.Type {    case AddDataNode:        h.handleAddDataNode(r, conn)    }}

由h.Server.DataServerJoin()解决节点退出集群:

func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {    if err := h.Server.DataServerJoin(); err != nil {        Err := fmt.Sprintf("DataServerJoin failed. %v", err)        if err := json.NewEncoder(conn).Encode(Response{Err}); err != nil {            h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))        }    } else {        if err := json.NewEncoder(conn).Encode(Response{}); err != nil {            h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))        }    }}

节点退出集群:

  • 由metaClient向集群发送增加DataNode的音讯;
  • 本机更新节点信息node.json文件;
// cmd/influxd/run/server.gofunc (s *Server) DataServerJoin() (err error) {    if s.config.Data.Enabled {        // If we've already created a data node for our id, we're done        if _, err = s.MetaClient.DataNode(s.Node.GetDataID()); err == nil {            return nil        }        // 向leader发送增加data节点的的申请        n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())        // 重试,直到胜利        for err != nil {            log.Printf("Unable to create data node. retry in 1s: %s", err.Error())            time.Sleep(time.Second)            n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())        }        s.Node.ID = n.ID        // 更新本机的node.json内容        if serr := s.Node.Save(); serr != nil {            return serr        }        return nil    }    return fmt.Errorf("Data node is not enabled")}

metaClient.CreateDataNode()理论是向Leader发送Command_CreateDataNodeCommand:

// CreateDataNode will create a new data node in the metastorefunc (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) {    cmd := &internal.CreateDataNodeCommand{        HTTPAddr: proto.String(httpAddr),        TCPAddr:  proto.String(tcpAddr),    }    if err := c.retryUntilExec(internal.Command_CreateDataNodeCommand, internal.E_CreateDataNodeCommand_Command, cmd); err != nil {        return nil, err    }    n, err := c.DataNodeByTCPHost(tcpAddr)    if err != nil {        return nil, err    }    c.nodeID = n.ID    return n, nil}

Raft Leader收到Command_CreateDatNodeCommand:

// services/meta/store_fsm.gofunc (fsm *storeFSM) Apply(l *raft.Log) interface{} {    var cmd internal.Command    if err := proto.Unmarshal(l.Data, &cmd); err != nil {        panic(fmt.Errorf("cannot marshal command: %x", l.Data))    }    switch cmd.GetType() {    case internal.Command_CreateDataNodeCommand:            return fsm.applyCreateDataNodeCommand(&cmd)    }}

将新节点更新到状态机:

func (fsm *storeFSM) applyCreateDataNodeCommand(cmd *internal.Command) interface{} {    ext, _ := proto.GetExtension(cmd, internal.E_CreateDataNodeCommand_Command)    v := ext.(*internal.CreateDataNodeCommand)    other := fsm.data.Clone()    other.CreateDataNode(v.GetHTTPAddr(), v.GetTCPAddr())    fsm.data = other    return nil}