一. 整体流程

  • client写入时,会首先写入内存的shard,若shard写入胜利,则间接返回client;
  • 若写入后shard已满,则将shard内的数据压缩后保留为inmemoryPart(依然在内存中),而后返回client:
  • inmemoryPart的数据,被后盾的goroutine,定期的merge为part构造,保留到disk中;

时序数据在磁盘中的存储:

  • partition: 一个月1个目录,比方2023_05:
  • part: 作为partition下的子目录,外面蕴含tv和索引信息;
# ls 2023_05/117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD# # ls 2023_05/117_117_20230504085731.987_20230504085737.814_175ADBE68C628DDD/index.bin  metaindex.bin  min_dedup_interval  timestamps.bin  values.bin

二. 内存shard

shards的个数:

  • shards个数跟CPU核数无关;
  • 对4C4G的机器来说,就1个shard;
// The number of shards for rawRow entries per partition.//var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 3) / 4

每个shard保留的rows个数:

  • 范畴=1w~50w;
  • unsafe.Sizeof(rawRow{})=50
  • 对4C4G机器来说:

    • memory.Allowed() = 4G×60%;
    • rowRowsShardsPerPartition=1;
    • 每个shard保留的rows个数:后果=(4G×60% / 1 / 256 / 50) = 18w;
// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet.func getMaxRawRowsPerShard() int {    maxRawRowsPerPartitionOnce.Do(func() {        n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{}))        if n < 1e4 {            n = 1e4        }        if n > 500e3 {            n = 500e3        }        maxRawRowsPerPartition = n    })    return maxRawRowsPerPartition}

若存在N个shards,写入时保留到哪个shard?

  • 按程序轮转写:roundRobin

    • 第一次写第一个,下一次写第二个...
func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {    n := atomic.AddUint32(&rrss.shardIdx, 1)    shards := rrss.shards    idx := n % uint32(len(shards))    shard := &shards[idx]    shard.addRows(pt, rows)}

三. 写入的代码

首先,依据rows工夫找指标partition,为简化剖析,仅思考一个partition写入的状况:

  • 一个partition就是一个month;
// lib/storage/table.gofunc (tb *table) AddRows(rows []rawRow) error {    if len(rows) == 0 {        return nil    }    // Verify whether all the rows may be added to a single partition.    ptwsX := getPartitionWrappers()    defer putPartitionWrappers(ptwsX)    ptwsX.a = tb.GetPartitions(ptwsX.a[:0])    ptws := ptwsX.a    for i, ptw := range ptws {        singlePt := true        for j := range rows {            if !ptw.pt.HasTimestamp(rows[j].Timestamp) {                singlePt = false                break            }        }        if !singlePt {            continue        }        ...        // 所有rows都在一个partition        // Fast path - add all the rows into the ptw.        ptw.pt.AddRows(rows)        tb.PutPartitions(ptws)        return nil    }    ...}

确定partition后,前面的流程即在partion内写入:

  • rows会被先写入partition内的[]shard;
// lib/storage/partition.go// AddRows adds the given rows to the partition pt.func (pt *partition) AddRows(rows []rawRow) {    if len(rows) == 0 {        return    }    // Validate all the rows.    ...    // 写入到[]shard    pt.rawRows.addRows(pt, rows)}

shard内的写入:

  • 若shard内有足够的空位写入rows,则写入shard并返回client了;
  • 否则,将shard内已有的rows和新rows保留在rowsToFlush中;而后将rowsToFlush中的数据写入inmemoryPart;
// lib/storage/partition.gofunc (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {    var rowsToFlush []rawRow    rrs.mu.Lock()    if cap(rrs.rows) == 0 {        n := getMaxRawRowsPerShard()        rrs.rows = make([]rawRow, 0, n)    }    maxRowsCount := cap(rrs.rows)    capacity := maxRowsCount - len(rrs.rows)    // 还有空位    if capacity >= len(rows) {        // Fast path - rows fit capacity.        rrs.rows = append(rrs.rows, rows...)    } else {        // shard中没有空位了        // 将shard中的rows和新rows保留在rowsToFlush中        // Slow path - rows don't fit capacity.        // Put rrs.rows and rows to rowsToFlush and convert it to a part.        rowsToFlush = append(rowsToFlush, rrs.rows...)       rowsToFlush = append(rowsToFlush, rows...)        rrs.rows = rrs.rows[:0]        rrs.lastFlushTime = fasttime.UnixTimestamp()    }    rrs.mu.Unlock()    // 将rowsToFlush内的rows写入inmemoryPart    pt.flushRowsToParts(rowsToFlush)}

将rowsToFlush的数据写入inmemoryPart的过程:

  • 这里能够看到,若rowsToFlush为空的话,函数就间接返回了;
  • 具体工作由pt.addRowsPart(rowsPart)执行;
// lib/storage/partition.gofunc (pt *partition) flushRowsToParts(rows []rawRow) {    maxRows := getMaxRawRowsPerShard()    wg := getWaitGroup()    for len(rows) > 0 {        n := maxRows        if n > len(rows) {            n = len(rows)        }        wg.Add(1)        go func(rowsPart []rawRow) {            defer wg.Done()            pt.addRowsPart(rowsPart)    // 执行工作的函数        }(rows[:n])        rows = rows[n:]    }    wg.Wait()    putWaitGroup(wg)}

将rows内容写入inmemoryPart,而后结构一个partWrapper,将partWrapper保留到partition.smallParts中;

与此同时,判断pt.smallParts是否超过256个,若<=256则间接返回;否则帮忙执行merge part;

// lib/storage/partition.gofunc (pt *partition) addRowsPart(rows []rawRow) {    // 将rows的内容写入inmemoryPart    mp := getInmemoryPart()    mp.InitFromRows(rows)    //将rows写入inmemoryPart时会对rows数据进行压缩    ...    // 结构一个partWrapper    p, err := mp.NewPart()    pw := &partWrapper{        p:        p,        mp:       mp,        refCount: 1,    }    // 将新的partWrapper保留在partition的smallParts中    pt.smallParts = append(pt.smallParts, pw)    // 断定是否超过256    ok := len(pt.smallParts) <= maxSmallPartsPerPartition    pt.partsLock.Unlock()    if ok {        return    // smallParts <= 256,间接返回    }    // 若 smallParts > 256,则帮忙执行merge    // The added part exceeds available limit. Help merging parts.    ...    err = pt.mergeSmallParts(false)    if err == nil {        atomic.AddUint64(&pt.smallAssistedMerges, 1)        return    }    ....}

四. 总结

  • 最快的流程:

    • rows写入到partition写的某个shard,而后返回;
    • shard内的rows被goroutine定期的保留到inmemoryPart;
  • 次快的流程:

    • rows写入的指标shard满了,将shard rows和新rows存入inmemoryPart,保留到partition.smallParts中;
    • inmemoryPart内的rows被goroutine定期的merge到磁盘,保留为part目录;
  • 最慢的流程:

    • 在上一个流程的根底上,发现pt.smallParts超过256个,帮忙执行merge;