关于influxdb:InfluxDB集群-移除DataNode源码分析

2次阅读

共计 3484 个字符,预计需要花费 9 分钟才能阅读完成。

influxdb 集群中,client 在 node1 上执行 remove dataNode node3 的命令:

influxd_ctl remove-data node3:8088

整体流程如下:

  • node1 收到 CLI 命令,向本人的 8084 发送 GET /remove-data 申请,request body: {“node”:”node3:8088″};
  • node1 通过 metaClient 向集群发送 DeleteDataNode();
  • node3 收到 /remove-data 申请后:

    • 先告诉集群删除数据节点:metaClient.DeleteDataNode();
    • 再删除该节点上的 shards 数据;

CLI 命令解决

命令行解析的代码入口:

// cmd/influxd_ctl/cli/cli.go
func (c *CommandLine) Run() error {
    switch cmd {
    case "remove-data":
        return do_remove_data(c)
    }
}

向本机的集群治理端口 8084,发送 GET /remove-data:

func do_remove_data(c *CommandLine) error {
    var node string
    force := "false"
    ....
    node = fs.Args()[len(fs.Args())-1]
    if o.Force {force = "true"}
    // 向本人的 8084 发送 HTTP remove-data
    url := c.getURL("remove-data", map[string]string{"node": node, "force": force})
    resp, err := http.Get(url)
    if err != nil {return err}
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    ...
    return nil
}

node1 上 /remove-data 的解决:

// services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/remove-data":
            h.WrapHandler("remove-data", h.removeDataNode).ServeHTTP(w, r)
        }
    }
}

handler 对 /remove-data 的解决:

  • 向 node3 发送 TCP 音讯:RemoveDataNode;
  • 删除集群中该节点信息:metaClient.DeleteDataNode();
func (h *handler) removeDataNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query()
    if len(ps["node"]) != 1 {http.Error(w, "", http.StatusBadRequest)
    }
    var force bool
    if len(ps["force"]) == 1 && ps["force"][0] == "true" {force = true}
    node := ps["node"][0]
    req := Request{RemoveDataNode, "", 0}
    err := h.call(node, req)
    if err != nil && force {if nodeInfo, merr := h.MetaClient.DataNodeByTCPHost(node); merr != nil {h.sendResponse(merr, w)
        } else if merr = h.MetaClient.DeleteDataNode(nodeInfo.ID); merr != nil {h.sendResponse(merr, w)
        }
        h.sendResponse(nil, w)
    } else {h.sendResponse(err, w)
    }
}

metaClient.DeleteDataNode() 会向 Leader 发送 Command,而后 Leader 更新状态机 fsm 中的 dataNodes 信息。

node3 解决 TCP RemoveDataNode 音讯

TCPHandler:

// services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {r := &Request{}
    err := json.NewDecoder(conn).Decode(r)
    switch r.Type {
    case RemoveDataNode:
        h.handleRemoveDataNode(r, conn)
    }
}

handleRemoveDataNode() 的具体内容:

  • 删除集群中该节点的信息:metaClient.DeleteDataNode();
  • 删除该节点中 shards 信息:TSDB.DeleteDatabase();
func (h *TCPHandler) handleRemoveDataNode(r *Request, conn net.Conn) {
    msg := ""
    func() {
        if h.TSDBStore == nil {msg = fmt.Sprintf("Datanode is not enabled on %s", h.Server.TCPAddr())
            return
        }
        if nodeInfo, err := h.MetaClient.DataNodeByTCPHost(h.Server.TCPAddr()); err != nil {msg = fmt.Sprintf("Datanode %s is not in cluster", h.Server.TCPAddr())
        } else {
            // update meta first
            if err = h.MetaClient.DeleteDataNode(nodeInfo.ID); err != nil {msg = err.Error()
            }
            // delete data
            dbs := h.TSDBStore.Databases()
            for _, db := range dbs {if err := h.TSDBStore.DeleteDatabase(db); err != nil {msg = fmt.Sprintf("Remove data node meta successfully. But remove data directory failed. %v Please remove directory manully.", err)
                    return
                }
            }
        }
    }()
    if err := json.NewEncoder(conn).Encode(Response{msg}); err != nil {h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))
    }
}

删除节点中 shards 数据:删除 shards 文件以及 WAL 文件

// tsdb/store.go
func (s *Store) DeleteDatabase(name string) error {s.mu.RLock()
    if _, ok := s.databases[name]; !ok {s.mu.RUnlock()
        // no files locally, so nothing to do
        return nil
    }
    // 找到负责的 shard
    shards := s.filterShards(func(sh *Shard) bool {return sh.database == name})
    s.mu.RUnlock()
    ......
    dbPath := filepath.Clean(filepath.Join(s.path, name))

    // 删除数据目录
    if err := os.RemoveAll(dbPath); err != nil {return err}
    // 删除 WAL
    if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil {return err}
    for _, sh := range shards {delete(s.shards, sh.id)
    }
    // Remove database from store list of databases
    delete(s.databases, name)
    // Remove shared index for database if using inmem index.
    delete(s.indexes, name)
    return nil
}
正文完
 0