TSDB
- 基于lsm的实现,memtable存储热数据(2h),磁盘存储冷数据
- 思考wisckey kv拆散,ssd并行写代替程序io
- lock-free
- 基于aep这类的Persistent Memory 代替wal
- 读写拆散
- 实现高效的内存查问数据结构(avltree、skiplist、红黑树)
- 基于可插拔式的压缩算法(ZSTD压缩、Snappy压缩)
- mmap内存拷贝
- 相似es的倒排索引
- 垂直写,程度查,序列分流,冷热读写
- 自定义Marshal编解码
- RoaingBitMap 优化
- 优化的正则查问(fastRegexMatcher)
想尝试写一个tsdb,当然实现较为简单,底层还是基于LSM的形式,如果不理解LSM的能够看下bitcask模型,或者理解下LSM论文,我集体简略实现过一个简略的LSM写操作,基于bitcask模型,代码量很少,能够简略理解下~:https://github.com/azhsmesos/…
TSDB: Time Series Database
- 数据点(Point): 时序数据的数据点是一个蕴含 (Timestamp:int64, Value:float64) 的二元组。
- 工夫线(Series): 不同标签(Label)的组合称为不同的工夫线
借用Prometheus
的格局就是这样:
series1:{"__name__": "cpu.idle", "host": "jvm_host"}
series1:{"__name__": "cpu.idle", "host": "jvm_host"}
数据模型定义
// Point 示意一个数据点 (ts, value) 二元组
type Point struct {
Ts int64 // in seconds
Value float64
}
// Label 代表一个标签组合
type Label struct {
Name string
Value string
}
// Row 一行时序数据 包含数据点和标签组合
type Row struct {
Metric string
Labels LabelSet
Point Point
}
// LabelSet 示意 Label 组合
type LabelSet []Label
数据写入
时序数据库具备垂直写,程度查的个性,比方clickhouse,咱们常常将其用来和grafana一起做监控的底层存储,咱们很多时候的查问都是基于一段持续时间内的数据点,可能这个数据点的label都是同一个,也可能是多个。
序列分流
因为时序数据库的场景,导致它可能同一条label的数据在某个工夫序列上十分大,所以咱们思考序列分流,基于工夫维度创立不同的segment,而后基于该工夫维度做冷热拆散
开始开发
一、One day 写内存
因为咱们要兼顾LSM的形式,所以采纳的模型比较简单,间接以Row的模式忘chan中写数据,而后异步进行刷盘。
日志存储格局这样:
其中一个segment中蕴含很多个rows,而后由一个meta.json的数据文件形容该segment的时间跨度、数据大小等数据。
tsdb.go
办法以及相干对象定义
package tsdb
import (
"context"
"errors"
"github.com/sirupsen/logrus"
"sync"
"time"
)
type options struct {
metaSerializer MetaSerializer // 元数据自定义Marshal接口
bytesCompressor BytesCompressor // 数据长久化存储压缩接口
retention time.Duration // 数据保留时长
segmentDuration time.Duration // 一个segment的时长
writeTimeout time.Duration // 写超时
onlyMemoryMode bool
enableOutdated bool // 是否能够写入过期数据(乱序写入)
maxRowsPerSegment int64 // 每段的最大row的数量
dataPath string // Segment 长久化存储文件夹
}
type TSDB struct {
segments *segmentList
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
queue chan []*Row
wait sync.WaitGroup
}
// Point 一个数据点
type Point struct {
Timestamp int64
Value float64
}
// Label 一个标签组合
type Label struct {
Name string
Value string
}
type LabelList []Label
var (
timerPool sync.Pool
defaultOpts = &options{
metaSerializer: newBinaryMetaSerializer(),
bytesCompressor: newNoopBytesCompressor(),
segmentDuration: 2 * time.Hour, // 默认两小时
retention: 7 * 24 * time.Hour,
writeTimeout: 30 * time.Second,
onlyMemoryMode: false,
enableOutdated: true,
maxRowsPerSegment: 19960412, // 该数字可自定义
dataPath: ".",
}
)
// Row 一行时序数据库,包含数据点和标签组合
type Row struct {
Metric string
Labels LabelList
Point Point
}
// InsertRows 插入rows
func (tsdb *TSDB) InsertRows(rows []*Row) error {
timer := getTimer(defaultOpts.writeTimeout)
select {
case tsdb.queue <- rows:
putTimer(timer)
case <-timer.C:
putTimer(timer)
return errors.New("failed to insert rows to database, write overload")
}
return nil
}
func getTimer(duration time.Duration) *time.Timer {
if value := timerPool.Get(); value != nil {
t := value.(*time.Timer)
if t.Reset(duration) {
logrus.Error("active timer trapped to the pool")
return nil
}
return t
}
return time.NewTimer(duration)
}
func putTimer(t *time.Timer) {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
}
list.go
内存中的排序链表
// List 排序链表构造
type List interface {
}
segment.go
segment格局定义
import "sync"
type Segment interface {
InsertRows(row []*Row)
}
type segmentList struct {
mutex sync.RWMutex
head Segment
list List
}
compressor.go
数据压缩文件定义
// noopBytesCompressor 默认不压缩
type noopBytesCompressor struct {
}
// BytesCompressor 数据压缩接口
type BytesCompressor interface {
Compress(data []byte) []byte
Decompress(data []byte) ([]byte, error)
}
func newNoopBytesCompressor() BytesCompressor {
return &noopBytesCompressor{}
}
func (n *noopBytesCompressor) Compress(data []byte) []byte {
return nil
}
func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {
return nil, nil
}
metadata.go
形容segment的meta信息
type metaSeries struct {
Sid string
StartOffset uint64
EndOffset uint64
Labels []uint32
}
type seriesWithLabel struct {
Name string
Sids []uint32
}
type Metadata struct {
MinTimestamp int64
MaxTimestamp int64
Series []metaSeries
Labels []seriesWithLabel
}
type binaryMetaserializer struct {
}
// MetaSerializer 编解码Segment元数据
type MetaSerializer interface {
Marshal(Metadata) ([]byte, error)
Unmarshal([]byte, *Metadata) error
}
func newBinaryMetaSerializer() MetaSerializer {
return &binaryMetaserializer{}
}
func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) {
return nil, nil
}
func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error {
return nil
}
能够看到,外围就是将数据以row数组的模式批量刷新到chan中去,而后由咱们的chan进行异步刷盘操作…
待定~
二、实现启动函数和压缩算法
数据压缩算法,目前是两个算法,ZSTD和Snappy两种,也是间接调用的库函数,实现比较简单
package tsdb
import (
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)
type BytesCompressorType int8
const (
// NoopBytesCompressor 默认不压缩
NoopBytesCompressor BytesCompressorType = iota
// ZSTDBytesCompressor 应用ZSTD压缩算法
ZSTDBytesCompressor
// SnappyBytesCompressor 应用snappy压缩算法
SnappyBytesCompressor
)
// noopBytesCompressor 默认不压缩
type noopBytesCompressor struct{}
type zstdBytesCompressor struct{}
type snappyBytesCompressor struct{}
// BytesCompressor 数据压缩接口
type BytesCompressor interface {
Compress(data []byte) []byte
Decompress(data []byte) ([]byte, error)
}
// 默认压缩算法
func newNoopBytesCompressor() BytesCompressor {
return &noopBytesCompressor{}
}
func (n *noopBytesCompressor) Compress(data []byte) []byte {
return data
}
func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {
return data, nil
}
// ZSTD 压缩算法
func newZSTDBytesCompressor() BytesCompressor {
return &zstdBytesCompressor{}
}
func (n *zstdBytesCompressor) Compress(data []byte) []byte {
encoder, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
return encoder.EncodeAll(data, make([]byte, 0, len(data)))
}
func (n *zstdBytesCompressor) Decompress(data []byte) ([]byte, error) {
decoder, _ := zstd.NewReader(nil)
return decoder.DecodeAll(data, nil)
}
// snappy 压缩算法
func newSnappyBytesCompressor() BytesCompressor {
return &snappyBytesCompressor{}
}
func (n *snappyBytesCompressor) Compress(data []byte) []byte {
return snappy.Encode(nil, data)
}
func (n *snappyBytesCompressor) Decompress(data []byte) ([]byte, error) {
return snappy.Decode(nil, data)
}
能够看到,外围就是将数据以row数组的模式批量刷新到chan中去,而后由咱们的chan进行异步刷盘操作…
下一篇实现可插拔的压缩算法和启动tsdb的具体实现
本文由mdnice多平台公布
发表回复