乐趣区

关于influxdb:InfluxDB集群-write写入数据源码分析二

上文讲到,每个 shard 有 N 个 replica,也就是 N 个 replica owner,通常是不同的节点,否则就起不到高可用的作用了。

# influxd_ctl show-shards
ID      GroupID Database        RetentionPolcy  Replicas        Owners                          StartTime                       EndTime
44      44      prometheus      one_week        2               node3:8088,node1:8088 2021-09-26 08:00:00 +0800 CST   2021-09-27 08:00:00 +0800 CST

写 shard 时,要写入所有的 replica owner,可能是本机节点,也可能是远端节点。

// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,    
    ...
    ch := make(chan *AsyncWriteResult, len(shard.Owners))
    for _, owner := range shard.Owners {go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
            // 以后节点写 shard
            if w.Node.GetDataID() == owner.NodeID {err := w.TSDBStore.WriteToShard(shardID, points)
                if err == tsdb.ErrShardNotFound {err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
                    ...
                    err = w.TSDBStore.WriteToShard(shardID, points)
                }
                ch <- &AsyncWriteResult{owner, err}
                return
            }
            // 远端节点写 shard
            err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)                        
            ch <- &AsyncWriteResult{owner, err}
        }(shard.ID, owner, points)
    }
    var wrote int
    for range shard.Owners {
        select {
        case result := <-ch:
            wrote++
            // 写入胜利的次数 >= required
            if wrote >= required {return nil}
        }
    }
    ...
    return ErrWriteFailed
}    

本地节点写 shard

本地节点写 shard,最终调用的是存储引擎的 WritePoints:

// tsdb/store.go
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
    ...
    sh := s.shards[shardID]
    ....
    return sh.WritePoints(points)
}
// tsdb/shard.go
func (s *Shard) WritePoints(points []models.Point) error {
    ...
    engine, err := s.engineNoLock()
    ....
    if err := engine.WritePoints(points); err != nil { }
    ...
}

存储引擎 WritePoints():

  • 先写到 cache;
  • 再写到 WAL;
// tsdb/engine/tsm1/engine.go
func (e *Engine) WritePoints(points []models.Point) error {
    ......
    // first try to write to the cache
    if err := e.Cache.WriteMulti(values); err != nil {return err}
    if e.WALEnabled {if _, err := e.WAL.WriteMulti(values); err != nil {return err}
    }
    return seriesErr
}

远端节点写 shard

向远端节点发送 WriteShardRequestMessage 音讯:

  • 先 dial 远端节点 (TCP),这里用 connection pool 的形式;
  • 将 points 用 protobuf 序列化,以 TLV 的封包格局,向远端节点 8088 发送 TCP 音讯 WriteShardRequestMessage;
  • 读远端节点的 TCP response,返回;
// cluster/shard_writer.go
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {c, err := w.dial(ownerID)
    ...
    conn, ok := c.(*pooledConn)
    db, rp, sgi := w.MetaClient.ShardOwner(shardID)
    ...
    var request WriteShardRequest
    request.AddPoints(points)
    ....
    buf, err := request.MarshalBinary()
    conn.SetWriteDeadline(time.Now().Add(w.timeout))
    // 写 request,发送到近程节点
    err := WriteTLV(conn, writeShardRequestMessage, buf)
    ..
    conn.SetReadDeadline(time.Now().Add(w.timeout))
    // 读 response
    _, buf, err = ReadTLV(conn)
    ...
    var response WriteShardResponse
    err := response.UnmarshalBinary(buf)
    ...
}

这里的 TLV 格局:(Tag/Length/Value)

  • 1 个 byte: 音讯类型,如 writeShardRequestMessage
  • 4 个 byte: request body 的 length;
  • N 个 byte: request protobuf 序列化的字节内容;
// cluster/service.go
func WriteTLV(w io.Writer, typ byte, buf []byte) error {if err := WriteType(w, typ); err != nil {return err}
    if err := WriteLV(w, buf); err != nil {return err}
    return nil
}

近程节点解决 WriteShardRequestMessage 音讯:

  • 远端节点开启 8088 TCP 监听,解决流式数据;
  • 先读 1byte,读出音讯类型,比方 WriteShardRequestMessage;
  • 再读出 body 长度和内容,将 body 用 Protobuf 反序列化失去 points;
  • 将 points 写入本地的 TSDB;

解决 TCP 连贯:

// cluster/service.go
func (s *Service) serve() {
    ....
    for {conn, err := s.Listener.Accept()
        go func() {defer s.wg.Done()
            s.handleConn(conn)
        }()}
}

读 request 中的音讯类型和音讯内容,解决该申请;

// cluster/service.go
func (s *Service) handleConn(conn net.Conn) {
    ....
    for {
        // Read type-length-value.
        typ, err := ReadType(conn)
        switch typ {
        case writeShardRequestMessage:
            buf, err := ReadLV(conn)
            err = s.processWriteShardRequest(buf)
            s.writeShardResponse(conn, err)
        }
    }
}

将 body 用 protobuf 反序列化,写入本地 TSDB:

// cluster/service.go
func (s *Service) processWriteShardRequest(buf []byte) error {
    var req WriteShardRequest
    if err := req.UnmarshalBinary(buf); err != nil {return err}
    points := req.Points()
    err := s.TSDBStore.WriteToShard(req.ShardID(), points)
    ....
}
退出移动版