基于 Hive 的离线数仓往往是企业大数据生产零碎中不可短少的一环。Hive 数仓有很高的成熟度和稳定性,但因为它是离线的,延时很大。在一些对延时要求比拟高的场景,须要另外搭建基于 Flink 的实时数仓,将链路延时升高到秒级。然而一套离线数仓加一套实时数仓的架构会带来超过两倍的资源耗费,甚至导致反复开发。
想要搭建流式链路就必须得摈弃现有的 Hive 数仓吗?并不是,借助 Flink 能够实现已有的 Hive 离线数仓准实时化。本文整顿自 Apache Flink Committer、阿里巴巴技术专家李劲松的分享,文章将剖析以后离线数仓实时化的难点,详解 Flink 如何解决 Hive 流批一体准实时数仓的难题,实现更高效、正当的资源配置。文章纲要如下:
- 离线数仓实时化的难点
- Flink 在流批一体的摸索
- 构建流批一体准实时数仓利用实际
离线数仓实时化的难点
离线数仓
上图是一个典型的离线数仓,假如当初公司有一个需要,目前公司的数据量很大,须要每天出一个报表且输入到业务数据库中。首先是刚入库的业务数据,大抵分为两种,一种是 MySQL 的 binlog,另外一种是业务零碎中的业务打点,这个日志打点信息能够通过 Flume 等工具去采集,再离线入库到数仓中。而后随着业务越来越多,业务中的各个表能够做一些形象,形象的益处是更好的治理和更高效的数据复用和计算复用。所以数仓就分成了多层 (明细层、中间层、服务层等等),每一层存的是数据表,数据表之间通过 HiveSQL 的计算来实现 ETL 转换。
不止是 HiveSQL,Hive 只是动态的批计算,而业务每天都要出报表,这意味着每天都要进行计算,这种状况下会依赖于调度工具和血统治理:
- 调度工具:依照某个策略把批计算调度起来。
- 血统治理:一个工作是由许多个作业组合而成,可能有非常复杂的表构造档次,整个计算是一个非常复杂的拓扑,作业间的依赖关系非常复杂 (缩小冗余存储和计算,也能够有较好的容错),只有当一级完结后能力进行下一级的计算。
当工作非常宏大的时候,咱们得出后果往往须要很长的一段时间,也就是咱们常说的 T+1,H+1,这就是离线数仓的问题。
第三方工具
下面说过,离线数仓不仅仅是简略的 Hive 计算,它还依赖了其它的第三方工具,比方:
- 应用 Flume 来入库,但存在肯定的问题,首先,它的容错可能无奈保障 Exactly-Once 成果,须要上游再次进行去重操作。其次,自定义逻辑须要通过一些伎俩,比方脚本来管制。第三,离线数仓并不具备良好的扩大能力,当数据剧增时,减少本来的并发数就比拟艰难了。
- 基于调度工具的作业调度会带来级联的计算提早,比方凌晨 1 点开始计算昨天的数据,可能须要到早上 6、7 点能力做完,并且无奈保障在设置的调度工夫内数据能够齐全 ready。此外,级联的计算还会带来简单的血统治理问题,大工作的 Batch 计算可能会忽然打满集群的资源,所以也要求咱们对于负载治理进行考量,这些都会给业务减少累赘。
无论是离线数仓还是第三方工具,其实次要的问题还是“慢”,如何解决慢的问题,此时就该实时数仓出场了。
实时数仓
实时数仓其实是从 Hive+HDFS 的组合换成了 Kafka,ETL 的性能通过 Flink 的流式解决解决。此时就不存在调度和血统治理的问题了,通过实时一直的增量更新,最终输入到业务的 DB 中。
尽管延时升高了,但此时咱们会面临另外一些问题:
- 历史数据失落,因为 Kafka 只是长期的存储介质,数据会有一个超时的工夫 (比方只保留 7 天的数据),这会导致咱们的历史数据失落。
- 老本绝对较高,实时计算的老本要大于离线计算。
Lambda 架构
所以此时很多人就会抉择一套实时一套离线的做法,互不烦扰,依据工作是否须要走实时的需要来对需要进行拆散。
这套架构看似解决了所有问题,但理论带来的问题也是十分多。首先,Lambda 架构造成了离线和实时的割裂问题,它们解决的业务问题都是一样的,然而两套计划让同样的数据源产生了不同的计算结果。不同层级的表构造可能不统一,并且当数据产生不统一的问题时,还须要去进行比对排查。
随着这套 Lambda 架构越走越远,开发团队、表构造表依赖、计算模型等都可能会被割裂开,越到前面越会发现,老本越来越高,而对立的代价越来越大。
那么问题来了,实时数仓会消耗如此大的资源,且还不能保留历史数据,Lambda 架构存在如此多的问题,有什么计划能够解决呢?
数据湖
数据湖领有不少的长处,原子性能够让咱们做到准实时的批流一体,并且反对已有数据的批改操作。然而毕竟数据湖是新一代数仓存储架构,各方面都还不是很完满,目前已有的数据湖都强依赖于 Spark(当然 Flink 也正在拥抱数据湖),将数据迁徙到数据湖须要团队对迁徙老本和人员学习老本进行考量。
如果没有这么大的信心迁徙数据湖,那有没有一个略微弛缓一些的计划减速已有的离线数仓呢?
Flink 在批流一体上的摸索
对立元数据
Flink 始终继续致力于离线和实时的对立,首先是对立元数据。简略来说就是把 Kafka 表的元数据信息存储到 HiveMetaStore 中,做到离线和实时的表 Meta 的对立。(目前开源的实时计算并没有一个较为欠缺的长久化 MetaStore,Hive MetaStore 不仅能保留离线表,也能够承当实时计算的 MetaStore 能力)。
对立计算引擎
同样的元数据之后,实时和离线的表构造和档次能够设计成一样,接下来就是能够共用:
- 同一套 SQL,Flink 本身提供批流一体的 ANSI-SQL 语法,能够大大减小用户 SQL 开发者和运维者的累赘,让用户专一于业务逻辑。
- 同一个引擎,Flink 的流和批复用一套优化和 Runtime 框架,现阶段的大数据引擎还远远达不到齐全稳固的状况,所以依然有很多时候须要咱们去深刻的剖析和优化,一套引擎能够让开发者专一单个技术栈,防止须要接触多个技术栈,而只有技术广度,没有技术深度。
对立数据
剖析了元数据和计算引擎的对立,更进一步,是否能对立实时和离线的数据,防止数据的不统一,防止数据的反复存储和反复计算。ETL 计算是否能对立呢?既然实时表设计上能够和离线表截然不同,是否能够罗唆只有实时表的 ETL 计算,离线表从实时表里获取数据?
并且,通过实时链路能够减速离线链路的数据筹备,批计算能够把调度换成流输出。
Flink Hive/File Streaming Sink 即为解决这个问题,实时 Kafka 表能够实时的同步到对于的离线表中:
- 离线表作为实时的历史数据,填补了实时数仓不存在历史数据的空缺。
- 数据批量准实时摄入为 Ad-hoc 查问离线表提供了准实时输出。
此时离线的批计算也能够交由实时调度,在实时工作解决中某个契机 (Partition Commit 见后续) 自行调度离线那块的工作进行数据同步操作。
此时实时和离线的表曾经根本对立,那么问题来了,Kafka 中的表和 Hive 中的表是否就共用一张表呢?我的想法是之后可能会呈现以下状况,在数仓中定义一张表,别离对应着 Kafka 和 Hive+HDFS 两种物理存储:
- 用户在进行 insert 操作时,就天然插入到了 Kafka 的实时 table 当中,同时生成另外一条链路,主动同步到 Hive Table 当中。这样这一张表就十分的残缺,不仅满足实时的需要,而且领有历史的数据。
- 一个 SQL 读取这样的一个 Hybrid Source,依据你的查问语句前面的 where 条件,主动路由到 Hive 的历史数据,或者是 Kafka 的实时数据。依据肯定的规定先读 Hive 历史数据,再读 Kafka 实时数据,当然这里有一个问题,它们之间通过什么标识来切换呢?一个想法是数据中或者 Kafka 的 Timestamp。
Hive Streaming Sink 的实现
Flink 1.11 前曾经有了 StreamingFileSink,在 1.11 中岂但把它集成到 SQL 中,让这个 Hive Streaming Sink 能够像离线的 Hive SQL 那样,所有的业务逻辑都由 SQL 去解决,而且带来了进一步的增量。
接下来介绍下 Hive/File Streaming Sink,分为两个组件,FileWriter 和 PartitionCommitter:
- FileWriter 组件能够做到分区感知,通过 checkpoint 机制能够保障 Exactly-Once(分布式场景是不牢靠的,须要通过两阶段提交 + 文件 Rename 的幂等性),FileWriter 也提供了 Rolling 相干的参数,这个 Rolling 指的是咱们的流式处理过程,它能够通过两个参数来管制执行频率,file-size 就是每个数据流的大小,rollover-interval 就是时长距离。然而须要留神,checkpoint 不宜设置太频繁,免得产生过多的小文件。
- Partition Committer,通过一系列的业务逻辑解决后失去的 Finished Flies 就间接可用了吗?因为咱们典型的 Hive 表都是分区表,当一个分区就绪后,还须要告诉上游,Partition 曾经解决实现,能够同步到 Hive metastore 中了。咱们须要在适合的机会来无效的 trigger 特定的 Partition commit。Partition committer 总的来说,就是实现了 Hive 分区表的数据及元数据的写入,甚至能够实现告诉调度零碎开始执行之后的 Batch 作业。
因为流式作业是不间断的在运行的,如何设置分区提交的工夫,某个分区什么时候提交它呢?
- 第一种是默认策略 Process time,也就是咱们所说的事件被解决时的以后零碎工夫,然而毛病也比拟显著,可能呈现各种各样的数据不残缺。
- 举荐策略就是 partition-time,这种策略能够做到提交时的语义明确且数据残缺,partition 字段就是由 event time,也就是事件产生的工夫所失去的。
如果以后工夫 Current time > 分区产生的工夫 + commitDelay 延时,即是能够开始进行分区提交的工夫。一个简略的例子是小时分区,比方以后曾经 12 点过 1 分了,曾经过了 11 点的分区 + 一个小时,所以咱们能够说不会再有 11 点分区的数据过去了,就能够提交 11 点的分区。(要是有 LateEvent 怎么办?所以也要求分区的提交是幂等的。)
接下来介绍分区的提交具体作用,最间接的就是写 SuccessFile 和 Add partition 到 Hive metastore。
Flink 内置反对了 Hive-MetaStore 和 SuccessFile,只有配置 ”sink.partition-commit.policy.kind” 为 “metastore,success-file”,即可做到在 commit 分区的时候主动 add 分区到 Hive 中,而且写 SuccessFile,当 add 操作实现的时候,这个 partition 才真正的对 Hive 可见。
Custom 机制容许自定义一个 Partition Commit Policy 的类,实现这个类能够做到在这个分区的工作解决实现后:比方触发上游的调度、Statistic Analysis、又或者触发 Hive 的小文件合并。(当然触发 Hive 的小文件合并岂但须要启动另一个作业,而且做不到一致性保障,后续 Flink 也会有进一步的摸索,在 Flink 作业中,被动实现小文件的合并)。
实时生产
不止是准实时的数据摄入,Flink 也带来了维表关联 Hive 表和流实时生产 Hive 表。
咱们晓得 Flink 是反对维表关联查问 MySQL 和 HBase 的,在计算中保护一个 LRU 的缓存,未命中查问 MySQL 或 HBase。然而没有 Lookup 的能力怎么办呢?数据个别是放在离线数仓中的,所以业务上咱们个别采纳 Hive Table 定期同步到 HBase 或者 MySQL。Flink 也能够容许间接维表关联 Hive 表,目前的实现很简略,须要在每个并发中全量 Load Hive 表的所有数据,只能针对小表的关联。
传统的 Hive Table 只反对依照批的形式进行读取计算,然而咱们当初能够应用流的形式来监控 Hive 外面的分区 / 文件生成,也就是每一条数据过去,都能够实时的进行生产计算,它也是齐全复用 Flink Streaming SQL 的形式,能够和 HBase、MySQL、Hive Table 进行 Join 操作,最初再通过 FileWriter 实时写入到 Hive Table 中。
构建流批一体准实时数仓利用实际
案例如下:通过 Flume 采集日志打点 Logs,计算各年龄层的 PV,此时咱们存在两条链路:
- 一条是实时链路,通过输出拜访日志,关联 Hive 的 User 表来计算出所须要的后果到业务 DB 中。
- 而另一条则是离线链路,咱们须要 Hive 提供小时分区表,来实现对历史数据的 Ad-hoc 查问。
这里就是咱们刚刚提到的,尽管是对应两个 database:realtime_db 和 offline_db,然而它们共用一份元数据。
对于 Hive 表咱们能够通过 Flink SQL 提供的 Hive dialect 语法,而后通过 Hive 的 DDL 语法来在 Flink 中创立 Hive 表,这里设置 PARTITION BY 天和小时,是与实时链路的不同之处,因为实时链路是没有分区概念的。
如何在表构造里防止分区引起的 Schema 差别?一个能够解决的计划是思考引入 Hidden Partition 的定义,Partition 的字段能够是某个字段的 Computed Column,这也能够与理论常见的状况做比照,如天或小时是由工夫字段计算出的,之后是上面的三个参数:
- sink.partition-commit.trigger,指定什么时候进行 partition 的 commit,这里设置了 partition-time,用于保障 exactly-once;
- partition.time-extractor.timestamp-pattern,怎么从 partition 中提取工夫,相当于设置了一个提取格局;
- sink.partition-commit.policy.kind,既 partition commit 所要进行的操作,也就是刚刚提到的 metastore,success-file。
之后设置回默认的 Flink dialect,创立 Kafka 的实时表,通过 insert into 将 Kafka 中的数据同步到 Hive 之中。
这部分是对于 Kafka 中的表如何通过 Dim join 的形式,拿到 User 表的年龄字段。图中须要关怀的是 lookup.join.cache.ttl 这个参数,咱们会将 user 这张表用相似于 broadcast 的形式,播送到每一个 task 中,然而这个过程中可能呈现 Hive 中的 table 存在更新操作,这里的 1h 就阐明,数据有效期仅为 1 小时。创立 view 的目标是将 Dim join 所须要的 process time 加上(Dim Join 须要定义 Process time 是个不太天然的过程,后续也在思考如何在不毁坏 SQL 语义的同时,简化 DimJoin 的语法。)
通过实时 Pipeline 的伎俩生产 Hive Table,而不是通过调度或者以往手动触发的 batch 作业,第一个参数 streaming-source.enable,关上流解决机制,而后应用 start-offset 参数指定从哪个分区 / 文件开始生产。此时,整个流批一体准实时数仓利用根本算是实现啦。
将来布局
Hive 作为分区级别治理的 Table Format 在一些不便有比拟大的限度,如果是新型的 Table Format 比方 Iceberg 会有更好的反对,将来 Flink 会在上面几个方面增强:
- Flink Hive/File Streaming Sink 的 Auto Compaction(Merging) 能力,小文件是实时的最大妨碍之一。
- Flink 拥抱 Iceberg,目前在社区中曾经开发结束 Iceberg Sink,Iceberg Source 正在推动中,能够看见在不远的未来,能够间接将 Iceberg 当做一个音讯队列,且,它保留了所有的历史数据,达到真正的流批对立。
- 加强 Flink Batch 的 Shuffle,目前齐全的 Hash Shuffle 带来了很多问题,比方小文件、随机 IO、Buffer 治理带来的 OOM,后续开源 Flink (1.12) 会加强力量引入 SortedShuffle 以及 ShuffleService。
- Flink Batch BoundedStream 反对,旧的 Dataset API 曾经不能满足流批对立的架构,社区 (1.12) 会在 DataStream 上提供 Batch 计算的能力。
作者介绍:
李劲松,花名之信,阿里巴巴技术专家,Apache Flink Committer。2014 年起专一于阿里外部 Galaxy 流计算框架;2017 年起开始 Flink 研发,次要专一于 Batch 计算、数据结构与类型。