在本机节点上,给每个远端节点调配一个NodeProcessor对象,负责数据的写入和数据的读取。
NodeProcessor定期的读取本机队列中的数据,而后将其发送给远端节点,尝试写入远端shard。
// services/hh/node_processor.gofunc (n *NodeProcessor) run() { ...... for { case <-time.After(currInterval): limiter := NewRateLimiter(n.RetryRateLimit) //限流 for { c, err := n.SendWrite() ... limiter.Update(c) time.Sleep(limiter.Delay()) } }}
读取和发送过程
- 首先,ping远端节点,判断其是否沉闷,如果不沉闷,则间接返回;
- 而后,读Head segment中的block,读操作均是从segment中的pos处开始;
- 而后,将读到的block数据反序列化,发送给远端节点;
- 最初,更新head segment中的pos;
// services/hh/node_processor.go// SendWrite attempts to sent the current block of hinted data to the target node. If successful,// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF// when there is no more data or the node is inactive.func (n *NodeProcessor) SendWrite() (int, error) { //ping远端节点,判断是否沉闷 active, err := n.Active() if err != nil { return 0, err } //读head segment中:pos开始的block // Get the current block from the queue buf, err := n.queue.Current() //反序列化 // unmarshal the byte slice back to shard ID and points shardID, points, err := unmarshalWrite(buf) //写入远端节点 if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil { atomic.AddInt64(&n.stats.WriteNodeReqFail, 1) return 0, err } //更新head segment的pos,下次依然从pos读取 if err := n.queue.Advance(); err != nil { n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err)) } return len(buf), nil}
1. 判断节点是否沉闷
先依据nodeId查问node信息(ip):
//services/hh/node_processor.go// Active returns whether this node processor is for a currently active node.func (n *NodeProcessor) Active() (bool, error) { nio, err := n.meta.DataNode(n.nodeID) if nio == nil { return false, nil } ping := n.Ping(nio) return ping == nil, nil}// Ping returns whether this node processor is for a currently active node.func (n *NodeProcessor) Ping(node *meta.NodeInfo) error { return n.ping(node.Host)}
ping的动作,理论是发送HTTP GET http://ip:8091/ping:
//services/hh/node_processor.go// Ping returns whether this node processor is for a currently active node.func (n *NodeProcessor) ping(host string) error { url := "http://" + host + "/ping" request, _ := http.NewRequest("GET", url, nil) transport := http.Transport{ DisableKeepAlives: true, } client := &http.Client{ Transport: &transport, Timeout: time.Duration(3) * time.Second, } resp, err := client.Do(request) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent { return nil } return fmt.Errorf("ping %s fail", url)}
2. 读head segment中的block
从head segment读取:
- 先定位到segment的pos地位,pos即下一个要读取的block;
- 依照block格局,先读block len,再读block data;
//services/hh/queue.go// current returns byte slice that the current segment pointsfunc (l *segment) current() ([]byte, error) { //定位到pos if err := l.seekToCurrent(); err != nil { return nil, err } //先读block len // read the record size sz, err := l.readUint64() l.currentSize = int64(sz) //再读block data b := make([]byte, sz) if err := l.readBytes(b); err != nil { return nil, err } return b, nil}
如何定位到segment的pos地位?实际上都是文件操作:
//services/hh/queue.gofunc (l *segment) seekToCurrent() error { return l.seek(int64(l.pos))}func (l *segment) seek(pos int64) error { n, err := l.file.Seek(pos, os.SEEK_SET) if err != nil { return err } if n != pos { return fmt.Errorf("bad seek. exp %v, got %v", 0, n) } return nil}
3. 将读到的block数据反序列化,写入远端节点
反序列化失去points,依照跟序列化相同的格局:
- 先读8byte的shardId;
- 而后逐行(\n)解析point;
//services/hh/node_processor.gofunc unmarshalWrite(b []byte) (uint64, []models.Point, error) { if len(b) < 8 { return 0, nil, fmt.Errorf("too short: len = %d", len(b)) } ownerID := binary.BigEndian.Uint64(b[:8]) points, err := models.ParsePoints(b[8:]) return ownerID, points, err}//models/points.gofunc ParsePoints(buf []byte) ([]Point, error) { return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")}func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1) for pos < len(buf) { pos, block = scanLine(buf, pos) pos++ start := skipWhitespace(block, 0) pt, err := parsePoint(block[start:], defaultTime, precision) points = append(points, pt) } return points, nil}
应用shardWriter将points写入远端节点:
//cluster/shard_writer.go// WriteShard writes time series points to a shardfunc (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error { c, err := w.dial(ownerID) conn, ok := c.(*pooledConn) // Determine the location of this shard and whether it still exists db, rp, sgi := w.MetaClient.ShardOwner(shardID) // Build write request. var request WriteShardRequest request.SetShardID(shardID) request.SetDatabase(db) request.SetRetentionPolicy(rp) request.AddPoints(points) //points序列化 // Marshal into protocol buffers. buf, err := request.MarshalBinary() //points依照TLV格局写入conn // Write request. conn.SetWriteDeadline(time.Now().Add(w.timeout)) if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil { conn.MarkUnusable() return err } //读取response // Read the response. conn.SetReadDeadline(time.Now().Add(w.timeout)) _, buf, err = ReadTLV(conn) if err != nil { conn.MarkUnusable() return err } .... return nil}
4. 更新head segment的pos
更新head segment的pos,下次持续从pos读取:
- 先尝试挪动head segment: head.advance();
- 若head segment读完,则持续下一个segment:
//services/hh/queue.go// Advance moves the head point to the next byte slice in the queuefunc (l *queue) Advance() error { l.mu.Lock() defer l.mu.Unlock() if l.head == nil { return ErrNotOpen } err := l.head.advance() if err == io.EOF { .... //head segment读完了,删除head,持续下一个 if err := l.trimHead(); err != nil { return err } } .... return nil}
尝试挪动head segment:
- 首先定位到footer,写入新的pos,而后同步到disk:
//services/hh/queue.go// advance advances the current value pointerfunc (l *segment) advance() error { //定位到end-8byte if err := l.seekEnd(-footerSize); err != nil { return err } //写入新的pos pos := l.pos + l.currentSize + 8 if err := l.writeUint64(uint64(pos)); err != nil { return err } //同步到disk if err := l.file.Sync(); err != nil { return err } l.pos = pos //定位到pos if err := l.seekToCurrent(); err != nil { return err } .... return nil}
若head segment读完,则持续下一个segment:
- 如果head segment读到io.EOF,则删除head,head=head.next:
//services/hh/queue.gofunc (l *queue) trimHead() error { if len(l.segments) > 1 { l.segments = l.segments[1:] if err := l.head.close(); err != nil { return err } //删除segment文件 if err := os.Remove(l.head.path); err != nil { return err } //下一个segment作为head l.head = l.segments[0] } return nil}