Client通过POST /write向influxdb集群写入时序数据:
curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
influxdb集群中的数据分shard在不同的节点上存储,client写入时序数据时(单条或批量):
- 有的数据须要写入以后节点;
- 有的数据须要写入远端节点;
- 在写入时,所有shard都写入胜利时,才认为该写入申请胜利。
整体流程:
- node1在8086上接管/write申请,而后依据写入的数据,确定数据存储到shard1和shard2上;
- shard1和shard2都写入胜利,才算数据写入胜利;
- 集群状况下,每个shard至多有2个replica,假如有2个replica,shard1存储到node1和node2上两份正本;
- shard1有2个replica,shard1写入胜利与request中传入的consistency无关;
consistency: 写入的一致性级别
consistency参数,由client在request中传入,标识了shard有N个replica的状况下,如何确定shard是否写入胜利。
如果client没有传入consistency参数,server端默认ConsistencyLevelOne,即只有一个replica写入OK,就返回client胜利。
consistency参数:
- all: 所有的replica都写入胜利则返回胜利;
- quorum: 大多数的replica写入胜利,则返回胜利;
- one: 任何一个replica写入胜利,则返回胜利;
- any: 任何一个replica写入胜利,或者被写入Hinted-Handoff缓存,则返回胜利;
以3节点,2replica为例:
Level | required |
---|---|
all | 2 |
quorum | 2 |
one | 1 |
any | 1 |
// cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, consistency models.ConsistencyLevel, points []models.Point) error { // The required number of writes to achieve the requested consistency level required := len(shard.Owners) switch consistency { case models.ConsistencyLevelAny, models.ConsistencyLevelOne: required = 1 case models.ConsistencyLevelQuorum: required = required/2 + 1 } ......}
数据写入:代码流程
POST /write的解决入口:
// services/http/handler.go// NewHandler returns a new instance of handler with routes.func NewHandler(c Config) *Handler { h := &Handler{ mux: pat.New(), Config: &c, Logger: zap.NewNop(), CLFLogger: log.New(os.Stderr, "[httpd] ", 0), Store: storage.NewStore(), stats: &Statistics{}, requestTracker: NewRequestTracker(), sema: make(chan struct{}, 100), } h.AddRoutes([]Route{ ...... Route{ "write", // Data-ingest route. "POST", "/write", true, writeLogEnabled, h.serveWrite, }, ...... }...) return h}
具体的写操作在h.serveWrite():
// services/httpd/handler.go// serveWrite receives incoming series data in line protocol format and writes it to the database.func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) { ...... database := r.URL.Query().Get("db") if database == "" { h.httpError(w, "database is required", http.StatusBadRequest) return } if di := h.MetaClient.Database(database); di == nil { h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound) return } body := r.Body buf := bytes.NewBuffer(bs) _, err := buf.ReadFrom(body) points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision")) level := r.URL.Query().Get("consistency") // 默认的consistency consistency := models.ConsistencyLevelOne if level != "" { var err error consistency, err = models.ParseConsistencyLevel(level) } // 写入points if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) h.httpError(w, err.Error(), http.StatusBadRequest) return } atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points))) h.writeHeader(w, http.StatusNoContent)}
由h.PointsWriter.WritePoints()写入points数据,代码走到cluster.WritePoints():
// cluster/points_writer.gofunc (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error { return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)}
写points:
- 先查问points要存入的shard,须要metaClient读元数据;
- 每个shard启动1个goroutine写points数据;
- 若某个shard写谬误,则返回err;仅所有shard都写入胜利,返回胜利;
// cluster/points_writer.gofunc (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { ... shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points}) ch := make(chan error, len(shardMappings.Points)) for shardID, points := range shardMappings.Points { // 每个shard启动1个goroutine去写 go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) { ch <- w.writeToShard(shard, database, retentionPolicy, consistencyLevel, points) }(shardMappings.Shards[shardID], database, retentionPolicy, points) } ... for range shardMappings.Points { select { case <-w.closing: return ErrWriteFailed //有shard返回err,则函数返回err case err := <-ch: if err != nil { return err } } } return nil}
写shard的流程:
在写入shard时,因为shard有N个replica,写入胜利依赖于consistencyLevel参数:
- 依据shard owner和consistency计算required;
- 最初write >= required,则认为本地写入胜利;
- 在集群场景下,某个shard可能归属于不同的node,也就是有不同的owner,每个owner都要写;
- 如果以后节点是shard owner,应用TSDBStore.WriteToShard()执行本地写;
如果远端节点是shard owner,则应用shardWriter.WriteShard()执行远端写;
- 如果远端节点写入失败,则存入本机的hinted-handoff队列;
- 前面等远端节点复原时,再将队列中的数据写入远端节点;
// cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, consistency models.ConsistencyLevel, points []models.Point) error { required := len(shard.Owners) ... switch consistency { case models.ConsistencyLevelAny, models.ConsistencyLevelOne: required = 1 case models.ConsistencyLevelQuorum: required = required/2 + 1 } ... ch := make(chan *AsyncWriteResult, len(shard.Owners)) for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { //以后节点写shard if w.Node.GetDataID() == owner.NodeID { err := w.TSDBStore.WriteToShard(shardID, points) if err == tsdb.ErrShardNotFound { err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true) ... err = w.TSDBStore.WriteToShard(shardID, points) } ch <- &AsyncWriteResult{owner, err} return } //远端节点写shard err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) if err != nil{ //远端节点写入失败,则写入本地的hh队列 hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points) ..... } ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } var wrote int for range shard.Owners { select { case result := <-ch: wrote++ // 写入胜利的次数 >= required if wrote >= required { return nil } } } ... return ErrWriteFailed}