乐趣区

关于influxdb:InfluxDB集群-hintedhandoff源码分析三points发送到远端节点

在本机节点上,给每个远端节点调配一个 NodeProcessor 对象,负责数据的写入和数据的读取。

NodeProcessor 定期的读取本机队列中的数据,而后将其发送给远端节点,尝试写入远端 shard。

// services/hh/node_processor.go
func (n *NodeProcessor) run() {
    ......
    for {case <-time.After(currInterval):
        limiter := NewRateLimiter(n.RetryRateLimit)    // 限流
        for {c, err := n.SendWrite()
            ...
            limiter.Update(c)
            time.Sleep(limiter.Delay())
        }
    }
}

读取和发送过程

  • 首先,ping 远端节点,判断其是否沉闷,如果不沉闷,则间接返回;
  • 而后,读 Head segment 中的 block,读操作均是从 segment 中的 pos 处开始;
  • 而后,将读到的 block 数据反序列化,发送给远端节点;
  • 最初,更新 head segment 中的 pos;
// services/hh/node_processor.go
// SendWrite attempts to sent the current block of hinted data to the target node. If successful,
// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF
// when there is no more data or the node is inactive.
func (n *NodeProcessor) SendWrite() (int, error) {
    //ping 远端节点,判断是否沉闷
    active, err := n.Active()
    if err != nil {return 0, err}    
    // 读 head segment 中:pos 开始的 block
    // Get the current block from the queue
    buf, err := n.queue.Current()
    
    // 反序列化 
    // unmarshal the byte slice back to shard ID and points
    shardID, points, err := unmarshalWrite(buf)
    
    // 写入远端节点
    if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil {atomic.AddInt64(&n.stats.WriteNodeReqFail, 1)
        return 0, err
    }
    // 更新 head segment 的 pos,下次依然从 pos 读取
    if err := n.queue.Advance(); err != nil {n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err))
    }
    return len(buf), nil
}

1. 判断节点是否沉闷

先依据 nodeId 查问 node 信息(ip):

//services/hh/node_processor.go
// Active returns whether this node processor is for a currently active node.
func (n *NodeProcessor) Active() (bool, error) {nio, err := n.meta.DataNode(n.nodeID)    
    if nio == nil {return false, nil}
    ping := n.Ping(nio)
    return ping == nil, nil
}
// Ping returns whether this node processor is for a currently active node.
func (n *NodeProcessor) Ping(node *meta.NodeInfo) error {return n.ping(node.Host)
}

ping 的动作,理论是发送 HTTP GET http://ip:8091/ping:

//services/hh/node_processor.go
// Ping returns whether this node processor is for a currently active node.
func (n *NodeProcessor) ping(host string) error {
    url := "http://" + host + "/ping"
    request, _ := http.NewRequest("GET", url, nil)
    transport := http.Transport{DisableKeepAlives: true,}
    client := &http.Client{
        Transport: &transport,
        Timeout:   time.Duration(3) * time.Second,
    }
    resp, err := client.Do(request)
    if err != nil {return err}
    defer resp.Body.Close()
    if resp.StatusCode == http.StatusNoContent {return nil}
    return fmt.Errorf("ping %s fail", url)
}

2. 读 head segment 中的 block

从 head segment 读取:

  • 先定位到 segment 的 pos 地位,pos 即下一个要读取的 block;
  • 依照 block 格局,先读 block len,再读 block data;
//services/hh/queue.go
// current returns byte slice that the current segment points
func (l *segment) current() ([]byte, error) {
    // 定位到 pos
    if err := l.seekToCurrent(); err != nil {return nil, err}
    // 先读 block len
    // read the record size
    sz, err := l.readUint64()
    l.currentSize = int64(sz)
    // 再读 block data
    b := make([]byte, sz)
    if err := l.readBytes(b); err != nil {return nil, err}
    return b, nil
}

如何定位到 segment 的 pos 地位?实际上都是文件操作:

