上文讲到,每个shard有N个replica,也就是N个replica owner,通常是不同的节点,否则就起不到高可用的作用了。

# influxd_ctl show-shardsID      GroupID Database        RetentionPolcy  Replicas        Owners                          StartTime                       EndTime44      44      prometheus      one_week        2               node3:8088,node1:8088 2021-09-26 08:00:00 +0800 CST   2021-09-27 08:00:00 +0800 CST

写shard时,要写入所有的replica owner,可能是本机节点,也可能是远端节点。

// cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,        ...    ch := make(chan *AsyncWriteResult, len(shard.Owners))    for _, owner := range shard.Owners {        go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {            //以后节点写shard            if w.Node.GetDataID() == owner.NodeID {                err := w.TSDBStore.WriteToShard(shardID, points)                if err == tsdb.ErrShardNotFound {                    err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)                    ...                    err = w.TSDBStore.WriteToShard(shardID, points)                }                ch <- &AsyncWriteResult{owner, err}                return            }            //远端节点写shard            err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)                                    ch <- &AsyncWriteResult{owner, err}        }(shard.ID, owner, points)    }    var wrote int    for range shard.Owners {        select {        case result := <-ch:            wrote++            // 写入胜利的次数 >= required            if wrote >= required {                return nil            }        }    }    ...    return ErrWriteFailed}    

本地节点写shard

本地节点写shard,最终调用的是存储引擎的WritePoints:

// tsdb/store.gofunc (s *Store) WriteToShard(shardID uint64, points []models.Point) error {    ...    sh := s.shards[shardID]    ....    return sh.WritePoints(points)}// tsdb/shard.gofunc (s *Shard) WritePoints(points []models.Point) error {    ...    engine, err := s.engineNoLock()    ....    if err := engine.WritePoints(points); err != nil {    }    ...}

存储引擎WritePoints():

  • 先写到cache;
  • 再写到WAL;
// tsdb/engine/tsm1/engine.gofunc (e *Engine) WritePoints(points []models.Point) error {    ......    // first try to write to the cache    if err := e.Cache.WriteMulti(values); err != nil {        return err    }    if e.WALEnabled {        if _, err := e.WAL.WriteMulti(values); err != nil {            return err        }    }    return seriesErr}

远端节点写shard

向远端节点发送WriteShardRequestMessage音讯:

  • 先dial远端节点(TCP),这里用connection pool的形式;
  • 将points用protobuf序列化,以TLV的封包格局,向远端节点8088发送TCP音讯WriteShardRequestMessage;
  • 读远端节点的TCP response,返回;
// cluster/shard_writer.gofunc (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {    c, err := w.dial(ownerID)    ...    conn, ok := c.(*pooledConn)    db, rp, sgi := w.MetaClient.ShardOwner(shardID)    ...    var request WriteShardRequest    request.AddPoints(points)    ....    buf, err := request.MarshalBinary()    conn.SetWriteDeadline(time.Now().Add(w.timeout))    // 写request,发送到近程节点    err := WriteTLV(conn, writeShardRequestMessage, buf)    ..    conn.SetReadDeadline(time.Now().Add(w.timeout))    // 读response    _, buf, err = ReadTLV(conn)    ...    var response WriteShardResponse    err := response.UnmarshalBinary(buf)    ...}

这里的TLV格局:(Tag/Length/Value)

  • 1个byte: 音讯类型,如writeShardRequestMessage
  • 4个byte: request body的length;
  • N个byte: request protobuf序列化的字节内容;
// cluster/service.gofunc WriteTLV(w io.Writer, typ byte, buf []byte) error {    if err := WriteType(w, typ); err != nil {        return err    }    if err := WriteLV(w, buf); err != nil {        return err    }    return nil}

近程节点解决WriteShardRequestMessage音讯:

  • 远端节点开启8088 TCP监听,解决流式数据;
  • 先读1byte,读出音讯类型,比方WriteShardRequestMessage;
  • 再读出body长度和内容,将body用Protobuf反序列化失去points;
  • 将points写入本地的TSDB;

解决TCP连贯:

// cluster/service.gofunc (s *Service) serve() {    ....    for {        conn, err := s.Listener.Accept()        go func() {            defer s.wg.Done()            s.handleConn(conn)        }()    }}

读request中的音讯类型和音讯内容,解决该申请;

// cluster/service.gofunc (s *Service) handleConn(conn net.Conn) {    ....    for {        // Read type-length-value.        typ, err := ReadType(conn)        switch typ {        case writeShardRequestMessage:            buf, err := ReadLV(conn)            err = s.processWriteShardRequest(buf)            s.writeShardResponse(conn, err)        }    }}

将body用protobuf反序列化,写入本地TSDB:

// cluster/service.gofunc (s *Service) processWriteShardRequest(buf []byte) error {    var req WriteShardRequest    if err := req.UnmarshalBinary(buf); err != nil {        return err    }    points := req.Points()    err := s.TSDBStore.WriteToShard(req.ShardID(), points)    ....}