上文讲到,每个shard有N个replica,也就是N个replica owner,通常是不同的节点,否则就起不到高可用的作用了。
# influxd_ctl show-shardsID GroupID Database RetentionPolcy Replicas Owners StartTime EndTime44 44 prometheus one_week 2 node3:8088,node1:8088 2021-09-26 08:00:00 +0800 CST 2021-09-27 08:00:00 +0800 CST
写shard时,要写入所有的replica owner,可能是本机节点,也可能是远端节点。
// cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, ... ch := make(chan *AsyncWriteResult, len(shard.Owners)) for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { //以后节点写shard if w.Node.GetDataID() == owner.NodeID { err := w.TSDBStore.WriteToShard(shardID, points) if err == tsdb.ErrShardNotFound { err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true) ... err = w.TSDBStore.WriteToShard(shardID, points) } ch <- &AsyncWriteResult{owner, err} return } //远端节点写shard err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } var wrote int for range shard.Owners { select { case result := <-ch: wrote++ // 写入胜利的次数 >= required if wrote >= required { return nil } } } ... return ErrWriteFailed}
本地节点写shard
本地节点写shard,最终调用的是存储引擎的WritePoints:
// tsdb/store.gofunc (s *Store) WriteToShard(shardID uint64, points []models.Point) error { ... sh := s.shards[shardID] .... return sh.WritePoints(points)}// tsdb/shard.gofunc (s *Shard) WritePoints(points []models.Point) error { ... engine, err := s.engineNoLock() .... if err := engine.WritePoints(points); err != nil { } ...}
存储引擎WritePoints():
- 先写到cache;
- 再写到WAL;
// tsdb/engine/tsm1/engine.gofunc (e *Engine) WritePoints(points []models.Point) error { ...... // first try to write to the cache if err := e.Cache.WriteMulti(values); err != nil { return err } if e.WALEnabled { if _, err := e.WAL.WriteMulti(values); err != nil { return err } } return seriesErr}
远端节点写shard
向远端节点发送WriteShardRequestMessage音讯:
- 先dial远端节点(TCP),这里用connection pool的形式;
- 将points用protobuf序列化,以TLV的封包格局,向远端节点8088发送TCP音讯WriteShardRequestMessage;
- 读远端节点的TCP response,返回;
// cluster/shard_writer.gofunc (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error { c, err := w.dial(ownerID) ... conn, ok := c.(*pooledConn) db, rp, sgi := w.MetaClient.ShardOwner(shardID) ... var request WriteShardRequest request.AddPoints(points) .... buf, err := request.MarshalBinary() conn.SetWriteDeadline(time.Now().Add(w.timeout)) // 写request,发送到近程节点 err := WriteTLV(conn, writeShardRequestMessage, buf) .. conn.SetReadDeadline(time.Now().Add(w.timeout)) // 读response _, buf, err = ReadTLV(conn) ... var response WriteShardResponse err := response.UnmarshalBinary(buf) ...}
这里的TLV格局:(Tag/Length/Value)
- 1个byte: 音讯类型,如writeShardRequestMessage
- 4个byte: request body的length;
- N个byte: request protobuf序列化的字节内容;
// cluster/service.gofunc WriteTLV(w io.Writer, typ byte, buf []byte) error { if err := WriteType(w, typ); err != nil { return err } if err := WriteLV(w, buf); err != nil { return err } return nil}
近程节点解决WriteShardRequestMessage音讯:
- 远端节点开启8088 TCP监听,解决流式数据;
- 先读1byte,读出音讯类型,比方WriteShardRequestMessage;
- 再读出body长度和内容,将body用Protobuf反序列化失去points;
- 将points写入本地的TSDB;
解决TCP连贯:
// cluster/service.gofunc (s *Service) serve() { .... for { conn, err := s.Listener.Accept() go func() { defer s.wg.Done() s.handleConn(conn) }() }}
读request中的音讯类型和音讯内容,解决该申请;
// cluster/service.gofunc (s *Service) handleConn(conn net.Conn) { .... for { // Read type-length-value. typ, err := ReadType(conn) switch typ { case writeShardRequestMessage: buf, err := ReadLV(conn) err = s.processWriteShardRequest(buf) s.writeShardResponse(conn, err) } }}
将body用protobuf反序列化,写入本地TSDB:
// cluster/service.gofunc (s *Service) processWriteShardRequest(buf []byte) error { var req WriteShardRequest if err := req.UnmarshalBinary(buf); err != nil { return err } points := req.Points() err := s.TSDBStore.WriteToShard(req.ShardID(), points) ....}