//services/hh/queue.go
func (l *segment) seekToCurrent() error {return l.seek(int64(l.pos))
}
func (l *segment) seek(pos int64) error {n, err := l.file.Seek(pos, os.SEEK_SET)
    if err != nil {return err}
    if n != pos {return fmt.Errorf("bad seek. exp %v, got %v", 0, n)
    }
    return nil
}

3. 将读到的 block 数据反序列化,写入远端节点

反序列化失去 points,依照跟序列化相同的格局:

  • 先读 8byte 的 shardId;
  • 而后逐行 (\n) 解析 point;
//services/hh/node_processor.go
func unmarshalWrite(b []byte) (uint64, []models.Point, error) {if len(b) < 8 {return 0, nil, fmt.Errorf("too short: len = %d", len(b))
    }
    ownerID := binary.BigEndian.Uint64(b[:8])
    points, err := models.ParsePoints(b[8:])
    return ownerID, points, err
}
//models/points.go
func ParsePoints(buf []byte) ([]Point, error) {return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")
}
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
    for pos < len(buf) {pos, block = scanLine(buf, pos)
        pos++
        start := skipWhitespace(block, 0)
        pt, err := parsePoint(block[start:], defaultTime, precision)
        points = append(points, pt)
    }
    return points, nil
}

应用 shardWriter 将 points 写入远端节点:

//cluster/shard_writer.go
// WriteShard writes time series points to a shard
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {c, err := w.dial(ownerID)
    conn, ok := c.(*pooledConn)
    // Determine the location of this shard and whether it still exists
    db, rp, sgi := w.MetaClient.ShardOwner(shardID)    
    // Build write request.
    var request WriteShardRequest
    request.SetShardID(shardID)
    request.SetDatabase(db)
    request.SetRetentionPolicy(rp)
    request.AddPoints(points)
    //points 序列化
    // Marshal into protocol buffers.
    buf, err := request.MarshalBinary()
    //points 依照 TLV 格局写入 conn
    // Write request.
    conn.SetWriteDeadline(time.Now().Add(w.timeout))
    if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {conn.MarkUnusable()
        return err
    }
    // 读取 response
    // Read the response.
    conn.SetReadDeadline(time.Now().Add(w.timeout))
    _, buf, err = ReadTLV(conn)
    if err != nil {conn.MarkUnusable()
        return err
    }    
    ....
    return nil
}

4. 更新 head segment 的 pos

更新 head segment 的 pos,下次持续从 pos 读取:

  • 先尝试挪动 head segment: head.advance();
  • 若 head segment 读完,则持续下一个 segment:
//services/hh/queue.go
// Advance moves the head point to the next byte slice in the queue
func (l *queue) Advance() error {l.mu.Lock()
    defer l.mu.Unlock()
    if l.head == nil {return ErrNotOpen}
    err := l.head.advance()
    if err == io.EOF {
        ....
        //head segment 读完了,删除 head,持续下一个
        if err := l.trimHead(); err != nil {return err}
    }
    ....
    return nil
}

尝试挪动 head segment:

  • 首先定位到 footer,写入新的 pos,而后同步到 disk:
//services/hh/queue.go
// advance advances the current value pointer
func (l *segment) advance() error {
    // 定位到 end-8byte
    if err := l.seekEnd(-footerSize); err != nil {return err}
    // 写入新的 pos
    pos := l.pos + l.currentSize + 8
    if err := l.writeUint64(uint64(pos)); err != nil {return err}
    // 同步到 disk
    if err := l.file.Sync(); err != nil {return err}
    l.pos = pos
    // 定位到 pos
    if err := l.seekToCurrent(); err != nil {return err}
    ....
    return nil
}

若 head segment 读完,则持续下一个 segment:

  • 如果 head segment 读到 io.EOF,则删除 head,head=head.next:
//services/hh/queue.go
func (l *queue) trimHead() error {if len(l.segments) > 1 {l.segments = l.segments[1:]
        if err := l.head.close(); err != nil {return err}
        // 删除 segment 文件
        if err := os.Remove(l.head.path); err != nil {return err}
        // 下一个 segment 作为 head
        l.head = l.segments[0]
    }
    return nil
}
退出移动版