influxdb集群中,client在node1上执行remove metaNode node3的命令:

influxd_ctl remove-meta node3:8091

整体流程如下:

  • node1收到CLI命令,向本人8084端口发送GET /remove-meta的申请,request body: {"metahttp:": "node3:8091"};
  • node1向node3发送GET http://node3:8091/ping,探测node3是否存活;
  • node1执行metaclient.DeleteMetaNode():Raft中删除该节点信息;
  • node向node3发送GET http://node3:8091/remove-meta,node3做一些清理操作;

CLI命令解决

命令行代码入口:

// cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error {    ....    switch cmd {    case "remove-meta":        return do_remove_meta(c)    }}

向本人的8084发送GET /remove-meta:

func do_remove_meta(c *CommandLine) error {    // flag读命令行的参数    fs := flag.NewFlagSet("", flag.ExitOnError)    o := RemoveMetaOptions{}    fs.BoolVar(&o.Force, "force", false, "force remove meta node")    fs.Parse(c.CMD[1:])    httpAddr := fs.Args()[len(fs.Args())-1]    force := "false"    if o.Force {        force = "true"    }    // 向本身的8084发送remote-meta,body中带要删除节点的httpAddr    url := c.getURL("remove-meta", map[string]string{"metahttp": httpAddr, "force": force})    resp, err := http.Get(url)    if err != nil {        return err    }    defer resp.Body.Close()    .....}

admin_cluster监听8084端口,负责集群的治理性能,/remove-meta的handler:

// services/admin_cluster/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {    switch r.Method {    case "GET":        switch r.URL.Path {        case "/remove-meta":            h.WrapHandler("remove-meta", h.removeMetaNode).ServeHTTP(w, r)        }    }}

具体的解决:

  • ping一下node3,看是否存活,若已不存活,则返回谬误;
  • metaClient.DeleteMetaNode()删除集群中该节点的信息;
  • 向node3发送/remove-meta;
func (h *handler) removeMetaNode(w http.ResponseWriter, r *http.Request) {    ps := r.URL.Query()    var metahttp string    metahttp = ps["metahttp"][0]    var force bool    if len(ps["force"]) == 1 && ps["force"][0] == "true" {        force = true    }    metaLive := true    // 先ping一下看移除的节点否活着    // ping meta server first    url := "http://" + metahttp + "/ping"    resp, err := http.Get(url)    if err != nil {        metaLive = false        h.logger.Info("ping meta server failed", zap.Error(err))    }     if !force && !metaLive {        h.sendResponse(fmt.Errorf("Meta node %s could not be connected", metahttp), w)        return    }    err = func() error {        // remove meta node and leave from raft cluster        nodeInfo := h.MetaClient.MetaNodeByAddr(metahttp)        if nodeInfo == nil {            return fmt.Errorf("Meta node %s does not exist in cluster", metahttp)        }        // 删除该meta节点的元信息(Raft)        if err := h.MetaClient.DeleteMetaNode(nodeInfo.ID); err != nil {            return err        }        // 向该节点的8091发送remote-meta        // delete directory        url := "http://" + metahttp + "/remove-meta"        resp, err := http.Get(url)        if err != nil {            return fmt.Errorf("Removed meta node from raft cluster, but remove meta data dir failed. %v", err)        }        body, err := ioutil.ReadAll(resp.Body)        return nil    }()    h.sendResponse(err, w)}

node1向node3发送/ping

node1发送GET http://node3:8091/ping;
nod3的解决:

//services/httpd/handler.go// servePing returns a simple response to let the client know the server is running.func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {    atomic.AddInt64(&h.stats.PingRequests, 1)    h.writeHeader(w, http.StatusNoContent)}

node1删除raft中该节点信息

通过metaClient.DeleteMetaNode()删除集群中该节点的信息;

将删除节点命令封装成1个Command_DeleteMetaNodeCommand,而后retryUtilExec()执行:

// services/meta/client.gofunc (c *Client) DeleteMetaNode(id uint64) error {    cmd := &internal.DeleteMetaNodeCommand{        ID: proto.Uint64(id),    }    return c.retryUntilExec(internal.Command_DeleteMetaNodeCommand, internal.E_DeleteMetaNodeCommand_Command, cmd)}

retryUtilExec()函数执行逻辑

  • 向metaServer发送POST /execute执行Command;
  • 若该metaServer是Follower,则将command转发给Leader;
  • 上述过程重试10次,直到胜利;

Leader解决POST /execute申请,解决DeleteMetaNodeCommand:

// services/meta/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {    switch r.Method {    case "POST":        h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)    }}func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {    var resp *internal.Response    if err := h.store.apply(body); err != nil {        resp = &internal.Response{            OK:    proto.Bool(false),            Error: proto.String(err.Error()),        }    } else {        resp = &internal.Response{            OK:    proto.Bool(false),            Index: proto.Uint64(h.store.index()),        }    }    b, err := proto.Marshal(resp)    w.Header().Add("Content-Type", "application/octet-stream")    w.Write(b)}

数据更新通过store.apply()执行:

func (fsm *storeFSM) Apply(l *raft.Log) interface{} {    var cmd internal.Command    if err := proto.Unmarshal(l.Data, &cmd); err != nil {        panic(fmt.Errorf("cannot marshal command: %x", l.Data))    }    err := func() interface{} {        switch cmd.GetType() {        case internal.Command_DeleteMetaNodeCommand:            return fsm.applyDeleteMetaNodeCommand(&cmd, s)        }    }()    ...}

具体到fsm.applyDeleteMetaNodeComamnd():更新fsm中metaNodes的信息:

func (fsm *storeFSM) applyDeleteMetaNodeCommand(cmd *internal.Command, s *store) interface{} {    ext, _ := proto.GetExtension(cmd, internal.E_DeleteMetaNodeCommand_Command)    v := ext.(*internal.DeleteMetaNodeCommand)    //更新other    other := fsm.data.Clone()    node := other.MetaNode(v.GetID())    if node == nil {        return ErrNodeNotFound    }    //node来到    if err := s.leave(node); err != nil && err != raft.ErrNotLeader {        return err    }    if err := other.DeleteMetaNode(v.GetID()); err != nil {        return err    }    fsm.data = other    return nil}

node1向node3发送/remove-meta

node3接管/remove-meta做一些资源清理操作:

// services/meta/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {     switch r.Method {    case "GET":        switch r.URL.Path {        case "/remove-meta":            h.WrapHandler("remove-meta", h.serveRemoveMeta).ServeHTTP(w, r)        }    }}

资源清理操作:

  • 删除本机的meta目录;
  • 新建本机的meta目录;
  • 保留本机的节点信息node.json文件;
func (h *handler) serveRemoveMeta(w http.ResponseWriter, r *http.Request) {    // admin_cluster have removed meta node from data    // remove meta dir    err := func() error {        // 删除meta目录        // remove contents in meta dir except the node file        if err := os.RemoveAll(h.s.config.Dir); err != nil {            return fmt.Errorf("remove meta dir failed: %v", err)        }        // 新建meta目录        if err := os.Mkdir(h.s.config.Dir, 755); err != nil {            return fmt.Errorf("create empty meta dir failed: %v", err)        }        // 保留node.json        // node file should be kept. Data node still needs it.        if err := h.s.Node.Save(); err != nil {            return fmt.Errorf("save node file failed: %v", err)        }        return nil    }()    if err != nil {        h.httpError(err, w, http.StatusInternalServerError)    } else {        if _, err := w.Write([]byte("OK")); err != nil {            h.logger.Info("Write response error", zap.Error(err))        }    }}