每个远端节点对应一个NodeProcessor,由NodeProcessor负责将远端写入失败的points,写入到本机队列:

// services/hh/node_processor.go// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error {    ....    //序列化    b := marshalWrite(shardID, points)    //写入本机队列    return n.queue.Append(b)}

本机队列的格局

每个NodeProcess有1个queue,对应于$data/hh的一个目录:

  • 每个queue对应一个目录,默认最大1Gi;
  • queue内蕴含若干个segment,从tail写入,从head读出;
  • 每个segment对应一个文件,默认最大1Mi;
  • segment内蕴含若干个block,每个block含8byte length以及data组成;
  • segment开端蕴含1个8byte的footer,保留了读取的地位,从header读出时更新;

写入过程:序列化

写入队列的points,依照固定的格局序列化:

  • 8byte: shardId;
  • point.String() + "\n";
  • point.String() + "\n";
  • ......
func marshalWrite(shardID uint64, points []models.Point) []byte {    b := make([]byte, 8)    binary.BigEndian.PutUint64(b, shardID)    for _, p := range points {        b = append(b, []byte(p.String())...)        b = append(b, '\n')    }    return b}

写入过程:写segment

从队列的tail segment写入:

// services/hh/queue.go// Append appends a byte slice to the end of the queuefunc (l *queue) Append(b []byte) error {        if l.diskUsage()+int64(len(b)) > l.maxSize {        return ErrQueueFull    }    // Append the entry to the tail, if the segment is full,    // try to create new segment and retry the append    if err := l.tail.append(b); err == ErrSegmentFull {        segment, err := l.addSegment()        l.tail = segment        return l.tail.append(b)    }    return nil}

写入segment的过程:

  • 首先,定位到文件的end-8byte地位;
  • 而后,依照block的格局,写block len,写block数据;
  • 而后,写入footer,即读pos;(未扭转)
  • 最初,将文件内容sync到磁盘;
// services/hh/queue.go// append adds byte slice to the end of segmentfunc (l *segment) append(b []byte) error {    //最大1Mi    if l.size+int64(len(b)) > l.maxSize {        return ErrSegmentFull    }    if err := l.seekEnd(-footerSize); err != nil {        return err    }    //写入block len    if err := l.writeUint64(uint64(len(b))); err != nil {        return err    }    //写入block数据    if err := l.writeBytes(b); err != nil {        return err    }    //写入footer    if err := l.writeUint64(uint64(l.pos)); err != nil {        return err    }    if err := l.file.Sync(); err != nil {        return err    }    if l.currentSize == 0 {        l.currentSize = int64(len(b))    }    l.size += int64(len(b)) + 8 // uint64 for slice length    return nil}