共计 7262 个字符,预计需要花费 19 分钟才能阅读完成。
简介:本文次要探讨 influxdb 在遇到写入的数据呈现高基数 Cardinality 问题时,一些可行的解决方案。
作者 | 徐建伟(竹影)
前序
随着挪动端倒退走向饱和,当初整个 IT 行业都期待着“万物互联”的物联网时代。在物联网场景中,往往有许多各类不同的终端设备,布署在不同的地位,去采集各种数据,比方某一区域有 10 万个 loT 设施,每个 loT 设施每 5 秒发送一次数据。那么每年会产生 6307 亿 个数据点。而这些数据都是程序产生的,并且 loT 设施产生数据的格局全副是统一的,并且没有删除和批改的需要。针对这样按时海量写入无更新场景,时序数据库应运而生。
时序数据库在假设没有数据插入和更新需要,数据结构稳固的前提下,极限谋求疾速写入,高压缩,疾速检索数据。时序数据的 Label(tag)会建设索引,以进步查问性能,以便你能够疾速找到与所有指定标签匹配的值。如果 Label(tag)值的数量过多时(高基数 Cardinality 问题),索引会呈现各种各样的问题,本文次要探讨 influxdb 在遇到写入的数据呈现高基数 Cardinality 问题时,一些可行的解决方案。
高基数 Cardinality 问题(工夫线膨胀)
时序数据库次要存储的是 metric 数据,每一条数据称为一个样本(sample),样本由以下三局部组成:
指标(工夫线 time-series):metric name 和形容以后样本特色的 labelsets;
工夫戳(timestamp):一个准确到毫秒的工夫戳;
样本值(value):示意以后样本的值。
<————– time-series=”” ——–=””><-timestamp —–=””> <-value->
node_cpu{cpu=“cpu0”,mode=“idle”} @1627339366586 70
node_cpu{cpu=“cpu0”,mode=“sys”} @1627339366586 5
node_cpu{cpu=“cpu0”,mode=“user”} @1627339366586 25
通常状况下,time-series 中的 lablelsets 是无限的,可枚举的,比方下面的例子 model 可选值为 idle,sys,user。
prometheus 官网文档中对于 Label 的倡议:
CAUTION: Remember that every unique combination of key-value label pairs represents a new time series, which can dramatically increase the amount of data stored. Do not use labels to store dimensions with high cardinality (many different label values), such as user IDs, email addresses, or other unbounded sets of values.
时序数据库的设计时,也是假设在工夫线低基数的前提下。然而随着 metric 的宽泛应用,在很多场景下无奈避免出现工夫线膨胀。
比方,在云原生场景下 tag 呈现 pod/container ID 之类,也有些 tag 呈现 userId,甚至有些 tag 是 url,而这些 tag 组合时,工夫线膨胀得十分厉害。
这个矛盾呈现是必然的,怎么解决呢?是写入数据方调整写入数据时,管制写入的 time-series 的数量,还是时序数据库去更改设计来实用这种场景?这个问题没有完满的解决方案,咱们须要做出均衡。
从理论状况登程,如果工夫线膨胀后,时序数据库不会呈现不可用,性能也不会呈现指数级别降落。也就是说工夫线不收缩时,性能优良。工夫线膨胀后,性能能达到良好或者及格就好。
那怎么让时序数据库在工夫线膨胀的状况下性能还能良好呢?接下来咱们通过 influxdb 的源码来探讨这个问题。
工夫线的解决逻辑
influxdb 的 tsm 构造,次要的逻辑处理过程相似 lsm。数据上报后,会增加到 cache 和日志文件(wal)。为了放慢检索速度或者压缩比例,会对上报的数据进行 compaction(数据文件合并,从新构建索引)。
索引波及到三个方面:
TSI(Time Series Index)检索 Measurement,tag,tagval,time
TSM(Time-Structured Merge Tree)用来检索 time-series -> value
Series Segment Index 用来检索 time-series key <–> time-series Id
具体 influxdb 的索引实现能够参照官网文章。
(https://github.com/influxdata…
![上传中 …]()
当工夫线膨胀后,TSI 和 TSM 的检索性能降落并不重大,问题次要是呈现在 Series Segment Index 里。
这节咱们会探讨 influxdb 的工夫线文件的正排索引(time-series key ->id, id->time-series key):
SeriesFile 是 Database(bucket)级别的。
SeriesIndex 次要解决 key->Id, key->id 的索引映射。
SeriesSegment 次要寄存的是 Series 的 Id 和 key。
SeriesIndex 外面是寄存 Series 的 Id 和 key 等索引。(能够了解是两个 hashmap)
keyIDMap 通过 key 来查找对应的 Id。
idOffsetMap 通过 Id 查到到 offset,通过这个 offset(对应 SeriesSegment 的地位)来查找 SeriesSegment 文件获取 key。
具体的代码(influxdb 2.0.7)如下:
tsdb/series_partition.go:30 // SeriesPartition represents a subset of series file data. type SeriesPartition struct {... segments []*SeriesSegment index *SeriesIndex seq uint64 // series id sequence .... } tsdb/series_index.go:36 // SeriesIndex represents an index of key-to-id & id-to-offset mappings. type SeriesIndex struct {path string ... data []byte // mmap data keyIDData []byte // key/id mmap data idOffsetData []byte // id/offset mmap data // In-memory data since rebuild. keyIDMap *rhh.HashMap idOffsetMap map[uint64]int64 tombstones map[uint64]struct{}}
对 series key 进行检索时,会先在内存 map 中查找,而后在磁盘的 map 上查找,具体的实现代码如下:
tsdb/series_index.go:185 func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 {// 内存 map 查找 if v := idx.keyIDMap.Get(key); v != nil {if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) {return id} } if len(idx.data) == 0 {return 0} hash := rhh.HashKey(key) for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask {// 磁盘 map 查找 offset elem := idx.keyIDData[(pos * SeriesIndexElemSize):] elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) if elemOffset == 0 {return 0} // 通过 offset 获取对于的 id elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) elemHash := rhh.HashKey(elemKey) if d > rhh.Dist(elemHash, pos, idx.capacity) {return 0} else if elemHash == hash && bytes.Equal(elemKey, key) {id := binary.BigEndian.Uint64(elem[8:]) if idx.IsDeleted(id) {return 0} return id } } }
这里补充一个知识点,将内存 hashmap 转成磁盘 hashmap 的实现。咱们都晓得 hashmap 的存储是数组,influfxdb 中的实现是通过 mmap 形式映射磁盘空间(见 SeriesIndex 的 keyIDData),而后通过 hash 拜访数组地址,采纳的 Robin Hood Hashing,合乎内存局部性原理(查找逻辑的代码如上 series_index.go 中)。将 Robin Hood Hashtable 纯手动移植磁盘 hashtable, 开发人员还是花了不少心理。
那内存 map 和磁盘 map 是如何生成的,为什么须要两个 map?
influxdb 的做法是将新增的 series key 先放到内存 hashmap 外面,当内存 hashmap 增长大于阈值时,将内存 hashmap 和磁盘 hashmap 进行 merge(遍历所有 SeriesSegment,过滤曾经删除的 series key)生成一个新的磁盘 hashmap,这个过程叫做 compaction。compation 完结后内存 hashmap 被清空,而后持续寄存新增的 series key。
tsdb/series_partition.go:200 // Check if we've crossed the compaction threshold. if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) && p.compactionLimiter.TryTake() { p.compacting = true log, logEnd := logger.NewOperation(context.TODO(), p.Logger,"Series partition compaction","series_partition_compaction", zap.String("path", p.path)) p.wg.Add(1) go func() { defer p.wg.Done() defer p.compactionLimiter.Release() compactor := NewSeriesPartitionCompactor() compactor.cancel = p.closing if err := compactor.Compact(p); err != nil {log.Error("series partition compaction failed", zap.Error(err)) } logEnd() // Clear compaction flag. p.mu.Lock() p.compacting = false p.mu.Unlock()}()}
tsdb/series_partition.go:569 func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {hdr := NewSeriesIndexHeader() hdr.Count = seriesN hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) // Allocate space for maps. keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) // Reindex all partitions. var entryN int for _, segment := range segments {errDone := errors.New("done") if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {... // Save max series identifier processed. hdr.MaxSeriesID, hdr.MaxOffset = id, offset // Ignore entry if tombstoned. if index.IsDeleted(id) {return nil} // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) }); err == errDone {break} else if err != nil {return err} }
这样设计有两个缺点:
做 compaction 时,当 io 拜访 SeriesSegments 文件,内存加载所有的 series key,会构建一个新的 hashtable,而后将这个 hashtable mmap 存储到磁盘,当 series key 超过几千万或者更多时,会呈现内存不够,oom 问题。
做 compaction 时, 对于曾经删除的 series key(tombstone 标记)做了过滤,不生成 series index,然而 SeriesSegment 中曾经删除 series key 只有做了 tombstone 标记,不会做物理删除,这样会导致 SeriesSegment 始终收缩,在理论生产环境一个 partition 下的所有 segmeng 文件超过几十 G,做 compaction 时,会产生大量 io 拜访。
可行的解决方案
1、减少 partition 或者 database
influxdb 的正排索引是 database 级别的,有两个形式能够缩小 compaction 时的内存,一个是减少 partition 数量或者将多个 Measurement 划到不同的 database 外面。
但这样做的问题是,曾经存在数据的 influxdb 不好调整两个数据。
2、批改工夫线存储策略
咱们晓得 hash 索引是 O1 的查问,效率十分高,然而对于增长性的数据,存在扩容问题。那咱们做个折中的抉择。当 partition 大于某个阈值时,将 hash 索引变成 b+tree 索引。b+tree 对于数据收缩性能降落无限,更适宜高基数问题,而且不再须要全局的 compaction。
3、将 series key 的正排索引下沉到 shard 级别
influxdb 外面每个 shard 都是有工夫区间的,某个工夫区间内的工夫线数据并不大。比方 database 外面保留的是 180 天 的 series key,而 shard 个别只有一天甚至 1 个小时的跨度,两者寄存的 series key 存在 1~ 2 个数量级的差距。另外将 series key 正排索引下沉到 shard 级别对删除操作更敌对,当 shard 过期删除时,会将以后 shard 的所有 series key 和其余 shard 做 diff,当 series key 不存在时再去删除 series key。
4、依据 Measurement 批改工夫线存储策略
在理论生产环境中,工夫线膨胀和 Measurement 有很大关系,个别是多数的 Measurement 存在工夫线膨胀问题,然而绝大部分的 Measurement 不存在工夫线爆炸的问题。
咱们能够对做 series key 的正排索引的 compaction 时,能够增加 Measurement 工夫线统计,如果某个 Measurement 的工夫线膨胀时,能够将这个 Measurement 的所有 series key 切换到 B+ tree。而不收缩的 series key 持续保留走 hash 索引。这样计划性能比第二个计划更好,开发成本会更高一些。
目前高基数问题次要体现在 series key 正排索引。集体感觉短期先做第二个计划适度到第四个计划的形式。这样能够比拟好的解决工夫线增长的问题,性能降落不多,老本不高。第三个计划改变比拟大,设计更正当,能够作为一个长期修复计划。
总结
本文次要通过 influxdb 来解说时序数据库的高基数 Cardinality 问题,以及可行的计划。metric 的维度爆炸导致数据线膨胀问题,很多同学都认为这是对时序数据库的误用或者是滥用。然而信息数据爆炸的明天,让数据维度收敛,不发散老本十分高,甚至远高于数据存储老本。
集体感觉须要对这个问题进行分而治之的形式,晋升时序数据库对维度爆炸的容忍度。换句话说,呈现工夫线膨胀后,时序数据库不会呈现解体状况,对工夫线未收缩的 metric 持续高效运行,而呈现工夫线膨胀的 metic 能够呈现性能降落,单不会线性降落。晋升对工夫线膨胀的容忍度,管制工夫线膨胀的爆炸半径,将会成为时序数据库的外围能力。
原文链接
本文为阿里云原创内容,未经容许不得转载。