influxdb集群中,client在node1上执行remove dataNode node3的命令:
influxd_ctl remove-data node3:8088
整体流程如下:
- node1收到CLI命令,向本人的8084发送GET /remove-data申请,request body: {"node":"node3:8088"};
- node1通过metaClient向集群发送DeleteDataNode();
node3收到/remove-data申请后:
- 先告诉集群删除数据节点:metaClient.DeleteDataNode();
- 再删除该节点上的shards数据;
CLI命令解决
命令行解析的代码入口:
// cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { switch cmd { case "remove-data": return do_remove_data(c) }}
向本机的集群治理端口8084,发送GET /remove-data:
func do_remove_data(c *CommandLine) error { var node string force := "false" .... node = fs.Args()[len(fs.Args())-1] if o.Force { force = "true" } // 向本人的8084发送HTTP remove-data url := c.getURL("remove-data", map[string]string{"node": node, "force": force}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) ... return nil}
node1上/remove-data的解决:
// services/admin_cluster/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/remove-data": h.WrapHandler("remove-data", h.removeDataNode).ServeHTTP(w, r) } }}
handler对/remove-data的解决:
- 向node3发送TCP音讯:RemoveDataNode;
- 删除集群中该节点信息:metaClient.DeleteDataNode();
func (h *handler) removeDataNode(w http.ResponseWriter, r *http.Request) { ps := r.URL.Query() if len(ps["node"]) != 1 { http.Error(w, "", http.StatusBadRequest) } var force bool if len(ps["force"]) == 1 && ps["force"][0] == "true" { force = true } node := ps["node"][0] req := Request{RemoveDataNode, "", 0} err := h.call(node, req) if err != nil && force { if nodeInfo, merr := h.MetaClient.DataNodeByTCPHost(node); merr != nil { h.sendResponse(merr, w) } else if merr = h.MetaClient.DeleteDataNode(nodeInfo.ID); merr != nil { h.sendResponse(merr, w) } h.sendResponse(nil, w) } else { h.sendResponse(err, w) }}
metaClient.DeleteDataNode()会向Leader发送Command,而后Leader更新状态机fsm中的dataNodes信息。
node3解决TCP RemoveDataNode音讯
TCPHandler:
// services/admin_cluster/tcphandler.gofunc (h *TCPHandler) handleConn(conn net.Conn) { r := &Request{} err := json.NewDecoder(conn).Decode(r) switch r.Type { case RemoveDataNode: h.handleRemoveDataNode(r, conn) }}
handleRemoveDataNode()的具体内容:
- 删除集群中该节点的信息:metaClient.DeleteDataNode();
- 删除该节点中shards信息:TSDB.DeleteDatabase();
func (h *TCPHandler) handleRemoveDataNode(r *Request, conn net.Conn) { msg := "" func() { if h.TSDBStore == nil { msg = fmt.Sprintf("Datanode is not enabled on %s", h.Server.TCPAddr()) return } if nodeInfo, err := h.MetaClient.DataNodeByTCPHost(h.Server.TCPAddr()); err != nil { msg = fmt.Sprintf("Datanode %s is not in cluster", h.Server.TCPAddr()) } else { // update meta first if err = h.MetaClient.DeleteDataNode(nodeInfo.ID); err != nil { msg = err.Error() } // delete data dbs := h.TSDBStore.Databases() for _, db := range dbs { if err := h.TSDBStore.DeleteDatabase(db); err != nil { msg = fmt.Sprintf("Remove data node meta successfully. But remove data directory failed. %v Please remove directory manully.", err) return } } } }() if err := json.NewEncoder(conn).Encode(Response{msg}); err != nil { h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err)) }}
删除节点中shards数据:删除shards文件以及WAL文件
// tsdb/store.gofunc (s *Store) DeleteDatabase(name string) error { s.mu.RLock() if _, ok := s.databases[name]; !ok { s.mu.RUnlock() // no files locally, so nothing to do return nil } // 找到负责的shard shards := s.filterShards(func(sh *Shard) bool { return sh.database == name }) s.mu.RUnlock() ...... dbPath := filepath.Clean(filepath.Join(s.path, name)) //删除数据目录 if err := os.RemoveAll(dbPath); err != nil { return err } //删除WAL if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil { return err } for _, sh := range shards { delete(s.shards, sh.id) } // Remove database from store list of databases delete(s.databases, name) // Remove shared index for database if using inmem index. delete(s.indexes, name) return nil}