关于influxdb:InfluxDB集群-write写入数据源码分析一

62次阅读

共计 5535 个字符,预计需要花费 14 分钟才能阅读完成。

Client 通过 POST /write 向 influxdb 集群写入时序数据:

curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

influxdb 集群中的数据分 shard 在不同的节点上存储,client 写入时序数据时 (单条或批量):

  • 有的数据须要写入以后节点;
  • 有的数据须要写入远端节点;
  • 在写入时,所有 shard 都写入胜利时,才认为该写入申请胜利。

整体流程:

  • node1 在 8086 上接管 /write 申请,而后依据写入的数据,确定数据存储到 shard1 和 shard2 上;
  • shard1 和 shard2 都写入胜利,才算数据写入胜利;
  • 集群状况下,每个 shard 至多有 2 个 replica,假如有 2 个 replica,shard1 存储到 node1 和 node2 上两份正本;
  • shard1 有 2 个 replica,shard1 写入胜利与 request 中传入的 consistency 无关;

consistency: 写入的一致性级别

consistency 参数,由 client 在 request 中传入,标识了 shard 有 N 个 replica 的状况下,如何确定 shard 是否写入胜利。

如果 client 没有传入 consistency 参数,server 端默认 ConsistencyLevelOne,即只有一个 replica 写入 OK,就返回 client 胜利。

consistency 参数:

  • all: 所有的 replica 都写入胜利则返回胜利;
  • quorum: 大多数的 replica 写入胜利,则返回胜利;
  • one: 任何一个 replica 写入胜利,则返回胜利;
  • any: 任何一个 replica 写入胜利,或者被写入 Hinted-Handoff 缓存,则返回胜利;

以 3 节点,2replica 为例:

Level required
all 2
quorum 2
one 1
any 1
// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
    consistency models.ConsistencyLevel, points []models.Point) error {
    // The required number of writes to achieve the requested consistency level
    required := len(shard.Owners)
    switch consistency {
    case models.ConsistencyLevelAny, models.ConsistencyLevelOne:
        required = 1
    case models.ConsistencyLevelQuorum:
        required = required/2 + 1
    }
    ......
}

数据写入:代码流程

POST /write 的解决入口:

// services/http/handler.go
// NewHandler returns a new instance of handler with routes.
func NewHandler(c Config) *Handler {
    h := &Handler{mux:            pat.New(),
        Config:         &c,
        Logger:         zap.NewNop(),
        CLFLogger:      log.New(os.Stderr, "[httpd]", 0),
        Store:          storage.NewStore(),
        stats:          &Statistics{},
        requestTracker: NewRequestTracker(),
        sema:           make(chan struct{}, 100),
    }
    h.AddRoutes([]Route{
        ......
        Route{
            "write", // Data-ingest route.
            "POST", "/write", true, writeLogEnabled, h.serveWrite,
        },
        ......
    }...)
    return h
}

具体的写操作在 h.serveWrite():

// services/httpd/handler.go
// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
    ......
    database := r.URL.Query().Get("db")
    if database == "" {h.httpError(w, "database is required", http.StatusBadRequest)
        return
    }
    if di := h.MetaClient.Database(database); di == nil {h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)
        return
    }    
    body := r.Body
    buf := bytes.NewBuffer(bs)
    _, err := buf.ReadFrom(body)
    
    points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
    
    level := r.URL.Query().Get("consistency")
    // 默认的 consistency
    consistency := models.ConsistencyLevelOne
    if level != "" {
        var err error
        consistency, err = models.ParseConsistencyLevel(level)        
    }
    // 写入 points
    if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
        h.httpError(w, err.Error(), http.StatusBadRequest)
        return
    }
    atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
    h.writeHeader(w, http.StatusNoContent)
}

由 h.PointsWriter.WritePoints() 写入 points 数据,代码走到 cluster.WritePoints():

// cluster/points_writer.go
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)
}

写 points:

  • 先查问 points 要存入的 shard,须要 metaClient 读元数据;
  • 每个 shard 启动 1 个 goroutine 写 points 数据;
  • 若某个 shard 写谬误,则返回 err;仅所有 shard 都写入胜利,返回胜利;
// cluster/points_writer.go
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
    ...
    shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
    
    ch := make(chan error, len(shardMappings.Points))
    for shardID, points := range shardMappings.Points {
        // 每个 shard 启动 1 个 goroutine 去写
        go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {ch <- w.writeToShard(shard, database, retentionPolicy, consistencyLevel, points)
        }(shardMappings.Shards[shardID], database, retentionPolicy, points)
    }
    ...
    for range shardMappings.Points {
        select {
        case <-w.closing:
            return ErrWriteFailed
        // 有 shard 返回 err,则函数返回 err
        case err := <-ch:
            if err != nil {return err}
        }
    }
    return nil
}    

写 shard 的流程:

  • 在写入 shard 时,因为 shard 有 N 个 replica,写入胜利依赖于 consistencyLevel 参数:

    • 依据 shard owner 和 consistency 计算 required;
    • 最初 write >= required,则认为本地写入胜利;
  • 在集群场景下,某个 shard 可能归属于不同的 node,也就是有不同的 owner,每个 owner 都要写;
  • 如果以后节点是 shard owner,应用 TSDBStore.WriteToShard() 执行本地写;
  • 如果远端节点是 shard owner,则应用 shardWriter.WriteShard() 执行远端写;

    • 如果远端节点写入失败,则存入本机的 hinted-handoff 队列;
    • 前面等远端节点复原时,再将队列中的数据写入远端节点;
// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
    consistency models.ConsistencyLevel, points []models.Point) error {required := len(shard.Owners)
    ...
    switch consistency {
    case models.ConsistencyLevelAny, models.ConsistencyLevelOne:
        required = 1
    case models.ConsistencyLevelQuorum:
        required = required/2 + 1
    }
    ...
    ch := make(chan *AsyncWriteResult, len(shard.Owners))
    for _, owner := range shard.Owners {go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
            // 以后节点写 shard
            if w.Node.GetDataID() == owner.NodeID {err := w.TSDBStore.WriteToShard(shardID, points)
                if err == tsdb.ErrShardNotFound {err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
                    ...
                    err = w.TSDBStore.WriteToShard(shardID, points)
                }
                ch <- &AsyncWriteResult{owner, err}
                return
            }
            // 远端节点写 shard
            err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)            
            if err != nil{
                // 远端节点写入失败,则写入本地的 hh 队列
                hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
                .....
            }
            ch <- &AsyncWriteResult{owner, err}
        }(shard.ID, owner, points)
    }
    var wrote int
    for range shard.Owners {
        select {
        case result := <-ch:
            wrote++
            // 写入胜利的次数 >= required
            if wrote >= required {return nil}
        }
    }
    ...
    return ErrWriteFailed
} 

正文完
 0