乐趣区

关于influxdb:InfluxDB集群-移除MetaNode源码分析

influxdb 集群中,client 在 node1 上执行 remove metaNode node3 的命令:

influxd_ctl remove-meta node3:8091

整体流程如下:

  • node1 收到 CLI 命令,向本人 8084 端口发送 GET /remove-meta 的申请,request body: {“metahttp:”: “node3:8091”};
  • node1 向 node3 发送 GET http://node3:8091/ping,探测 node3 是否存活;
  • node1 执行 metaclient.DeleteMetaNode():Raft 中删除该节点信息;
  • node 向 node3 发送 GET http://node3:8091/remove-meta,node3 做一些清理操作;

CLI 命令解决

命令行代码入口:

// cmd/influxd_ctl/cli/cli.go
func (c *CommandLine) Run() error {
    ....
    switch cmd {
    case "remove-meta":
        return do_remove_meta(c)
    }
}

向本人的 8084 发送 GET /remove-meta:

func do_remove_meta(c *CommandLine) error {
    // flag 读命令行的参数
    fs := flag.NewFlagSet("", flag.ExitOnError)
    o := RemoveMetaOptions{}
    fs.BoolVar(&o.Force, "force", false, "force remove meta node")
    fs.Parse(c.CMD[1:])
    httpAddr := fs.Args()[len(fs.Args())-1]
    force := "false"
    if o.Force {force = "true"}
    // 向本身的 8084 发送 remote-meta,body 中带要删除节点的 httpAddr
    url := c.getURL("remove-meta", map[string]string{"metahttp": httpAddr, "force": force})
    resp, err := http.Get(url)
    if err != nil {return err}
    defer resp.Body.Close()
    .....
}

admin_cluster 监听 8084 端口,负责集群的治理性能,/remove-meta 的 handler:

// services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/remove-meta":
            h.WrapHandler("remove-meta", h.removeMetaNode).ServeHTTP(w, r)
        }
    }
}

具体的解决:

  • ping 一下 node3,看是否存活,若已不存活,则返回谬误;
  • metaClient.DeleteMetaNode() 删除集群中该节点的信息;
  • 向 node3 发送 /remove-meta;
func (h *handler) removeMetaNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query()
    var metahttp string
    metahttp = ps["metahttp"][0]
    var force bool
    if len(ps["force"]) == 1 && ps["force"][0] == "true" {force = true}
    metaLive := true
    // 先 ping 一下看移除的节点否活着
    // ping meta server first
    url := "http://" + metahttp + "/ping"
    resp, err := http.Get(url)
    if err != nil {
        metaLive = false
        h.logger.Info("ping meta server failed", zap.Error(err))
    } 
    if !force && !metaLive {h.sendResponse(fmt.Errorf("Meta node %s could not be connected", metahttp), w)
        return
    }
    err = func() error {
        // remove meta node and leave from raft cluster
        nodeInfo := h.MetaClient.MetaNodeByAddr(metahttp)
        if nodeInfo == nil {return fmt.Errorf("Meta node %s does not exist in cluster", metahttp)
        }
        // 删除该 meta 节点的元信息 (Raft)
        if err := h.MetaClient.DeleteMetaNode(nodeInfo.ID); err != nil {return err}
        // 向该节点的 8091 发送 remote-meta
        // delete directory
        url := "http://" + metahttp + "/remove-meta"
        resp, err := http.Get(url)
        if err != nil {return fmt.Errorf("Removed meta node from raft cluster, but remove meta data dir failed. %v", err)
        }
        body, err := ioutil.ReadAll(resp.Body)
        return nil
    }()
    h.sendResponse(err, w)
}

node1 向 node3 发送 /ping

node1 发送 GET http://node3:8091/ping;
nod3 的解决:

//services/httpd/handler.go
// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {atomic.AddInt64(&h.stats.PingRequests, 1)
    h.writeHeader(w, http.StatusNoContent)
}

node1 删除 raft 中该节点信息

通过 metaClient.DeleteMetaNode() 删除集群中该节点的信息;

