共计 3524 个字符,预计需要花费 9 分钟才能阅读完成。
上文讲到,每个 shard 有 N 个 replica,也就是 N 个 replica owner,通常是不同的节点,否则就起不到高可用的作用了。
# influxd_ctl show-shards
ID GroupID Database RetentionPolcy Replicas Owners StartTime EndTime
44 44 prometheus one_week 2 node3:8088,node1:8088 2021-09-26 08:00:00 +0800 CST 2021-09-27 08:00:00 +0800 CST
写 shard 时,要写入所有的 replica owner,可能是本机节点,也可能是远端节点。
// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
...
ch := make(chan *AsyncWriteResult, len(shard.Owners))
for _, owner := range shard.Owners {go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
// 以后节点写 shard
if w.Node.GetDataID() == owner.NodeID {err := w.TSDBStore.WriteToShard(shardID, points)
if err == tsdb.ErrShardNotFound {err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
...
err = w.TSDBStore.WriteToShard(shardID, points)
}
ch <- &AsyncWriteResult{owner, err}
return
}
// 远端节点写 shard
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
ch <- &AsyncWriteResult{owner, err}
}(shard.ID, owner, points)
}
var wrote int
for range shard.Owners {
select {
case result := <-ch:
wrote++
// 写入胜利的次数 >= required
if wrote >= required {return nil}
}
}
...
return ErrWriteFailed
}
本地节点写 shard
本地节点写 shard,最终调用的是存储引擎的 WritePoints:
// tsdb/store.go
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
...
sh := s.shards[shardID]
....
return sh.WritePoints(points)
}
// tsdb/shard.go
func (s *Shard) WritePoints(points []models.Point) error {
...
engine, err := s.engineNoLock()
....
if err := engine.WritePoints(points); err != nil { }
...
}
存储引擎 WritePoints():
- 先写到 cache;
- 再写到 WAL;
// tsdb/engine/tsm1/engine.go
func (e *Engine) WritePoints(points []models.Point) error {
......
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {return err}
if e.WALEnabled {if _, err := e.WAL.WriteMulti(values); err != nil {return err}
}
return seriesErr
}
远端节点写 shard
向远端节点发送 WriteShardRequestMessage 音讯:
- 先 dial 远端节点 (TCP),这里用 connection pool 的形式;
- 将 points 用 protobuf 序列化,以 TLV 的封包格局,向远端节点 8088 发送 TCP 音讯 WriteShardRequestMessage;
- 读远端节点的 TCP response,返回;
// cluster/shard_writer.go
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {c, err := w.dial(ownerID)
...
conn, ok := c.(*pooledConn)
db, rp, sgi := w.MetaClient.ShardOwner(shardID)
...
var request WriteShardRequest
request.AddPoints(points)
....
buf, err := request.MarshalBinary()
conn.SetWriteDeadline(time.Now().Add(w.timeout))
// 写 request,发送到近程节点
err := WriteTLV(conn, writeShardRequestMessage, buf)
..
conn.SetReadDeadline(time.Now().Add(w.timeout))
// 读 response
_, buf, err = ReadTLV(conn)
...
var response WriteShardResponse
err := response.UnmarshalBinary(buf)
...
}
这里的 TLV 格局:(Tag/Length/Value)
- 1 个 byte: 音讯类型,如 writeShardRequestMessage
- 4 个 byte: request body 的 length;
- N 个 byte: request protobuf 序列化的字节内容;
// cluster/service.go
func WriteTLV(w io.Writer, typ byte, buf []byte) error {if err := WriteType(w, typ); err != nil {return err}
if err := WriteLV(w, buf); err != nil {return err}
return nil
}
近程节点解决 WriteShardRequestMessage 音讯:
- 远端节点开启 8088 TCP 监听,解决流式数据;
- 先读 1byte,读出音讯类型,比方 WriteShardRequestMessage;
- 再读出 body 长度和内容,将 body 用 Protobuf 反序列化失去 points;
- 将 points 写入本地的 TSDB;
解决 TCP 连贯:
// cluster/service.go
func (s *Service) serve() {
....
for {conn, err := s.Listener.Accept()
go func() {defer s.wg.Done()
s.handleConn(conn)
}()}
}
读 request 中的音讯类型和音讯内容,解决该申请;
// cluster/service.go
func (s *Service) handleConn(conn net.Conn) {
....
for {
// Read type-length-value.
typ, err := ReadType(conn)
switch typ {
case writeShardRequestMessage:
buf, err := ReadLV(conn)
err = s.processWriteShardRequest(buf)
s.writeShardResponse(conn, err)
}
}
}
将 body 用 protobuf 反序列化,写入本地 TSDB:
// cluster/service.go
func (s *Service) processWriteShardRequest(buf []byte) error {
var req WriteShardRequest
if err := req.UnmarshalBinary(buf); err != nil {return err}
points := req.Points()
err := s.TSDBStore.WriteToShard(req.ShardID(), points)
....
}
正文完