influxdb 集群中,client 在 node1 上执行 add dataNode node3 的命令:
influxd_ctl add-data node3:8088
整体流程如下:
- node1 收到 CLI 命令,向本人的 8084 发送 GET /add-data 申请,request body: {“node”: “node3:8088”};
- node1 向 node3 的 8088 端口发送 TCP 音讯 AddDataNode;
- node3 收到 AddDataNode 后,被动将本人退出集群;
CLI 命令解决
命令行解析的代码入口:
// cmd/influxd_ctl/cli/cli.go
func (c *CommandLine) Run() error {
switch cmd {
case "add-data":
return do_add_data(c)
}
}
向本机的集群治理端口 8084,发送 GET /add-data:
func do_add_data(c *CommandLine) error {
var node string
node = c.CMD[1]
// 向本机的 8084 发送 http add-data
url := c.getURL("add-data", map[string]string{"node": node})
resp, err := http.Get(url)
if err != nil {return err}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
......
return nil
}
node1 向 node3 发送 AddDataNode
node1 解决 GET /add-data 的 handler:
// services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
switch r.URL.Path {
case "/add-data":
h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)
}
}
}
结构了 1 个 AddDataNode 的 request,而后通过 TCP 发送给 node3:
func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query()
if len(ps["node"]) != 1 {http.Error(w, "", http.StatusBadRequest)
}
node := ps["node"][0]
req := Request{AddDataNode, "", 0}
err := h.call(node, req)
h.sendResponse(err, w)
}
h.call() 发送 TCP 音讯:AddDataNode
// 发送 tcp 音讯
func (h *handler) call(node string, req Request) error {conn, err := tcp.Dial("tcp", node, MuxHeader)
if err != nil {return fmt.Errorf("tcp dial to node %s failed. %v", node, err)
}
defer conn.Close()
if err = json.NewEncoder(conn).Encode(req); err != nil {return fmt.Errorf("encode and send request failed. %v", err)
}
// read response
resp, err := h.readResponse(conn)
...
return nil
}
node3 将本人退出集群
node3 接管到 TCP 音讯:AddDataNode
// services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {r := &Request{}
err := json.NewDecoder(conn).Decode(r)
switch r.Type {
case AddDataNode:
h.handleAddDataNode(r, conn)
}
}
由 h.Server.DataServerJoin() 解决节点退出集群:
func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {if err := h.Server.DataServerJoin(); err != nil {Err := fmt.Sprintf("DataServerJoin failed. %v", err)
if err := json.NewEncoder(conn).Encode(Response{Err}); err != nil {h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))
}
} else {if err := json.NewEncoder(conn).Encode(Response{}); err != nil {h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))
}
}
}
节点退出集群:
- 由 metaClient 向集群发送增加 DataNode 的音讯;
- 本机更新节点信息 node.json 文件;
// cmd/influxd/run/server.go
func (s *Server) DataServerJoin() (err error) {
if s.config.Data.Enabled {
// If we've already created a data node for our id, we're done
if _, err = s.MetaClient.DataNode(s.Node.GetDataID()); err == nil {return nil}
// 向 leader 发送增加 data 节点的的申请
n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
// 重试,直到胜利
for err != nil {log.Printf("Unable to create data node. retry in 1s: %s", err.Error())
time.Sleep(time.Second)
n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
}
s.Node.ID = n.ID
// 更新本机的 node.json 内容
if serr := s.Node.Save(); serr != nil {return serr}
return nil
}
return fmt.Errorf("Data node is not enabled")
}
metaClient.CreateDataNode() 理论是向 Leader 发送 Command_CreateDataNodeCommand:
// CreateDataNode will create a new data node in the metastore
func (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) {
cmd := &internal.CreateDataNodeCommand{HTTPAddr: proto.String(httpAddr),
TCPAddr: proto.String(tcpAddr),
}
if err := c.retryUntilExec(internal.Command_CreateDataNodeCommand, internal.E_CreateDataNodeCommand_Command, cmd); err != nil {return nil, err}
n, err := c.DataNodeByTCPHost(tcpAddr)
if err != nil {return nil, err}
c.nodeID = n.ID
return n, nil
}
Raft Leader 收到 Command_CreateDatNodeCommand:
// services/meta/store_fsm.go
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
var cmd internal.Command
if err := proto.Unmarshal(l.Data, &cmd); err != nil {panic(fmt.Errorf("cannot marshal command: %x", l.Data))
}
switch cmd.GetType() {
case internal.Command_CreateDataNodeCommand:
return fsm.applyCreateDataNodeCommand(&cmd)
}
}
将新节点更新到状态机:
func (fsm *storeFSM) applyCreateDataNodeCommand(cmd *internal.Command) interface{} {ext, _ := proto.GetExtension(cmd, internal.E_CreateDataNodeCommand_Command)
v := ext.(*internal.CreateDataNodeCommand)
other := fsm.data.Clone()
other.CreateDataNode(v.GetHTTPAddr(), v.GetTCPAddr())
fsm.data = other
return nil
}