关于influxdb:InfluxDB集群-hintedhandoff源码分析二points写入到本机队列

每个远端节点对应一个NodeProcessor,由NodeProcessor负责将远端写入失败的points,写入到本机队列:

// services/hh/node_processor.go
// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate
// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.
func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error {
    ....
    //序列化
    b := marshalWrite(shardID, points)
    //写入本机队列
    return n.queue.Append(b)
}

本机队列的格局

每个NodeProcess有1个queue,对应于$data/hh的一个目录:

  • 每个queue对应一个目录,默认最大1Gi;
  • queue内蕴含若干个segment,从tail写入,从head读出;
  • 每个segment对应一个文件,默认最大1Mi;
  • segment内蕴含若干个block,每个block含8byte length以及data组成;
  • segment开端蕴含1个8byte的footer,保留了读取的地位,从header读出时更新;

写入过程:序列化

写入队列的points,依照固定的格局序列化:

  • 8byte: shardId;
  • point.String() + “\n”;
  • point.String() + “\n”;
  • ……
func marshalWrite(shardID uint64, points []models.Point) []byte {
    b := make([]byte, 8)
    binary.BigEndian.PutUint64(b, shardID)
    for _, p := range points {
        b = append(b, []byte(p.String())...)
        b = append(b, '\n')
    }
    return b
}

写入过程:写segment

从队列的tail segment写入:

// services/hh/queue.go
// Append appends a byte slice to the end of the queue
func (l *queue) Append(b []byte) error {
        if l.diskUsage()+int64(len(b)) > l.maxSize {
        return ErrQueueFull
    }
    // Append the entry to the tail, if the segment is full,
    // try to create new segment and retry the append
    if err := l.tail.append(b); err == ErrSegmentFull {
        segment, err := l.addSegment()
        l.tail = segment
        return l.tail.append(b)
    }
    return nil
}

写入segment的过程:

  • 首先,定位到文件的end-8byte地位;
  • 而后,依照block的格局,写block len,写block数据;
  • 而后,写入footer,即读pos;(未扭转)
  • 最初,将文件内容sync到磁盘;
// services/hh/queue.go
// append adds byte slice to the end of segment
func (l *segment) append(b []byte) error {
    //最大1Mi
    if l.size+int64(len(b)) > l.maxSize {
        return ErrSegmentFull
    }
    if err := l.seekEnd(-footerSize); err != nil {
        return err
    }
    //写入block len
    if err := l.writeUint64(uint64(len(b))); err != nil {
        return err
    }
    //写入block数据
    if err := l.writeBytes(b); err != nil {
        return err
    }
    //写入footer
    if err := l.writeUint64(uint64(l.pos)); err != nil {
        return err
    }
    if err := l.file.Sync(); err != nil {
        return err
    }
    if l.currentSize == 0 {
        l.currentSize = int64(len(b))
    }
    l.size += int64(len(b)) + 8 // uint64 for slice length
    return nil
}

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据