作者介绍:胡梦宇,知乎外围架构平台开发工程师,大数据基础架构方向,次要工作内容是负责知乎外部大数据组件的二次开发和数据平台建设。
前言
一年前,知乎的大数据架构与 TiDB 首次相遇,那时咱们将 Hive MetaStore 的元数据库迁徙到了 TiDB,失去了超过单机数据库一个量级的性能晋升。在见识过分布式 NewSQL 数据库 TiDB 的威力后,咱们对它寄予厚望,将它利用到了大数据架构的其余场景下,如:Hive 大查问报警,NameNode RPC 减速。
Hive 大查问报警
背景
在知乎外部,Hive 次要被利用与两个场景:1. ETL 外围链路工作 2. Adhoc 即席查问。在 ETL 场景下,Hive SQL 工作都比拟固定而且稳固,然而在 Adhoc 场景下,用户提交的 Hive SQL 比拟随机多变。在用户对 SQL 没有做好优化的状况下,启动的 MapReduce 工作会扫描过多的数据,不仅使得工作运行较慢,还会对 HDFS 造成微小压力,影响集群的稳定性,这种状况在季度末或者年底呈现得极为频繁,有些用户会扫描一季度甚至一整年的数据,这样的查问一旦呈现,便会导致集群资源缓和,进而影响 ETL 工作,导致报表提早产出。
SQL 大查问实时报警零碎简介
针对以上痛点,咱们开发了大 SQL 查问实时报警零碎,在用户提交 SQL 时,会做以下事件:
- 解析 SQL 的执行打算,转化成须要扫描的表门路以及分区门路;
- 汇总所有分区门路的大小,计算出扫描数据总量;
- 判断扫描分区总量是否超过阈值,如果超过阈值,在企业微信告诉用户。
上面详解每一步的具体实现。
从执行打算拿到 Hive 扫描的 HDFS 门路
这一步咱们利用 Hive Server 的 Hook 机制,在每条 SQL 被解析实现后,向 Kafka 输入一条审计日志,审计日志的格局如下:
{
"operation": "QUERY",
"user": "hdfs",
"time": "2021-07-12 15:43:16.022",
"ip": "127.0.0.1",
"hiveServerIp": "127.0.0.1",
"inputPartitionSize": 2,
"sql": "select count(*) from test_table where pdate in ('2021-07-01','2021-07-02')",
"hookType": "PRE_EXEC_HOOK",
"currentDatabase": "default",
"sessionId": "5e18ff6e-421d-4868-a522-fc3d342c3551",
"queryId": "hive_20210712154316_fb366800-2cc9-4ba3-83a7-815c97431063",
"inputTableList": ["test_table"],
"outputTableList": [],
"inputPaths": [
"/user/hdfs/tables/default.db/test_table/2021-07-01",
"/user/hdfs/tables/default.db/test_table/2021-07-02"
],
"app.owner": "humengyu"
}
这里咱们次要关注以下几个字段:
字段 | 含意 |
---|---|
operation | SQL 的类型,如 QUERY, DROP 等 |
user | 提交 SQL 的用户,在知乎外部是组账号 |
sql | 提交的 SQL 内容 |
inputPaths | 扫描的 HDFS 门路 |
app.owner | 提交 SQL 的集体账号 |
汇总分区的大小
汇总分区大小须要晓得 inputPaths
字段里每一个 HDFS 门路的目录大小,这里有以下几种解决方案:
计划 | 长处 | 毛病 |
---|---|---|
调用 HDFS API 实时获取 | 后果精确 | 须要调用 getContentSummary 办法,比拟消耗 NameNode 性能,等待时间比拟久。 |
利用 Hive MetaStore 的分区统计信息 | 速度较快 | 后果可能不准,有些表通过其余计算引擎如 Flink,Spark 间接写入 HDFS 目录,没有及时更新统计信息; |
利用 HDFS 的 fsimage 解析出所有 Hive 目录大小,存入 TiDB | 速度较快 | 后果具备 T+1 的提早,当天的分区无奈统计大小。 |
思考到应用场景,大 SQL 查问大部分状况下都是扫描了几个月甚至几年的数据,一两天的分区信息疏忽能够承受,咱们抉择了第三种计划:每天将 HDFS 的 fsimage 解析,并且计算出每个 Hive 目录的大小,再将后果存入 TiDB。因为咱们在其余场景也会用到 fsimage 的信息,所以这里咱们不仅仅只存储了 Hive 目录,而是存储了整个 HDFS 的目录状况,近百亿条数据。很显著,在如此大的数据量下,还波及到数据索引相干,TiDB 是一个很好的抉择。
实时报警
咱们将审计日志实时发送至 Kafka,再用 Flink 实时去生产 Kafka 内的审计日志,利用 KafkaTableSource 和 Json Format 将 Kafka 作为流表,再利用 JdbcLookupTableSource 将 TiDB 作为维表,便可轻松计算出每条 SQL 扫描的数据量再进行报警判断。
最初达成的成果如下:
NameNode PRC 减速
背景
故事的起因是这样的,在有一段时间内,常常有用户反馈 Hive 查询卡住没有反馈,短的卡十几分钟,长的卡几小时,非常奇怪,通过定位发现是 Hive 外部在调用 getInputSummary 办法时,有一把全局锁,在某一个查问较大时,调用这个办法会破费较长的工夫,导致其余的查问线程在期待这把锁的开释。通过浏览源码发现,getInputSummary 办法是能够并发去执行的,它外部其实就是在调用 HDFS 客户端的 getContentSummary 办法,咱们将锁去掉,不再应用全局锁的性能,而是采纳了相似线程池的形式,让它能够以一个较高的并发度去执行。然而这样会带来一些问题,HDFS 客户端的 getContentSummary 办法相似于文件系统的 du 操作,如果并发度过高,会显著影响 NameNode 性能。不仅仅只有 Hive,其余的计算引擎也会调用 getContentSummary 办法,因而,优化这个办法十分必要。
缓存 ContentSummary 信息
知乎在 2019 年 HDFS 就曾经拆分了 Federation, 采取的是 Router Base Federation 的计划,引入了 NameNode 的代理组件 Router. 咱们只有在 Router 层给 HDFS 的 ContentSummary 做一层缓存,在客户端发动调用时,如果缓存命中,则从缓存读取,如果缓存未命中,则从 NameNode 申请。通过外部探讨,缓存计划有以下几种:
计划 | 长处 | 毛病 |
---|---|---|
客户端第一次申请 Router 时,从 NameNode 返回,更新缓存;第二次申请时,先拿缓存,并且判断目录的批改工夫,如果期间发未产生批改,则返回缓存,如果产生了批改,从 NameNode 返回,更新缓存。 | 对于不常批改的目录,只须要申请一次 NameNode。 | 对于第一次申请仍然须要去拜访 NameNode;只能缓存没有子目录的目录,因为子目录的变更下层目录无奈感知。 |
每天利用 fsimage 产出一份全目录的 ContentSummary 信息缓存至 TiDB,在客户端申请时,走第一种计划的逻辑。 | 大部分目录的第一次申请都不必走 NameNode。 | 仍然只能缓存没有子目录的目录,因为子目录的变更下层目录无奈感知。 |
咱们抉择了第二种计划,因为 ContentSummary 信息在咱们之前做 Hive SQL 大查问报警的时候曾经产出,所以接入进来非常不便。在接入 TiDB 做缓存,并且给申请门路建索引当前,对于个别状况下的 getContentSummary 申请,提早能保障在 10ms 以下,而对于没有 TiDB 缓存的 NameNode,这个工夫可能会破费几分钟甚至几十分钟。
瞻望
本次咱们利用 TiDB 的超大存储和索引性能,缓存了 HDFS 的元信息,满足了知乎外部的一些场景,后续咱们会继续改良和扩大此场景:比方缓存 HDFS 文件信息能够做成实时缓存,利用 Edit log 订阅文件变更,而后和 TiDB 外面的存量 fsimage 进行合并,产出低提早的 NameNode 快照,用于一些在线的剖析等。