在数据中台建设过程中,一个典型的数据集成场景是将 MQ (Message Queue,例如 Kafka、RocketMQ 等)的数据导入到 Hive 中,以供上游数仓建设以及指标统计。因为 MQ-Hive 是数仓建设第一层,因而对数据的准确性以及实时性要求比拟高。
本文次要围绕 MQ-Hive 场景,针对目前字节跳动内已有解决方案的痛点,提出基于 Flink 的实时解决方案,并介绍新计划在字节跳动外部的应用现状。
已有计划及痛点
字节跳动内已有解决方案如下图所示,次要分了两个步骤:
- 通过 Dump 服务将 MQ 的数据写入到 HDFS 文件
- 再通过 Batch ETL 将 HDFS 数据导入到 Hive 中,并增加 Hive 分区
痛点
- 工作链较长,原始数据须要通过屡次转换最终能力进入 Hive
- 实时性比拟差,Dump Service、Batch ETL 提早都会导致最终数据产出提早
- 存储、计算开销大,MQ 数据反复存储和计算
- 基于原生 Java 打造,数据流量持续增长后,存在单点故障和机器负载不平衡等问题
- 运维老本较高,架构上无奈复用公司内 Hadoop/Flink/Yarn 等现有基础设施
- 不反对异地容灾
基于 Flink 实时解决方案
劣势
针对目前公司传统解决方案的痛点,咱们提出基于 Flink 的实时解决方案,将 MQ 的数据实时写入到 Hive,并反对事件工夫以及 Exactly Once 语义。相比老计划,新计划劣势如下所示:
- 基于流式引擎 Flink 开发,反对 Exactly Once 语义
- 实时性更高,MQ 数据间接进入 Hive,无两头计算环节
- 缩小两头存储,整个流程数据只会落地一次
- 撑持 Yarn 部署模式,不便用户迁徙
- 资源管理弹性,不便扩容以及运维
- 反对双机房容灾
整体架构
整体架构如下图所示,次要包含 DTS(Data Transmission Service) Source、DTS Core、DTS Sink 三大模块,具体性能如下:
- DTS Source 接入不同 MQ 数据源,反对 Kafka、RocketMQ 等
- DTS Sink 将数据输入到指标数据源,反对 HDFS、Hive 等
- DTS Core 贯通整个数据同步流程,通过 Source 读取源端数据,通过 DTS Framework 解决,最初通过 Sink 将数据输入到指标端。
- DTS Framework 集成类型零碎、文件切分、Exactly Once、工作信息采集、事件工夫、脏数据收集等外围性能
- 反对 Yarn 部署模式,资源调度、治理比拟弹性
Exactly Once
Flink 框架通过 Checkpoint 机制,可能提供 Exactly Once 或者 At Least Once 语义。为了实现 MQ-Hive 全链路反对 Exactly-once 语义,还须要 MQ Source、Hive Sink 端反对 Exactly Once 语义。本文通过 Checkpoint + 2PC 协定实现,具体过程如下:
- 数据写入时,Source 端从上游 MQ 拉取数据并发送到 Sink 端;Sink 端将数据写入到长期目录中
- Checkpoint Snapshot 阶段,Source 端将 MQ Offset 保留到 State 中;Sink 端敞开写入的文件句柄,并保留以后 Checkpoint ID 到 State 中;
- Checkpoint Complete 阶段,Source 端 Commit MQ Offset;Sink 端将长期目录中的数据挪动到正式目录下
- Checkpoint Recover 阶段,加载最新一次胜利的 Checkpoint 目录并复原 State 信息,其中 Source 端将 State 中保留的 MQ Offset 作为起始地位;Sink 端复原最新一次胜利的 Checkpoint ID,并将长期目录的数据挪动到正式目录下
■ 实现优化
在理论应用场景中,特地是大并发场景下,HDFS 写入提早容易有毛刺,因为个别 Task Snapshot 超时或者失败,导致整个 Checkpoint 失败的问题会比拟显著。因而针对 Checkpoint 失败,进步零碎的容错性以及稳定性就比拟重要。这里充分利用 Checkpoint ID 严格枯燥递增的个性,每一次做 Checkpoint 时,以后 Checkpoint ID 肯定比以前大,因而在 Checkpoint Complete 阶段,能够提交小于等于以后 Checkpoint ID 的长期数据。具体优化策略如下:
- Sink 端长期目录为{dump_path}/{next_cp_id},这里 next_cp_id 的定义是以后最新的 cp_id + 1
- Checkpoint Snapshot 阶段,Sink 端保留以后最新 cp_id 到 State,同时更新 next_cp_id 为 cp_id + 1
- Checkpoint Complete 阶段,Sink 端将长期目录中所有小于等于以后 cp_id 的数据挪动到正式目录下
- Checkpoint Recover 阶段,Sink 端复原最新一次胜利的 cp_id,并将长期目录中小于等于以后 cp_id 的数据挪动到正式目录下
类型零碎
因为不同数据源反对的数据类型不一样,为了解决不同数据源间的数据同步以及不同类型转换兼容的问题,咱们反对了 DTS 类型零碎,DTS 类型可细化为根底类型和复合类型,其中复合类型反对类型嵌套,具体转换流程如下:
- 在 Source 端,将源数据类型,对立转成零碎外部的 DTS 类型
- 在 Sink 端,将零碎外部的 DTS 类型转换成指标数据源类型
- 其中 DTS 类型零碎反对不同类型间的互相转换,比方 String 类型与 Date 类型的互相转换
Rolling Policy
Sink 端是并发写入,每个 Task 解决的流量不一样,为了防止生成太多的小文件或者生成的文件过大,须要反对自定义文件切分策略,以管制单个文件的大小。目前反对三种文件切分策略:文件大小、文件最长未更新工夫、Checkpoint。
■ 优化策略
Hive 反对 Parquet、Orc、Text 等多种存储格局,不同的存储格局数据写入过程不太一样,具体能够分为两大类:
- RowFormat:基于单条写入,反对依照 Offset 进行 HDFS Truncate 操作,例如 Text 格局
- BulkFormat:基于 Block 写入,不反对 HDFS Truncate 操作,例如 Parquet、ORC 格局
为了保障 Exactly Once 语义,并同时反对 Parquet、Orc、Text 等多种格局,在每次 Checkpoint 时,强制做文件切分,保障所有写入的文件都是残缺的,Checkpoint 复原时不必做 Truncate 操作。
容错解决
现实状况下流式工作会始终运行不须要重启,但理论不可避免会遇到以下几个场景:
- Flink 计算引擎降级,须要重启工作
- 上游数据减少,须要调整工作并发度
- Task Failover
■ 并发度调整
目前 Flink 原生反对 State Rescale。具体实现中,在 Task 做 Checkpoint Snapshot 时,将 MQ Offset 保留到 ListState 中;Job 重启后,Job Master 会依据 Operator 并发度,将 ListState 平均分配到各个 Task 上。
■ Task Failover
因为网络抖动、写入超时等内部因素的影响,Task 不可避免会呈现写入失败,如何疾速、精确的做 Task Failover 就显得比拟重要。目前 Flink 原生反对多种 Task Failover 策略,本文应用 Region Failover 策略,将失败 Task 所在 Region 的所有 Task 都重启。
异地容灾
■ 背景
大数据时代,数据的准确性和实时性显得尤为重要。本文提供多机房部署及异地容灾解决方案,当主机房因为断网、断电、地震、火灾等起因临时无奈对外提供服务时,能疾速将服务切换到备灾机房,并同时保障 Exactly Once 语义。
■ 容灾组件
整体解决方案须要多个容灾组件一起配合实现,容灾组件如下图所示,次要包含 MQ、YARN、HDFS,具体如下:
- MQ 须要反对多机房部署,当主机房故障时,能将 Leader 切换到备机房,以供上游生产
- Yarn 集群在主机房、备机房都有部署,以便 Flink Job 迁徙
- 上游 HDFS 须要反对多机房部署,当主机房故障时,能将 Master 切换到备机房
- Flink Job 运行在 Yarn 上,同时工作 State Backend 保留到 HDFS,通过 HDFS 的多机房反对保障 State Backend 的多机房
■ 容灾过程
整体容灾过程如下所示:
- 失常状况下,MQ Leader 以及 HDFS Master 部署在主机房,并将数据同步到备机房。同时 Flink Job 运行在主机房,并将工作 State 写入到 HDFS 中,留神 State 也是多机房部署模式
- 劫难状况下,MQ Leader 以及 HDFS Master 从主机房迁徙到备灾机房,同时 Flink Job 也迁徙到备灾机房,并通过 State 复原劫难前的 Offset 信息,以提供 Exactly Once 语义
事件工夫归档
■ 背景
在数仓建设中,解决工夫 (Process Time) 和事件工夫 (Event Time) 的解决逻辑不太一样,对于解决工夫会将数据写到以后零碎工夫所对应的工夫分区下;对于事件工夫,则是依据数据的生产工夫将数据写到对应工夫分区下,本文也简称为归档。在理论场景中,不可避免会遇到各种上下游故障,并在继续一段时间后复原,如果采纳 Process Time 的解决策略,则事变期间的数据会写入到复原后的工夫分区下,最终导致分区空洞或者数据漂移的问题;如果采纳归档的策略,会依照事件工夫写入,则没有此类问题。因为上游数据事件工夫会存在乱序,同时 Hive 分区生成后就不应该再持续写入,因而理论写入过程中不可能做到有限归档,只能在肯定工夫范畴内归档。归档的难点在于如何确定全局最小归档工夫以及如何容忍肯定的乱序。
■ 全局最小归档工夫
Source 端是并发读取,并且一个 Task 可能同时读取多个 MQ Partition 的数据,对于 MQ 的每一个 Parititon 会保留以后分区归档工夫,取分区中最小值作为 Task 的最小归档工夫,最终取 Task 中最小值,作为全局最小归档工夫。
■ 乱序解决
为了反对乱序的场景,会反对一个归档区间的设置,其中 Global Min Watermark 为全局最小归档工夫,Partition Watermark 为分区以后归档工夫,Partition Min Watermark 为分区最小归档工夫,只有当事件工夫满足以下条件时,才会进行归档:
- 事件工夫大于全局最小归档工夫
- 事件工夫大于分区最小归档工夫
Hive 分区生成
■ 原理
Hive 分区生成的难点在于如何确定分区的数据是否就绪以及如何增加分区。因为 Sink 端是并发写入,同时会有多个 Task 写同一个分区数据,因而只有当所有 Task 分区数据写入实现,能力认为分区数据是就绪,本文解决思路如下:
- 在 Sink 端,对于每个 Task 保留以后最小解决工夫,须要满足枯燥递增的个性
- 在 Checkpoint Complete 时,Task 上报最小解决工夫到 JM 端
- JM 拿到所有 Task 的最小解决工夫后,能够失去全局最小解决工夫,并以此作为 Hive 分区的最小就绪工夫
- 当最小就绪工夫更新时,可判断是否增加 Hive 分区
■ 动静分区
动静分区是依据上游输出数据的值,确定数据写到哪个分区目录,而不是写到固定分区目录,例如 date={date}/hour={hour}/app={app}的场景,依据分区工夫以及 app 字段的值确定最终的分区目录,以实现每个小时内,雷同的 app 数据在同一个分区下。在动态分区场景下,每个 Task 每次只会写入一个分区文件,但在动静分区场景下,每个 Task 可能同时写入多个分区文件。对于 Parque 格局的写入,会先将数据写到做本地缓存,而后批次写入到 Hive,当 Task 同时解决的文件句柄过多时,容易呈现 OOM。为了避免单 Task OOM,会周期性对文件句柄做探活检测,及时开释长时间没有写入的文件句柄。
Messenger
Messenger 模块用于采集 Job 运行状态信息,以便掂量 Job 衰弱度以及大盘指标建设。
■ 元信息采集
元信息采集的原理如下所示,在 Sink 端通过 Messenger 采集 Task 的外围指标,例如流量、QPS、脏数据、写入 Latency、事件工夫写入成果等,并通过 Messenger Collector 汇总。其中脏数据须要输入到内部存储中,工作运行指标输入到 Grafana,用于大盘指标展现。
■ 脏数据收集
数据集成场景下,不可避免会遇到脏数据,例如类型配置谬误、字段溢出、类型转换不兼容等场景。对于流式工作来说,因为工作会始终运行,因而须要可能实时统计脏数据流量,并且将脏数据保留到内部存储中以供排查,同时在运行日志中采样输入。
■ 大盘监控
大盘指标笼罩全局指标以及单个 Job 指标,包含写入胜利流量和 QPS、写入 Latency、写入失败流量和 QPS、归档成果统计等,具体如下图所示:
将来布局
基于 Flink 实时解决方案目前已在公司上线和推广,将来次要关注以下几个方面:
- 数据集成性能加强,反对更多数据源的接入,反对用户自定义数据转换逻辑等
- Data Lake 买通,反对 CDC 数据实时导入
- 流批架构对立,反对全量、增量场景数据集成
- 架构降级,反对更多部署环境,比方 K8S
- 服务化欠缺,升高用户接入老本
总结
随着字节跳动业务产品逐步多元化疾速倒退,字节跳动外部一站式大数据开发平台性能也越来越丰盛,并提供离线、实时、全量、增量场景下全域数据集成解决方案,从最后的几百个工作规模增长到数万级规模,日解决数据达到 PB 级,其中基于 Flink 实时解决方案目前已在公司外部大力推广和应用,并逐渐替换老的 MQ-Hive 链路。
参考文献:
1.Real-time Exactly-once ETL with Apache Flink
http://shzhangji.com/blog/201…
2.Implementing the Two-Phase Commit Operator in Flink
https://flink.apache.org/feat…
3.A Deep Dive into Rescalable State in Apache Flink
https://flink.apache.org/feat…
4.Data Streaming Fault Tolerance
https://ci.apache.org/project…