关于后端:从零实现一个TSDB一

36次阅读

共计 5653 个字符,预计需要花费 15 分钟才能阅读完成。

  • TSDB

    • 基于 lsm 的实现,memtable 存储热数据(2h),磁盘存储冷数据
    • 思考 wisckey kv 拆散,ssd 并行写代替程序 io
    • lock-free
    • 基于 aep 这类的 Persistent Memory 代替 wal
    • 读写拆散
    • 实现高效的内存查问数据结构(avltree、skiplist、红黑树)
    • 基于可插拔式的压缩算法(ZSTD 压缩、Snappy 压缩)
    • mmap 内存拷贝
    • 相似 es 的倒排索引
    • 垂直写,程度查,序列分流,冷热读写
    • 自定义 Marshal 编解码
    • RoaingBitMap 优化
    • 优化的正则查问(fastRegexMatcher)

想尝试写一个 tsdb,当然实现较为简单,底层还是基于 LSM 的形式,如果不理解 LSM 的能够看下 bitcask 模型,或者理解下 LSM 论文,我集体简略实现过一个简略的 LSM 写操作,基于 bitcask 模型,代码量很少,能够简略理解下~:https://github.com/azhsmesos/…

TSDB: Time Series Database

  • 数据点(Point): 时序数据的数据点是一个蕴含 (Timestamp:int64, Value:float64) 的二元组。
  • 工夫线(Series): 不同标签(Label)的组合称为不同的工夫线

借用 Prometheus 的格局就是这样:

series1:{"__name__": "cpu.idle", "host": "jvm_host"}

series1:{"__name__": "cpu.idle", "host": "jvm_host"}

数据模型定义

// Point 示意一个数据点 (ts, value) 二元组
type Point struct {
 Ts    int64 // in seconds
 Value float64
}

// Label 代表一个标签组合
type Label struct {
 Name  string
 Value string
}

// Row 一行时序数据 包含数据点和标签组合
type Row struct {
 Metric string
 Labels LabelSet
 Point  Point
}

// LabelSet 示意 Label 组合
type LabelSet []Label

数据写入

时序数据库具备垂直写,程度查的个性,比方 clickhouse,咱们常常将其用来和 grafana 一起做监控的底层存储,咱们很多时候的查问都是基于一段持续时间内的数据点,可能这个数据点的 label 都是同一个,也可能是多个。

序列分流

因为时序数据库的场景,导致它可能同一条 label 的数据在某个工夫序列上十分大,所以咱们思考序列分流,基于工夫维度创立不同的 segment,而后基于该工夫维度做冷热拆散

开始开发

一、One day 写内存

因为咱们要兼顾 LSM 的形式,所以采纳的模型比较简单,间接以 Row 的模式忘 chan 中写数据,而后异步进行刷盘。

日志存储格局这样:

其中一个 segment 中蕴含很多个 rows,而后由一个 meta.json 的数据文件形容该 segment 的时间跨度、数据大小等数据。

tsdb.go办法以及相干对象定义

package tsdb

import (
   "context"
   "errors"
   "github.com/sirupsen/logrus"
   "sync"
   "time"
)

type options struct {
   metaSerializer    MetaSerializer  // 元数据自定义 Marshal 接口
   bytesCompressor   BytesCompressor // 数据长久化存储压缩接口
   retention         time.Duration   // 数据保留时长
   segmentDuration   time.Duration   // 一个 segment 的时长
   writeTimeout      time.Duration   // 写超时
   onlyMemoryMode    bool
   enableOutdated    bool   // 是否能够写入过期数据(乱序写入)maxRowsPerSegment int64  // 每段的最大 row 的数量
   dataPath          string // Segment 长久化存储文件夹
}

type TSDB struct {
   segments *segmentList
   mutex    sync.RWMutex
   ctx      context.Context
   cancel   context.CancelFunc

   queue chan []*Row
   wait  sync.WaitGroup
}

// Point 一个数据点
type Point struct {
   Timestamp int64
   Value     float64
}

// Label 一个标签组合
type Label struct {
   Name  string
   Value string
}

type LabelList []Label

var (
   timerPool   sync.Pool
   defaultOpts = &options{metaSerializer:    newBinaryMetaSerializer(),
      bytesCompressor:   newNoopBytesCompressor(),
      segmentDuration:   2 * time.Hour, // 默认两小时
      retention:         7 * 24 * time.Hour,
      writeTimeout:      30 * time.Second,
      onlyMemoryMode:    false,
      enableOutdated:    true,
      maxRowsPerSegment: 19960412, // 该数字可自定义
      dataPath:          ".",
   }
)

// Row 一行时序数据库,包含数据点和标签组合
type Row struct {
   Metric string
   Labels LabelList
   Point  Point
}

