分布式时序数据库QTSDB的设计与实现

23次阅读

共计 5356 个字符,预计需要花费 14 分钟才能阅读完成。

奇技指南

现有的开源时序数据库 influxdb 只支持单机运行,在面临大量数据写入时,会出现查询慢,机器负载高,单机容量的限制。

为了解决这一问题,360 基础架构团队在单机 influxdb 的基础上,开发了集群版——QTSDB

QTSDB 简述

QTSDB 是一个分布式时间序列数据库,用于处理海量数据写入与查询。实现上,是基于开源单机时序数据库 influxdb 1.7 开发的分布式版本,除了具有 influxdb 本身的特性之外,还有容量扩展、副本容错等集群功能。

主要特点如下:

  • 为时间序列数据专门编写的高性能数据存储, 兼顾写入性能和磁盘空间占用;
  • 类 sql 查询语句, 支持多种统计聚合函数;
  • 自动清理过期数据;
  • 内置连续查询,自动完成用户预设的聚合操作;
  • Golang 编写,没有其它的依赖, 部署运维简单;
  • 节点动态水平扩展,支持海量数据存储;
  • 副本冗余设计,自动故障转移,支持高可用;
  • 优化数据写入,支持高吞吐量;

系统架构

逻辑存储层次结构

influxdb 架构层次最高是 database,database 下边根据数据保留时长不同分成了不同的 retension policy,形成了 database 下面的多个存储容器,因为时序数据库与时间维度关联,所以将相同保留时长的内容存放到一起,便于到期删除。除此之外,在 retension policy 之下,将 retension policy 的保留时长继续细分,每个时间段的数据存储在一个 shard group 中,这样当某个分段的 shard group 到期之后,会将其整个删掉,避免从存储引擎内部抠出部分数据。例如,在 database 之下的数据,可能是 30 天保留时长,可能是 7 天保留时长,他们将存放在不同的 retension policy 之下。假设将 7 天的数据继续按 1 天进行划分,就将他们分别存放到 7 个 shard group 中,当第 8 天的数据生成时,会新建一个 shard group 写入,并将第 1 天的 shard group 整个删除。

到此为止,同一个 retension policy 下,发来的当下时序数据只会落在当下的时间段,也就是只有最新的 shard group 有数据写入,为了提高并发量,一个 shard group 又分成了多个 shard,这些 shard 全局唯一,分布于所有物理节点上,每个 shard 对应一个 tsm 存储引擎,负责存储数据。

在请求访问数据时,通过请求的信息可以锁定某个 database 和 retension policy,然后根据请求中的时间段信息,锁定某个(些)shard group。对于写入的情况,每条写入的数据都对应一个 serieskey(这个概念后面会介绍),通过对 serieskey 进行哈希取模就能锁定一个 shard,进行写入。而 shard 是有副本的,在写入的时候会采用无主多写的策略同时写入到每个副本中。查询时,由于查询请求中没有 serieskey 的信息,所以只能将 shard group 内的 shard 都查询一遍,针对一个 shard,会在其副本中选择一个可用的物理节点进行访问。

那么一个 shard group 要有多少 shard 呢,为了达到最大并发量,又不过分干扰数据整体的有序性,在物理节点数和副本数确定后,一个 shard group 内的 shard 数量是机器数除以副本数,保障了当下的数据可以均匀写入到所有的物理节点之上,也不至于因为 shard 过多影响查询效率。例如,图上 data 集群有 6 个物理节点,用户指定双副本,那么就有 3 个 shard。

集群结构

整个系统分成三个部分:proxy、meta 集群、data 集群。proxy 负责接收请求,无状态,其前可接 lvs 支持水平扩展。meta 集群保存上面提到的逻辑存储层次及其与物理节点的对应关系,通过 raft 协议保障元数据的强一致,这里 meta 信息保存在内存中,日志和快照会持久化到磁盘。data 集群是真正的数据存储节点,数据以 shard 为单位存储于其上,每个 shard 都对应一个 tsm 存储引擎。

请求到来的时候,经过 lvs 锁定一台 proxy,proxy 先根据 database、retension policy 和时间段到 meta 集群查找 meta 信息,最终得到一个 shard 到物理节点的映射,然后将这个映射关系转换为物理节点到 shard 的映射返回给 proxy,最后根据这个映射关系,到 data 集群指定的物理节点中访问具体的 shard,至于 shard 之下的数据访问后边会介绍。

