在本机节点上,给每个远端节点调配一个NodeProcessor对象,负责数据的写入和数据的读取。

NodeProcessor定期的读取本机队列中的数据,而后将其发送给远端节点,尝试写入远端shard。

// services/hh/node_processor.gofunc (n *NodeProcessor) run() {    ......    for {    case <-time.After(currInterval):        limiter := NewRateLimiter(n.RetryRateLimit)    //限流        for {            c, err := n.SendWrite()            ...            limiter.Update(c)            time.Sleep(limiter.Delay())        }    }}

读取和发送过程

  • 首先,ping远端节点,判断其是否沉闷,如果不沉闷,则间接返回;
  • 而后,读Head segment中的block,读操作均是从segment中的pos处开始;
  • 而后,将读到的block数据反序列化,发送给远端节点;
  • 最初,更新head segment中的pos;
// services/hh/node_processor.go// SendWrite attempts to sent the current block of hinted data to the target node. If successful,// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF// when there is no more data or the node is inactive.func (n *NodeProcessor) SendWrite() (int, error) {    //ping远端节点,判断是否沉闷    active, err := n.Active()    if err != nil {        return 0, err    }        //读head segment中:pos开始的block    // Get the current block from the queue    buf, err := n.queue.Current()        //反序列化     // unmarshal the byte slice back to shard ID and points    shardID, points, err := unmarshalWrite(buf)        //写入远端节点    if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil {        atomic.AddInt64(&n.stats.WriteNodeReqFail, 1)        return 0, err    }    //更新head segment的pos,下次依然从pos读取    if err := n.queue.Advance(); err != nil {        n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err))    }    return len(buf), nil}

1. 判断节点是否沉闷

先依据nodeId查问node信息(ip):

//services/hh/node_processor.go// Active returns whether this node processor is for a currently active node.func (n *NodeProcessor) Active() (bool, error) {    nio, err := n.meta.DataNode(n.nodeID)        if nio == nil {        return false, nil    }    ping := n.Ping(nio)    return ping == nil, nil}// Ping returns whether this node processor is for a currently active node.func (n *NodeProcessor) Ping(node *meta.NodeInfo) error {    return n.ping(node.Host)}

ping的动作,理论是发送HTTP GET http://ip:8091/ping:

//services/hh/node_processor.go// Ping returns whether this node processor is for a currently active node.func (n *NodeProcessor) ping(host string) error {    url := "http://" + host + "/ping"    request, _ := http.NewRequest("GET", url, nil)    transport := http.Transport{        DisableKeepAlives: true,    }    client := &http.Client{        Transport: &transport,        Timeout:   time.Duration(3) * time.Second,    }    resp, err := client.Do(request)    if err != nil {        return err    }    defer resp.Body.Close()    if resp.StatusCode == http.StatusNoContent {        return nil    }    return fmt.Errorf("ping %s fail", url)}

2. 读head segment中的block

从head segment读取:

  • 先定位到segment的pos地位,pos即下一个要读取的block;
  • 依照block格局,先读block len,再读block data;
//services/hh/queue.go// current returns byte slice that the current segment pointsfunc (l *segment) current() ([]byte, error) {    //定位到pos    if err := l.seekToCurrent(); err != nil {        return nil, err    }    //先读block len    // read the record size    sz, err := l.readUint64()    l.currentSize = int64(sz)    //再读block data    b := make([]byte, sz)    if err := l.readBytes(b); err != nil {        return nil, err    }    return b, nil}

如何定位到segment的pos地位?实际上都是文件操作:

//services/hh/queue.gofunc (l *segment) seekToCurrent() error {    return l.seek(int64(l.pos))}func (l *segment) seek(pos int64) error {    n, err := l.file.Seek(pos, os.SEEK_SET)    if err != nil {        return err    }    if n != pos {        return fmt.Errorf("bad seek. exp %v, got %v", 0, n)    }    return nil}

3. 将读到的block数据反序列化,写入远端节点

反序列化失去points,依照跟序列化相同的格局:

  • 先读8byte的shardId;
  • 而后逐行(\n)解析point;
//services/hh/node_processor.gofunc unmarshalWrite(b []byte) (uint64, []models.Point, error) {    if len(b) < 8 {        return 0, nil, fmt.Errorf("too short: len = %d", len(b))    }    ownerID := binary.BigEndian.Uint64(b[:8])    points, err := models.ParsePoints(b[8:])    return ownerID, points, err}//models/points.gofunc ParsePoints(buf []byte) ([]Point, error) {    return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")}func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {    points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)    for pos < len(buf) {        pos, block = scanLine(buf, pos)        pos++        start := skipWhitespace(block, 0)        pt, err := parsePoint(block[start:], defaultTime, precision)        points = append(points, pt)    }    return points, nil}

应用shardWriter将points写入远端节点:

//cluster/shard_writer.go// WriteShard writes time series points to a shardfunc (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {    c, err := w.dial(ownerID)    conn, ok := c.(*pooledConn)    // Determine the location of this shard and whether it still exists    db, rp, sgi := w.MetaClient.ShardOwner(shardID)        // Build write request.    var request WriteShardRequest    request.SetShardID(shardID)    request.SetDatabase(db)    request.SetRetentionPolicy(rp)    request.AddPoints(points)    //points序列化    // Marshal into protocol buffers.    buf, err := request.MarshalBinary()    //points依照TLV格局写入conn    // Write request.    conn.SetWriteDeadline(time.Now().Add(w.timeout))    if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {        conn.MarkUnusable()        return err    }    //读取response    // Read the response.    conn.SetReadDeadline(time.Now().Add(w.timeout))    _, buf, err = ReadTLV(conn)    if err != nil {        conn.MarkUnusable()        return err    }        ....    return nil}

4. 更新head segment的pos

更新head segment的pos,下次持续从pos读取:

  • 先尝试挪动head segment: head.advance();
  • 若head segment读完,则持续下一个segment:
//services/hh/queue.go// Advance moves the head point to the next byte slice in the queuefunc (l *queue) Advance() error {    l.mu.Lock()    defer l.mu.Unlock()    if l.head == nil {        return ErrNotOpen    }    err := l.head.advance()    if err == io.EOF {        ....        //head segment读完了,删除head,持续下一个        if err := l.trimHead(); err != nil {            return err        }    }    ....    return nil}

尝试挪动head segment:

  • 首先定位到footer,写入新的pos,而后同步到disk:
//services/hh/queue.go// advance advances the current value pointerfunc (l *segment) advance() error {    //定位到end-8byte    if err := l.seekEnd(-footerSize); err != nil {        return err    }    //写入新的pos    pos := l.pos + l.currentSize + 8    if err := l.writeUint64(uint64(pos)); err != nil {        return err    }    //同步到disk    if err := l.file.Sync(); err != nil {        return err    }    l.pos = pos    //定位到pos    if err := l.seekToCurrent(); err != nil {        return err    }    ....    return nil}

若head segment读完,则持续下一个segment:

  • 如果head segment读到io.EOF,则删除head,head=head.next:
//services/hh/queue.gofunc (l *queue) trimHead() error {    if len(l.segments) > 1 {        l.segments = l.segments[1:]        if err := l.head.close(); err != nil {            return err        }        //删除segment文件        if err := os.Remove(l.head.path); err != nil {            return err        }        //下一个segment作为head        l.head = l.segments[0]    }    return nil}