// InsertRows 插入 rows
func (tsdb *TSDB) InsertRows(rows []*Row) error {timer := getTimer(defaultOpts.writeTimeout)
   select {
   case tsdb.queue <- rows:
      putTimer(timer)
   case <-timer.C:
      putTimer(timer)
      return errors.New("failed to insert rows to database, write overload")
   }
   return nil
}

func getTimer(duration time.Duration) *time.Timer {if value := timerPool.Get(); value != nil {t := value.(*time.Timer)
      if t.Reset(duration) {logrus.Error("active timer trapped to the pool")
         return nil
      }
      return t
   }
   return time.NewTimer(duration)
}

func putTimer(t *time.Timer) {if !t.Stop() {
      select {
      case <-t.C:
      default:
      }
   }
}

list.go内存中的排序链表

// List 排序链表构造
type List interface {}

segment.gosegment 格局定义

import "sync"

type Segment interface {InsertRows(row []*Row)
}

type segmentList struct {
   mutex sync.RWMutex
   head  Segment
   list  List
}

compressor.go数据压缩文件定义

// noopBytesCompressor 默认不压缩
type noopBytesCompressor struct {
}

// BytesCompressor 数据压缩接口
type BytesCompressor interface {Compress(data []byte) []byte
   Decompress(data []byte) ([]byte, error)
}

func newNoopBytesCompressor() BytesCompressor {return &noopBytesCompressor{}
}

func (n *noopBytesCompressor) Compress(data []byte) []byte {return nil}

func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {return nil, nil}

metadata.go形容 segment 的 meta 信息

type metaSeries struct {
   Sid         string
   StartOffset uint64
   EndOffset   uint64
   Labels      []uint32}

type seriesWithLabel struct {
   Name string
   Sids []uint32}

type Metadata struct {
   MinTimestamp int64
   MaxTimestamp int64
   Series       []metaSeries
   Labels       []seriesWithLabel}

type binaryMetaserializer struct {
}

// MetaSerializer 编解码 Segment 元数据
type MetaSerializer interface {Marshal(Metadata) ([]byte, error)
   Unmarshal([]byte, *Metadata) error
}

func newBinaryMetaSerializer() MetaSerializer {return &binaryMetaserializer{}
}

func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) {return nil, nil}

func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error {return nil}

能够看到,外围就是将数据以 row 数组的模式批量刷新到 chan 中去,而后由咱们的 chan 进行异步刷盘操作 …

待定~

二、实现启动函数和压缩算法

数据压缩算法,目前是两个算法,ZSTD 和 Snappy 两种,也是间接调用的库函数,实现比较简单

package tsdb

import (
    "github.com/golang/snappy"
    "github.com/klauspost/compress/zstd"
)

type BytesCompressorType int8

const (
    // NoopBytesCompressor 默认不压缩
    NoopBytesCompressor BytesCompressorType = iota

    // ZSTDBytesCompressor 应用 ZSTD 压缩算法
    ZSTDBytesCompressor

    // SnappyBytesCompressor 应用 snappy 压缩算法
    SnappyBytesCompressor
)

// noopBytesCompressor 默认不压缩
type noopBytesCompressor struct{}
type zstdBytesCompressor struct{}
type snappyBytesCompressor struct{}

// BytesCompressor 数据压缩接口
type BytesCompressor interface {Compress(data []byte) []byte
    Decompress(data []byte) ([]byte, error)
}

//  默认压缩算法

func newNoopBytesCompressor() BytesCompressor {return &noopBytesCompressor{}
}

func (n *noopBytesCompressor) Compress(data []byte) []byte {return data}

func (n *noopBytesCompressor) Decompress(data []byte) ([]byte, error) {return data, nil}

// ZSTD 压缩算法

func newZSTDBytesCompressor() BytesCompressor {return &zstdBytesCompressor{}
}

func (n *zstdBytesCompressor) Compress(data []byte) []byte {encoder, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
    return encoder.EncodeAll(data, make([]byte, 0, len(data)))
}

func (n *zstdBytesCompressor) Decompress(data []byte) ([]byte, error) {decoder, _ := zstd.NewReader(nil)
    return decoder.DecodeAll(data, nil)
}

// snappy 压缩算法

func newSnappyBytesCompressor() BytesCompressor {return &snappyBytesCompressor{}
}

func (n *snappyBytesCompressor) Compress(data []byte) []byte {return snappy.Encode(nil, data)
}

func (n *snappyBytesCompressor) Decompress(data []byte) ([]byte, error) {return snappy.Decode(nil, data)
}

能够看到,外围就是将数据以 row 数组的模式批量刷新到 chan 中去,而后由咱们的 chan 进行异步刷盘操作 …

下一篇实现可插拔的压缩算法和启动 tsdb 的具体实现

本文由 mdnice 多平台公布

正文完
 0