倒排索引
prometheus tsdb中的index以倒排索引的形式组织:
给每个series调配1个id
- 用seriesId查问series,这是前向索引,查问工夫复杂度=O(1);
结构label的索引
- 若seriesId={2,5,10,29}都含有label: app='nginx';
- 那么,对于app='nginx", {2,5,10,29}就是它的倒排索引;
举例来说,对于seriesId=5:
// seriesId=5{ __name__ = "request_total", pod="nginx-1", path="/api/v1/status", status="200", method="GET"}
那么,对于:
- status="200": 它的倒排索引={1,2,5,......}
- method="GET": 它的倒排索引={2,3,4,5,6,9,......}
整体源码框架
内存中,应用headIndexReader,将内存block中的label组织成倒排索引;
block中,应用blockIndexReader,读取block目录中的index文件,将其中的label组织倒排索引;
headIndexReader和blockIndexReader均继承自indexReader,提供了:
- LabelNames(): 查问所有的Label key;
- LabelValues(name):查问label key对应的values;
- Postings():查问label key/value对应的[]seriesId;
blockQuerier依据不同的block,结构不同的indexReader来读取Label索引;blockQuerier应用Postings()失去[]seriesId后,再应用chunkReader最终读取到时序数据(t/v)。
内存中的倒排索引
数据结构:
// tsdb/index/postings.gotype MemPostings struct { mtx sync.RWMutex // label key --> []labelValue values map[string]stringset // Label names to possible values. // map[labelName]map[labelValue]postingsList // labelName --> labelValue --> []posting m map[string]map[string][]uint64 ordered bool}// tsdb/head.go// Head handles reads and writes of time series data within a time window.type Head struct { ...... postings *index.MemPostings // Postings lists for terms.}
1-内存倒排索引的插入
入口是插入时序数据:
- 如果lset曾经在series中了,则间接返回;
否则获取一个seriesId:
- 将label key/value插入到h.values;
- 将label key/value和seriesId插入到h.postings中(大map);
// tsdb/head.gofunc (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { ...... s, created, err := a.head.getOrCreate(lset.Hash(), lset) ......}func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { s := h.series.getByHash(hash, lset) // 曾经有了,间接返回 if s != nil { return s, false, nil } id := atomic.AddUint64(&h.lastSeriesID, 1) return h.getOrCreateWithID(id, hash, lset)}
插入到h.values和h.postings:
// tsdb/head.gofunc (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool) ...... // 将label key/value插入 h.values for _, l := range lset { valset, ok := h.values[l.Name] if !ok { valset = stringset{} h.values[l.Name] = valset } // 插入key,value valset.set(l.Value) ...... } // id=seriesId // 将key/value/seriesId插入h.postings h.postings.Add(id, lset) return s, true, nil}
2-内存倒排索引的查问
次要在headIndexReader中进行:
- 通过LableNames()查问所有的lableName;
- 通过LabelValues(name)查问labelName对应的labelValues;
- 通过postings查问到key、value对应的[]seriesId,最终应用seriesId+chunkReader查问最终的时序数据(t/v);
LableNames()和LabelValues(name)都是应用head.values:
// tsdb/head.gofunc (h *headIndexReader) LabelNames() ([]string, error) { labelNames := make([]string, 0, len(h.head.values)) // 读h.head.values for name := range h.head.values { if name == "" { continue } labelNames = append(labelNames, name) } sort.Strings(labelNames) return labelNames, nil}
// tsdb/head.gofunc (h *headIndexReader) LabelValues(name string) ([]string, error) { sl := make([]string, 0, len(h.head.values[name])) // 读h.head.values for s := range h.head.values[name] { sl = append(sl, s) } return sl, nil}
Postings()提供了查问key/values对应的[]seriesId的性能:
// tsdb/head.go// Postings returns the postings list iterator for the label pairs.func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { res := make([]index.Postings, 0, len(values)) for _, value := range values { res = append(res, h.head.postings.Get(name, value)) } return index.Merge(res...), nil}
block中的倒排索引
数据结构:
// tsdb/index/index.gotype Reader struct { ...... // labelName--> labelValue + offset postings map[string][]postingOffset ......}
block中的倒排索引,是read磁盘block中的index文件失去。
查问LabelNames():
- 具体读取由Reader.LableNames()实现;
- blockIndexReader.LabelNames()最终调用Reader.LabelNames();
// tsdb/index/index.go// LabelNames returns all the unique label names present in the index.func (r *Reader) LabelNames() ([]string, error) { labelNames := make([]string, 0, len(r.postings)) // 读r.postings for name := range r.postings { ...... labelNames = append(labelNames, name) } sort.Strings(labelNames) return labelNames, nil}// tsdb/block.gofunc (r blockIndexReader) LabelNames() ([]string, error) { return r.b.LabelNames()}
查问LabelValues(name):
- 具体读取由Reader.LabelValues读toc.PostingsTable实现;
- blockIndexReader.LabelValues()最终调用Reader.LabelValues();
// tsdb/index/index.go// LabelValues returns value tuples that exist for the given label name.func (r *Reader) LabelValues(name string) ([]string, error) { ...... e, ok := r.postings[name] values := make([]string, 0, len(e)*symbolFactor) // 读toc.PostingsTable d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) d.Skip(e[0].off) lastVal := e[len(e)-1].value for d.Err() == nil { ...... s := yoloString(d.UvarintBytes()) //Label value. values = append(values, s) } return values, nil}// tsdb/block.gofunc (r blockIndexReader) LabelValues(name string) ([]string, error) { st, err := r.ir.LabelValues(name) return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)}
查问Postings():
- 具体读取由Reader.Postings()读toc.PostingsTable实现;
- blockIndexReader.Postings()最终调用Reader.Postings();
// tsdb/index/index.gofunc (r *Reader) Postings(name string, values ...string) (Postings, error) { ..... e, ok := r.postings[name] res := make([]Postings, 0, len(values)) for valueIndex < len(values) && values[valueIndex] < e[0].value { // Discard values before the start. valueIndex++ } for valueIndex < len(values) { value := values[valueIndex] i := sort.Search(len(e), func(i int) bool { return e[i].value >= value }) d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) d.Skip(e[i].off) for d.Err() == nil { ...... d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.Postings(d2.Get()) res = append(res, p) } } return Merge(res...), nil}// tsdb/block.gofunc (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) { p, err := r.ir.Postings(name, values...) if err != nil { return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) } return p, nil}
Postings()在查问时应用
内存和block应用Postings()进行查问的流程相似,只是应用不同的indexReader。
1) 查问入口:加载内存block和磁盘block,结构出blockQuerier
// tsdb/db.gofunc (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { var blocks []BlockReader // 磁盘block for _, b := range db.blocks { if b.OverlapsClosedInterval(mint, maxt) { blocks = append(blocks, b) blockMetas = append(blockMetas, b.Meta()) } } // 内存block if maxt >= db.head.MinTime() { blocks = append(blocks, &RangeHead{ head: db.head, mint: mint, maxt: maxt, }) } blockQueriers := make([]storage.Querier, 0, len(blocks)) for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) if err == nil { blockQueriers = append(blockQueriers, q) continue } } return &querier{ blocks: blockQueriers, }, nil}
能够看出:
- 对于内存block,应用RangeHead构造;
- 对于磁盘block,应用Block构造;
2) 结构blockQuerier
// tsdb/querier.go// NewBlockQuerier returns a querier against the reader.func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { // 结构headIndexReader indexr, err := b.Index() if err != nil { return nil, errors.Wrapf(err, "open index reader") } chunkr, err := b.Chunks() ..... return &blockQuerier{ mint: mint, maxt: maxt, index: indexr, chunks: chunkr, tombstones: tombsr, }, nil}
上述代码中,最重要的是:
// 依据不同的block结构出不同的indexReaderindexr, err := b.Index()
对于内存block(RangeHead): 最终结构的是headIndexReader
// tsdb/head.gofunc (h *RangeHead) Index() (IndexReader, error) { return h.head.indexRange(h.mint, h.maxt), nil}func (h *Head) indexRange(mint, maxt int64) *headIndexReader { if hmin := h.MinTime(); hmin > mint { mint = hmin } return &headIndexReader{head: h, mint: mint, maxt: maxt}}
对于磁盘block(Block): 最终结构的是blockIndexReader
// tsdb/block.go// Index returns a new IndexReader against the block data.func (pb *Block) Index() (IndexReader, error) { if err := pb.startRead(); err != nil { return nil, err } return blockIndexReader{ir: pb.indexr, b: pb}, nil}
3) BlockQuerier应用indexReader查问postings信息
查问seriesSet
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { ...... if sortSeries { base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...) } else { base, err = LookupChunkSeries(q.index, q.tombstones, ms...) } ......}func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) { ...... // 最终会调用indexReader.Postings() p, err := PostingsForMatchers(ir, ms...) ......}
PostingsForMatchers最终会调用到indexReader.Postings()。
参考
1.https://ganeshvernekar.com/bl...