上篇咱们实现了将整个冷数据进行刷盘的过程,其实刷盘时将meta数据进行编解码还没实现

  留神:这儿说的meta并不是meta文件的数据,而是  data中的索引信息

meta编码
metadata.goMarshal()办法

分为这么几步

  • 将label对应的seriesIDList进行保留,同时保留其长度
  • 保留该series的seriesID和对应的startTimestamp和endTimestamp
  • 保留每个series中的labelList(有序存储)
  • 保留minTs和maxTs
  • 最初保留一个signature,用于查看文件是否损坏

    func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) {  nowEncodingBuf := newEncodingBuf()  labelOrdered := make(map[string]int)  for index, labelToSids := range meta.Labels {      labelOrdered[labelToSids.Name] = index      nowEncodingBuf.MarshalUint16(uint16(len(labelToSids.Name)))      nowEncodingBuf.MarshalString(labelToSids.Name)      nowEncodingBuf.MarshalUint32(uint32(len(labelToSids.Sids)))      nowEncodingBuf.MarshalUint32(labelToSids.Sids...)  }  nowEncodingBuf.MarshalUint16(endBlock)  for index, series := range meta.Series {      nowEncodingBuf.MarshalUint16(uint16(len(series.Sid)))      nowEncodingBuf.MarshalString(series.Sid)      nowEncodingBuf.MarshalUint64(series.StartOffset, series.EndOffset)      labelList := meta.SeriesIDRelatedLabels[index]      nowEncodingBuf.MarshalUint32(uint32(labelList.Len()))      labelIndex := make([]uint32, 0, labelList.Len())      for _, labelName := range labelList {          labelIndex = append(labelIndex, uint32(labelOrdered[labelName.MarshalName()]))      }      sort.Slice(labelIndex, func(i, j int) bool {          return labelIndex[i] < labelIndex[j]      })      nowEncodingBuf.MarshalUint32(labelIndex...)  }  nowEncodingBuf.MarshalUint16(endBlock)  nowEncodingBuf.MarshalUint64(uint64(meta.MinTimestamp))  nowEncodingBuf.MarshalUint64(uint64(meta.MaxTimestamp))  nowEncodingBuf.MarshalString(signature)  return DoCompress(nowEncodingBuf.Bytes()), nil}

接下来看看解码器UnmarshalMeta

大抵过程其实就是对Marshal办法的一个还原

  • 通过咱们保留的signature来校验文件是否残缺
  • 通过额定的offset来进行生产位点的转移,实现解码出对应指针的数据并将其结构成为meta构造体

metadata.go

