上文剖析到,InfluxDB 集群的部署,波及到 3 个命令:
- influxd 过程的启动;
- 增加集群的 data-nodes;
- 查问集群的节点信息;
本文联合源码,剖析每一步具体都是怎么实现的。
influxd 过程启动
命令:
# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3
这里重点剖析 -join 参数。
1. 读取参数
//cmd/influxd/run/command.go
func (cmd *Command) ParseFlags(args ...string) (Options, error) {
.......
fs.StringVar(&options.Join, "join", "","")
return options, nil
}
// Run parses the config from args and runs the server.
func (cmd *Command) Run(args ...string) error {
// Propogate the top-level join options down to the meta config
if config.Join != "" {config.Meta.JoinPeers = strings.Split(options.Join, ",")
}
....
}
2. 将 peers 退出 Raft
将 joinPeers 作为判断条件,找到所有的 raftAddr,而后传入 Raft-lib 作为初始节点:
//services/meta/store.go
// open opens and initializes the raft store.
func (s *store) open(raftln net.Listener) error {
joinPeers = s.config.JoinPeers
var initializePeers []string
for {peers := c.peers()
if !Peers(peers).Contains(s.raftAddr) {peers = append(peers, s.raftAddr)
}
if len(s.config.JoinPeers)-len(peers) == 0 {
initializePeers = peers
break
}
}
// Open the raft store.
if err := s.openRaft(initializePeers, raftln); err != nil
......
}
增加 data nodes
命令:
# influxd_ctl add-data ops3:8088
比方在 node1 上,执行增加 node3 为 dataNode 的命令:
首先在 node1 上进行命令解析 (实际上跟 API 一样的流程);
//services/admin_cluster/handler.go
// ServeHTTP responds to HTTP request to the handler.
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
switch r.URL.Path {
case "/add-data":
h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)
}
}
}
而后向 node3 发送 AddDataNode 的 tcp 音讯;
// add new data node to the cluster
func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query()
if len(ps["node"]) != 1 {http.Error(w, "", http.StatusBadRequest)
}
node := ps["node"][0]
req := Request{AddDataNode, "", 0}
err := h.call(node, req)
h.sendResponse(err, w)
}
发送 TCP 音讯:
func (h *handler) call(node string, req Request) error {conn, err := tcp.Dial("tcp", node, MuxHeader)
if err != nil {return fmt.Errorf("tcp dial to node %s failed. %v", node, err)
}
defer conn.Close()
if err = json.NewEncoder(conn).Encode(req); err != nil {return fmt.Errorf("encode and send request failed. %v", err)
}
// read response
resp, err := h.readResponse(conn)
....
return nil
}
node3 接管解决 AddDataNode 的音讯:
//services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {
switch r.Type {
case AddDataNode:
h.handleAddDataNode(r, conn)
}
}
func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {err := h.Server.DataServerJoin()
...
}
node3 应用 metaclient,带上本人的 httpAddr(8091) 和 tcpAddr(8088),向 meta 集群发送增加节点的申请:node3 会重试,直到胜利为止;
//cmd/influxd/run/server.go
func (s *Server) DataServerJoin() (err error) {n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
for err != nil {time.Sleep(time.Second)
n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
}
}
查问集群节点
命令:
# influxd_ctl show-nodes
查问的过程:
- 首先通过命令行解析,最终定位到 cluster 的 Get /nodes 代码;
- 而后由 metaClient 查问集群的所有 metaNodes 和 dataNodes 信息;
//services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
switch r.URL.Path {
case "/nodes":
h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r)
}
......
}
}
func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) {metaNodes, _ := h.MetaClient.MetaNodes()
dataNodes, _ := h.MetaClient.DataNodes()
nodes := make(map[string][]meta.NodeInfo)
nodes["Meta"] = metaNodes
nodes["Data"] = dataNodes
w.Header().Add("Content-Type", "application/json")
w.Write(MarshalJSON(nodes, true))
}