共计 5653 个字符,预计需要花费 15 分钟才能阅读完成。
TSDB
- 基于 lsm 的实现,memtable 存储热数据(2h),磁盘存储冷数据
- 思考 wisckey kv 拆散,ssd 并行写代替程序 io
- lock-free
- 基于 aep 这类的 Persistent Memory 代替 wal
- 读写拆散
- 实现高效的内存查问数据结构(avltree、skiplist、红黑树)
- 基于可插拔式的压缩算法(ZSTD 压缩、Snappy 压缩)
- mmap 内存拷贝
- 相似 es 的倒排索引
- 垂直写,程度查,序列分流,冷热读写
- 自定义 Marshal 编解码
- RoaingBitMap 优化
- 优化的正则查问(fastRegexMatcher)
想尝试写一个 tsdb,当然实现较为简单,底层还是基于 LSM 的形式,如果不理解 LSM 的能够看下 bitcask 模型,或者理解下 LSM 论文,我集体简略实现过一个简略的 LSM 写操作,基于 bitcask 模型,代码量很少,能够简略理解下~:https://github.com/azhsmesos/…
TSDB: Time Series Database
- 数据点(Point): 时序数据的数据点是一个蕴含 (Timestamp:int64, Value:float64) 的二元组。
- 工夫线(Series): 不同标签(Label)的组合称为不同的工夫线
借用 Prometheus
的格局就是这样:
series1:{"__name__": "cpu.idle", "host": "jvm_host"}
series1:{"__name__": "cpu.idle", "host": "jvm_host"}
数据模型定义
// Point 示意一个数据点 (ts, value) 二元组 | |
type Point struct { | |
Ts int64 // in seconds | |
Value float64 | |
} | |
// Label 代表一个标签组合 | |
type Label struct { | |
Name string | |
Value string | |
} | |
// Row 一行时序数据 包含数据点和标签组合 | |
type Row struct { | |
Metric string | |
Labels LabelSet | |
Point Point | |
} | |
// LabelSet 示意 Label 组合 | |
type LabelSet []Label |
数据写入
时序数据库具备垂直写,程度查的个性,比方 clickhouse,咱们常常将其用来和 grafana 一起做监控的底层存储,咱们很多时候的查问都是基于一段持续时间内的数据点,可能这个数据点的 label 都是同一个,也可能是多个。
序列分流
因为时序数据库的场景,导致它可能同一条 label 的数据在某个工夫序列上十分大,所以咱们思考序列分流,基于工夫维度创立不同的 segment,而后基于该工夫维度做冷热拆散
开始开发
一、One day 写内存
因为咱们要兼顾 LSM 的形式,所以采纳的模型比较简单,间接以 Row 的模式忘 chan 中写数据,而后异步进行刷盘。
日志存储格局这样:
其中一个 segment 中蕴含很多个 rows,而后由一个 meta.json 的数据文件形容该 segment 的时间跨度、数据大小等数据。
tsdb.go
办法以及相干对象定义
package tsdb | |
import ( | |
"context" | |
"errors" | |
"github.com/sirupsen/logrus" | |
"sync" | |
"time" | |
) | |
type options struct { | |
metaSerializer MetaSerializer // 元数据自定义 Marshal 接口 | |
bytesCompressor BytesCompressor // 数据长久化存储压缩接口 | |
retention time.Duration // 数据保留时长 | |
segmentDuration time.Duration // 一个 segment 的时长 | |
writeTimeout time.Duration // 写超时 | |
onlyMemoryMode bool | |
enableOutdated bool // 是否能够写入过期数据(乱序写入)maxRowsPerSegment int64 // 每段的最大 row 的数量 | |
dataPath string // Segment 长久化存储文件夹 | |
} | |
type TSDB struct { | |
segments *segmentList | |
mutex sync.RWMutex | |
ctx context.Context | |
cancel context.CancelFunc | |
queue chan []*Row | |
wait sync.WaitGroup | |
} | |
// Point 一个数据点 | |
type Point struct { | |
Timestamp int64 | |
Value float64 | |
} | |
// Label 一个标签组合 | |
type Label struct { | |
Name string | |
Value string | |
} | |
type LabelList []Label | |
var ( | |
timerPool sync.Pool | |
defaultOpts = &options{metaSerializer: newBinaryMetaSerializer(), | |
bytesCompressor: newNoopBytesCompressor(), | |
segmentDuration: 2 * time.Hour, // 默认两小时 | |
retention: 7 * 24 * time.Hour, | |
writeTimeout: 30 * time.Second, | |
onlyMemoryMode: false, | |
enableOutdated: true, | |
maxRowsPerSegment: 19960412, // 该数字可自定义 | |
dataPath: ".", | |
} | |
) | |
// Row 一行时序数据库,包含数据点和标签组合 | |
type Row struct { | |
Metric string | |
Labels LabelList | |
Point Point | |
} | |
// InsertRows 插入 rows | |
func (tsdb *TSDB) InsertRows(rows []*Row) error {timer := getTimer(defaultOpts.writeTimeout) | |
select { | |
case tsdb.queue <- rows: | |
putTimer(timer) | |
case <-timer.C: | |
putTimer(timer) | |
return errors.New("failed to insert rows to database, write overload") | |
} | |
return nil | |
} | |
func getTimer(duration time.Duration) *time.Timer {if value := timerPool.Get(); value != nil {t := value.(*time.Timer) | |
if t.Reset(duration) {logrus.Error("active timer trapped to the pool") | |
return nil | |
} | |
return t | |
} | |
return time.NewTimer(duration) | |
} | |
func putTimer(t *time.Timer) {if !t.Stop() { | |
select { | |
case <-t.C: | |
default: | |
} | |
} | |
} |
list.go
内存中的排序链表
// List 排序链表构造 | |
type List interface {} |
segment.go
segment 格局定义
import "sync" | |
type Segment interface {InsertRows(row []*Row) | |
} | |
type segmentList struct { | |
mutex sync.RWMutex | |
head Segment | |
list List | |
} |
compressor.go
数据压缩文件定义
// noopBytesCompressor 默认不压缩 | |
type noopBytesCompressor struct { | |
} | |
// BytesCompressor 数据压缩接口 | |
type BytesCompressor interface {Compress(data []byte) []byte | |
Decompress(data []byte) ([]byte, error) | |
} | |
func newNoopBytesCompressor() BytesCompressor {return &noopBytesCompressor{} | |
} | |
func (n *noopBytesCompressor) Compress(data []byte) []byte {return nil} | |
func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {return nil, nil} |
metadata.go
形容 segment 的 meta 信息
type metaSeries struct { | |
Sid string | |
StartOffset uint64 | |
EndOffset uint64 | |
Labels []uint32} | |
type seriesWithLabel struct { | |
Name string | |
Sids []uint32} | |
type Metadata struct { | |
MinTimestamp int64 | |
MaxTimestamp int64 | |
Series []metaSeries | |
Labels []seriesWithLabel} | |
type binaryMetaserializer struct { | |
} | |
// MetaSerializer 编解码 Segment 元数据 | |
type MetaSerializer interface {Marshal(Metadata) ([]byte, error) | |
Unmarshal([]byte, *Metadata) error | |
} | |
func newBinaryMetaSerializer() MetaSerializer {return &binaryMetaserializer{} | |
} | |
func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) {return nil, nil} | |
func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error {return nil} |
能够看到,外围就是将数据以 row 数组的模式批量刷新到 chan 中去,而后由咱们的 chan 进行异步刷盘操作 …
待定~
二、实现启动函数和压缩算法
数据压缩算法,目前是两个算法,ZSTD 和 Snappy 两种,也是间接调用的库函数,实现比较简单
package tsdb | |
import ( | |
"github.com/golang/snappy" | |
"github.com/klauspost/compress/zstd" | |
) | |
type BytesCompressorType int8 | |
const ( | |
// NoopBytesCompressor 默认不压缩 | |
NoopBytesCompressor BytesCompressorType = iota | |
// ZSTDBytesCompressor 应用 ZSTD 压缩算法 | |
ZSTDBytesCompressor | |
// SnappyBytesCompressor 应用 snappy 压缩算法 | |
SnappyBytesCompressor | |
) | |
// noopBytesCompressor 默认不压缩 | |
type noopBytesCompressor struct{} | |
type zstdBytesCompressor struct{} | |
type snappyBytesCompressor struct{} | |
// BytesCompressor 数据压缩接口 | |
type BytesCompressor interface {Compress(data []byte) []byte | |
Decompress(data []byte) ([]byte, error) | |
} | |
// 默认压缩算法 | |
func newNoopBytesCompressor() BytesCompressor {return &noopBytesCompressor{} | |
} | |
func (n *noopBytesCompressor) Compress(data []byte) []byte {return data} | |
func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {return data, nil} | |
// ZSTD 压缩算法 | |
func newZSTDBytesCompressor() BytesCompressor {return &zstdBytesCompressor{} | |
} | |
func (n *zstdBytesCompressor) Compress(data []byte) []byte {encoder, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest)) | |
return encoder.EncodeAll(data, make([]byte, 0, len(data))) | |
} | |
func (n *zstdBytesCompressor) Decompress(data []byte) ([]byte, error) {decoder, _ := zstd.NewReader(nil) | |
return decoder.DecodeAll(data, nil) | |
} | |
// snappy 压缩算法 | |
func newSnappyBytesCompressor() BytesCompressor {return &snappyBytesCompressor{} | |
} | |
func (n *snappyBytesCompressor) Compress(data []byte) []byte {return snappy.Encode(nil, data) | |
} | |
func (n *snappyBytesCompressor) Decompress(data []byte) ([]byte, error) {return snappy.Decode(nil, data) | |
} |
能够看到,外围就是将数据以 row 数组的模式批量刷新到 chan 中去,而后由咱们的 chan 进行异步刷盘操作 …
下一篇实现可插拔的压缩算法和启动 tsdb 的具体实现
本文由 mdnice 多平台公布