共计 7668 个字符,预计需要花费 20 分钟才能阅读完成。
倒排索引
prometheus tsdb 中的 index 以倒排索引的形式组织:
-
给每个 series 调配 1 个 id
- 用 seriesId 查问 series,这是前向索引,查问工夫复杂度 =O(1);
-
结构 label 的索引
- 若 seriesId={2,5,10,29}都含有 label: app=’nginx’;
- 那么,对于 app=’nginx”, {2,5,10,29}就是它的倒排索引;
举例来说,对于 seriesId=5:
// seriesId=5
{
__name__ = "request_total",
pod="nginx-1",
path="/api/v1/status",
status="200",
method="GET"
}
那么,对于:
- status=”200″: 它的倒排索引 ={1,2,5,……}
- method=”GET”: 它的倒排索引 ={2,3,4,5,6,9,……}
整体源码框架
内存中,应用 headIndexReader,将内存 block 中的 label 组织成倒排索引;
block 中,应用 blockIndexReader,读取 block 目录中的 index 文件,将其中的 label 组织倒排索引;
headIndexReader 和 blockIndexReader 均继承自 indexReader,提供了:
- LabelNames(): 查问所有的 Label key;
- LabelValues(name):查问 label key 对应的 values;
- Postings():查问 label key/value 对应的[]seriesId;
blockQuerier 依据不同的 block,结构不同的 indexReader 来读取 Label 索引;blockQuerier 应用 Postings()失去[]seriesId 后,再应用 chunkReader 最终读取到时序数据(t/v)。
内存中的倒排索引
数据结构:
// tsdb/index/postings.go
type MemPostings struct {
mtx sync.RWMutex
// label key --> []labelValue
values map[string]stringset // Label names to possible values.
// map[labelName]map[labelValue]postingsList
// labelName --> labelValue --> []posting
m map[string]map[string][]uint64
ordered bool
}
// tsdb/head.go
// Head handles reads and writes of time series data within a time window.
type Head struct {
......
postings *index.MemPostings // Postings lists for terms.
}
1- 内存倒排索引的插入
入口是插入时序数据:
- 如果 lset 曾经在 series 中了,则间接返回;
-
否则获取一个 seriesId:
- 将 label key/value 插入到 h.values;
- 将 label key/value 和 seriesId 插入到 h.postings 中(大 map);
// tsdb/head.go
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
......
s, created, err := a.head.getOrCreate(lset.Hash(), lset)
......
}
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) {s := h.series.getByHash(hash, lset)
// 曾经有了,间接返回
if s != nil {return s, false, nil}
id := atomic.AddUint64(&h.lastSeriesID, 1)
return h.getOrCreateWithID(id, hash, lset)
}
插入到 h.values 和 h.postings:
// tsdb/head.go
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) {s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool)
......
// 将 label key/value 插入 h.values
for _, l := range lset {valset, ok := h.values[l.Name]
if !ok {valset = stringset{}
h.values[l.Name] = valset
}
// 插入 key,value
valset.set(l.Value)
......
}
// id=seriesId
// 将 key/value/seriesId 插入 h.postings
h.postings.Add(id, lset)
return s, true, nil
}
2- 内存倒排索引的查问
次要在 headIndexReader 中进行:
- 通过 LableNames()查问所有的 lableName;
- 通过 LabelValues(name)查问 labelName 对应的 labelValues;
- 通过 postings 查问到 key、value 对应的[]seriesId,最终应用 seriesId+chunkReader 查问最终的时序数据(t/v);
LableNames()和 LabelValues(name)都是应用 head.values:
// tsdb/head.go
func (h *headIndexReader) LabelNames() ([]string, error) {labelNames := make([]string, 0, len(h.head.values))
// 读 h.head.values
for name := range h.head.values {
if name == "" {continue}
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
// tsdb/head.go
func (h *headIndexReader) LabelValues(name string) ([]string, error) {sl := make([]string, 0, len(h.head.values[name]))
// 读 h.head.values
for s := range h.head.values[name] {sl = append(sl, s)
}
return sl, nil
}
Postings()提供了查问 key/values 对应的[]seriesId 的性能:
// tsdb/head.go
// Postings returns the postings list iterator for the label pairs.
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {res := make([]index.Postings, 0, len(values))
for _, value := range values {res = append(res, h.head.postings.Get(name, value))
}
return index.Merge(res...), nil
}
block 中的倒排索引
数据结构:
// tsdb/index/index.go
type Reader struct {
......
// labelName--> labelValue + offset
postings map[string][]postingOffset
......
}
block 中的倒排索引,是 read 磁盘 block 中的 index 文件失去。
查问 LabelNames():
- 具体读取由 Reader.LableNames()实现;
- blockIndexReader.LabelNames()最终调用 Reader.LabelNames();
// tsdb/index/index.go
// LabelNames returns all the unique label names present in the index.
func (r *Reader) LabelNames() ([]string, error) {labelNames := make([]string, 0, len(r.postings))
// 读 r.postings
for name := range r.postings {
......
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
// tsdb/block.go
func (r blockIndexReader) LabelNames() ([]string, error) {return r.b.LabelNames()
}
查问 LabelValues(name):
- 具体读取由 Reader.LabelValues 读 toc.PostingsTable 实现;
- blockIndexReader.LabelValues()最终调用 Reader.LabelValues();
// tsdb/index/index.go
// LabelValues returns value tuples that exist for the given label name.
func (r *Reader) LabelValues(name string) ([]string, error) {
......
e, ok := r.postings[name]
values := make([]string, 0, len(e)*symbolFactor)
// 读 toc.PostingsTable
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
d.Skip(e[0].off)
lastVal := e[len(e)-1].value
for d.Err() == nil {
......
s := yoloString(d.UvarintBytes()) //Label value.
values = append(values, s)
}
return values, nil
}
// tsdb/block.go
func (r blockIndexReader) LabelValues(name string) ([]string, error) {st, err := r.ir.LabelValues(name)
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
查问 Postings():
- 具体读取由 Reader.Postings()读 toc.PostingsTable 实现;
- blockIndexReader.Postings()最终调用 Reader.Postings();
// tsdb/index/index.go
func (r *Reader) Postings(name string, values ...string) (Postings, error) {
.....
e, ok := r.postings[name]
res := make([]Postings, 0, len(values))
for valueIndex < len(values) && values[valueIndex] < e[0].value {
// Discard values before the start.
valueIndex++
}
for valueIndex < len(values) {value := values[valueIndex]
i := sort.Search(len(e), func(i int) bool {return e[i].value >= value })
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
d.Skip(e[i].off)
for d.Err() == nil {
......
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d2.Get())
res = append(res, p)
}
}
return Merge(res...), nil
}
// tsdb/block.go
func (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) {p, err := r.ir.Postings(name, values...)
if err != nil {return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return p, nil
}
Postings()在查问时应用
内存和 block 应用 Postings()进行查问的流程相似,只是应用不同的 indexReader。
1) 查问入口:加载内存 block 和磁盘 block,结构出 blockQuerier
// tsdb/db.go
func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {var blocks []BlockReader
// 磁盘 block
for _, b := range db.blocks {if b.OverlapsClosedInterval(mint, maxt) {blocks = append(blocks, b)
blockMetas = append(blockMetas, b.Meta())
}
}
// 内存 block
if maxt >= db.head.MinTime() {
blocks = append(blocks, &RangeHead{
head: db.head,
mint: mint,
maxt: maxt,
})
}
blockQueriers := make([]storage.Querier, 0, len(blocks))
for _, b := range blocks {q, err := NewBlockQuerier(b, mint, maxt)
if err == nil {blockQueriers = append(blockQueriers, q)
continue
}
}
return &querier{blocks: blockQueriers,}, nil
}
能够看出:
- 对于内存 block,应用 RangeHead 构造;
- 对于磁盘 block,应用 Block 构造;
2) 结构 blockQuerier
// tsdb/querier.go
// NewBlockQuerier returns a querier against the reader.
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
// 结构 headIndexReader
indexr, err := b.Index()
if err != nil {return nil, errors.Wrapf(err, "open index reader")
}
chunkr, err := b.Chunks()
.....
return &blockQuerier{
mint: mint,
maxt: maxt,
index: indexr,
chunks: chunkr,
tombstones: tombsr,
}, nil
}
上述代码中,最重要的是:
// 依据不同的 block 结构出不同的 indexReader
indexr, err := b.Index()
对于内存 block(RangeHead): 最终结构的是 headIndexReader
// tsdb/head.go
func (h *RangeHead) Index() (IndexReader, error) {return h.head.indexRange(h.mint, h.maxt), nil
}
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {if hmin := h.MinTime(); hmin > mint {mint = hmin}
return &headIndexReader{head: h, mint: mint, maxt: maxt}
}
对于磁盘 block(Block): 最终结构的是 blockIndexReader
// tsdb/block.go
// Index returns a new IndexReader against the block data.
func (pb *Block) Index() (IndexReader, error) {if err := pb.startRead(); err != nil {return nil, err}
return blockIndexReader{ir: pb.indexr, b: pb}, nil
}
3) BlockQuerier 应用 indexReader 查问 postings 信息
查问 seriesSet
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
......
if sortSeries {base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
} else {base, err = LookupChunkSeries(q.index, q.tombstones, ms...)
}
......
}
func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) {
......
// 最终会调用 indexReader.Postings()
p, err := PostingsForMatchers(ir, ms...)
......
}
PostingsForMatchers 最终会调用到 indexReader.Postings()。
参考
1.https://ganeshvernekar.com/bl…