• TSDB

    • 基于lsm的实现,memtable存储热数据(2h),磁盘存储冷数据
    • 思考wisckey kv拆散,ssd并行写代替程序io
    • lock-free
    • 基于aep这类的Persistent Memory 代替wal
    • 读写拆散
    • 实现高效的内存查问数据结构(avltree、skiplist、红黑树)
    • 基于可插拔式的压缩算法(ZSTD压缩、Snappy压缩)
    • mmap内存拷贝
    • 相似es的倒排索引
    • 垂直写,程度查,序列分流,冷热读写
    • 自定义Marshal编解码
    • RoaingBitMap 优化
    • 优化的正则查问(fastRegexMatcher)

想尝试写一个tsdb,当然实现较为简单,底层还是基于LSM的形式,如果不理解LSM的能够看下bitcask模型,或者理解下LSM论文,我集体简略实现过一个简略的LSM写操作,基于bitcask模型,代码量很少,能够简略理解下~:https://github.com/azhsmesos/...

TSDB: Time Series Database

  • 数据点(Point): 时序数据的数据点是一个蕴含 (Timestamp:int64, Value:float64) 的二元组。
  • 工夫线(Series): 不同标签(Label)的组合称为不同的工夫线

借用Prometheus的格局就是这样:

series1:{"__name__": "cpu.idle", "host": "jvm_host"}

series1:{"__name__": "cpu.idle", "host": "jvm_host"}

数据模型定义

// Point 示意一个数据点 (ts, value) 二元组type Point struct { Ts    int64 // in seconds Value float64}// Label 代表一个标签组合type Label struct { Name  string Value string}// Row 一行时序数据 包含数据点和标签组合type Row struct { Metric string Labels LabelSet Point  Point}// LabelSet 示意 Label 组合type LabelSet []Label

数据写入

时序数据库具备垂直写,程度查的个性,比方clickhouse,咱们常常将其用来和grafana一起做监控的底层存储,咱们很多时候的查问都是基于一段持续时间内的数据点,可能这个数据点的label都是同一个,也可能是多个。

序列分流

因为时序数据库的场景,导致它可能同一条label的数据在某个工夫序列上十分大,所以咱们思考序列分流,基于工夫维度创立不同的segment,而后基于该工夫维度做冷热拆散

开始开发

一、One day 写内存

因为咱们要兼顾LSM的形式,所以采纳的模型比较简单,间接以Row的模式忘chan中写数据,而后异步进行刷盘。

日志存储格局这样:

其中一个segment中蕴含很多个rows,而后由一个meta.json的数据文件形容该segment的时间跨度、数据大小等数据。

tsdb.go办法以及相干对象定义

