一. 整体流程
- client写入时,会首先写入内存的shard,若shard写入胜利,则间接返回client;
- 若写入后shard已满,则将shard内的数据压缩后保留为inmemoryPart(依然在内存中),而后返回client:
- inmemoryPart的数据,被后盾的goroutine,定期的merge为part构造,保留到disk中;
时序数据在磁盘中的存储:
- partition: 一个月1个目录,比方2023_05:
- part: 作为partition下的子目录,外面蕴含tv和索引信息;
# ls 2023_05/117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD# # ls 2023_05/117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD/index.bin metaindex.bin min_dedup_interval timestamps.bin values.bin
二. 内存shard
shards的个数:
- shards个数跟CPU核数无关;
- 对4C4G的机器来说,就1个shard;
// The number of shards for rawRow entries per partition.//var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 3) / 4
每个shard保留的rows个数:
- 范畴=1w~50w;
- unsafe.Sizeof(rawRow{})=50
对4C4G机器来说:
- memory.Allowed() = 4G×60%;
- rowRowsShardsPerPartition=1;
- 每个shard保留的rows个数:后果=(4G×60% / 1 / 256 / 50) = 18w;
// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet.func getMaxRawRowsPerShard() int { maxRawRowsPerPartitionOnce.Do(func() { n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{})) if n < 1e4 { n = 1e4 } if n > 500e3 { n = 500e3 } maxRawRowsPerPartition = n }) return maxRawRowsPerPartition}
若存在N个shards,写入时保留到哪个shard?
按程序轮转写:roundRobin
- 第一次写第一个,下一次写第二个...
func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) { n := atomic.AddUint32(&rrss.shardIdx, 1) shards := rrss.shards idx := n % uint32(len(shards)) shard := &shards[idx] shard.addRows(pt, rows)}
三. 写入的代码
首先,依据rows工夫找指标partition,为简化剖析,仅思考一个partition写入的状况:
- 一个partition就是一个month;
// lib/storage/table.gofunc (tb *table) AddRows(rows []rawRow) error { if len(rows) == 0 { return nil } // Verify whether all the rows may be added to a single partition. ptwsX := getPartitionWrappers() defer putPartitionWrappers(ptwsX) ptwsX.a = tb.GetPartitions(ptwsX.a[:0]) ptws := ptwsX.a for i, ptw := range ptws { singlePt := true for j := range rows { if !ptw.pt.HasTimestamp(rows[j].Timestamp) { singlePt = false break } } if !singlePt { continue } ... // 所有rows都在一个partition // Fast path - add all the rows into the ptw. ptw.pt.AddRows(rows) tb.PutPartitions(ptws) return nil } ...}
确定partition后,前面的流程即在partion内写入:
- rows会被先写入partition内的[]shard;
// lib/storage/partition.go// AddRows adds the given rows to the partition pt.func (pt *partition) AddRows(rows []rawRow) { if len(rows) == 0 { return } // Validate all the rows. ... // 写入到[]shard pt.rawRows.addRows(pt, rows)}
shard内的写入:
- 若shard内有足够的空位写入rows,则写入shard并返回client了;
- 否则,将shard内已有的rows和新rows保留在rowsToFlush中;而后将rowsToFlush中的数据写入inmemoryPart;
// lib/storage/partition.gofunc (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { var rowsToFlush []rawRow rrs.mu.Lock() if cap(rrs.rows) == 0 { n := getMaxRawRowsPerShard() rrs.rows = make([]rawRow, 0, n) } maxRowsCount := cap(rrs.rows) capacity := maxRowsCount - len(rrs.rows) // 还有空位 if capacity >= len(rows) { // Fast path - rows fit capacity. rrs.rows = append(rrs.rows, rows...) } else { // shard中没有空位了 // 将shard中的rows和新rows保留在rowsToFlush中 // Slow path - rows don't fit capacity. // Put rrs.rows and rows to rowsToFlush and convert it to a part. rowsToFlush = append(rowsToFlush, rrs.rows...) rowsToFlush = append(rowsToFlush, rows...) rrs.rows = rrs.rows[:0] rrs.lastFlushTime = fasttime.UnixTimestamp() } rrs.mu.Unlock() // 将rowsToFlush内的rows写入inmemoryPart pt.flushRowsToParts(rowsToFlush)}
将rowsToFlush的数据写入inmemoryPart的过程:
- 这里能够看到,若rowsToFlush为空的话,函数就间接返回了;
- 具体工作由pt.addRowsPart(rowsPart)执行;
// lib/storage/partition.gofunc (pt *partition) flushRowsToParts(rows []rawRow) { maxRows := getMaxRawRowsPerShard() wg := getWaitGroup() for len(rows) > 0 { n := maxRows if n > len(rows) { n = len(rows) } wg.Add(1) go func(rowsPart []rawRow) { defer wg.Done() pt.addRowsPart(rowsPart) // 执行工作的函数 }(rows[:n]) rows = rows[n:] } wg.Wait() putWaitGroup(wg)}
将rows内容写入inmemoryPart,而后结构一个partWrapper,将partWrapper保留到partition.smallParts中;
与此同时,判断pt.smallParts是否超过256个,若<=256则间接返回;否则帮忙执行merge part;
// lib/storage/partition.gofunc (pt *partition) addRowsPart(rows []rawRow) { // 将rows的内容写入inmemoryPart mp := getInmemoryPart() mp.InitFromRows(rows) //将rows写入inmemoryPart时会对rows数据进行压缩 ... // 结构一个partWrapper p, err := mp.NewPart() pw := &partWrapper{ p: p, mp: mp, refCount: 1, } // 将新的partWrapper保留在partition的smallParts中 pt.smallParts = append(pt.smallParts, pw) // 断定是否超过256 ok := len(pt.smallParts) <= maxSmallPartsPerPartition pt.partsLock.Unlock() if ok { return // smallParts <= 256,间接返回 } // 若 smallParts > 256,则帮忙执行merge // The added part exceeds available limit. Help merging parts. ... err = pt.mergeSmallParts(false) if err == nil { atomic.AddUint64(&pt.smallAssistedMerges, 1) return } ....}
四. 总结
最快的流程:
- rows写入到partition写的某个shard,而后返回;
- shard内的rows被goroutine定期的保留到inmemoryPart;
次快的流程:
- rows写入的指标shard满了,将shard rows和新rows存入inmemoryPart,保留到partition.smallParts中;
- inmemoryPart内的rows被goroutine定期的merge到磁盘,保留为part目录;
最慢的流程:
- 在上一个流程的根底上,发现pt.smallParts超过256个,帮忙执行merge;