influxdb集群中,client在node1上执行add dataNode node3的命令:
influxd_ctl add-data node3:8088
整体流程如下:
- node1收到CLI命令,向本人的8084发送GET /add-data申请,request body: {"node": "node3:8088"};
- node1向node3的8088端口发送TCP音讯AddDataNode;
- node3收到AddDataNode后,被动将本人退出集群;
CLI命令解决
命令行解析的代码入口:
// cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { switch cmd { case "add-data": return do_add_data(c) }}
向本机的集群治理端口8084,发送GET /add-data:
func do_add_data(c *CommandLine) error { var node string node = c.CMD[1] // 向本机的8084发送http add-data url := c.getURL("add-data", map[string]string{"node": node}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) ...... return nil}
node1向node3发送AddDataNode
node1解决GET /add-data的handler:
// services/admin_cluster/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/add-data": h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r) } }}
结构了1个AddDataNode的request,而后通过TCP发送给node3:
func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) { ps := r.URL.Query() if len(ps["node"]) != 1 { http.Error(w, "", http.StatusBadRequest) } node := ps["node"][0] req := Request{AddDataNode, "", 0} err := h.call(node, req) h.sendResponse(err, w)}
h.call()发送TCP音讯:AddDataNode
// 发送tcp音讯func (h *handler) call(node string, req Request) error { conn, err := tcp.Dial("tcp", node, MuxHeader) if err != nil { return fmt.Errorf("tcp dial to node %s failed. %v", node, err) } defer conn.Close() if err = json.NewEncoder(conn).Encode(req); err != nil { return fmt.Errorf("encode and send request failed. %v", err) } // read response resp, err := h.readResponse(conn) ... return nil}
node3将本人退出集群
node3接管到TCP音讯:AddDataNode
// services/admin_cluster/tcphandler.gofunc (h *TCPHandler) handleConn(conn net.Conn) { r := &Request{} err := json.NewDecoder(conn).Decode(r) switch r.Type { case AddDataNode: h.handleAddDataNode(r, conn) }}
由h.Server.DataServerJoin()解决节点退出集群:
func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) { if err := h.Server.DataServerJoin(); err != nil { Err := fmt.Sprintf("DataServerJoin failed. %v", err) if err := json.NewEncoder(conn).Encode(Response{Err}); err != nil { h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err)) } } else { if err := json.NewEncoder(conn).Encode(Response{}); err != nil { h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err)) } }}
节点退出集群:
- 由metaClient向集群发送增加DataNode的音讯;
- 本机更新节点信息node.json文件;
// cmd/influxd/run/server.gofunc (s *Server) DataServerJoin() (err error) { if s.config.Data.Enabled { // If we've already created a data node for our id, we're done if _, err = s.MetaClient.DataNode(s.Node.GetDataID()); err == nil { return nil } // 向leader发送增加data节点的的申请 n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) // 重试,直到胜利 for err != nil { log.Printf("Unable to create data node. retry in 1s: %s", err.Error()) time.Sleep(time.Second) n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) } s.Node.ID = n.ID // 更新本机的node.json内容 if serr := s.Node.Save(); serr != nil { return serr } return nil } return fmt.Errorf("Data node is not enabled")}
metaClient.CreateDataNode()理论是向Leader发送Command_CreateDataNodeCommand:
// CreateDataNode will create a new data node in the metastorefunc (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) { cmd := &internal.CreateDataNodeCommand{ HTTPAddr: proto.String(httpAddr), TCPAddr: proto.String(tcpAddr), } if err := c.retryUntilExec(internal.Command_CreateDataNodeCommand, internal.E_CreateDataNodeCommand_Command, cmd); err != nil { return nil, err } n, err := c.DataNodeByTCPHost(tcpAddr) if err != nil { return nil, err } c.nodeID = n.ID return n, nil}
Raft Leader收到Command_CreateDatNodeCommand:
// services/meta/store_fsm.gofunc (fsm *storeFSM) Apply(l *raft.Log) interface{} { var cmd internal.Command if err := proto.Unmarshal(l.Data, &cmd); err != nil { panic(fmt.Errorf("cannot marshal command: %x", l.Data)) } switch cmd.GetType() { case internal.Command_CreateDataNodeCommand: return fsm.applyCreateDataNodeCommand(&cmd) }}
将新节点更新到状态机:
func (fsm *storeFSM) applyCreateDataNodeCommand(cmd *internal.Command) interface{} { ext, _ := proto.GetExtension(cmd, internal.E_CreateDataNodeCommand_Command) v := ext.(*internal.CreateDataNodeCommand) other := fsm.data.Clone() other.CreateDataNode(v.GetHTTPAddr(), v.GetTCPAddr()) fsm.data = other return nil}