导读:Flink 从 1.9.0 开始提供与 Hive 集成的性能,随着几个版本的迭代,在最新的 Flink 1.11 中,与 Hive 集成的性能进一步深入,并且开始尝试将流计算场景与 Hive 进行整合。
本文次要分享在 Flink 1.11 中对接 Hive 的新个性,以及如何利用 Flink 对 Hive 数仓进行实时化革新,从而实现批流一体的指标。次要内容包含:
· Flink 与 Hive 集成的背景介绍
· Flink 1.11 中的新个性
· 打造 Hive 批流一体数仓
一、Flink 与 Hive 集成背景
为什么要做 Flink 和 Hive 集成的性能呢?最早的初衷是咱们心愿开掘 Flink 在批处理方面的能力。家喻户晓,Flink 在流计算方面曾经是胜利的引擎了,应用的用户也十分多。在 Flink 的设计理念当中,批计算是流解决中的一个特例。也就意味着,如果 Flink 在流计算方面做好,其实它的架构也能很好的反对批计算的场景。在批计算的场景中,SQL 是一个很重要的切入点。因为做数据分析的同学,他们更习惯应用 SQL 进行开发,而不是去写 DataStream 或者 DataSet 这样的程序。
Hadoop 生态圈的 SQL 引擎,Hive 是一个事实上的规范。大部分的用户环境中都会应用到了 Hive 的一些性能,来搭建数仓。一些比拟新的 SQL 的引擎,例如 Spark SQL、Impala,它们其实都提供了与 Hive 集成的能力。为了不便的可能对接上目前用户已有的应用场景,所以咱们认为对 Flink 而言,对接 Hive 也是不可短少的性能。
因而,咱们在 Flink 1.9 当中,就开始提供了与 Hive 集成的性能。当然在 1.9 版本外面,这个性能是作为试用版公布的。到了 Flink 1.10 版本, 与 Hive 集成的性能就达到了生产可用。同时在 Flink 1.10 公布的时候,咱们用 10TB 的 TPC-DS 测试集,对 Flink 和 Hive on MapReduce 进行了比照,比照后果如下:
蓝色的方框示意 Flink 用的工夫,桔红色的方框示意 Hive on MapReduce 用的工夫。最终的后果是 Flink 对于 Hive on MapReduce 大略晋升了 7 倍左右的性能。所以验证了 Flink SQL 能够很好的反对批计算的场景。
接下来介绍下 Flink 对接 Hive 的设计架构。对接 Hive 的时候须要几个层面,别离是:
· 可能拜访 Hive 的元数据;
· 读写 Hive 表数据;
· Production Ready;
1. 拜访 Hive 元数据
应用过 Hive 的同学应该都晓得,Hive 的元数据是通过 Hive Metastore 来治理的。所以意味着 Flink 须要买通与 Hive Metastore 的通信。为了更好的拜访 Hive 元数据,在 Flink 这边是提出了一套全新设计的 Catalog API。
这个全新的接口是一个通用化的设计。它并不只是为了对接 Hive 元数据,实践上是它能够对接不同内部零碎的元数据。
而且在一个 Flink Session 当中,是能够创立多个 Catalog,每一个 Catalog 对应于一个内部零碎。用户能够在 Flink Table API 或者如果应用的是 SQL Client 的话,能够在 Yaml 文件里指定定义哪些 Catalog。而后在 SQL Client 创立 TableEnvironment 的时候,就会把这些 Catalog 加载起来。TableEnvironment 通过 CatalogManager 来治理这些不同的 Catalog 的实例。这样 SQL Client 在后续的提交 SQL 语句的过程中,就能够应用这些 Catalog 去拜访内部零碎的元数据了。
下面这张图里列出了 2 个 Catalog 的实现。一个是 GenericlnMemoryCatalog,把所有的元数据都保留在 Flink Client 端的内存里。它的行为是相似于 Catalog 接口呈现之前 Flink 的行为。也就是所有的元数据的生命周期跟 SQL Client 的 Session 周期是一样的。当 Session 完结,在 Session 外面创立的元数据也就主动的失落了。
另一个是对接 Hive 着重介绍的 HiveCatalog。HiveCatalog 背地对接的是 Hive Metastore 的实例,要与 Hive Metastore 进行通信来做元数据的读写。为了反对多个版本的 Hive,不同版本的 Hive Metastore 的 API 可能存在不兼容。所以在 HiveCatalog 和 Hive Metastore 之间又加了一个 HiveShim,通过 HiveShim 能够反对不同版本的 Hive。
这里的 HiveCatalog 一方面能够让 Flink 去拜访 Hive 本身有的元数据,另一方面它也为 Flink 提供了长久化元数据的能力。也就是 HiveCatalog 既能够用来存储 Hive 的元数据,也能够存 Flink 应用的元数据。例如,在 Flink 中创立一张 Kafka 的表,那么这张表也是能够存到 HiveCatalog 里的。这样也就是为 Flink 提供了长久化元数据的能力。在没有 HiveCatalog 之前,是没有长久化能力的。
2. 读写 Hive 表数据
有了拜访 Hive 元数据的能力后,另一个重要的方面是读写 Hive 表数据。Hive 的表是存在 Hadoop 的 file system 里的,这个 file system 是一个 HDFS,也可能是其余文件系统。只有是实现了 Hadoop 的 file system 接口的,实践上都能够存储 Hive 的表。
在 Flink 当中:
· 读数据时实现了 HiveTableSource
· 写数据时实现了 HiveTableSink
而且设计的一个准则是:心愿尽可能去复用 Hive 原有的 Input/Output Format、SerDe 等,来读写 Hive 的数据。这样做的益处次要是 2 点,一个是复用能够缩小开发的工作量。另一个是复用益处是尽可能与 Hive 保障写入数据的兼容性。指标是 Flink 写入的数据,Hive 必须能够失常的读取。反之,Hive 写入的数据,Flink 也能够失常读取。
3. Production Ready
在 Flink 1.10 中,对接 Hive 的性能曾经实现了 Production Ready。实现 Production Ready 次要是认为在性能上曾经齐备了。具体实现的性能如下:
二、Flink 1.11 中的新个性
上面将介绍下,在 Flink 1.11 版本中,对接 Hive 的一些新个性。
1. 简化的依赖治理
首先做的是简化应用 Hive connector 的依赖治理。Hive connector 的一个痛点是须要增加若干个 jar 包的依赖,而且应用的 Hive 版本的不同,所需增加的 jar 包就不同。例如下图:
第一张图是应用的 Hive 1.0.0 版本须要增加的 jar 包。第二张图是用 Hive 2.2.0 版本须要增加的 jar 包。能够看出,不论是从 jar 包的个数、版本等,不同 Hive 版本增加的 jar 包是不一样的。所以如果不认真去读文档的话,就很容易导致用户增加的依赖谬误。一旦增加谬误,例如增加少了或者版本不对,那么会报进去一些比拟奇怪、难了解的谬误。这也是用户在应用 Hive connector 时裸露最多的问题之一。
所以咱们心愿能简化依赖治理,给用户提供更好的体验。具体的做法是,在 Flink 1.11 版本中开始,会提供一些事后打好的 Hive 依赖包:
用户能够依据本人的 Hive 版本,抉择对应的依赖包就能够了。
如果用户应用的 Hive 并不是开源版本的 Hive,用户还是能够应用 1.10 那种形式,去本人增加单个 jar 包。
2. Hive Dialect 的加强
在 Flink 1.10 就引入了 Hive Dialect,然而很少有人应用,因为这个版本的 Hive Dialect 性能比拟弱。仅仅的一个性能是:是否容许创立分区表的开关。就是如果设置了 Hive Dialect,那就能够在 Flink SQL 中创立分区表。如果没设置,则不容许创立。
另一个要害的是它不提供 Hive 语法的兼容。如果设置了 Hive Dialect 并能够创立分区表,然而创立分区表的 DDL 并不是 Hive 的语法。
在 Flink 1.11 中着重对 Hive Dialect 的性能进行了加强。加强的指标是:心愿用户在应用 Flink SQL Client 的时候,可能取得与应用 Hive CLI 或 Beeline 近似的应用体验。就是在应用 Flink SQL Client 中,能够去写一些 Hive 特定的一些语法。或者说用户在迁徙至 Flink 的时候,Hive 的脚本能够齐全不必批改。
为了实现上述指标,在 Flink 1.11 中做了如下改良:
· 给 Dialect 做了参数化,目前参数反对 default 和 hive 两种值。default 是 Flink 本身的 Dialect,hive 是 Hive 的 Dialect。
· SQL Client 和 API 均能够应用。
· 能够灵便的做动静切换,切换是语句级别的。例如 Session 创立后,第一个语句想用 Flink 的 Dialect 来写,就设置成 default。在执行了几行语句后,想用 Hive 的 Dialect 来写,就能够设置成 hive。在切换时,就不须要重启 Session。
· 兼容 Hive 罕用 DDL 以及根底的 DML。
· 提供与 Hive CLI 或 Beeline 近似的应用体验。
3. 开启 Hive Dialect
上图是在 SQL Client 中开启 Hive Dialect 的办法。在 SQL Client 中能够设置初始的 Dialect。能够在 Yaml 文件里设置,也能够在 SQL Client 起来后,进行动静的切换。
还能够通过 Flink Table API 的形式开启 Hive Dialect:
能够看到通过 TableEnvironment 去获取 Config 而后设置开启。
4. Hive Dialect 反对的语法
Hive Dialect 的语法次要是在 DDL 方面进行了加强。因为在 1.10 中通过 Flink SQL 写 DDL 去操作 Hive 的元数据不是非常可用,所以要解决这个痛点,将次要精力集中在 DDL 方向了。
目前所反对的 DDL 如下:
5. 流式数据写入 Hive
在 Flink 1.11 中还做了流式数据场景,以及跟 Hive 相结合的性能,通过 Flink 与 Hive 的联合,来帮忙 Hive 数仓进行实时化的革新。
流式数据写入 Hive 是借助 Streaming File Sink 实现的,它是齐全 SQL 化的, 不须要用户进行代码开发。流式数据写入 Hive 也反对分区和非分区表。Hive 数仓个别都是离线数据,用户对数据一致性要求比拟高,所以反对 Exactly-Once 语义。流式数据写 Hive 大略有 5-10 分钟级别的提早。如果心愿提早尽可能的低,那么产生的一个后果就是会生成更多的小文件。小文件对 HDFS 来说是不敌对的,小文件多了当前,会影响 HDFS 的性能。这种状况下能够做一些小文的合并操作。
流式数据写入 Hive 须要有几个配置的中央:
对于分区表来说,要设置 Partition Commit Delay 的参数。这个参数的意义就是管制每个分区蕴含多长时间的数据,例如可设置成天、小时等。
Partition Commit Trigger 示意 Partition Commit 什么时候触发,在 1.11 版本中反对 Process-time 和 Partition-time 触发机制。
Partition Commit Policy 示意用什么形式提交分区。对于 Hive 来说,是须要将分区提交到 metastore, 这样分区才是可见的。metastore 策略只反对 Hive 表。还有一个是 success-file 形式,success-file 是通知上游的作业分区的数据曾经筹备好了。用户也能够自定义,本人去实现一个提交形式。另外 Policy 能够指定多个的,例如能够同时指定 metastore 和 success-file。
上面看下流式数据写入 Hive 的实现原理:
次要是两个局部,一个是 StreamingFileWriter,借助它实现数据的写入,它会辨别 Bucket,这里的 Buck 相似 Hive 的分区概念,每个 Subtask 都会往不同的 Bucket 去写数据。每个 Subtask 写的 Bucket 同一个工夫可能会维持 3 种文件,In-progress Files 示意正在写的文件,Pending Files 示意文件曾经写完了然而还没有提交,Finished Files 示意文件曾经写完并且也曾经提交了。
另一个是 StreamingFileCommitter,在 StreamingFileWriter 后执行。它是用来提交分区的,所以对于非分区表就不须要它了。当 StreamingFileWriter 的一个分区数据筹备好后,StreamingFileWriter 会向 StreamingFileCommitter 发一个 Commit Message,Commit Message 通知 StreamingFileCommitter 那些数据曾经筹备好了的。而后进行提交的触发 Commit Trigger,以及提交形式 Commit Policy。
上面是一个具体的例子:
例子中创立了一个叫 hive_table 的分区表,它有两个分区 dt 和 hour。dt 代表的是日期的字符串,hour 代表小时的字符串。Commit trigger 设置的是 partition-time,Commit delay 设置的是 1 小时,Commit Policy 设置的是 metastore 和 success-file。
6. 流式生产 Hive
在 Flink 1.10 中读 Hive 数据的形式是批的形式去读的,从 1.11 版本中,提供了流式的去读 Hive 数据。
通过一直的监控 Hive 数据表有没有新数据,有的话就进行增量数据的生产。
如果要针对某一张 Hive 表开启流式生产,能够在 table property 中开启,或者也能够应用在 1.11 中新加的 dynamic options 性能,能够查问的时候动静的指定 Hive 表是否须要关上流式读取。
流式生产 Hive 反对分区表和非分区表。对于非分区表会监控表目录下新文件增加,并增量读取。对于分区表通过监控分区目录和 Metastore 的形式确认是否有新分区增加,如果有新增分区,就会把新增分区数据读取进去。这里须要留神,读新增分区数据是一次性的。也就是新减少分区后,会把这个分区数据一次性都读出来,在这之后就不再监控这个分区的数据了。所以如果须要用 Flink 流式生产 Hive 的分区表,那应该保障分区在增加的时候它的数据是残缺的。
流式生产 Hive 数据也须要额定的指定一些参数。首先要指定生产程序,因为数据是增量读取,所以须要指定要用什么程序生产数据,目前反对两种生产程序 create-time 和 partition-time。
用户还能够指定生产终点,相似于生产 kafka 指定 offset 这样的性能,心愿从哪个工夫点的数据开始生产。Flink 去生产数据的时候,就会查看并只会读取这个工夫点之后的数据。
最初还能够指定监控的距离。因为目前监控新数据的增加都是要扫描文件系统的,可能你心愿监控的不要太频繁,太频繁会给文件系统造成比拟大的压力。所以能够管制一个距离。
最初看下流式生产的原理。先看流式生产非分区表:
图中 ContinuoousFileMonitoringFunction 会一直监控非分区表目录上面的文件,会一直的跟文件系统进行交互。一旦发现有新的文件增加了,就会对这些文件生成 Splits,并将 Splits 传到 ContinuoousFileReaderOperator,FileReaderOperator 拿到 Splits 后就会到文件系统中理论的生产这些数据,而后把读出来的数据再传往上游解决。
对于流式生产分区表和非分区表区别不是很大,其中 HiveContinuousMonitoringFunction 也会去一直的扫描文件系统,然而它扫描的是新增分区的目录。当它发现有新增的分区目录后,会进一步到 metstore 中做核查,查看是否这个分区曾经提交到 metstore 中。如果曾经提交,那就能够生产分区中的数据了。而后会把分区中的数据生成 Splits 传给 ContinuousFileReaderOperator,而后就能够对数据进行生产了。
7. 关联 Hive 维表
对于 Hive 跟流式数据联合的另一个场景就是:关联 Hive 维表。例如在生产流式数据时,与一张线下的 Hive 维表进行 join。
关联 Hive 维表采纳了 Flink 的 Temporal Table 的语法,就是把 Hive 的维表作为 Temporal Table,而后与流式的表进行 join。想理解更多对于 Temporal Table 的内容,可查看 Flink 的官网。
关联 Hive 维表的实现是每个 sub-task 将 Hive 表缓存在内存中,是缓存整张的 Hive 表。如果 Hive 维表大小超过 sub-task 的可用内存,那么作业会失败。
Hive 维表在关联的时候,Hive 维表可能会产生更新,所以会容许用户设置 hive 表缓存的超时工夫。超过这个工夫后,sub-task 会从新加载 Hive 维表。须要留神,这种场景不适用于 Hive 维表频繁更新的状况,这样会对 HDFS 文件系统造成很大的压力。所以实用于 Hive 维表迟缓更新的状况。缓存超时工夫个别设置的比拟长,个别是小时级别的。
这张图示意的是关联 Hive 维表的原理。Streaming Data 代表流式数据,LookupJoinRunner 示意 Join 算子,它会拿到流式数据的 join key,并把 join key 传给 FileSystemLookupFunction。
FileSystemLookupFunction 是 一个 Table function,它会去跟底层的文件系统交互并加载 Hive 表,而后在 Hive 表中查问 join key,判断哪些行数据是能够 join 的。
上面是关联 Hive 维表的例子:
这是 Flink 官网的一个例子,流式表是 Orders,LatestTates 是 Hive 的维表。
三、Hive 批流一体数仓
通过下面的介绍能够看出,在 Flink 1.11 中,在 Hive 数仓和批流一体的性能是进行了着重的开发。因为 Flink 是一个流解决的引擎,心愿帮用户更好的将批和流联合,让 Hive 数仓实现实时化的革新,让用户更不便的开掘数据的价值。
在 Flink 1.11 之前,Flink 对接 Hive 会做些批处理的计算,并且只反对离线的场景。离线的场景一个问题是提早比拟大,批作业的调度个别都会通过一些调度的框架去调度。这样其实提早会有累加的作用。例如第一个 job 跑完,能力去跑第二个 job… 这样顺次执行。所以端对端的提早就是所有 job 的叠加。
到了 1.11 之后,反对了 Hive 的流式解决的能力,就能够对 Hive 数仓进行一个实时化的革新。
例如 Online 的一些数据,用 Flink 做 ETL,去实时的写入 Hive。当数据写入 Hive 之后,能够进一步接一个新的 Flink job,来做实时的查问或者近似实时的查问,能够很快的返回后果。同时,其余的 Flink job 还能够利用写入 Hive 数仓的数据作为维表,来跟其它线上的数据进行关联整合,来失去剖析的后果。
作者介绍:
李锐,阿里花名 ” 天离 ”,阿里巴巴技术专家,Apache Hive PMC 成员,退出阿里巴巴之前曾就任于 Intel、IBM 等公司,次要参加 Hive、HDFS、Spark 等开源我的项目。