关于后端:从零实现一个TSDB五

55次阅读

共计 5977 个字符,预计需要花费 15 分钟才能阅读完成。

上篇咱们实现了将整个冷数据进行刷盘的过程,其实刷盘时将 meta 数据进行编解码还没实现

  留神:这儿说的 meta 并不是 meta 文件的数据,而是
  data 中的索引信息

meta 编码
metadata.goMarshal()办法

分为这么几步

  • 将 label 对应的 seriesIDList 进行保留,同时保留其长度
  • 保留该 series 的 seriesID 和对应的 startTimestamp 和 endTimestamp
  • 保留每个 series 中的 labelList(有序存储)
  • 保留 minTs 和 maxTs
  • 最初保留一个signature,用于查看文件是否损坏

    func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) {nowEncodingBuf := newEncodingBuf()
    
      labelOrdered := make(map[string]int)
      for index, labelToSids := range meta.Labels {labelOrdered[labelToSids.Name] = index
          nowEncodingBuf.MarshalUint16(uint16(len(labelToSids.Name)))
          nowEncodingBuf.MarshalString(labelToSids.Name)
          nowEncodingBuf.MarshalUint32(uint32(len(labelToSids.Sids)))
          nowEncodingBuf.MarshalUint32(labelToSids.Sids...)
      }
      nowEncodingBuf.MarshalUint16(endBlock)
    
      for index, series := range meta.Series {nowEncodingBuf.MarshalUint16(uint16(len(series.Sid)))
          nowEncodingBuf.MarshalString(series.Sid)
          nowEncodingBuf.MarshalUint64(series.StartOffset, series.EndOffset)
    
          labelList := meta.SeriesIDRelatedLabels[index]
          nowEncodingBuf.MarshalUint32(uint32(labelList.Len()))
          labelIndex := make([]uint32, 0, labelList.Len())
          for _, labelName := range labelList {labelIndex = append(labelIndex, uint32(labelOrdered[labelName.MarshalName()]))
          }
          sort.Slice(labelIndex, func(i, j int) bool {return labelIndex[i] < labelIndex[j]
          })
          nowEncodingBuf.MarshalUint32(labelIndex...)
      }
      nowEncodingBuf.MarshalUint16(endBlock)
      nowEncodingBuf.MarshalUint64(uint64(meta.MinTimestamp))
      nowEncodingBuf.MarshalUint64(uint64(meta.MaxTimestamp))
      nowEncodingBuf.MarshalString(signature)
      return DoCompress(nowEncodingBuf.Bytes()), nil
    }

接下来看看解码器 UnmarshalMeta

大抵过程其实就是对 Marshal 办法的一个还原

  • 通过咱们保留的 signature 来校验文件是否残缺
  • 通过额定的 offset 来进行生产位点的转移,实现解码出对应指针的数据并将其结构成为 meta 构造体

metadata.go

func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error {data, err := DoDecompress(data)
    if err != nil {return fmt.Errorf("faild to decompress, err: %v", err)
    }
    if len(data) < len(signature) {return fmt.Errorf("the data block is incomplete, data len: %d", len(data))
    }

    nowDecodingBuf := newDecodingBuf()
    // 首先判断数据是否残缺
    if strings.EqualFold(nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):]), signature) {return fmt.Errorf("the data block is incomplete, data: %s", nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):]))
    }
    offset := 0
    labels := make([]seriesWithLabel, 0)
    for {
        var labelName string
        labelLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size])
        offset += uint16Size
        if labelLen == endBlock {break}
        labelName = nowDecodingBuf.UnmarshalString(data[offset : offset+int(labelLen)])
        offset += int(labelLen)
        sidCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
        offset += uint32Size
        sidList := make([]uint32, sidCount)
        for i := 0; i < int(sidCount); i++ {sidList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
            offset += uint32Size
        }
        labels = append(labels, seriesWithLabel{
            Name: labelName,
            Sids: sidList,
        })
    }
    meta.Labels = labels

    seriesList := make([]metaSeries, 0)
    for {series := metaSeries{}
        sidLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size])
        offset += uint16Size

        if sidLen == endBlock {break}

        series.Sid = nowDecodingBuf.UnmarshalString(data[offset : offset+int(sidLen)])
        offset += int(sidLen)

        series.StartOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])
        offset += uint64Size

        series.EndOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])
        offset += uint64Size

        labelCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
        offset += uint32Size

        labelList := make([]uint32, labelCount)
        for i := 0; i < int(labelCount); i++ {labelList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
            offset += uint32Size
        }
        series.Labels = labelList
        seriesList = append(seriesList, series)
    }
    meta.Series = seriesList
    meta.MinTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]))
    offset += uint64Size
    meta.MaxTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]))
    offset += uint64Size
    return nowDecodingBuf.err
}

