一. 整体流程
- client 写入时,会首先写入内存的 shard,若 shard 写入胜利,则间接返回 client;
- 若写入后 shard 已满,则将 shard 内的数据压缩后保留为 inmemoryPart(依然在内存中),而后返回 client:
- inmemoryPart 的数据,被后盾的 goroutine,定期的 merge 为 part 构造,保留到 disk 中;
时序数据在磁盘中的存储:
- partition: 一个月 1 个目录,比方 2023_05:
- part: 作为 partition 下的子目录,外面蕴含 tv 和索引信息;
# ls 2023_05/
117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD
# # ls 2023_05/117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD/
index.bin metaindex.bin min_dedup_interval timestamps.bin values.bin
二. 内存 shard
shards 的个数:
- shards 个数跟 CPU 核数无关;
- 对 4C4G 的机器来说,就 1 个 shard;
// The number of shards for rawRow entries per partition.
//
var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 3) / 4
每个 shard 保留的 rows 个数:
- 范畴 =1w~50w;
- unsafe.Sizeof(rawRow{})=50
-
对 4C4G 机器来说:
- memory.Allowed() = 4G×60%;
- rowRowsShardsPerPartition=1;
- 每个 shard 保留的 rows 个数:后果 =(4G×60% / 1 / 256 / 50) = 18w;
// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerShard() int {maxRawRowsPerPartitionOnce.Do(func() {n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{}))
if n < 1e4 {n = 1e4}
if n > 500e3 {n = 500e3}
maxRawRowsPerPartition = n
})
return maxRawRowsPerPartition
}
若存在 N 个 shards,写入时保留到哪个 shard?
-
按程序轮转写:roundRobin
- 第一次写第一个,下一次写第二个 …
func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {n := atomic.AddUint32(&rrss.shardIdx, 1)
shards := rrss.shards
idx := n % uint32(len(shards))
shard := &shards[idx]
shard.addRows(pt, rows)
}
三. 写入的代码
首先,依据 rows 工夫找指标 partition,为简化剖析,仅思考一个 partition 写入的状况:
- 一个 partition 就是一个 month;
// lib/storage/table.go
func (tb *table) AddRows(rows []rawRow) error {if len(rows) == 0 {return nil}
// Verify whether all the rows may be added to a single partition.
ptwsX := getPartitionWrappers()
defer putPartitionWrappers(ptwsX)
ptwsX.a = tb.GetPartitions(ptwsX.a[:0])
ptws := ptwsX.a
for i, ptw := range ptws {
singlePt := true
for j := range rows {if !ptw.pt.HasTimestamp(rows[j].Timestamp) {
singlePt = false
break
}
}
if !singlePt {continue}
...
// 所有 rows 都在一个 partition
// Fast path - add all the rows into the ptw.
ptw.pt.AddRows(rows)
tb.PutPartitions(ptws)
return nil
}
...
}
确定 partition 后,前面的流程即在 partion 内写入:
- rows 会被先写入 partition 内的 []shard;
// lib/storage/partition.go
// AddRows adds the given rows to the partition pt.
func (pt *partition) AddRows(rows []rawRow) {if len(rows) == 0 {return}
// Validate all the rows.
...
// 写入到 []shard
pt.rawRows.addRows(pt, rows)
}
shard 内的写入:
- 若 shard 内有足够的空位写入 rows,则写入 shard 并返回 client 了;
- 否则,将 shard 内已有的 rows 和新 rows 保留在 rowsToFlush 中;而后将 rowsToFlush 中的数据写入 inmemoryPart;
// lib/storage/partition.go
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {var rowsToFlush []rawRow
rrs.mu.Lock()
if cap(rrs.rows) == 0 {n := getMaxRawRowsPerShard()
rrs.rows = make([]rawRow, 0, n)
}
maxRowsCount := cap(rrs.rows)
capacity := maxRowsCount - len(rrs.rows)
// 还有空位
if capacity >= len(rows) {
// Fast path - rows fit capacity.
rrs.rows = append(rrs.rows, rows...)
} else {
// shard 中没有空位了
// 将 shard 中的 rows 和新 rows 保留在 rowsToFlush 中
// Slow path - rows don't fit capacity.
// Put rrs.rows and rows to rowsToFlush and convert it to a part.
rowsToFlush = append(rowsToFlush, rrs.rows...)
rowsToFlush = append(rowsToFlush, rows...)
rrs.rows = rrs.rows[:0]
rrs.lastFlushTime = fasttime.UnixTimestamp()}
rrs.mu.Unlock()
// 将 rowsToFlush 内的 rows 写入 inmemoryPart
pt.flushRowsToParts(rowsToFlush)
}
将 rowsToFlush 的数据写入 inmemoryPart 的过程:
- 这里能够看到,若 rowsToFlush 为空的话,函数就间接返回了;
- 具体工作由 pt.addRowsPart(rowsPart) 执行;
// lib/storage/partition.go
func (pt *partition) flushRowsToParts(rows []rawRow) {maxRows := getMaxRawRowsPerShard()
wg := getWaitGroup()
for len(rows) > 0 {
n := maxRows
if n > len(rows) {n = len(rows)
}
wg.Add(1)
go func(rowsPart []rawRow) {defer wg.Done()
pt.addRowsPart(rowsPart) // 执行工作的函数
}(rows[:n])
rows = rows[n:]
}
wg.Wait()
putWaitGroup(wg)
}
将 rows 内容写入 inmemoryPart,而后结构一个 partWrapper,将 partWrapper 保留到 partition.smallParts 中;
与此同时,判断 pt.smallParts 是否超过 256 个,若 <=256 则间接返回;否则帮忙执行 merge part;
// lib/storage/partition.go
func (pt *partition) addRowsPart(rows []rawRow) {
// 将 rows 的内容写入 inmemoryPart
mp := getInmemoryPart()
mp.InitFromRows(rows) // 将 rows 写入 inmemoryPart 时会对 rows 数据进行压缩
...
// 结构一个 partWrapper
p, err := mp.NewPart()
pw := &partWrapper{
p: p,
mp: mp,
refCount: 1,
}
// 将新的 partWrapper 保留在 partition 的 smallParts 中
pt.smallParts = append(pt.smallParts, pw)
// 断定是否超过 256
ok := len(pt.smallParts) <= maxSmallPartsPerPartition
pt.partsLock.Unlock()
if ok {return // smallParts <= 256,间接返回}
// 若 smallParts > 256,则帮忙执行 merge
// The added part exceeds available limit. Help merging parts.
...
err = pt.mergeSmallParts(false)
if err == nil {atomic.AddUint64(&pt.smallAssistedMerges, 1)
return
}
....
}
四. 总结
-
最快的流程:
- rows 写入到 partition 写的某个 shard,而后返回;
- shard 内的 rows 被 goroutine 定期的保留到 inmemoryPart;
-
次快的流程:
- rows 写入的指标 shard 满了,将 shard rows 和新 rows 存入 inmemoryPart,保留到 partition.smallParts 中;
- inmemoryPart 内的 rows 被 goroutine 定期的 merge 到磁盘,保留为 part 目录;
-
最慢的流程:
- 在上一个流程的根底上,发现 pt.smallParts 超过 256 个,帮忙执行 merge;