package tsdbimport (   "context"   "errors"   "github.com/sirupsen/logrus"   "sync"   "time")type options struct {   metaSerializer    MetaSerializer  // 元数据自定义Marshal接口   bytesCompressor   BytesCompressor // 数据长久化存储压缩接口   retention         time.Duration   // 数据保留时长   segmentDuration   time.Duration   // 一个segment的时长   writeTimeout      time.Duration   // 写超时   onlyMemoryMode    bool   enableOutdated    bool   // 是否能够写入过期数据(乱序写入)   maxRowsPerSegment int64  // 每段的最大row的数量   dataPath          string // Segment 长久化存储文件夹}type TSDB struct {   segments *segmentList   mutex    sync.RWMutex   ctx      context.Context   cancel   context.CancelFunc   queue chan []*Row   wait  sync.WaitGroup}// Point 一个数据点type Point struct {   Timestamp int64   Value     float64}// Label 一个标签组合type Label struct {   Name  string   Value string}type LabelList []Labelvar (   timerPool   sync.Pool   defaultOpts = &options{      metaSerializer:    newBinaryMetaSerializer(),      bytesCompressor:   newNoopBytesCompressor(),      segmentDuration:   2 * time.Hour, // 默认两小时      retention:         7 * 24 * time.Hour,      writeTimeout:      30 * time.Second,      onlyMemoryMode:    false,      enableOutdated:    true,      maxRowsPerSegment: 19960412, // 该数字可自定义      dataPath:          ".",   })// Row 一行时序数据库,包含数据点和标签组合type Row struct {   Metric string   Labels LabelList   Point  Point}// InsertRows 插入rowsfunc (tsdb *TSDB) InsertRows(rows []*Row) error {   timer := getTimer(defaultOpts.writeTimeout)   select {   case tsdb.queue <- rows:      putTimer(timer)   case <-timer.C:      putTimer(timer)      return errors.New("failed to insert rows to database, write overload")   }   return nil}func getTimer(duration time.Duration) *time.Timer {   if value := timerPool.Get(); value != nil {      t := value.(*time.Timer)      if t.Reset(duration) {         logrus.Error("active timer trapped to the pool")         return nil      }      return t   }   return time.NewTimer(duration)}func putTimer(t *time.Timer) {   if !t.Stop() {      select {      case <-t.C:      default:      }   }}

list.go内存中的排序链表

// List 排序链表构造type List interface {   }

segment.gosegment格局定义

import "sync"type Segment interface {   InsertRows(row []*Row)}type segmentList struct {   mutex sync.RWMutex   head  Segment   list  List}

compressor.go数据压缩文件定义

// noopBytesCompressor 默认不压缩type noopBytesCompressor struct {}// BytesCompressor 数据压缩接口type BytesCompressor interface {   Compress(data []byte) []byte   Decompress(data []byte) ([]byte, error)}func newNoopBytesCompressor() BytesCompressor {   return &noopBytesCompressor{}}func (n *noopBytesCompressor) Compress(data []byte) []byte {   return nil}func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {   return nil, nil}

metadata.go形容segment的meta信息

type metaSeries struct {   Sid         string   StartOffset uint64   EndOffset   uint64   Labels      []uint32}type seriesWithLabel struct {   Name string   Sids []uint32}type Metadata struct {   MinTimestamp int64   MaxTimestamp int64   Series       []metaSeries   Labels       []seriesWithLabel}type binaryMetaserializer struct {}// MetaSerializer 编解码Segment元数据type MetaSerializer interface {   Marshal(Metadata) ([]byte, error)   Unmarshal([]byte, *Metadata) error}func newBinaryMetaSerializer() MetaSerializer {   return &binaryMetaserializer{}}func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) {   return nil, nil}func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error {   return nil}

能够看到,外围就是将数据以row数组的模式批量刷新到chan中去,而后由咱们的chan进行异步刷盘操作...

待定~

二、实现启动函数和压缩算法

数据压缩算法,目前是两个算法,ZSTD和Snappy两种,也是间接调用的库函数,实现比较简单

package tsdbimport (    "github.com/golang/snappy"    "github.com/klauspost/compress/zstd")type BytesCompressorType int8const (    // NoopBytesCompressor 默认不压缩    NoopBytesCompressor BytesCompressorType = iota    // ZSTDBytesCompressor 应用ZSTD压缩算法    ZSTDBytesCompressor    // SnappyBytesCompressor 应用snappy压缩算法    SnappyBytesCompressor)// noopBytesCompressor 默认不压缩type noopBytesCompressor struct{}type zstdBytesCompressor struct{}type snappyBytesCompressor struct{}// BytesCompressor 数据压缩接口type BytesCompressor interface {    Compress(data []byte) []byte    Decompress(data []byte) ([]byte, error)}//  默认压缩算法func newNoopBytesCompressor() BytesCompressor {    return &noopBytesCompressor{}}func (n *noopBytesCompressor) Compress(data []byte) []byte {    return data}func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {    return data, nil}// ZSTD 压缩算法func newZSTDBytesCompressor() BytesCompressor {    return &zstdBytesCompressor{}}func (n *zstdBytesCompressor) Compress(data []byte) []byte {    encoder, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))    return encoder.EncodeAll(data, make([]byte, 0, len(data)))}func (n *zstdBytesCompressor) Decompress(data []byte) ([]byte, error) {    decoder, _ := zstd.NewReader(nil)    return decoder.DecodeAll(data, nil)}// snappy 压缩算法func newSnappyBytesCompressor() BytesCompressor {    return &snappyBytesCompressor{}}func (n *snappyBytesCompressor) Compress(data []byte) []byte {    return snappy.Encode(nil, data)}func (n *snappyBytesCompressor) Decompress(data []byte) ([]byte, error) {    return snappy.Decode(nil, data)}

能够看到,外围就是将数据以row数组的模式批量刷新到chan中去,而后由咱们的chan进行异步刷盘操作...

下一篇实现可插拔的压缩算法和启动tsdb的具体实现

本文由mdnice多平台公布