在本机节点上,给每个远端节点调配一个 NodeProcessor 对象,负责数据的写入和数据的读取。
NodeProcessor 定期的读取本机队列中的数据,而后将其发送给远端节点,尝试写入远端 shard。
// services/hh/node_processor.go
func (n *NodeProcessor) run() {
......
for {case <-time.After(currInterval):
limiter := NewRateLimiter(n.RetryRateLimit) // 限流
for {c, err := n.SendWrite()
...
limiter.Update(c)
time.Sleep(limiter.Delay())
}
}
}
读取和发送过程
- 首先,ping 远端节点,判断其是否沉闷,如果不沉闷,则间接返回;
- 而后,读 Head segment 中的 block,读操作均是从 segment 中的 pos 处开始;
- 而后,将读到的 block 数据反序列化,发送给远端节点;
- 最初,更新 head segment 中的 pos;
// services/hh/node_processor.go
// SendWrite attempts to sent the current block of hinted data to the target node. If successful,
// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF
// when there is no more data or the node is inactive.
func (n *NodeProcessor) SendWrite() (int, error) {
//ping 远端节点,判断是否沉闷
active, err := n.Active()
if err != nil {return 0, err}
// 读 head segment 中:pos 开始的 block
// Get the current block from the queue
buf, err := n.queue.Current()
// 反序列化
// unmarshal the byte slice back to shard ID and points
shardID, points, err := unmarshalWrite(buf)
// 写入远端节点
if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil {atomic.AddInt64(&n.stats.WriteNodeReqFail, 1)
return 0, err
}
// 更新 head segment 的 pos,下次依然从 pos 读取
if err := n.queue.Advance(); err != nil {n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err))
}
return len(buf), nil
}
1. 判断节点是否沉闷
先依据 nodeId 查问 node 信息(ip):
//services/hh/node_processor.go
// Active returns whether this node processor is for a currently active node.
func (n *NodeProcessor) Active() (bool, error) {nio, err := n.meta.DataNode(n.nodeID)
if nio == nil {return false, nil}
ping := n.Ping(nio)
return ping == nil, nil
}
// Ping returns whether this node processor is for a currently active node.
func (n *NodeProcessor) Ping(node *meta.NodeInfo) error {return n.ping(node.Host)
}
ping 的动作,理论是发送 HTTP GET http://ip:8091/ping:
//services/hh/node_processor.go
// Ping returns whether this node processor is for a currently active node.
func (n *NodeProcessor) ping(host string) error {
url := "http://" + host + "/ping"
request, _ := http.NewRequest("GET", url, nil)
transport := http.Transport{DisableKeepAlives: true,}
client := &http.Client{
Transport: &transport,
Timeout: time.Duration(3) * time.Second,
}
resp, err := client.Do(request)
if err != nil {return err}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {return nil}
return fmt.Errorf("ping %s fail", url)
}
2. 读 head segment 中的 block
从 head segment 读取:
- 先定位到 segment 的 pos 地位,pos 即下一个要读取的 block;
- 依照 block 格局,先读 block len,再读 block data;
//services/hh/queue.go
// current returns byte slice that the current segment points
func (l *segment) current() ([]byte, error) {
// 定位到 pos
if err := l.seekToCurrent(); err != nil {return nil, err}
// 先读 block len
// read the record size
sz, err := l.readUint64()
l.currentSize = int64(sz)
// 再读 block data
b := make([]byte, sz)
if err := l.readBytes(b); err != nil {return nil, err}
return b, nil
}
如何定位到 segment 的 pos 地位?实际上都是文件操作:
//services/hh/queue.go
func (l *segment) seekToCurrent() error {return l.seek(int64(l.pos))
}
func (l *segment) seek(pos int64) error {n, err := l.file.Seek(pos, os.SEEK_SET)
if err != nil {return err}
if n != pos {return fmt.Errorf("bad seek. exp %v, got %v", 0, n)
}
return nil
}
3. 将读到的 block 数据反序列化,写入远端节点
反序列化失去 points,依照跟序列化相同的格局:
- 先读 8byte 的 shardId;
- 而后逐行 (\n) 解析 point;
//services/hh/node_processor.go
func unmarshalWrite(b []byte) (uint64, []models.Point, error) {if len(b) < 8 {return 0, nil, fmt.Errorf("too short: len = %d", len(b))
}
ownerID := binary.BigEndian.Uint64(b[:8])
points, err := models.ParsePoints(b[8:])
return ownerID, points, err
}
//models/points.go
func ParsePoints(buf []byte) ([]Point, error) {return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")
}
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
for pos < len(buf) {pos, block = scanLine(buf, pos)
pos++
start := skipWhitespace(block, 0)
pt, err := parsePoint(block[start:], defaultTime, precision)
points = append(points, pt)
}
return points, nil
}
应用 shardWriter 将 points 写入远端节点:
//cluster/shard_writer.go
// WriteShard writes time series points to a shard
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {c, err := w.dial(ownerID)
conn, ok := c.(*pooledConn)
// Determine the location of this shard and whether it still exists
db, rp, sgi := w.MetaClient.ShardOwner(shardID)
// Build write request.
var request WriteShardRequest
request.SetShardID(shardID)
request.SetDatabase(db)
request.SetRetentionPolicy(rp)
request.AddPoints(points)
//points 序列化
// Marshal into protocol buffers.
buf, err := request.MarshalBinary()
//points 依照 TLV 格局写入 conn
// Write request.
conn.SetWriteDeadline(time.Now().Add(w.timeout))
if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {conn.MarkUnusable()
return err
}
// 读取 response
// Read the response.
conn.SetReadDeadline(time.Now().Add(w.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {conn.MarkUnusable()
return err
}
....
return nil
}
4. 更新 head segment 的 pos
更新 head segment 的 pos,下次持续从 pos 读取:
- 先尝试挪动 head segment: head.advance();
- 若 head segment 读完,则持续下一个 segment:
//services/hh/queue.go
// Advance moves the head point to the next byte slice in the queue
func (l *queue) Advance() error {l.mu.Lock()
defer l.mu.Unlock()
if l.head == nil {return ErrNotOpen}
err := l.head.advance()
if err == io.EOF {
....
//head segment 读完了,删除 head,持续下一个
if err := l.trimHead(); err != nil {return err}
}
....
return nil
}
尝试挪动 head segment:
- 首先定位到 footer,写入新的 pos,而后同步到 disk:
//services/hh/queue.go
// advance advances the current value pointer
func (l *segment) advance() error {
// 定位到 end-8byte
if err := l.seekEnd(-footerSize); err != nil {return err}
// 写入新的 pos
pos := l.pos + l.currentSize + 8
if err := l.writeUint64(uint64(pos)); err != nil {return err}
// 同步到 disk
if err := l.file.Sync(); err != nil {return err}
l.pos = pos
// 定位到 pos
if err := l.seekToCurrent(); err != nil {return err}
....
return nil
}
若 head segment 读完,则持续下一个 segment:
- 如果 head segment 读到 io.EOF,则删除 head,head=head.next:
//services/hh/queue.go
func (l *queue) trimHead() error {if len(l.segments) > 1 {l.segments = l.segments[1:]
if err := l.head.close(); err != nil {return err}
// 删除 segment 文件
if err := os.Remove(l.head.path); err != nil {return err}
// 下一个 segment 作为 head
l.head = l.segments[0]
}
return nil
}