关于influxdb:InfluxDB集群-节点部署命令的源码分析

上文剖析到,InfluxDB集群的部署,波及到3个命令:

  • influxd过程的启动;
  • 增加集群的data-nodes;
  • 查问集群的节点信息;

本文联合源码,剖析每一步具体都是怎么实现的。

influxd过程启动

命令:

# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3

这里重点剖析-join参数。
1.读取参数

//cmd/influxd/run/command.go
func (cmd *Command) ParseFlags(args ...string) (Options, error) {
    .......
    fs.StringVar(&options.Join, "join", "", "")
    return options, nil
}
// Run parses the config from args and runs the server.
func (cmd *Command) Run(args ...string) error {
    // Propogate the top-level join options down to the meta config
    if config.Join != "" {
        config.Meta.JoinPeers = strings.Split(options.Join, ",")
    }
    ....
}

2.将peers退出Raft
将joinPeers作为判断条件,找到所有的raftAddr,而后传入Raft-lib作为初始节点:

//services/meta/store.go
// open opens and initializes the raft store.
func (s *store) open(raftln net.Listener) error {
    joinPeers = s.config.JoinPeers
    var initializePeers []string
    for {
        peers := c.peers()
        if !Peers(peers).Contains(s.raftAddr) {
            peers = append(peers, s.raftAddr)
        }
        if len(s.config.JoinPeers)-len(peers) == 0 {
            initializePeers = peers
            break
        }
    }
    // Open the raft store.
    if err := s.openRaft(initializePeers, raftln); err != nil
    ......
}

增加data nodes

命令:

# influxd_ctl add-data ops3:8088

比方在node1上,执行增加node3为dataNode的命令:

首先在node1上进行命令解析(实际上跟API一样的流程);

//services/admin_cluster/handler.go
// ServeHTTP responds to HTTP request to the handler.
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/add-data":
            h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)
        }
    }    
}

而后向node3发送AddDataNode的tcp音讯;

// add new data node to the cluster
func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {
    ps := r.URL.Query()
    if len(ps["node"]) != 1 {
        http.Error(w, "", http.StatusBadRequest)
    }
    node := ps["node"][0]
    req := Request{AddDataNode, "", 0}
    err := h.call(node, req)
    h.sendResponse(err, w)
}

发送TCP音讯:

func (h *handler) call(node string, req Request) error {
    conn, err := tcp.Dial("tcp", node, MuxHeader)
    if err != nil {
        return fmt.Errorf("tcp dial to node %s failed. %v", node, err)
    }
    defer conn.Close()
    if err = json.NewEncoder(conn).Encode(req); err != nil {
        return fmt.Errorf("encode and send request failed. %v", err)
    }
    // read response
    resp, err := h.readResponse(conn)
    ....
    return nil
}

node3接管解决AddDataNode的音讯:

//services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {
    switch r.Type {
    case AddDataNode:
        h.handleAddDataNode(r, conn)
    }
}
func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {
    err := h.Server.DataServerJoin()
    ...
}

node3应用metaclient,带上本人的httpAddr(8091)和tcpAddr(8088),向meta集群发送增加节点的申请:node3会重试,直到胜利为止;

//cmd/influxd/run/server.go
func (s *Server) DataServerJoin() (err error) {
    n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
    for err != nil {
        time.Sleep(time.Second)
        n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
    }
}

查问集群节点

命令:

# influxd_ctl show-nodes

查问的过程:

  • 首先通过命令行解析,最终定位到cluster的Get /nodes代码;
  • 而后由metaClient查问集群的所有metaNodes和dataNodes信息;
//services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/nodes":
            h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r)
        }
        ......
    }
}
func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) {
    metaNodes, _ := h.MetaClient.MetaNodes()
    dataNodes, _ := h.MetaClient.DataNodes()

    nodes := make(map[string][]meta.NodeInfo)
    nodes["Meta"] = metaNodes
    nodes["Data"] = dataNodes

    w.Header().Add("Content-Type", "application/json")
    w.Write(MarshalJSON(nodes, true))
}

【腾讯云】云产品限时秒杀,爆款1核2G云服务器,首年50元

阿里云限时活动-2核2G-5M带宽-60G SSD-1000G月流量 ,特惠价99元/年(原价1234.2元/年,可以直接买3年),速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

You may also like...

发表评论

邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据