关于influxdb:InfluxDB集群-节点部署命令的源码分析

98次阅读

共计 3220 个字符,预计需要花费 9 分钟才能阅读完成。

上文剖析到,InfluxDB 集群的部署,波及到 3 个命令:

  • influxd 过程的启动;
  • 增加集群的 data-nodes;
  • 查问集群的节点信息;

本文联合源码,剖析每一步具体都是怎么实现的。

influxd 过程启动

命令:

# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3

这里重点剖析 -join 参数。
1. 读取参数

//cmd/influxd/run/command.go
func (cmd *Command) ParseFlags(args ...string) (Options, error) {
    .......
    fs.StringVar(&options.Join, "join", "","")
    return options, nil
}
// Run parses the config from args and runs the server.
func (cmd *Command) Run(args ...string) error {
    // Propogate the top-level join options down to the meta config
    if config.Join != "" {config.Meta.JoinPeers = strings.Split(options.Join, ",")
    }
    ....
}

2. 将 peers 退出 Raft
将 joinPeers 作为判断条件,找到所有的 raftAddr,而后传入 Raft-lib 作为初始节点:

//services/meta/store.go
// open opens and initializes the raft store.
func (s *store) open(raftln net.Listener) error {
    joinPeers = s.config.JoinPeers
    var initializePeers []string
    for {peers := c.peers()
        if !Peers(peers).Contains(s.raftAddr) {peers = append(peers, s.raftAddr)
        }
        if len(s.config.JoinPeers)-len(peers) == 0 {
            initializePeers = peers
            break
        }
    }
    // Open the raft store.
    if err := s.openRaft(initializePeers, raftln); err != nil
    ......
}

增加 data nodes

命令:

# influxd_ctl add-data ops3:8088

比方在 node1 上,执行增加 node3 为 dataNode 的命令:

首先在 node1 上进行命令解析 (实际上跟 API 一样的流程);

//services/admin_cluster/handler.go
// ServeHTTP responds to HTTP request to the handler.
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/add-data":
            h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)
        }
    }    
}

而后向 node3 发送 AddDataNode 的 tcp 音讯;

// add new data node to the cluster
func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query()
    if len(ps["node"]) != 1 {http.Error(w, "", http.StatusBadRequest)
    }
    node := ps["node"][0]
    req := Request{AddDataNode, "", 0}
    err := h.call(node, req)
    h.sendResponse(err, w)
}

发送 TCP 音讯:

func (h *handler) call(node string, req Request) error {conn, err := tcp.Dial("tcp", node, MuxHeader)
    if err != nil {return fmt.Errorf("tcp dial to node %s failed. %v", node, err)
    }
    defer conn.Close()
    if err = json.NewEncoder(conn).Encode(req); err != nil {return fmt.Errorf("encode and send request failed. %v", err)
    }
    // read response
    resp, err := h.readResponse(conn)
    ....
    return nil
}

node3 接管解决 AddDataNode 的音讯:

//services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {
    switch r.Type {
    case AddDataNode:
        h.handleAddDataNode(r, conn)
    }
}
func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {err := h.Server.DataServerJoin()
    ...
}

node3 应用 metaclient,带上本人的 httpAddr(8091) 和 tcpAddr(8088),向 meta 集群发送增加节点的申请:node3 会重试,直到胜利为止;

//cmd/influxd/run/server.go
func (s *Server) DataServerJoin() (err error) {n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
    for err != nil {time.Sleep(time.Second)
        n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
    }
}

查问集群节点

命令:

# influxd_ctl show-nodes

查问的过程:

  • 首先通过命令行解析,最终定位到 cluster 的 Get /nodes 代码;
  • 而后由 metaClient 查问集群的所有 metaNodes 和 dataNodes 信息;
//services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/nodes":
            h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r)
        }
        ......
    }
}
func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) {metaNodes, _ := h.MetaClient.MetaNodes()
    dataNodes, _ := h.MetaClient.DataNodes()

    nodes := make(map[string][]meta.NodeInfo)
    nodes["Meta"] = metaNodes
    nodes["Data"] = dataNodes

    w.Header().Add("Content-Type", "application/json")
    w.Write(MarshalJSON(nodes, true))
}

正文完
 0