influxdb集群中,client在node1上执行remove dataNode node3的命令:

influxd_ctl remove-data node3:8088

整体流程如下:

  • node1收到CLI命令,向本人的8084发送GET /remove-data申请,request body: {"node":"node3:8088"};
  • node1通过metaClient向集群发送DeleteDataNode();
  • node3收到/remove-data申请后:

    • 先告诉集群删除数据节点:metaClient.DeleteDataNode();
    • 再删除该节点上的shards数据;

CLI命令解决

命令行解析的代码入口:

// cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error {    switch cmd {    case "remove-data":        return do_remove_data(c)    }}

向本机的集群治理端口8084,发送GET /remove-data:

func do_remove_data(c *CommandLine) error {    var node string    force := "false"    ....    node = fs.Args()[len(fs.Args())-1]    if o.Force {        force = "true"    }    // 向本人的8084发送HTTP remove-data    url := c.getURL("remove-data", map[string]string{"node": node, "force": force})    resp, err := http.Get(url)    if err != nil {        return err    }    defer resp.Body.Close()    body, err := ioutil.ReadAll(resp.Body)    ...    return nil}

node1上/remove-data的解决:

// services/admin_cluster/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {    switch r.Method {    case "GET":        switch r.URL.Path {        case "/remove-data":            h.WrapHandler("remove-data", h.removeDataNode).ServeHTTP(w, r)        }    }}

handler对/remove-data的解决:

  • 向node3发送TCP音讯:RemoveDataNode;
  • 删除集群中该节点信息:metaClient.DeleteDataNode();
func (h *handler) removeDataNode(w http.ResponseWriter, r *http.Request) {    ps := r.URL.Query()    if len(ps["node"]) != 1 {        http.Error(w, "", http.StatusBadRequest)    }    var force bool    if len(ps["force"]) == 1 && ps["force"][0] == "true" {        force = true    }    node := ps["node"][0]    req := Request{RemoveDataNode, "", 0}    err := h.call(node, req)    if err != nil && force {              if nodeInfo, merr := h.MetaClient.DataNodeByTCPHost(node); merr != nil {            h.sendResponse(merr, w)        } else if merr = h.MetaClient.DeleteDataNode(nodeInfo.ID); merr != nil {            h.sendResponse(merr, w)        }        h.sendResponse(nil, w)    } else {        h.sendResponse(err, w)    }}

metaClient.DeleteDataNode()会向Leader发送Command,而后Leader更新状态机fsm中的dataNodes信息。

node3解决TCP RemoveDataNode音讯

TCPHandler:

// services/admin_cluster/tcphandler.gofunc (h *TCPHandler) handleConn(conn net.Conn) {    r := &Request{}    err := json.NewDecoder(conn).Decode(r)    switch r.Type {    case RemoveDataNode:        h.handleRemoveDataNode(r, conn)    }}

handleRemoveDataNode()的具体内容:

  • 删除集群中该节点的信息:metaClient.DeleteDataNode();
  • 删除该节点中shards信息:TSDB.DeleteDatabase();
func (h *TCPHandler) handleRemoveDataNode(r *Request, conn net.Conn) {    msg := ""    func() {        if h.TSDBStore == nil {            msg = fmt.Sprintf("Datanode is not enabled on %s", h.Server.TCPAddr())            return        }        if nodeInfo, err := h.MetaClient.DataNodeByTCPHost(h.Server.TCPAddr()); err != nil {            msg = fmt.Sprintf("Datanode %s is not in cluster", h.Server.TCPAddr())        } else {            // update meta first            if err = h.MetaClient.DeleteDataNode(nodeInfo.ID); err != nil {                msg = err.Error()            }            // delete data            dbs := h.TSDBStore.Databases()            for _, db := range dbs {                if err := h.TSDBStore.DeleteDatabase(db); err != nil {                    msg = fmt.Sprintf("Remove data node meta successfully. But remove data directory failed. %v Please remove directory manully.", err)                    return                }            }        }    }()    if err := json.NewEncoder(conn).Encode(Response{msg}); err != nil {        h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))    }}

删除节点中shards数据:删除shards文件以及WAL文件

// tsdb/store.gofunc (s *Store) DeleteDatabase(name string) error {    s.mu.RLock()    if _, ok := s.databases[name]; !ok {        s.mu.RUnlock()        // no files locally, so nothing to do        return nil    }    // 找到负责的shard    shards := s.filterShards(func(sh *Shard) bool {        return sh.database == name    })    s.mu.RUnlock()    ......    dbPath := filepath.Clean(filepath.Join(s.path, name))    //删除数据目录    if err := os.RemoveAll(dbPath); err != nil {        return err    }    //删除WAL    if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil {        return err    }    for _, sh := range shards {        delete(s.shards, sh.id)    }    // Remove database from store list of databases    delete(s.databases, name)    // Remove shared index for database if using inmem index.    delete(s.indexes, name)    return nil}