TSDB
- 基于lsm的实现,memtable存储热数据(2h),磁盘存储冷数据
- 思考wisckey kv拆散,ssd并行写代替程序io
- lock-free
- 基于aep这类的Persistent Memory 代替wal
- 读写拆散
- 实现高效的内存查问数据结构(avltree、skiplist、红黑树)
- 基于可插拔式的压缩算法(ZSTD压缩、Snappy压缩)
- mmap内存拷贝
- 相似es的倒排索引
- 垂直写,程度查,序列分流,冷热读写
- 自定义Marshal编解码
- RoaingBitMap 优化
- 优化的正则查问(fastRegexMatcher)
想尝试写一个tsdb,当然实现较为简单,底层还是基于LSM的形式,如果不理解LSM的能够看下bitcask模型,或者理解下LSM论文,我集体简略实现过一个简略的LSM写操作,基于bitcask模型,代码量很少,能够简略理解下~:https://github.com/azhsmesos/...
TSDB: Time Series Database
- 数据点(Point): 时序数据的数据点是一个蕴含 (Timestamp:int64, Value:float64) 的二元组。
- 工夫线(Series): 不同标签(Label)的组合称为不同的工夫线
借用Prometheus
的格局就是这样:
series1:{"__name__": "cpu.idle", "host": "jvm_host"}
series1:{"__name__": "cpu.idle", "host": "jvm_host"}
数据模型定义
// Point 示意一个数据点 (ts, value) 二元组type Point struct { Ts int64 // in seconds Value float64}// Label 代表一个标签组合type Label struct { Name string Value string}// Row 一行时序数据 包含数据点和标签组合type Row struct { Metric string Labels LabelSet Point Point}// LabelSet 示意 Label 组合type LabelSet []Label
数据写入
时序数据库具备垂直写,程度查的个性,比方clickhouse,咱们常常将其用来和grafana一起做监控的底层存储,咱们很多时候的查问都是基于一段持续时间内的数据点,可能这个数据点的label都是同一个,也可能是多个。
序列分流
因为时序数据库的场景,导致它可能同一条label的数据在某个工夫序列上十分大,所以咱们思考序列分流,基于工夫维度创立不同的segment,而后基于该工夫维度做冷热拆散
开始开发
一、One day 写内存
因为咱们要兼顾LSM的形式,所以采纳的模型比较简单,间接以Row的模式忘chan中写数据,而后异步进行刷盘。
日志存储格局这样:
其中一个segment中蕴含很多个rows,而后由一个meta.json的数据文件形容该segment的时间跨度、数据大小等数据。
tsdb.go
办法以及相干对象定义
package tsdbimport ( "context" "errors" "github.com/sirupsen/logrus" "sync" "time")type options struct { metaSerializer MetaSerializer // 元数据自定义Marshal接口 bytesCompressor BytesCompressor // 数据长久化存储压缩接口 retention time.Duration // 数据保留时长 segmentDuration time.Duration // 一个segment的时长 writeTimeout time.Duration // 写超时 onlyMemoryMode bool enableOutdated bool // 是否能够写入过期数据(乱序写入) maxRowsPerSegment int64 // 每段的最大row的数量 dataPath string // Segment 长久化存储文件夹}type TSDB struct { segments *segmentList mutex sync.RWMutex ctx context.Context cancel context.CancelFunc queue chan []*Row wait sync.WaitGroup}// Point 一个数据点type Point struct { Timestamp int64 Value float64}// Label 一个标签组合type Label struct { Name string Value string}type LabelList []Labelvar ( timerPool sync.Pool defaultOpts = &options{ metaSerializer: newBinaryMetaSerializer(), bytesCompressor: newNoopBytesCompressor(), segmentDuration: 2 * time.Hour, // 默认两小时 retention: 7 * 24 * time.Hour, writeTimeout: 30 * time.Second, onlyMemoryMode: false, enableOutdated: true, maxRowsPerSegment: 19960412, // 该数字可自定义 dataPath: ".", })// Row 一行时序数据库,包含数据点和标签组合type Row struct { Metric string Labels LabelList Point Point}// InsertRows 插入rowsfunc (tsdb *TSDB) InsertRows(rows []*Row) error { timer := getTimer(defaultOpts.writeTimeout) select { case tsdb.queue <- rows: putTimer(timer) case <-timer.C: putTimer(timer) return errors.New("failed to insert rows to database, write overload") } return nil}func getTimer(duration time.Duration) *time.Timer { if value := timerPool.Get(); value != nil { t := value.(*time.Timer) if t.Reset(duration) { logrus.Error("active timer trapped to the pool") return nil } return t } return time.NewTimer(duration)}func putTimer(t *time.Timer) { if !t.Stop() { select { case <-t.C: default: } }}
list.go
内存中的排序链表
// List 排序链表构造type List interface { }
segment.go
segment格局定义
import "sync"type Segment interface { InsertRows(row []*Row)}type segmentList struct { mutex sync.RWMutex head Segment list List}
compressor.go
数据压缩文件定义
// noopBytesCompressor 默认不压缩type noopBytesCompressor struct {}// BytesCompressor 数据压缩接口type BytesCompressor interface { Compress(data []byte) []byte Decompress(data []byte) ([]byte, error)}func newNoopBytesCompressor() BytesCompressor { return &noopBytesCompressor{}}func (n *noopBytesCompressor) Compress(data []byte) []byte { return nil}func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) { return nil, nil}
metadata.go
形容segment的meta信息
type metaSeries struct { Sid string StartOffset uint64 EndOffset uint64 Labels []uint32}type seriesWithLabel struct { Name string Sids []uint32}type Metadata struct { MinTimestamp int64 MaxTimestamp int64 Series []metaSeries Labels []seriesWithLabel}type binaryMetaserializer struct {}// MetaSerializer 编解码Segment元数据type MetaSerializer interface { Marshal(Metadata) ([]byte, error) Unmarshal([]byte, *Metadata) error}func newBinaryMetaSerializer() MetaSerializer { return &binaryMetaserializer{}}func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) { return nil, nil}func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error { return nil}
能够看到,外围就是将数据以row数组的模式批量刷新到chan中去,而后由咱们的chan进行异步刷盘操作...
待定~
二、实现启动函数和压缩算法
数据压缩算法,目前是两个算法,ZSTD和Snappy两种,也是间接调用的库函数,实现比较简单
package tsdbimport ( "github.com/golang/snappy" "github.com/klauspost/compress/zstd")type BytesCompressorType int8const ( // NoopBytesCompressor 默认不压缩 NoopBytesCompressor BytesCompressorType = iota // ZSTDBytesCompressor 应用ZSTD压缩算法 ZSTDBytesCompressor // SnappyBytesCompressor 应用snappy压缩算法 SnappyBytesCompressor)// noopBytesCompressor 默认不压缩type noopBytesCompressor struct{}type zstdBytesCompressor struct{}type snappyBytesCompressor struct{}// BytesCompressor 数据压缩接口type BytesCompressor interface { Compress(data []byte) []byte Decompress(data []byte) ([]byte, error)}// 默认压缩算法func newNoopBytesCompressor() BytesCompressor { return &noopBytesCompressor{}}func (n *noopBytesCompressor) Compress(data []byte) []byte { return data}func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) { return data, nil}// ZSTD 压缩算法func newZSTDBytesCompressor() BytesCompressor { return &zstdBytesCompressor{}}func (n *zstdBytesCompressor) Compress(data []byte) []byte { encoder, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest)) return encoder.EncodeAll(data, make([]byte, 0, len(data)))}func (n *zstdBytesCompressor) Decompress(data []byte) ([]byte, error) { decoder, _ := zstd.NewReader(nil) return decoder.DecodeAll(data, nil)}// snappy 压缩算法func newSnappyBytesCompressor() BytesCompressor { return &snappyBytesCompressor{}}func (n *snappyBytesCompressor) Compress(data []byte) []byte { return snappy.Encode(nil, data)}func (n *snappyBytesCompressor) Decompress(data []byte) ([]byte, error) { return snappy.Decode(nil, data)}
能够看到,外围就是将数据以row数组的模式批量刷新到chan中去,而后由咱们的chan进行异步刷盘操作...
下一篇实现可插拔的压缩算法和启动tsdb的具体实现
本文由mdnice多平台公布