上文剖析到,InfluxDB集群的部署,波及到3个命令:

  • influxd过程的启动;
  • 增加集群的data-nodes;
  • 查问集群的节点信息;

本文联合源码,剖析每一步具体都是怎么实现的。

influxd过程启动

命令:

# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3

这里重点剖析-join参数。
1.读取参数

//cmd/influxd/run/command.gofunc (cmd *Command) ParseFlags(args ...string) (Options, error) {    .......    fs.StringVar(&options.Join, "join", "", "")    return options, nil}// Run parses the config from args and runs the server.func (cmd *Command) Run(args ...string) error {    // Propogate the top-level join options down to the meta config    if config.Join != "" {        config.Meta.JoinPeers = strings.Split(options.Join, ",")    }    ....}

2.将peers退出Raft
将joinPeers作为判断条件,找到所有的raftAddr,而后传入Raft-lib作为初始节点:

//services/meta/store.go// open opens and initializes the raft store.func (s *store) open(raftln net.Listener) error {    joinPeers = s.config.JoinPeers    var initializePeers []string    for {        peers := c.peers()        if !Peers(peers).Contains(s.raftAddr) {            peers = append(peers, s.raftAddr)        }        if len(s.config.JoinPeers)-len(peers) == 0 {            initializePeers = peers            break        }    }    // Open the raft store.    if err := s.openRaft(initializePeers, raftln); err != nil    ......}

增加data nodes

命令:

# influxd_ctl add-data ops3:8088

比方在node1上,执行增加node3为dataNode的命令:

首先在node1上进行命令解析(实际上跟API一样的流程);

//services/admin_cluster/handler.go// ServeHTTP responds to HTTP request to the handler.func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {    switch r.Method {    case "GET":        switch r.URL.Path {        case "/add-data":            h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)        }    }    }

而后向node3发送AddDataNode的tcp音讯;

// add new data node to the clusterfunc (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {    ps := r.URL.Query()    if len(ps["node"]) != 1 {        http.Error(w, "", http.StatusBadRequest)    }    node := ps["node"][0]    req := Request{AddDataNode, "", 0}    err := h.call(node, req)    h.sendResponse(err, w)}

发送TCP音讯:

func (h *handler) call(node string, req Request) error {    conn, err := tcp.Dial("tcp", node, MuxHeader)    if err != nil {        return fmt.Errorf("tcp dial to node %s failed. %v", node, err)    }    defer conn.Close()    if err = json.NewEncoder(conn).Encode(req); err != nil {        return fmt.Errorf("encode and send request failed. %v", err)    }    // read response    resp, err := h.readResponse(conn)    ....    return nil}

node3接管解决AddDataNode的音讯:

//services/admin_cluster/tcphandler.gofunc (h *TCPHandler) handleConn(conn net.Conn) {    switch r.Type {    case AddDataNode:        h.handleAddDataNode(r, conn)    }}func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {    err := h.Server.DataServerJoin()    ...}

node3应用metaclient,带上本人的httpAddr(8091)和tcpAddr(8088),向meta集群发送增加节点的申请:node3会重试,直到胜利为止;

//cmd/influxd/run/server.gofunc (s *Server) DataServerJoin() (err error) {    n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())    for err != nil {        time.Sleep(time.Second)        n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())    }}

查问集群节点

命令:

# influxd_ctl show-nodes

查问的过程:

  • 首先通过命令行解析,最终定位到cluster的Get /nodes代码;
  • 而后由metaClient查问集群的所有metaNodes和dataNodes信息;
//services/admin_cluster/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {    switch r.Method {    case "GET":        switch r.URL.Path {        case "/nodes":            h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r)        }        ......    }}func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) {    metaNodes, _ := h.MetaClient.MetaNodes()    dataNodes, _ := h.MetaClient.DataNodes()    nodes := make(map[string][]meta.NodeInfo)    nodes["Meta"] = metaNodes    nodes["Data"] = dataNodes    w.Header().Add("Content-Type", "application/json")    w.Write(MarshalJSON(nodes, true))}