上文剖析到,InfluxDB集群的部署,波及到3个命令:
- influxd过程的启动;
- 增加集群的data-nodes;
- 查问集群的节点信息;
本文联合源码,剖析每一步具体都是怎么实现的。
influxd过程启动
命令:
# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3
这里重点剖析-join参数。
1.读取参数
//cmd/influxd/run/command.gofunc (cmd *Command) ParseFlags(args ...string) (Options, error) { ....... fs.StringVar(&options.Join, "join", "", "") return options, nil}// Run parses the config from args and runs the server.func (cmd *Command) Run(args ...string) error { // Propogate the top-level join options down to the meta config if config.Join != "" { config.Meta.JoinPeers = strings.Split(options.Join, ",") } ....}
2.将peers退出Raft
将joinPeers作为判断条件,找到所有的raftAddr,而后传入Raft-lib作为初始节点:
//services/meta/store.go// open opens and initializes the raft store.func (s *store) open(raftln net.Listener) error { joinPeers = s.config.JoinPeers var initializePeers []string for { peers := c.peers() if !Peers(peers).Contains(s.raftAddr) { peers = append(peers, s.raftAddr) } if len(s.config.JoinPeers)-len(peers) == 0 { initializePeers = peers break } } // Open the raft store. if err := s.openRaft(initializePeers, raftln); err != nil ......}
增加data nodes
命令:
# influxd_ctl add-data ops3:8088
比方在node1上,执行增加node3为dataNode的命令:
首先在node1上进行命令解析(实际上跟API一样的流程);
//services/admin_cluster/handler.go// ServeHTTP responds to HTTP request to the handler.func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/add-data": h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r) } } }
而后向node3发送AddDataNode的tcp音讯;
// add new data node to the clusterfunc (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) { ps := r.URL.Query() if len(ps["node"]) != 1 { http.Error(w, "", http.StatusBadRequest) } node := ps["node"][0] req := Request{AddDataNode, "", 0} err := h.call(node, req) h.sendResponse(err, w)}
发送TCP音讯:
func (h *handler) call(node string, req Request) error { conn, err := tcp.Dial("tcp", node, MuxHeader) if err != nil { return fmt.Errorf("tcp dial to node %s failed. %v", node, err) } defer conn.Close() if err = json.NewEncoder(conn).Encode(req); err != nil { return fmt.Errorf("encode and send request failed. %v", err) } // read response resp, err := h.readResponse(conn) .... return nil}
node3接管解决AddDataNode的音讯:
//services/admin_cluster/tcphandler.gofunc (h *TCPHandler) handleConn(conn net.Conn) { switch r.Type { case AddDataNode: h.handleAddDataNode(r, conn) }}func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) { err := h.Server.DataServerJoin() ...}
node3应用metaclient,带上本人的httpAddr(8091)和tcpAddr(8088),向meta集群发送增加节点的申请:node3会重试,直到胜利为止;
//cmd/influxd/run/server.gofunc (s *Server) DataServerJoin() (err error) { n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) for err != nil { time.Sleep(time.Second) n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) }}
查问集群节点
命令:
# influxd_ctl show-nodes
查问的过程:
- 首先通过命令行解析,最终定位到cluster的Get /nodes代码;
- 而后由metaClient查问集群的所有metaNodes和dataNodes信息;
//services/admin_cluster/handler.gofunc (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/nodes": h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r) } ...... }}func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) { metaNodes, _ := h.MetaClient.MetaNodes() dataNodes, _ := h.MetaClient.DataNodes() nodes := make(map[string][]meta.NodeInfo) nodes["Meta"] = metaNodes nodes["Data"] = dataNodes w.Header().Add("Content-Type", "application/json") w.Write(MarshalJSON(nodes, true))}