数据访问

语法格式

influxdb 的查询提供类似于关系数据库的查询方式,展示出来类似一个关系表:measurement,时序数据库的时间作为一个永恒的列,除此之外的列分成两类:

1、field

一类是 field,他们是时序数据最关键的数据部分,其值会随着时间的流动源源不断的追加,例如两台机器之间在每个时间点上的延迟。

2、tag

另一类是 tag,他们是一个 field 值的一些标记,所以都是字符串类型,并且取值范围很有限。例如某个时间点的延迟 field 值是 2ms,对应有两个标记属性,从哪台机器到哪台机器的延迟,因此可以设计两个 tag:from、to。

measurement 展示出来第一行是 key,剩下的可以看成 value,这样 tag 有 tagkey,tagvalue,field 有 fieldkey 和 fieldvalue。

数据读写

当收到一行写入数据时,会转化为如下的格式:

measurement+tagkey1+tagvalue1+tagkey2+tagvalue2+fieldkey+fieldvalue+time。

如果一行中存在多个 field 就会划分成多条这样的数据存储。influxdb 的存储引擎可以理解为一个 map,从 measurement 到 fieldkey 作为存储 key,后边的 fieldvalue 和 time 是存储 value,这些值会源源不断追加的,在存储引擎中,这些值会作为一列存储到一起,因为是随时间渐变的数据,将他们保存到一起可以提升压缩的效果。另外将存储 key 去掉 fieldkey 之后剩余部分就是上边提到的 serieskey。

上边提到,访问请求在集群中如何锁定 shard,这里介绍在一个 shard 内的访问。

influxdb 的查询类似于 sql 语法,但是跟 sql 语句的零散信息无法直接查询存储引擎,所以需要一些策略将 sql 语句转换成存储 key。influxdb 通过构建倒排索引来将 where 后的 tag 信息转换为所有相关的 serieskey 的集合,然后将每个 serieskey 拼接上 select 后边的 fieldkey 就组成了存储 key,这样就可以按列取出对应的数据了。

通过对 tsm 存储引擎中存储 key 内 serieskey 的分析,能够构建出倒排索引,新版本 influxdb 将倒排索引持久化到每个 shard 中,与存储数据的 tsm 存储引擎对应,叫做 tsi 存储引擎。倒排索引相当于一个三层的 map,map 的 key 是 measurment,值是一个二层的 map,这个二层的 map 的 key 是 tagkey,对应的值是一个一层的 map,这个一层 map 的 key 是 tagval,对应的值是一个 serieskey 的集合,这个集合中的每个 serieskey 字串都包含了 map 索引路径上的 measurement、tagkey 和 tagval。

这样可以分析查询 sql,用 from 后的 measurement 查询倒排索引三级 map 获得一个二级 map,然后再分析 where 之后多个过滤逻辑单元,以 tagkey1=tagval1 为例,将这两个信息作为二层 map 的 key,查到最终的值:serieskey 的集合,这个集合的每个 serieskey 字串都包含了 measurment、tagkey1 和 tagval1,他们是满足当下过滤逻辑单元的 serieskey。根据这些逻辑单元的与或逻辑,将其对应的 serieskey 的集合进行交并运算,最终根据 sql 的语义过滤出所有的符合其逻辑的 serieskey 的集合,然后将这些 serieskey 与 select 后边的 fieldkey 拼接起来,得到最终的存储·key,就可以读取数据了。

不带聚合函数的查询:如图,对于一个 serieskey,需要拼接众多的 fieldkey,进而取出多个列的数据,他们出来后面临的问题是怎么组合为一行的数据,influxdb 行列约束比较松散,不能单纯按照列内偏移确定行。Influxdb 把 serieskey 和 time 作为判断列数据为一行的依据,每一个 serieskey 对应的多列就汇集为一个以多行为粒度的数据流,多个 serieskey 对应的数据流按照一定顺序汇集为一个数据流,作为最终的结果集返回到客户端。