将删除节点命令封装成 1 个 Command_DeleteMetaNodeCommand,而后 retryUtilExec() 执行:

// services/meta/client.go
func (c *Client) DeleteMetaNode(id uint64) error {
    cmd := &internal.DeleteMetaNodeCommand{ID: proto.Uint64(id),
    }
    return c.retryUntilExec(internal.Command_DeleteMetaNodeCommand, internal.E_DeleteMetaNodeCommand_Command, cmd)
}

retryUtilExec() 函数执行逻辑

  • 向 metaServer 发送 POST /execute 执行 Command;
  • 若该 metaServer 是 Follower,则将 command 转发给 Leader;
  • 上述过程重试 10 次,直到胜利;

Leader 解决 POST /execute 申请,解决 DeleteMetaNodeCommand:

// services/meta/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "POST":
        h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)
    }
}
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
    var resp *internal.Response
    if err := h.store.apply(body); err != nil {
        resp = &internal.Response{OK:    proto.Bool(false),
            Error: proto.String(err.Error()),
        }
    } else {
        resp = &internal.Response{OK:    proto.Bool(false),
            Index: proto.Uint64(h.store.index()),
        }
    }
    b, err := proto.Marshal(resp)
    w.Header().Add("Content-Type", "application/octet-stream")
    w.Write(b)
}

数据更新通过 store.apply() 执行:

func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
    var cmd internal.Command
    if err := proto.Unmarshal(l.Data, &cmd); err != nil {panic(fmt.Errorf("cannot marshal command: %x", l.Data))
    }
    err := func() interface{} {switch cmd.GetType() {
        case internal.Command_DeleteMetaNodeCommand:
            return fsm.applyDeleteMetaNodeCommand(&cmd, s)
        }
    }()
    ...
}

具体到 fsm.applyDeleteMetaNodeComamnd():更新 fsm 中 metaNodes 的信息:

func (fsm *storeFSM) applyDeleteMetaNodeCommand(cmd *internal.Command, s *store) interface{} {ext, _ := proto.GetExtension(cmd, internal.E_DeleteMetaNodeCommand_Command)
    v := ext.(*internal.DeleteMetaNodeCommand)
    // 更新 other
    other := fsm.data.Clone()
    node := other.MetaNode(v.GetID())
    if node == nil {return ErrNodeNotFound}
    //node 来到
    if err := s.leave(node); err != nil && err != raft.ErrNotLeader {return err}
    if err := other.DeleteMetaNode(v.GetID()); err != nil {return err}
    fsm.data = other
    return nil
}

node1 向 node3 发送 /remove-meta

node3 接管 /remove-meta 做一些资源清理操作:

// services/meta/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/remove-meta":
            h.WrapHandler("remove-meta", h.serveRemoveMeta).ServeHTTP(w, r)
        }
    }
}

资源清理操作:

  • 删除本机的 meta 目录;
  • 新建本机的 meta 目录;
  • 保留本机的节点信息 node.json 文件;
func (h *handler) serveRemoveMeta(w http.ResponseWriter, r *http.Request) {
    // admin_cluster have removed meta node from data
    // remove meta dir

    err := func() error {
        // 删除 meta 目录
        // remove contents in meta dir except the node file
        if err := os.RemoveAll(h.s.config.Dir); err != nil {return fmt.Errorf("remove meta dir failed: %v", err)
        }
        // 新建 meta 目录
        if err := os.Mkdir(h.s.config.Dir, 755); err != nil {return fmt.Errorf("create empty meta dir failed: %v", err)
        }
        // 保留 node.json
        // node file should be kept. Data node still needs it.
        if err := h.s.Node.Save(); err != nil {return fmt.Errorf("save node file failed: %v", err)
        }
        return nil
    }()
    if err != nil {h.httpError(err, w, http.StatusInternalServerError)
    } else {if _, err := w.Write([]byte("OK")); err != nil {h.logger.Info("Write response error", zap.Error(err))
        }
    }
}
退出移动版