Client 通过 POST /write 向 influxdb 集群写入时序数据:
curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
influxdb 集群中的数据分 shard 在不同的节点上存储,client 写入时序数据时 (单条或批量):
- 有的数据须要写入以后节点;
- 有的数据须要写入远端节点;
- 在写入时,所有 shard 都写入胜利时,才认为该写入申请胜利。
整体流程:
- node1 在 8086 上接管 /write 申请,而后依据写入的数据,确定数据存储到 shard1 和 shard2 上;
- shard1 和 shard2 都写入胜利,才算数据写入胜利;
- 集群状况下,每个 shard 至多有 2 个 replica,假如有 2 个 replica,shard1 存储到 node1 和 node2 上两份正本;
- shard1 有 2 个 replica,shard1 写入胜利与 request 中传入的 consistency 无关;
consistency: 写入的一致性级别
consistency 参数,由 client 在 request 中传入,标识了 shard 有 N 个 replica 的状况下,如何确定 shard 是否写入胜利。
如果 client 没有传入 consistency 参数,server 端默认 ConsistencyLevelOne,即只有一个 replica 写入 OK,就返回 client 胜利。
consistency 参数:
- all: 所有的 replica 都写入胜利则返回胜利;
- quorum: 大多数的 replica 写入胜利,则返回胜利;
- one: 任何一个 replica 写入胜利,则返回胜利;
- any: 任何一个 replica 写入胜利,或者被写入 Hinted-Handoff 缓存,则返回胜利;
以 3 节点,2replica 为例:
Level | required |
---|---|
all | 2 |
quorum | 2 |
one | 1 |
any | 1 |
// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
consistency models.ConsistencyLevel, points []models.Point) error {
// The required number of writes to achieve the requested consistency level
required := len(shard.Owners)
switch consistency {
case models.ConsistencyLevelAny, models.ConsistencyLevelOne:
required = 1
case models.ConsistencyLevelQuorum:
required = required/2 + 1
}
......
}
数据写入:代码流程
POST /write 的解决入口:
// services/http/handler.go
// NewHandler returns a new instance of handler with routes.
func NewHandler(c Config) *Handler {
h := &Handler{mux: pat.New(),
Config: &c,
Logger: zap.NewNop(),
CLFLogger: log.New(os.Stderr, "[httpd]", 0),
Store: storage.NewStore(),
stats: &Statistics{},
requestTracker: NewRequestTracker(),
sema: make(chan struct{}, 100),
}
h.AddRoutes([]Route{
......
Route{
"write", // Data-ingest route.
"POST", "/write", true, writeLogEnabled, h.serveWrite,
},
......
}...)
return h
}
具体的写操作在 h.serveWrite():
// services/httpd/handler.go
// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
......
database := r.URL.Query().Get("db")
if database == "" {h.httpError(w, "database is required", http.StatusBadRequest)
return
}
if di := h.MetaClient.Database(database); di == nil {h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)
return
}
body := r.Body
buf := bytes.NewBuffer(bs)
_, err := buf.ReadFrom(body)
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
level := r.URL.Query().Get("consistency")
// 默认的 consistency
consistency := models.ConsistencyLevelOne
if level != "" {
var err error
consistency, err = models.ParseConsistencyLevel(level)
}
// 写入 points
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
h.writeHeader(w, http.StatusNoContent)
}
由 h.PointsWriter.WritePoints() 写入 points 数据,代码走到 cluster.WritePoints():
// cluster/points_writer.go
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)
}
写 points:
- 先查问 points 要存入的 shard,须要 metaClient 读元数据;
- 每个 shard 启动 1 个 goroutine 写 points 数据;
- 若某个 shard 写谬误,则返回 err;仅所有 shard 都写入胜利,返回胜利;
// cluster/points_writer.go
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
...
shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
// 每个 shard 启动 1 个 goroutine 去写
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {ch <- w.writeToShard(shard, database, retentionPolicy, consistencyLevel, points)
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
}
...
for range shardMappings.Points {
select {
case <-w.closing:
return ErrWriteFailed
// 有 shard 返回 err,则函数返回 err
case err := <-ch:
if err != nil {return err}
}
}
return nil
}
写 shard 的流程:
-
在写入 shard 时,因为 shard 有 N 个 replica,写入胜利依赖于 consistencyLevel 参数:
- 依据 shard owner 和 consistency 计算 required;
- 最初 write >= required,则认为本地写入胜利;
- 在集群场景下,某个 shard 可能归属于不同的 node,也就是有不同的 owner,每个 owner 都要写;
- 如果以后节点是 shard owner,应用 TSDBStore.WriteToShard() 执行本地写;
-
如果远端节点是 shard owner,则应用 shardWriter.WriteShard() 执行远端写;
- 如果远端节点写入失败,则存入本机的 hinted-handoff 队列;
- 前面等远端节点复原时,再将队列中的数据写入远端节点;
// cluster/points_writer.go
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
consistency models.ConsistencyLevel, points []models.Point) error {required := len(shard.Owners)
...
switch consistency {
case models.ConsistencyLevelAny, models.ConsistencyLevelOne:
required = 1
case models.ConsistencyLevelQuorum:
required = required/2 + 1
}
...
ch := make(chan *AsyncWriteResult, len(shard.Owners))
for _, owner := range shard.Owners {go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
// 以后节点写 shard
if w.Node.GetDataID() == owner.NodeID {err := w.TSDBStore.WriteToShard(shardID, points)
if err == tsdb.ErrShardNotFound {err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
...
err = w.TSDBStore.WriteToShard(shardID, points)
}
ch <- &AsyncWriteResult{owner, err}
return
}
// 远端节点写 shard
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
if err != nil{
// 远端节点写入失败,则写入本地的 hh 队列
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
.....
}
ch <- &AsyncWriteResult{owner, err}
}(shard.ID, owner, points)
}
var wrote int
for range shard.Owners {
select {
case result := <-ch:
wrote++
// 写入胜利的次数 >= required
if wrote >= required {return nil}
}
}
...
return ErrWriteFailed
}