共计 4624 个字符,预计需要花费 12 分钟才能阅读完成。
本文介绍了 SmartNews 利用 Flink 减速 Hive 日表的生产,将 Flink 无缝地集成到以 Airflow 和 Hive 为主的批处理零碎的实际。具体介绍过程中遇到的技术挑战和应答计划,以供社区分享。次要内容为:
- 我的项目背景
- 问题的定义
- 我的项目的指标
- 技术选型
- 技术挑战
- 整体计划及挑战应答
- 我的项目成绩和瞻望
- 后记
一、我的项目背景
SmartNews 是一家机器学习驱动的互联网公司。自 2012 年于日本东京成立,并在美国和中国设有办公室。通过 8 年多的倒退,SmartNews 曾经成长为日本排名第一,美国成长最快的新闻类利用,笼罩寰球超过 150 多个国家市场。据 2019 年初统计,SmartNews 的 iOS 和 Android 版本寰球累计下载量曾经超过 5000 万次。
SmartNews 在过来 9 年的工夫,基于 Airflow, Hive, EMR 等技术栈构建了大量的数据集。随着数据量的增长,这些离线表的解决工夫在逐步拉长。另外,随着业务方迭代节奏的放慢,对表的实时性也提出了更高的要求。因而,SmartNews 外部发动了 Speedy Batch 的我的项目,以放慢现有离线表生产效率。
本次分享便是 Speedy Batch 我的项目中的一个例子,减速用户行为 (actions) 表的实际。
APP 端上报的用户行为日志,每日通过 Hive 作业生成日表,这个表是许多其余表的源头,至关重要。这个作业须要运行 3 个小时,进而拉高了许多上游表的提早 (Latency),显著影响数据科学家、产品经理等用户的应用体验。因而咱们须要对这些作业进行提速,让各个表能更早可用。
公司业务基本上都在私有云上,服务器的原始日志以文件模式上传至云存储,按日分区;目前的作业用 Airflow 调度到 EMR 上运行,生成 Hive 日表,数据存储在云存储。
二、问题的定义
1. 输出
新闻服务器每隔 30 秒上传一个原始日志文件,文件上传至相应日期和小时的云存储目录。
2. 输入
原始日志通过 ETL 解决之后,按日 (dt) 和行为 (action) 两级分区输入。action 品种约 300 个,不固定,常有增减。
3. 用户
对这个表的应用是宽泛的,多路径的。有从 Hive 里查问,也有从 Presto,Jupyter 和 Spark 里查问,咱们甚至不能确定以上就是全副的拜访路径。
三、我的项目的指标
- 将 actions 表的时延从 3 小时缩短至 30 分钟;
对上游用户放弃通明。通明又分两个方面:
- 性能方面:用户无需批改任何代码,做到齐全无感
- 性能方面:新我的项目产生的表,不应该导致上游读取时的性能降落
四、技术选型
在本我的项目之前,共事曾经对该作业做了多轮次改良,成果不是很显著。
尝试过的计划包含减少资源,投入更多的机器,但遇到了云存储的 IOPS 限度:每个 prefix 最多反对 3000 个并发读写,这个问题在输入阶段尤为显著,即多个 reducer 同时向同一个 action 子目录输入的时候,容易碰到这个限度。另外还尝试了按小时预处理,而后到每日凌晨再合并成日表,但合并过程亦耗时较多,整体时延还是在 2.5 小时左右,成果不够显著。
鉴于服务器端的日志是近实时上传至云存储,团队提出了流式解决的思路,摒弃了批作业期待一天、解决 3 小时的模式,而是把计算扩散在一整天,进而升高当天完结后的解决用时。团队对 Flink 有比拟好的背景,加上 Flink 近期对 Hive 的改良较多,因而决定采纳基于 Flink 的计划。
五、技术挑战
挑战是多方面的。
1. 输入 RC 文件格式
以后 Hive 表的文件格式为 RCFile,为了保障对用户的通明,咱们只能在现有的 Hive 表上做 in-place 的 upgrade,也就是咱们得重用以后表,那么 Flink 输入的文件格式也得合乎 RCFile 格局,因为一张 Hive 表只能有一个格局。
RCFile 属于 bulk format (绝对应的是 row format),在每次 checkpoint 时必须一次性输入。如果咱们抉择 5 分钟一次 checkpoint,那么每个 action 每 5 分钟必须输入一个文件,这会大量减少后果文件数,进而影响上游的读取性能。特地是对于低频 action,文件数会上百倍的减少。咱们理解了 Flink 的文件合并性能,但那是在一个 checkpoint 内多个 sink 数据的合并,这并不能解决咱们的问题,咱们须要的是跨 checkpoint 的文件合并。
团队思考过以 row format (e.g. CSV) 输入,而后实现自定义的 Hive SerDe,使之兼容 RCFile 和 CSV。但很快咱们放弃了这个构想,因为那样的话,须要为每个查问场景实现这个 Hybrid 的 SerDe,例如须要为 Presto 实现,为 Spark 实现,等等。
- 一方面咱们没法投入这么多资源;
另一方面那种计划也是用户有感的,毕竟用户还是须要装置这个自定义的 SerDe。
咱们之前提出了生成一个新格局的表,但也因为对用户不够通明而被否决。
2. Partition 的可感知性和完整性
如何让上游作业能感知到当天这个 partition 曾经 ready?actions 表分两级 partition, dt 和 action。action 属于 Hive 的 dynamic partition,数量多且不固定。以后 Airflow 上游作业是期待 insert_actions 这个 Hive 工作实现后,再开始执行的。这个没问题,因为 insert_actions 完结时,所有 action 的 partition 都曾经 ready 了。但对于 Flink 作业来说,没有完结的信号,它只能往 Hive 外面提交一个个的 partition,如 dt=2021-05-29/action=refresh。因为 action 数量多,提交 partition 的过程可能继续数分钟,因而咱们也不能让 Airflow 作业去感知 dt 级别的 partition,那样很可能在只有局部 action 的状况下触发上游。
3. 流式读取云存储文件
我的项目的输出是一直上传的云存储文件,并非来自 MQ (message queue)。Flink 反对 FileStreamingSource,能够流式的读入文件,但那是基于定时 list 目录以发现新的文件。但这个计划不适宜咱们的场景,因为咱们的目录太大,云存储 list 操作根本无法实现。
4. Exactly Once 保障
鉴于 actions 表的重要性,用户无奈承受任何的数据失落或者反复,因而整个计划须要保障恰好一次的解决。
六、整体计划及挑战应答
1. 输入 RCFile 并且防止小文件
咱们最终抉择的计划是分两步走,第一个 Flink 作业以 json (row format) 格局输入,而后用另外一个 Flink 作业去做 Json 到 RC 格局的转化。以此解决 Flink 不能欢快的输入适合大小 RC 文件的问题。
输入 json 的两头后果,这样咱们能够通过 Rolling Policy 管制输入文件的大小,能够跨多个 checkpoint 攒成足够大,或者工夫足够长,而后再输入到云存储。这里 Flink 其实利用的是云存储的 Multi Part Upload (MPU) 的性能,即每次 checkpoint Flink 也是把以后 checkpoint 攒下来的数据上传至 云存储,但输入的不是文件,而是一个 part。最初当多个 part 达到大小或者工夫要求,就能够调用云存储的接口将多个 part 合并成一个文件,这个合并操作在云存储端实现,利用端无需再次读取这个 part 到本地合并而后再上传。而 Bulk format 均须要一次性全局解决,因而无奈分段上传而后合并,必须一次性全副上传。
当第二个作业感知到一个新的 json 文件上传后,加载它,转化成 RCFile,而后上传到最终的门路。这个过程带来的提早较小,一个文件能够管制在 10s 以内,这是能够承受的。
2. 优雅的感知输出文件
输出端,没有采纳 Flink 的 FileStreamingSource,而是采纳云存储的 event notification 来感知新文件的产生,承受到这个告诉后再被动去加载文件。
3. Partition 的可感知性和完整性
输入端,咱们输入 dt 级别的 success file,来让上游牢靠地感知日表的 ready。咱们实现自定义的 StreamingFileWriter,使之输入 partitionCreated 和 partitionInactive 的信号,并且通过实现自定义的 PartitionCommitter,来基于上述信号判断日表的完结。
其机制如下,每个云存储 writer 开始写某个 action,会收回一个 partitionCreated 信号,当它完结时又收回 partitionInactive 信号。PartitionCommitter 判断某一天之内是否所有的 partittion 都 inactive 了,如果是,则一天的数据都解决了,输入 dt 级别的 success file,在 Airflow 通过感知这个文件来判断 Flink 是否实现了日表的解决。
4. Exactly Once
云存储的 event notification 提供 At Least once 保障。Flink 作业内对文件级别进行去重,作业采纳 Exactly Once 的 checkpoint 设定,云存储文件输入基于 MPU 机制等价于反对 truncate,因而云存储输入等价于幂等,因而等价于端到端的 Exactly Once。
七、我的项目成绩和瞻望
我的项目曾经上线,时延维持在 34 分钟高低,其中包含 15 分钟的期待早退文件。
- 第一个 Flink 作业须要 8 分钟左右实现 checkpoint 和输入,json 转 rc 作业须要 12 分钟实现全副解决。咱们能够把这个工夫持续压缩,然而综合时效性和老本,咱们抉择以后的状态。
- json 转 rc 作业耗时比当初的料想的要大,因为上游作业最初一个 checkpoint 输入太多的文件,导致整体耗时长,这个能够通过减少作业的并发度线性的降落。
- 输入的文件数比批作业输入的文件数有所增加,减少 50% 左右。这是流式解决于批处理的劣势,流式解决须要在工夫达到时就输入一个文件,而此时文件大小未必达到预期。好在这个水平的文件数减少不显著影响上游的性能。
- 做到了上游的齐全通明,整个上线前后,没有收到任何用户异样反馈。
该我的项目让咱们在生产环境验证了利用流式解决框架 Flink 来无缝染指批处理零碎,实现用户无感的部分改良。未来咱们将利用同样的技术,去减速更多其余的 Hive 表的生产,并且宽泛提供更细粒度 Hive 示意的生产,例如小时级。另一方面,咱们将摸索利用 data lake 来治理批流一体的数据,实现技术栈的逐渐收敛。
八、后记
因为采纳齐全不同的计算框架,且须要与批处理零碎齐全保持一致,团队踩过不少的坑,限于篇幅,无奈一一列举。因而咱们筛选几个有代表的问题留给读者思考:
- 为了验证新作业产出的后果与原来 Hive 产出统一,咱们须要比照两者的输入。那么,如何能力高效的比拟两个 Hive 表的一致性呢?特地是每天有百亿级数据,每条有数百个字段,当然也蕴含简单类型 (array, map, array<map> 等)。
- 两个 Flink 作业的 checkpoint 模式都必须是 Exactly Once 吗?哪个能够不是,哪个必须是?
- StreamFileWriter 只有在 checkpoint 时才承受到 partitionCreated 和 partitionInactive 信号,那么咱们能够在它的 snapshotState() 函数外面输入给上游 ( 上游会保留到 state) 吗?
- 最初一问:你们有更好的计划可供咱们参考吗?