func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error {    data, err := DoDecompress(data)    if err != nil {        return fmt.Errorf("faild to decompress, err: %v", err)    }    if len(data) < len(signature) {        return fmt.Errorf("the data block is incomplete, data len: %d", len(data))    }    nowDecodingBuf := newDecodingBuf()    // 首先判断数据是否残缺    if strings.EqualFold(nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):]), signature) {        return fmt.Errorf("the data block is incomplete, data: %s", nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):]))    }    offset := 0    labels := make([]seriesWithLabel, 0)    for {        var labelName string        labelLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size])        offset += uint16Size        if labelLen == endBlock {            break        }        labelName = nowDecodingBuf.UnmarshalString(data[offset : offset+int(labelLen)])        offset += int(labelLen)        sidCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])        offset += uint32Size        sidList := make([]uint32, sidCount)        for i := 0; i < int(sidCount); i++ {            sidList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])            offset += uint32Size        }        labels = append(labels, seriesWithLabel{            Name: labelName,            Sids: sidList,        })    }    meta.Labels = labels    seriesList := make([]metaSeries, 0)    for {        series := metaSeries{}        sidLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size])        offset += uint16Size        if sidLen == endBlock {            break        }        series.Sid = nowDecodingBuf.UnmarshalString(data[offset : offset+int(sidLen)])        offset += int(sidLen)        series.StartOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])        offset += uint64Size        series.EndOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])        offset += uint64Size        labelCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])        offset += uint32Size        labelList := make([]uint32, labelCount)        for i := 0; i < int(labelCount); i++ {            labelList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])            offset += uint32Size        }        series.Labels = labelList        seriesList = append(seriesList, series)    }    meta.Series = seriesList    meta.MinTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]))    offset += uint64Size    meta.MaxTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]))    offset += uint64Size    return nowDecodingBuf.err}

以上就是存储的整个过程,接下来写查问,咱们写第一个接口
QueryLabelValues()

依据labelName查问所有的labelValue

tsdb.go

  1. 从内存有序list下面迭代获取所有的节点
  2. 将node转化成为segment,其实就是meta数据
  3. 通过labelVs获取所有的value值
func (db *TSDB) QueryLabelValues(label string, start, end int64) []string {    temp := make(map[string]struct{})    for _, segment := range db.segments.Get(start, end) {        segment := segment.Load()        values := segment.QueryLabelValuse(label)        for i := 0; i < len(values); i++ {            temp[values[i]] = struct{}{}        }    }    ret := make([]string, 0, len(temp))    for key := range temp {        ret = append(ret, key)    }    sort.Strings(ret)    return ret}

disk.goLoad()办法

  • 获取以后diskSegment的句柄fd,创立DReader对象
  • 通过DReader去读取以后文件数据,封装成meta返回

    func (ds *diskSegment) Load() Segment {  if ds.load {      return ds  }  start := time.Now()  reader := bytes.NewReader(ds.dataFd.Bytes())  dreader := &DReader{      reader: reader,  }  dataLen, metaLen, err := dreader.Read()  if err != nil {      logrus.Errorf("faild to read %s, err: %v", ds.dataFilename, err)      return ds  }  metaBytes := make([]byte, metaLen)  _, err = reader.ReadAt(metaBytes, uint64Size>>1+int64(dataLen))  if err != nil {      logrus.Errorf("faild to read %s, metaData error: %v", ds.dataFilename, err)      return ds  }  var meta Metadata  if err = UnmarshaMeta(metaBytes, &meta); err != nil {      logrus.Errorf("faild to unmarshal meta, error: %v", err)      return ds  }  for _, label := range meta.Labels {      key, value := UnmarshalLabelName(label.Name)      if !strings.EqualFold(key, "") && strings.EqualFold(value, "") {          ds.labelVs.Set(key, value)      }  }  ds.indexMap = newDiskIndexMap(meta.Labels)  ds.series = meta.Series  ds.load = true  logrus.Infof("load disk segment %s, time: %v", ds.dataFilename, time.Since(start))  return ds}func (dr *DReader) Read() (int64, int64, error) {  // 读取data长度  diskDataLen := make([]byte, uint64Size)  _, err := dr.reader.ReadAt(diskDataLen, 0)  if err != nil {      return 0, 0, err  }  nowDecodingBuf := newDecodingBuf()  // nowDecodingBuf.UnmarshalUint64(diskDataLen)  dataLen := nowDecodingBuf.UnmarshalUint64(diskDataLen)  // 读取 meta长度  diskDataLen = make([]byte, uint64Size)  _, err = dr.reader.ReadAt(diskDataLen, uint64Size)  if err != nil {      return 0, 0, err  }  nowDecodingBuf = newDecodingBuf()  metaLen := nowDecodingBuf.UnmarshalUint64(diskDataLen)  return int64(dataLen), int64(metaLen), nil}

而后间接从segment的labelVs中通过label获取values,而后排序后返回

测试后果OK

=== RUN   TestInsertRowtime="2022-09-26T21:16:23+08:00" level=info msg="data: [vm_node_azh0 vm_node_azh1 vm_node_azh2]\n"--- PASS: TestInsertRow (1.06s)

github: https://github.com/azhsmesos/...

本文由mdnice多平台公布