influxdb集群场景下,1个shard有N个replcia,通常replica在不同的节点上;写shard时,所有的replica都要写入。
当远端的replica写入失败时,会先存储到本机的hinted-handoff队列;本机会定期的将hinted-handoff队列的内容发送给远端节点,达到数据的最终统一。
Hinted-handoff代码入口
对于每个shard,都要向其shardOwner写入points:
- 远端节点因为网络起因可能写入失败,此时将本来发往远端节点的points,调用hinted-handoff.WriteShard()写入本机hh队列;
// cluster/points_writer.go// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write// partially succeeds, ErrPartialWrite is returned.func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, consistency models.ConsistencyLevel, points []models.Point) error { ...... ch := make(chan *AsyncWriteResult, len(shard.Owners)) // 向shard owner写shards for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { .... // 写shard err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) if err != nil{ // 写入失败的话,入队hh hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points) .... if consistency == models.ConsistencyLevelAny { ch <- &AsyncWriteResult{owner, nil} return } } ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } ......}
Hinted-handoff写入数据
在本机节点的$data/hh目录下,给每个远端节点创立了一个$id目录,其中缓存了写入远端节点失败的数据:
- 每个远端节点由1个NodeProcessor解决写入失败的数据;
- 由NodeProcessor将写入远端失败的points,缓存到本机目录;
// services/hh/service.go// WriteShard queues the points write for shardID to node ownerID to handoff queuefunc (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error { ....... processor, ok := s.processors[ownerID] if !ok { if err := func() error { processor, ok = s.processors[ownerID] if !ok { processor = NewNodeProcessor(ownerID, s.pathforNode(ownerID), s.shardWriter, s.MetaClient, &s.cfg) processor.Logger = s.Logger if err := processor.Open(); err != nil { return err } s.processors[ownerID] = processor } return nil }(); err != nil { return err } } //缓存到本机目录 if err := processor.WriteShard(shardID, points); err != nil { return err } return nil}
写入本机队列的数据,须要依照特定的格局进行序列化:
- 8bytes: shardId;
- point.String() + "\n";
- point.String() + "\n";
- .....
// services/hh/node_processor.go// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error { ..... b := marshalWrite(shardID, points) return n.queue.Append(b)}func marshalWrite(shardID uint64, points []models.Point) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, shardID) for _, p := range points { b = append(b, []byte(p.String())...) b = append(b, '\n') } return b}
NodeProcessor定期将points发送到远端节点
由NodeProcessor定期将queue中的数据发送给失败的节点:
// services/hh/node_processor.gofunc (n *NodeProcessor) run() { ...... for { case <-time.After(currInterval): limiter := NewRateLimiter(n.RetryRateLimit) //限流 for { c, err := n.SendWrite() ... limiter.Update(c) time.Sleep(limiter.Delay()) } }}
解决步骤:
- 首先,ping远端节点,若不通,则返回失败;
- 而后,读本机queue中的数据,数据反序列化;
- 而后,应用shardWriter将数据发送给远端节点;
- 最初,将本机queue的地位向后挪动;
// services/hh/node_processor.go// SendWrite attempts to sent the current block of hinted data to the target node. If successful,// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF// when there is no more data or the node is inactive.func (n *NodeProcessor) SendWrite() (int, error) { // ping node是否通 active, err := n.Active() if err != nil { return 0, err } // 从队列中读出,并反序列化 buf, err := n.queue.Current() // unmarshal the byte slice back to shard ID and points shardID, points, err := unmarshalWrite(buf) if err != nil { ... } // 写shard if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil { atomic.AddInt64(&n.stats.WriteNodeReqFail, 1) return 0, err } ...... // queue可读的地位后移 if err := n.queue.Advance(); err != nil { n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err)) } return len(buf), nil}