关于influxdb:InfluxDB集群-write写入数据源码分析一

Client通过POST /write向influxdb集群写入时序数据:

curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

influxdb集群中的数据分shard在不同的节点上存储,client写入时序数据时(单条或批量):

  • 有的数据须要写入以后节点;
  • 有的数据须要写入远端节点;
  • 在写入时,所有shard都写入胜利时,才认为该写入申请胜利。

整体流程:

  • node1在8086上接管/write申请,而后依据写入的数据,确定数据存储到shard1和shard2上;
  • shard1和shard2都写入胜利,才算数据写入胜利;
  • 集群状况下,每个shard至多有2个replica,假如有2个replica,shard1存储到node1和node2上两份正本;
  • shard1有2个replica,shard1写入胜利与request中传入的consistency无关;

consistency: 写入的一致性级别

consistency参数,由client在request中传入,标识了shard有N个replica的状况下,如何确定shard是否写入胜利。

如果client没有传入consistency参数,server端默认ConsistencyLevelOne,即只有一个replica写入OK,就返回client胜利。

consistency参数:

  • all: 所有的replica都写入胜利则返回胜利;
  • quorum: 大多数的replica写入胜利,则返回胜利;
  • one: 任何一个replica写入胜利,则返回胜利;
  • any: 任何一个replica写入胜利,或者被写入Hinted-Handoff缓存,则返回胜利;

以3节点,2replica为例:

Level required
all 2
quorum 2
one 1
any 1
// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
    consistency models.ConsistencyLevel, points []models.Point) error {
    // The required number of writes to achieve the requested consistency level
    required := len(shard.Owners)
    switch consistency {
    case models.ConsistencyLevelAny, models.ConsistencyLevelOne:
        required = 1
    case models.ConsistencyLevelQuorum:
        required = required/2 + 1
    }
    ......
}

数据写入:代码流程

POST /write的解决入口:

// services/http/handler.go
// NewHandler returns a new instance of handler with routes.
func NewHandler(c Config) *Handler {
    h := &Handler{
        mux:            pat.New(),
        Config:         &c,
        Logger:         zap.NewNop(),
        CLFLogger:      log.New(os.Stderr, "[httpd] ", 0),
        Store:          storage.NewStore(),
        stats:          &Statistics{},
        requestTracker: NewRequestTracker(),
        sema:           make(chan struct{}, 100),
    }
    h.AddRoutes([]Route{
        ......
        Route{
            "write", // Data-ingest route.
            "POST", "/write", true, writeLogEnabled, h.serveWrite,
        },
        ......
    }...)
    return h
}

具体的写操作在h.serveWrite():

// services/httpd/handler.go
// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
    ......
    database := r.URL.Query().Get("db")
    if database == "" {
        h.httpError(w, "database is required", http.StatusBadRequest)
        return
    }
    if di := h.MetaClient.Database(database); di == nil {
        h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)
        return
    }    
    body := r.Body
    buf := bytes.NewBuffer(bs)
    _, err := buf.ReadFrom(body)
    
    points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
    
    level := r.URL.Query().Get("consistency")
    // 默认的consistency
    consistency := models.ConsistencyLevelOne
    if level != "" {
        var err error
        consistency, err = models.ParseConsistencyLevel(level)        
    }
    // 写入points
    if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {
        atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
        h.httpError(w, err.Error(), http.StatusBadRequest)
        return
    }
    atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
    h.writeHeader(w, http.StatusNoContent)
}

由h.PointsWriter.WritePoints()写入points数据,代码走到cluster.WritePoints():

// cluster/points_writer.go
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
    return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)
}

写points:

  • 先查问points要存入的shard,须要metaClient读元数据;
  • 每个shard启动1个goroutine写points数据;
  • 若某个shard写谬误,则返回err;仅所有shard都写入胜利,返回胜利;
// cluster/points_writer.go
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
    ...
    shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
    
    ch := make(chan error, len(shardMappings.Points))
    for shardID, points := range shardMappings.Points {
        // 每个shard启动1个goroutine去写
        go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
            ch <- w.writeToShard(shard, database, retentionPolicy, consistencyLevel, points)
        }(shardMappings.Shards[shardID], database, retentionPolicy, points)
    }
    ...
    for range shardMappings.Points {
        select {
        case <-w.closing:
            return ErrWriteFailed
        //有shard返回err,则函数返回err
        case err := <-ch:
            if err != nil {
                return err
            }
        }
    }
    return nil
}    

写shard的流程:

  • 在写入shard时,因为shard有N个replica,写入胜利依赖于consistencyLevel参数:

    • 依据shard owner和consistency计算required;
    • 最初write >= required,则认为本地写入胜利;
  • 在集群场景下,某个shard可能归属于不同的node,也就是有不同的owner,每个owner都要写;
  • 如果以后节点是shard owner,应用TSDBStore.WriteToShard()执行本地写;
  • 如果远端节点是shard owner,则应用shardWriter.WriteShard()执行远端写;

    • 如果远端节点写入失败,则存入本机的hinted-handoff队列;
    • 前面等远端节点复原时,再将队列中的数据写入远端节点;
// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
    consistency models.ConsistencyLevel, points []models.Point) error {
    required := len(shard.Owners)
    ...
    switch consistency {
    case models.ConsistencyLevelAny, models.ConsistencyLevelOne:
        required = 1
    case models.ConsistencyLevelQuorum:
        required = required/2 + 1
    }
    ...
    ch := make(chan *AsyncWriteResult, len(shard.Owners))
    for _, owner := range shard.Owners {
        go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
            //以后节点写shard
            if w.Node.GetDataID() == owner.NodeID {
                err := w.TSDBStore.WriteToShard(shardID, points)
                if err == tsdb.ErrShardNotFound {
                    err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
                    ...
                    err = w.TSDBStore.WriteToShard(shardID, points)
                }
                ch <- &AsyncWriteResult{owner, err}
                return
            }
            //远端节点写shard
            err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)            
            if err != nil{
                //远端节点写入失败,则写入本地的hh队列
                hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
                .....
            }
            ch <- &AsyncWriteResult{owner, err}
        }(shard.ID, owner, points)
    }
    var wrote int
    for range shard.Owners {
        select {
        case result := <-ch:
            wrote++
            // 写入胜利的次数 >= required
            if wrote >= required {
                return nil
            }
        }
    }
    ...
    return ErrWriteFailed
} 

【腾讯云】云产品限时秒杀,爆款1核2G云服务器,首年50元

阿里云限时活动-2核2G-5M带宽-60G SSD-1000G月流量 ,特惠价99元/年(原价1234.2元/年,可以直接买3年),速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

You may also like...

发表评论

邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据