共计 3484 个字符,预计需要花费 9 分钟才能阅读完成。
influxdb 集群中,client 在 node1 上执行 remove dataNode node3 的命令:
influxd_ctl remove-data node3:8088
整体流程如下:
- node1 收到 CLI 命令,向本人的 8084 发送 GET /remove-data 申请,request body: {“node”:”node3:8088″};
- node1 通过 metaClient 向集群发送 DeleteDataNode();
-
node3 收到 /remove-data 申请后:
- 先告诉集群删除数据节点:metaClient.DeleteDataNode();
- 再删除该节点上的 shards 数据;
CLI 命令解决
命令行解析的代码入口:
// cmd/influxd_ctl/cli/cli.go
func (c *CommandLine) Run() error {
switch cmd {
case "remove-data":
return do_remove_data(c)
}
}
向本机的集群治理端口 8084,发送 GET /remove-data:
func do_remove_data(c *CommandLine) error {
var node string
force := "false"
....
node = fs.Args()[len(fs.Args())-1]
if o.Force {force = "true"}
// 向本人的 8084 发送 HTTP remove-data
url := c.getURL("remove-data", map[string]string{"node": node, "force": force})
resp, err := http.Get(url)
if err != nil {return err}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
...
return nil
}
node1 上 /remove-data 的解决:
// services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
switch r.URL.Path {
case "/remove-data":
h.WrapHandler("remove-data", h.removeDataNode).ServeHTTP(w, r)
}
}
}
handler 对 /remove-data 的解决:
- 向 node3 发送 TCP 音讯:RemoveDataNode;
- 删除集群中该节点信息:metaClient.DeleteDataNode();
func (h *handler) removeDataNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query()
if len(ps["node"]) != 1 {http.Error(w, "", http.StatusBadRequest)
}
var force bool
if len(ps["force"]) == 1 && ps["force"][0] == "true" {force = true}
node := ps["node"][0]
req := Request{RemoveDataNode, "", 0}
err := h.call(node, req)
if err != nil && force {if nodeInfo, merr := h.MetaClient.DataNodeByTCPHost(node); merr != nil {h.sendResponse(merr, w)
} else if merr = h.MetaClient.DeleteDataNode(nodeInfo.ID); merr != nil {h.sendResponse(merr, w)
}
h.sendResponse(nil, w)
} else {h.sendResponse(err, w)
}
}
metaClient.DeleteDataNode() 会向 Leader 发送 Command,而后 Leader 更新状态机 fsm 中的 dataNodes 信息。
node3 解决 TCP RemoveDataNode 音讯
TCPHandler:
// services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {r := &Request{}
err := json.NewDecoder(conn).Decode(r)
switch r.Type {
case RemoveDataNode:
h.handleRemoveDataNode(r, conn)
}
}
handleRemoveDataNode() 的具体内容:
- 删除集群中该节点的信息:metaClient.DeleteDataNode();
- 删除该节点中 shards 信息:TSDB.DeleteDatabase();
func (h *TCPHandler) handleRemoveDataNode(r *Request, conn net.Conn) {
msg := ""
func() {
if h.TSDBStore == nil {msg = fmt.Sprintf("Datanode is not enabled on %s", h.Server.TCPAddr())
return
}
if nodeInfo, err := h.MetaClient.DataNodeByTCPHost(h.Server.TCPAddr()); err != nil {msg = fmt.Sprintf("Datanode %s is not in cluster", h.Server.TCPAddr())
} else {
// update meta first
if err = h.MetaClient.DeleteDataNode(nodeInfo.ID); err != nil {msg = err.Error()
}
// delete data
dbs := h.TSDBStore.Databases()
for _, db := range dbs {if err := h.TSDBStore.DeleteDatabase(db); err != nil {msg = fmt.Sprintf("Remove data node meta successfully. But remove data directory failed. %v Please remove directory manully.", err)
return
}
}
}
}()
if err := json.NewEncoder(conn).Encode(Response{msg}); err != nil {h.Logger.Info("Encode admin tcp resposne failed.", zap.Error(err))
}
}
删除节点中 shards 数据:删除 shards 文件以及 WAL 文件
// tsdb/store.go
func (s *Store) DeleteDatabase(name string) error {s.mu.RLock()
if _, ok := s.databases[name]; !ok {s.mu.RUnlock()
// no files locally, so nothing to do
return nil
}
// 找到负责的 shard
shards := s.filterShards(func(sh *Shard) bool {return sh.database == name})
s.mu.RUnlock()
......
dbPath := filepath.Clean(filepath.Join(s.path, name))
// 删除数据目录
if err := os.RemoveAll(dbPath); err != nil {return err}
// 删除 WAL
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil {return err}
for _, sh := range shards {delete(s.shards, sh.id)
}
// Remove database from store list of databases
delete(s.databases, name)
// Remove shared index for database if using inmem index.
delete(s.indexes, name)
return nil
}
正文完