influxdb 集群场景下,1 个 shard 有 N 个 replcia,通常 replica 在不同的节点上;写 shard 时,所有的 replica 都要写入。
当远端的 replica 写入失败时,会先存储到本机的 hinted-handoff 队列;本机会定期的将 hinted-handoff 队列的内容发送给远端节点,达到数据的最终统一。
Hinted-handoff 代码入口
对于每个 shard,都要向其 shardOwner 写入 points:
- 远端节点因为网络起因可能写入失败,此时将本来发往远端节点的 points,调用 hinted-handoff.WriteShard() 写入本机 hh 队列;
// cluster/points_writer.go
// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write
// partially succeeds, ErrPartialWrite is returned.
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
consistency models.ConsistencyLevel, points []models.Point) error {
......
ch := make(chan *AsyncWriteResult, len(shard.Owners))
// 向 shard owner 写 shards
for _, owner := range shard.Owners {go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
....
// 写 shard
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
if err != nil{
// 写入失败的话,入队 hh
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
....
if consistency == models.ConsistencyLevelAny {ch <- &AsyncWriteResult{owner, nil}
return
}
}
ch <- &AsyncWriteResult{owner, err}
}(shard.ID, owner, points)
}
......
}
Hinted-handoff 写入数据
在本机节点的 $data/hh 目录下,给每个远端节点创立了一个 $id 目录,其中缓存了写入远端节点失败的数据:
- 每个远端节点由 1 个 NodeProcessor 解决写入失败的数据;
- 由 NodeProcessor 将写入远端失败的 points,缓存到本机目录;
// services/hh/service.go
// WriteShard queues the points write for shardID to node ownerID to handoff queue
func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error {
.......
processor, ok := s.processors[ownerID]
if !ok {if err := func() error {processor, ok = s.processors[ownerID]
if !ok {processor = NewNodeProcessor(ownerID, s.pathforNode(ownerID), s.shardWriter, s.MetaClient, &s.cfg)
processor.Logger = s.Logger
if err := processor.Open(); err != nil {return err}
s.processors[ownerID] = processor
}
return nil
}(); err != nil {return err}
}
// 缓存到本机目录
if err := processor.WriteShard(shardID, points); err != nil {return err}
return nil
}
写入本机队列的数据,须要依照特定的格局进行序列化:
- 8bytes: shardId;
- point.String() + “\n”;
- point.String() + “\n”;
- …..
// services/hh/node_processor.go
// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate
// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.
func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error {
.....
b := marshalWrite(shardID, points)
return n.queue.Append(b)
}
func marshalWrite(shardID uint64, points []models.Point) []byte {b := make([]byte, 8)
binary.BigEndian.PutUint64(b, shardID)
for _, p := range points {b = append(b, []byte(p.String())...)
b = append(b, '\n')
}
return b
}
NodeProcessor 定期将 points 发送到远端节点
由 NodeProcessor 定期将 queue 中的数据发送给失败的节点:
// services/hh/node_processor.go
func (n *NodeProcessor) run() {
......
for {case <-time.After(currInterval):
limiter := NewRateLimiter(n.RetryRateLimit) // 限流
for {c, err := n.SendWrite()
...
limiter.Update(c)
time.Sleep(limiter.Delay())
}
}
}
解决步骤:
- 首先,ping 远端节点,若不通,则返回失败;
- 而后,读本机 queue 中的数据,数据反序列化;
- 而后,应用 shardWriter 将数据发送给远端节点;
- 最初,将本机 queue 的地位向后挪动;
// services/hh/node_processor.go
// SendWrite attempts to sent the current block of hinted data to the target node. If successful,
// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF
// when there is no more data or the node is inactive.
func (n *NodeProcessor) SendWrite() (int, error) {
// ping node 是否通
active, err := n.Active()
if err != nil {return 0, err}
// 从队列中读出,并反序列化
buf, err := n.queue.Current()
// unmarshal the byte slice back to shard ID and points
shardID, points, err := unmarshalWrite(buf)
if err != nil {...}
// 写 shard
if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil {atomic.AddInt64(&n.stats.WriteNodeReqFail, 1)
return 0, err
}
......
// queue 可读的地位后移
if err := n.queue.Advance(); err != nil {n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err))
}
return len(buf), nil
}