上篇咱们实现了将整个冷数据进行刷盘的过程,其实刷盘时将meta数据进行编解码还没实现
留神:这儿说的meta并不是meta文件的数据,而是 data中的索引信息
meta编码metadata.go
的Marshal()
办法
分为这么几步
- 将label对应的seriesIDList进行保留,同时保留其长度
- 保留该series的seriesID和对应的startTimestamp和endTimestamp
- 保留每个series中的labelList(有序存储)
- 保留minTs和maxTs
最初保留一个
signature
,用于查看文件是否损坏func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) { nowEncodingBuf := newEncodingBuf() labelOrdered := make(map[string]int) for index, labelToSids := range meta.Labels { labelOrdered[labelToSids.Name] = index nowEncodingBuf.MarshalUint16(uint16(len(labelToSids.Name))) nowEncodingBuf.MarshalString(labelToSids.Name) nowEncodingBuf.MarshalUint32(uint32(len(labelToSids.Sids))) nowEncodingBuf.MarshalUint32(labelToSids.Sids...) } nowEncodingBuf.MarshalUint16(endBlock) for index, series := range meta.Series { nowEncodingBuf.MarshalUint16(uint16(len(series.Sid))) nowEncodingBuf.MarshalString(series.Sid) nowEncodingBuf.MarshalUint64(series.StartOffset, series.EndOffset) labelList := meta.SeriesIDRelatedLabels[index] nowEncodingBuf.MarshalUint32(uint32(labelList.Len())) labelIndex := make([]uint32, 0, labelList.Len()) for _, labelName := range labelList { labelIndex = append(labelIndex, uint32(labelOrdered[labelName.MarshalName()])) } sort.Slice(labelIndex, func(i, j int) bool { return labelIndex[i] < labelIndex[j] }) nowEncodingBuf.MarshalUint32(labelIndex...) } nowEncodingBuf.MarshalUint16(endBlock) nowEncodingBuf.MarshalUint64(uint64(meta.MinTimestamp)) nowEncodingBuf.MarshalUint64(uint64(meta.MaxTimestamp)) nowEncodingBuf.MarshalString(signature) return DoCompress(nowEncodingBuf.Bytes()), nil}
接下来看看解码器UnmarshalMeta
大抵过程其实就是对Marshal办法的一个还原
- 通过咱们保留的signature来校验文件是否残缺
- 通过额定的offset来进行生产位点的转移,实现解码出对应指针的数据并将其结构成为meta构造体
metadata.go
func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error { data, err := DoDecompress(data) if err != nil { return fmt.Errorf("faild to decompress, err: %v", err) } if len(data) < len(signature) { return fmt.Errorf("the data block is incomplete, data len: %d", len(data)) } nowDecodingBuf := newDecodingBuf() // 首先判断数据是否残缺 if strings.EqualFold(nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):]), signature) { return fmt.Errorf("the data block is incomplete, data: %s", nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):])) } offset := 0 labels := make([]seriesWithLabel, 0) for { var labelName string labelLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size]) offset += uint16Size if labelLen == endBlock { break } labelName = nowDecodingBuf.UnmarshalString(data[offset : offset+int(labelLen)]) offset += int(labelLen) sidCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size]) offset += uint32Size sidList := make([]uint32, sidCount) for i := 0; i < int(sidCount); i++ { sidList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size]) offset += uint32Size } labels = append(labels, seriesWithLabel{ Name: labelName, Sids: sidList, }) } meta.Labels = labels seriesList := make([]metaSeries, 0) for { series := metaSeries{} sidLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size]) offset += uint16Size if sidLen == endBlock { break } series.Sid = nowDecodingBuf.UnmarshalString(data[offset : offset+int(sidLen)]) offset += int(sidLen) series.StartOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]) offset += uint64Size series.EndOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]) offset += uint64Size labelCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size]) offset += uint32Size labelList := make([]uint32, labelCount) for i := 0; i < int(labelCount); i++ { labelList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size]) offset += uint32Size } series.Labels = labelList seriesList = append(seriesList, series) } meta.Series = seriesList meta.MinTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])) offset += uint64Size meta.MaxTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])) offset += uint64Size return nowDecodingBuf.err}
以上就是存储的整个过程,接下来写查问,咱们写第一个接口QueryLabelValues()
依据labelName查问所有的labelValue
tsdb.go
- 从内存有序list下面迭代获取所有的节点
- 将node转化成为segment,其实就是meta数据
- 通过labelVs获取所有的value值
func (db *TSDB) QueryLabelValues(label string, start, end int64) []string { temp := make(map[string]struct{}) for _, segment := range db.segments.Get(start, end) { segment := segment.Load() values := segment.QueryLabelValuse(label) for i := 0; i < len(values); i++ { temp[values[i]] = struct{}{} } } ret := make([]string, 0, len(temp)) for key := range temp { ret = append(ret, key) } sort.Strings(ret) return ret}
disk.go
的Load()
办法
- 获取以后diskSegment的句柄fd,创立DReader对象
通过DReader去读取以后文件数据,封装成meta返回
func (ds *diskSegment) Load() Segment { if ds.load { return ds } start := time.Now() reader := bytes.NewReader(ds.dataFd.Bytes()) dreader := &DReader{ reader: reader, } dataLen, metaLen, err := dreader.Read() if err != nil { logrus.Errorf("faild to read %s, err: %v", ds.dataFilename, err) return ds } metaBytes := make([]byte, metaLen) _, err = reader.ReadAt(metaBytes, uint64Size>>1+int64(dataLen)) if err != nil { logrus.Errorf("faild to read %s, metaData error: %v", ds.dataFilename, err) return ds } var meta Metadata if err = UnmarshaMeta(metaBytes, &meta); err != nil { logrus.Errorf("faild to unmarshal meta, error: %v", err) return ds } for _, label := range meta.Labels { key, value := UnmarshalLabelName(label.Name) if !strings.EqualFold(key, "") && strings.EqualFold(value, "") { ds.labelVs.Set(key, value) } } ds.indexMap = newDiskIndexMap(meta.Labels) ds.series = meta.Series ds.load = true logrus.Infof("load disk segment %s, time: %v", ds.dataFilename, time.Since(start)) return ds}func (dr *DReader) Read() (int64, int64, error) { // 读取data长度 diskDataLen := make([]byte, uint64Size) _, err := dr.reader.ReadAt(diskDataLen, 0) if err != nil { return 0, 0, err } nowDecodingBuf := newDecodingBuf() // nowDecodingBuf.UnmarshalUint64(diskDataLen) dataLen := nowDecodingBuf.UnmarshalUint64(diskDataLen) // 读取 meta长度 diskDataLen = make([]byte, uint64Size) _, err = dr.reader.ReadAt(diskDataLen, uint64Size) if err != nil { return 0, 0, err } nowDecodingBuf = newDecodingBuf() metaLen := nowDecodingBuf.UnmarshalUint64(diskDataLen) return int64(dataLen), int64(metaLen), nil}
而后间接从segment的labelVs中通过label获取values,而后排序后返回
测试后果OK
=== RUN TestInsertRowtime="2022-09-26T21:16:23+08:00" level=info msg="data: [vm_node_azh0 vm_node_azh1 vm_node_azh2]\n"--- PASS: TestInsertRow (1.06s)
github: https://github.com/azhsmesos/...
本文由mdnice多平台公布