共计 3220 个字符,预计需要花费 9 分钟才能阅读完成。
上文剖析到,InfluxDB 集群的部署,波及到 3 个命令:
- influxd 过程的启动;
- 增加集群的 data-nodes;
- 查问集群的节点信息;
本文联合源码,剖析每一步具体都是怎么实现的。
influxd 过程启动
命令:
# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3
这里重点剖析 -join 参数。
1. 读取参数
//cmd/influxd/run/command.go | |
func (cmd *Command) ParseFlags(args ...string) (Options, error) { | |
....... | |
fs.StringVar(&options.Join, "join", "","") | |
return options, nil | |
} | |
// Run parses the config from args and runs the server. | |
func (cmd *Command) Run(args ...string) error { | |
// Propogate the top-level join options down to the meta config | |
if config.Join != "" {config.Meta.JoinPeers = strings.Split(options.Join, ",") | |
} | |
.... | |
} |
2. 将 peers 退出 Raft
将 joinPeers 作为判断条件,找到所有的 raftAddr,而后传入 Raft-lib 作为初始节点:
//services/meta/store.go | |
// open opens and initializes the raft store. | |
func (s *store) open(raftln net.Listener) error { | |
joinPeers = s.config.JoinPeers | |
var initializePeers []string | |
for {peers := c.peers() | |
if !Peers(peers).Contains(s.raftAddr) {peers = append(peers, s.raftAddr) | |
} | |
if len(s.config.JoinPeers)-len(peers) == 0 { | |
initializePeers = peers | |
break | |
} | |
} | |
// Open the raft store. | |
if err := s.openRaft(initializePeers, raftln); err != nil | |
...... | |
} |
增加 data nodes
命令:
# influxd_ctl add-data ops3:8088
比方在 node1 上,执行增加 node3 为 dataNode 的命令:
首先在 node1 上进行命令解析 (实际上跟 API 一样的流程);
//services/admin_cluster/handler.go | |
// ServeHTTP responds to HTTP request to the handler. | |
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
switch r.Method { | |
case "GET": | |
switch r.URL.Path { | |
case "/add-data": | |
h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r) | |
} | |
} | |
} |
而后向 node3 发送 AddDataNode 的 tcp 音讯;
// add new data node to the cluster | |
func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {ps := r.URL.Query() | |
if len(ps["node"]) != 1 {http.Error(w, "", http.StatusBadRequest) | |
} | |
node := ps["node"][0] | |
req := Request{AddDataNode, "", 0} | |
err := h.call(node, req) | |
h.sendResponse(err, w) | |
} |
发送 TCP 音讯:
func (h *handler) call(node string, req Request) error {conn, err := tcp.Dial("tcp", node, MuxHeader) | |
if err != nil {return fmt.Errorf("tcp dial to node %s failed. %v", node, err) | |
} | |
defer conn.Close() | |
if err = json.NewEncoder(conn).Encode(req); err != nil {return fmt.Errorf("encode and send request failed. %v", err) | |
} | |
// read response | |
resp, err := h.readResponse(conn) | |
.... | |
return nil | |
} |
node3 接管解决 AddDataNode 的音讯:
//services/admin_cluster/tcphandler.go | |
func (h *TCPHandler) handleConn(conn net.Conn) { | |
switch r.Type { | |
case AddDataNode: | |
h.handleAddDataNode(r, conn) | |
} | |
} | |
func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {err := h.Server.DataServerJoin() | |
... | |
} |
node3 应用 metaclient,带上本人的 httpAddr(8091) 和 tcpAddr(8088),向 meta 集群发送增加节点的申请:node3 会重试,直到胜利为止;
//cmd/influxd/run/server.go | |
func (s *Server) DataServerJoin() (err error) {n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) | |
for err != nil {time.Sleep(time.Second) | |
n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) | |
} | |
} |
查问集群节点
命令:
# influxd_ctl show-nodes
查问的过程:
- 首先通过命令行解析,最终定位到 cluster 的 Get /nodes 代码;
- 而后由 metaClient 查问集群的所有 metaNodes 和 dataNodes 信息;
//services/admin_cluster/handler.go | |
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
switch r.Method { | |
case "GET": | |
switch r.URL.Path { | |
case "/nodes": | |
h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r) | |
} | |
...... | |
} | |
} | |
func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) {metaNodes, _ := h.MetaClient.MetaNodes() | |
dataNodes, _ := h.MetaClient.DataNodes() | |
nodes := make(map[string][]meta.NodeInfo) | |
nodes["Meta"] = metaNodes | |
nodes["Data"] = dataNodes | |
w.Header().Add("Content-Type", "application/json") | |
w.Write(MarshalJSON(nodes, true)) | |
} |
正文完