以上就是存储的整个过程,接下来写查问,咱们写第一个接口
QueryLabelValues()

依据 labelName 查问所有的 labelValue

tsdb.go

  1. 从内存有序 list 下面迭代获取所有的节点
  2. 将 node 转化成为 segment,其实就是 meta 数据
  3. 通过 labelVs 获取所有的 value 值
func (db *TSDB) QueryLabelValues(label string, start, end int64) []string {temp := make(map[string]struct{})
    for _, segment := range db.segments.Get(start, end) {segment := segment.Load()
        values := segment.QueryLabelValuse(label)
        for i := 0; i < len(values); i++ {temp[values[i]] = struct{}{}
        }
    }
    ret := make([]string, 0, len(temp))
    for key := range temp {ret = append(ret, key)
    }
    sort.Strings(ret)
    return ret
}

disk.goLoad() 办法

  • 获取以后 diskSegment 的句柄 fd,创立 DReader 对象
  • 通过 DReader 去读取以后文件数据,封装成 meta 返回

    func (ds *diskSegment) Load() Segment {
      if ds.load {return ds}
      start := time.Now()
      reader := bytes.NewReader(ds.dataFd.Bytes())
      dreader := &DReader{reader: reader,}
      dataLen, metaLen, err := dreader.Read()
      if err != nil {logrus.Errorf("faild to read %s, err: %v", ds.dataFilename, err)
          return ds
      }
      metaBytes := make([]byte, metaLen)
      _, err = reader.ReadAt(metaBytes, uint64Size>>1+int64(dataLen))
      if err != nil {logrus.Errorf("faild to read %s, metaData error: %v", ds.dataFilename, err)
          return ds
      }
      var meta Metadata
      if err = UnmarshaMeta(metaBytes, &meta); err != nil {logrus.Errorf("faild to unmarshal meta, error: %v", err)
          return ds
      }
      for _, label := range meta.Labels {key, value := UnmarshalLabelName(label.Name)
          if !strings.EqualFold(key, "") && strings.EqualFold(value,"") {ds.labelVs.Set(key, value)
          }
      }
      ds.indexMap = newDiskIndexMap(meta.Labels)
      ds.series = meta.Series
      ds.load = true
      logrus.Infof("load disk segment %s, time: %v", ds.dataFilename, time.Since(start))
      return ds
    }
    
    func (dr *DReader) Read() (int64, int64, error) {
      // 读取 data 长度
      diskDataLen := make([]byte, uint64Size)
      _, err := dr.reader.ReadAt(diskDataLen, 0)
      if err != nil {return 0, 0, err}
      nowDecodingBuf := newDecodingBuf()
      // nowDecodingBuf.UnmarshalUint64(diskDataLen)
      dataLen := nowDecodingBuf.UnmarshalUint64(diskDataLen)
    
      // 读取 meta 长度
      diskDataLen = make([]byte, uint64Size)
      _, err = dr.reader.ReadAt(diskDataLen, uint64Size)
      if err != nil {return 0, 0, err}
      nowDecodingBuf = newDecodingBuf()
      metaLen := nowDecodingBuf.UnmarshalUint64(diskDataLen)
      return int64(dataLen), int64(metaLen), nil
    }

而后间接从 segment 的 labelVs 中通过 label 获取 values,而后排序后返回

测试后果 OK

=== RUN   TestInsertRow
time="2022-09-26T21:16:23+08:00" level=info msg="data: [vm_node_azh0 vm_node_azh1 vm_node_azh2]\n"
--- PASS: TestInsertRow (1.06s)

github: https://github.com/azhsmesos/…

本文由 mdnice 多平台公布

正文完
 0