每个远端节点对应一个 NodeProcessor,由 NodeProcessor 负责将远端写入失败的 points,写入到本机队列:
// services/hh/node_processor.go
// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate
// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.
func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error {
....
// 序列化
b := marshalWrite(shardID, points)
// 写入本机队列
return n.queue.Append(b)
}
本机队列的格局
每个 NodeProcess 有 1 个 queue,对应于 $data/hh 的一个目录:
- 每个 queue 对应一个目录,默认最大 1Gi;
- queue 内蕴含若干个 segment,从 tail 写入,从 head 读出;
- 每个 segment 对应一个文件,默认最大 1Mi;
- segment 内蕴含若干个 block,每个 block 含 8byte length 以及 data 组成;
- segment 开端蕴含 1 个 8byte 的 footer,保留了读取的地位,从 header 读出时更新;
写入过程:序列化
写入队列的 points,依照固定的格局序列化:
- 8byte: shardId;
- point.String() + “\n”;
- point.String() + “\n”;
- ……
func marshalWrite(shardID uint64, points []models.Point) []byte {b := make([]byte, 8)
binary.BigEndian.PutUint64(b, shardID)
for _, p := range points {b = append(b, []byte(p.String())...)
b = append(b, '\n')
}
return b
}
写入过程:写 segment
从队列的 tail segment 写入:
// services/hh/queue.go
// Append appends a byte slice to the end of the queue
func (l *queue) Append(b []byte) error {if l.diskUsage()+int64(len(b)) > l.maxSize {return ErrQueueFull}
// Append the entry to the tail, if the segment is full,
// try to create new segment and retry the append
if err := l.tail.append(b); err == ErrSegmentFull {segment, err := l.addSegment()
l.tail = segment
return l.tail.append(b)
}
return nil
}
写入 segment 的过程:
- 首先,定位到文件的 end-8byte 地位;
- 而后,依照 block 的格局,写 block len,写 block 数据;
- 而后,写入 footer,即读 pos;(未扭转)
- 最初,将文件内容 sync 到磁盘;
// services/hh/queue.go
// append adds byte slice to the end of segment
func (l *segment) append(b []byte) error {
// 最大 1Mi
if l.size+int64(len(b)) > l.maxSize {return ErrSegmentFull}
if err := l.seekEnd(-footerSize); err != nil {return err}
// 写入 block len
if err := l.writeUint64(uint64(len(b))); err != nil {return err}
// 写入 block 数据
if err := l.writeBytes(b); err != nil {return err}
// 写入 footer
if err := l.writeUint64(uint64(l.pos)); err != nil {return err}
if err := l.file.Sync(); err != nil {return err}
if l.currentSize == 0 {l.currentSize = int64(len(b))
}
l.size += int64(len(b)) + 8 // uint64 for slice length
return nil
}