关于influxdb:InfluxDB集群-hintedhandoff源码分析二points写入到本机队列

96次阅读

共计 1932 个字符,预计需要花费 5 分钟才能阅读完成。

每个远端节点对应一个 NodeProcessor,由 NodeProcessor 负责将远端写入失败的 points,写入到本机队列:

// services/hh/node_processor.go
// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate
// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.
func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error {
    ....
    // 序列化
    b := marshalWrite(shardID, points)
    // 写入本机队列
    return n.queue.Append(b)
}

本机队列的格局

每个 NodeProcess 有 1 个 queue,对应于 $data/hh 的一个目录:

  • 每个 queue 对应一个目录,默认最大 1Gi;
  • queue 内蕴含若干个 segment,从 tail 写入,从 head 读出;
  • 每个 segment 对应一个文件,默认最大 1Mi;
  • segment 内蕴含若干个 block,每个 block 含 8byte length 以及 data 组成;
  • segment 开端蕴含 1 个 8byte 的 footer,保留了读取的地位,从 header 读出时更新;

写入过程:序列化

写入队列的 points,依照固定的格局序列化:

  • 8byte: shardId;
  • point.String() + “\n”;
  • point.String() + “\n”;
  • ……
func marshalWrite(shardID uint64, points []models.Point) []byte {b := make([]byte, 8)
    binary.BigEndian.PutUint64(b, shardID)
    for _, p := range points {b = append(b, []byte(p.String())...)
        b = append(b, '\n')
    }
    return b
}

写入过程:写 segment

从队列的 tail segment 写入:

// services/hh/queue.go
// Append appends a byte slice to the end of the queue
func (l *queue) Append(b []byte) error {if l.diskUsage()+int64(len(b)) > l.maxSize {return ErrQueueFull}
    // Append the entry to the tail, if the segment is full,
    // try to create new segment and retry the append
    if err := l.tail.append(b); err == ErrSegmentFull {segment, err := l.addSegment()
        l.tail = segment
        return l.tail.append(b)
    }
    return nil
}

写入 segment 的过程:

  • 首先,定位到文件的 end-8byte 地位;
  • 而后,依照 block 的格局,写 block len,写 block 数据;
  • 而后,写入 footer,即读 pos;(未扭转)
  • 最初,将文件内容 sync 到磁盘;
// services/hh/queue.go
// append adds byte slice to the end of segment
func (l *segment) append(b []byte) error {
    // 最大 1Mi
    if l.size+int64(len(b)) > l.maxSize {return ErrSegmentFull}
    if err := l.seekEnd(-footerSize); err != nil {return err}
    // 写入 block len
    if err := l.writeUint64(uint64(len(b))); err != nil {return err}
    // 写入 block 数据
    if err := l.writeBytes(b); err != nil {return err}
    // 写入 footer
    if err := l.writeUint64(uint64(l.pos)); err != nil {return err}
    if err := l.file.Sync(); err != nil {return err}
    if l.currentSize == 0 {l.currentSize = int64(len(b))
    }
    l.size += int64(len(b)) + 8 // uint64 for slice length
    return nil
}

正文完
 0