带聚合函数的查询:这种方式与上边的查询正好相反,这里是针对聚合函数参数 field,拼接上众多的 serieskey,当然最终目的都是一样,得到存储 key,多个存储 key 可以读取多个数据流,这些数据流面临两种处理,先将他们按照一定的顺序汇集为一个数据流,然后按照一定的策略圈定这个数据流内相邻的一些数据进行聚合计算,进而得到最终聚合后的值。这里的顺序和策略来自于 sql 语句中 group by 后的聚合方式。

多数据流的合并聚合方式,也同样适用于 shard 之上的查询结果。

对于写入就比较简单了,直接更新数据存储引擎和倒排索引就可以了。

整个流程

对于访问的整个流程上边都已经提到了,这里整体梳理一下:分成两个阶段,在 shard 之上的查询,在 shard 之下的查询。

首先访问请求通过 lvs 锁定到某个 proxy,proxy 到 meta 集群中查找 meta 信息,根据请求信息,锁定 database,retension policy 和 shard group,进而得到众多的 shard。

对于写入操作,根据写入时的 serieskey,锁定一个 shard 进行写入,由于 shard 存在多副本,需要同时将数据写入到多个副本。对于查询,无法通过请求信息得到 serieskey,因此需要查询所有的 shard,针对每个 shard 选择一个可用的副本,进行访问。

经过上边的处理就获得 shard 到物理节点的映射,然后将其反转为物理节点到 shard 的映射,返回给 proxy,proxy 就可以在 data 集群的某个节点访问对应的 shard 了。

在 shard 之下的写入访问,需要拆解 insert 语句,组合为存储键值对存入 tsm 存储引擎,然后根据组合的 serieskey 更新倒排索引。

在 shard 之下的查询访问,分析 sql 语句,查询倒排索引,获取其相关的 serieskey 集合,将其拼接 field,形成最终的存储 key,进行数据访问。然后将众多数据在 data 节点上进行 shard 之上的合并聚合,在 proxy 上进行 data 之上的合并聚合。

最终 proxy 将访问结果返回给客户端。

故障处理

策略

上边提到 influxdb 针对 shard 提供副本容错,当写入数据发送到 proxy,proxy 将数据以无主多写的形式发送到所有的 shard 副本。meta 集群以心跳的形式监控 data 节点是否在线,在读取的时候,针对同一 shard 会在在线的 data 节点中随机选择一个读取节点进行读取。

在写入时如果一个 data 节点不可用,则会写入到 proxy 的一个临时文件中,等网络恢复正常会将这些暂存的数据发送到指定节点。

处理

data 集群扩容

当有全新节点加入 data 集群,目前还不支持自动将现有数据进行迁移,不过也做了些努力,为了使当下写入数据尽快应用到新的节点,在新加入节点的时候,会将当下时间作为当下 shard group 的结尾时间,然后按照全新的 data 节点数量新建一个 shard group,这样当下数据量马上就能均分到各个 data 节点,而每个 shard group 相关的 meta 信息都存储在 meta 集群里,因此不会对之前数据的读取造成干扰。

data 节点短暂不可用

如果 data 节点处于短期不可用状态,包括短暂的网络故障后自恢复,或者硬件故障后运维人员干预,最终 data 节点还存有掉线前的数据,那么就可以以原来的身份加入到 data 集群。对于写入来说,不可用期间 proxy 会临时存放此 data 节点的数据,在 data 加入集群时会将这部分数据再次发送到 data 节点,保障数据最终一致。

data 节点长期不可用

如果 data 节点由于一些原因,不能或者不需要以原来的身份加入到集群,需要运维人员手动将原来不可用的 data 节点下线,那么这台机器可用时,可以以全新的 data 身份加入到集群中,这等同于集群的扩容。

总 结

QTSDB 集群实现为:写入时根据 serieskey 将数据写到指定 shard,而读取时无法预知 serieskey,因此需要查询每个 shard。将整个读取过程切分为两个阶段:在 data 节点上进行存储引擎的读取以及节点内部多 shard 的合并聚合,在 proxy 节点将多个 data 节点的数据汇总,进行后期的合并聚合,形成最终的结果集返回到客户端。

QTSDB 现有的集群功能还有不完善的地方,会在之后的使用中不断完善。

本文为 360 技术原创文章,转载请务必注明出处及文末二维码,谢谢~

关于 360 技术

360 技术是 360 技术团队打造的技术分享公众号,每天推送技术干货内容

更多技术信息欢迎关注“360 技术”微信公众号

正文完
 0