关于flink:Hive-SQL-on-Flink-构建流批一体引擎

53次阅读

共计 6258 个字符,预计需要花费 16 分钟才能阅读完成。

摘要:本文整顿自阿里巴巴开发工程师罗宇侠 & 方盛凯,在 Flink Forward Asia 2022 流批一体专场的分享。本篇内容次要分为五个局部:

  1. 构建流批一体引擎的挑战
  2. Hive SQL on Flink
  3. 流批一体引擎的收益
  4. Demo
  5. 将来瞻望

点击查看原文视频 & 演讲 PPT

一、构建流批一体引擎的挑战

目前,流和批依然是绝对割裂的。尽管咱们在应用层对立了,但从接入层开始,不同的引擎仍旧有不同的接入层、API 层、执行层。咱们认为,对立的流批一体引擎应该是从接入层开始应用 SQL Gateway 作为接入层。在 API 层应用 Flink SQL 作为编写作业的次要语言,在执行层替换成对立的 Runtime。

为了达成对立的流体引擎,咱们认为有以下两个难点:

  • 应用层的对接。在流批割裂的环境下,应用层依然是有不同的提交平台,如何保障原来的应用层能无损且间接地对接到新的 SQL Gateway 上,是一个微小的难点。
  • 用户作业迁徙的老本。用户原来的 Batch 作业是用 Hive SQL 进行撰写的,当初则须要替换成 Flink SQL。为了保障用户的作业能无损迁上来,咱们须要解决语言上的兼容和用户所用的 UDF 的兼容。

为此咱们围绕以下两点在 Flink 1.16 上做了大量改良,保障了 Hive SQL on Flink 构建流批一体引擎是可行的。

  • Flink 对 Hive SQL 的兼容,咱们在 1.16 中大大晋升了对 Hive SQL 自身的兼容性。
  • 咱们在 Flink 社区引入了 SQL Gateway,从而兼容 Hive 的生态。

二、Hive SQL on Flink

接下来我来讲一下 Flink 社区具体做的一些工作来使得基于 Hive SQL on Flink 构建流批一体引擎成为可能。

在这一方面,Flink 社区通过多个版本的打磨,做了大量的工作使得基于 Hive SQL on Flink 构建流批一体引擎可能在生产中可用。

2.1 Hive SQL on Flink 的具体工作

第一,集成 Hive MetaStore。家喻户晓,在大数据畛域,Hive MetaStore 曾经是事实的元数据管理规范了,所以 Flink 在很早的版本就曾经开始集成 Hive MetaStore。次要分为以下三方面的反对:

  • 反对 Hive MetaStore 作为 Flink 的 Catalog,Hive 已有的表可主动注册进 Flink 中,用户无需再定义各种 DDL 来映射底层的 Hive 表。
  • 反对 Hive MetaStore 存储 Flink 定义的 Hive 表 / 非 Hive 表。
  • 反对从 Hive MetaStore 取得表的统计信息,从而优化查问的执行打算,晋升端到端 SQL 的性能。

第二,集成 Hive 的 UDF。次要反对以下两方面:

  • Hive 提供了十分丰盛的 UDF,在 Flink 中咱们能够间接调用 Hive 中内置的 UDF。换句话说,用户应用 Flink 就能享受到 Hive 那套内置 UDF 所带来的不便及易用性。
  • 反对调用自定义的 Hive UDF。对于相熟 Hive 的人,他们会基于 Hive UDF 的接口去定义本人的 UDF。但如果他们想用 Flink,又不想废除那些 UDF,更不想重写。要怎么办呢?其实 Flink 反对调用用户自定义的 Hive UDF,所以用户不须要对 UDF 做任何重写的工作,这极大的不便了用户的操作。

第三,Hive 表的读写。次要反对以下几方面:

  • 反对流读 / 批读 / 流写 / 批写 Hive 表。
  • 批读 Hive 表反对动态分区裁剪和动静分区裁剪。能够大幅削减读取数据的规模,从而晋升读的性能和效率。
  • 批读 Hive 表反对并发推断。在批场景下,并发设置是一个比拟难的问题,但如果在批读 Hive 场景下,咱们能够通过 Hive 表的文件信息推断出正当的并发,从而晋升端到端链路的性能。
  • 批写 / 流写 Hive 反对自定义分区提交策略。在批调度链路里,咱们可能会把先提交分区,而后触发一些其余上游的操作或调度,这时咱们无需引入其余额定的组件,间接在 Flink 里自定义这些分区提交的策略即可。比方指定分区提交后,触发定时工作或者在音讯队列插一条数据等等。
  • 流写 Hive 表反对小文件主动合并。在流的场景下,会生成很多小文件,但在流写 Hive 表时,咱们反对小文件的主动合并,通过将小文件合并成更大的文件,缩小了小文件的数量,从而缓解 HDFS 集群的压力。
  • 批写 Hive 表反对主动收集统计信息,这一部分齐全兼容了 Hive 的行为。在应用 Hive 写 Hive 表的时候,它会收集统计信息并提交到 MetaStore。咱们用 Flink 写 Hive 表的时候,也能反对将统计信息提交到 MetaStore,包含文件的大小、数据的条数等等。

2.2 Flink 兼容 Hive SQL 的架构

用户的 Hive SQL 如何在 Flink 中顺滑地运行?上图是 Flink 兼容 Hive SQL 的架构,能够看到,它被分成了两个不同的分支,Flink SQL 和 Hive SQL。而后它们会由不同的 Parser 去做解析,Flink SQL 通过 Flink Parser 做解析,Hive SQL 通过 Hive Parser 做解析,Hive Parser 的行为和 Hive 的行为保持一致。

接下来它们都会生成 Flink 里的 Logical Plan,Logical Plan 进行优化,生成 Physical Plan,Physical Plan 再进行翻译,生成具体的 Job Graph,最初交由 Flink Runtime 执行。

基于这套架构,咱们能够很不便地扩大 Flink 来提供对其余语法的反对。另外通过这套架构,咱们实践上还能达到对 Hive 语法的百分之百兼容。

2.3 Flink 对 Hive SQL 的兼容

接下来讲一下咱们最初达到了怎么的成果。

第一,反对生产上罕用的 Hive 语法。即生产上的作业可能很好地迁徙到咱们的 Flink 中执行。次要反对以下语法:

  • 反对 distribute by/sort by/ cluster by。
  • 反对 multi insert。一个 scan 能够插入到多个不同数据的 sink 端,极大的进步了数据 ETL 链路的效率。
  • 反对 insert directory。
  • 反对 load data。
  • 反对 create function using jar。
  • ……

那么咱们到底对 Hive SQL 的兼容度能达到多少呢?答案是 94% 了。这个数字又是怎么得进去的呢?

  • 基于 Hive 2.3 的 qtest 测试集,12000 条 DQL/DML 都扔到 Flink 去执行,这些 SQL 都可能被失常执行。
  • 12000 条 DQL/DML 也蕴含了很多对 ACID 表的查问。Hive 的 ACID 表在生产中用的较少,如果咱们除去针对 ACID 表的 DQL/DML,兼容度可达 97%。

2.4 Flink 对 Hive 生态的兼容

如上图所示,之前的内容讲的是 API 层、执行层曾经对立了。那么咱们如何在接入层也把它对立掉呢?就引出咱们接下来要分享的 Flink SQL Gateway 了。

2.5 引入 Flink SQL Gateway 的起因

咱们为什么引入 Flink SQL Gateway 呢?起因有以下三个:

  • 目前 Flink 社区官网提供了 SQL Client 供用户提交 SQL 作业。但因为 SQL Client 自身没有服务化,用户往往须要基于 SQL Client 做一层封装,增加一个服务化的前端。通过该服务化的前端,用户的 SQL 作业最终会被提交给 SQL Client 去执行。以上的过程比拟繁琐而且开发成本较大,因而,咱们在社区提供了一个默认的服务化的实现,升高用户的应用老本。
  • 以上的计划是基于 SQL Client 来做的作业提交,但这套 API 并不稳固。而引入的 SQL Gateway 则提供了稳固的 API。
  • 相比于 SQL Client, SQL Gateway 是 C/S 架构,更容易对接诸多生态,e.g. HiveServer2。

基于以上的考量,Flink 社区引入了 Flink SQL Gateway。它有以下特点:

  • 开箱即用,用户能够间接应用 SQL Gateway 搭建一个生产可用的提交工具。
  • 生态对接,提供了稳固的 API,不便 Flink 对接其它生态工具。
  • 兼容 HiveServer2 协定,提供了 HiveServer2 Endpoint 以兼容 Hive 生态。

2.6 Flink SQL Gateway 架构

上图是 Flink SQL Gateway 的架构图,能够分成前端和后端。

后端提供了多租户能力,能够对接不同的集群,包含 Flink Standalone,Flink On Yarn 等。另外,它反对用户自定义的 Catalog,能够用默认的 Catalog,也能够用 MySQL Catalog、Hive Catalog。

SQL Gateway 目前提供了两个 Endpoint,别离是 REST Endpoint 和 HiveServer2 Endpoint。

  • REST Endpoint:用户能够通过 REST 工具提交作业。
  • HiveServer2 Endpoint:通过它咱们就能提供对接 Hive 支流生态的能力。

从上图左侧能够看到目前一些 Hive 的生态工具,包含 Beeline、DBeaver、DolphinScheduler、Superset、Apache Zeppelin 等,都能很好的对接到 Flink SQL Gateway 上。

2.7 HiveServer2 Endpoint

下面提到 HiveServer2 Endpoint 在兼容 Hive 生态的重要性,接下来让咱们一起来看一下它的具体架构。从上图能够看到次要分为两层,Client 端和 Server 端。HiveServer2 实际上是定义了 Client 端和 Server 端的一套通信协议,如果要兼容 HiveServer2,咱们只有实现 HiveServer2 定义的这套协定即可。通过兼容 HiveServer2 协定,咱们能够在不批改 Client 的状况下,将申请调用都转发到 Flink SQL Gateway,并在 Flink 集群执行。

上图出现的是 HiveServer2 和 HiveServer2 Endpoint 的对应关系。

HiveServer2 提供了直连 MetaStore 的能力,能够应用 Hive SQL,底层是批处理引擎,包含 MapReduce 或者 Spark 等。

HiveServer2 Endpoint 内置了 Hive Catalog,其实就是 Hive MetaStore。同时它也应用 Hive 语法,底层也是批处理引擎,即 Flink Batch 引擎。

讲了这么多对于 Hive 兼容的内容,最初咱们能达到什么样的成果呢?

下面的图咱们从上往下看。通常,用户的 SQL 脚本通过 Apache Zeppelin、Beeline 等客户端提交作业,而后通过 Hive 的 JDBC 提交到 HiveServer2 中,再交由底下具体的引擎来执行。

基于上述介绍的 Flink 对兼容 Hive 所做的工作,咱们只须要将引擎层改成 Flink 将能够作业间接迁徙到 Flink 上,从而达到了一个十分平滑且无缝迁徙的过程。

三、流批一体引擎的收益

3.1 Hive SQL on Flink 构建流批一体引擎

基于 Hive SQL on Flink 构建了流批一体引擎,咱们取得了以下收益:

  • 第一,对立流批引擎。升高保护老本,晋升研发的效率。因为咱们当初就一套引擎了,所以保护老本会非常低。
  • 第二,流批一体数仓。咱们通过流批一体引擎构建出了流批一体 SQL 层。借此,咱们能够把流批一体的存储思考进来,构建残缺的流批一体数仓架构。
  • 第三,Hive SQL 实时化。目前 Hive SQL 次要还是跑在批引擎上,每天做一次调度,产生后果。如果把 Hive SQL 迁徙到 Flink 中,咱们就能够很不便的将它实时化革新。只有把引擎模式设置成流模式,就能够将其实时化,数仓实时化革新的老本非常低。
  • 第四,OLAP & 联邦查问。咱们能够基于 Flink + Hive SQL 搭建 OLAP 零碎。借助 Flink 对各种数据源的反对,以及对 Hive SQL 略微进行扩大就能够实现联邦查问。

3.2 基于 Hive 语法进行联邦查问

联邦查问是指,查不同数据源的数据,再写到不同的数据存储中。Hive 自身尽管通过 storage handler 提供了查问不同数据源的数据,比方 MySQL、Hbase 等,但相对来说还是比较复杂和不太欠缺。所以 Flink 就对 Hive 语法进行了扩大,使其它能够很不便的进行联邦查问。

首先咱们看一下上图两头这条十分典型的 Hive SQL,它将几个表 join 一下,distribute by 再写到上游。留神看一下红色字,就是须要咱们额定革新的内容,革新的老本非常低,只有在 Table 后面加上 Catalog 的那么就能读到不同 Catalog 的数据。比如说咱们注册一个 PG Catalog,间接把 PG Catalog 的名字加到这个表的后面,咱们就能读到 PG Catalog 的数据。

基于这样一层革新和扩大,咱们就能应用 Hive 语法查到不同数据源的数据,再写到不同的数据存储。

四、Demo

在传统的 Lambda 架构中,咱们往往会有两条不同的 pipeline:

  • 实时的 pipeline,咱们往往通过 Flink 将 Kafka 的数据进行打宽聚合写入上游,并通过 Flink 写入 HDFS 的最终表。
  • 离线的 pipeline,咱们则能够通过周期性地调度 Flink 作业将数据写入到 HDFS 中。为了保持数据的正确性,在 Lambda 架构之中往往通过将批的后果回刷到 HDFS 中,保证数据的正确性。

当批作业回刷完结后,用户能够通过应用层剖析最终表的后果,进行实时大屏地展现,做相干的数据利用以及剖析数据之中潜在的趋势。

明天,咱们则聚焦在数据回刷这一层,演示如何通过 Hive on Flink 构建流批一体引擎。

咱们在 Kafka 中提前灌注了一些实时订单表,在 HDFS 中灌注了之前曾经有的一些历史订单表。

实时链路中,咱们通过 Window 语义,按天级别将统计信息间接灌入 HDFS 中,实时地获取以后的销售量。而离线链路中,咱们则通过 agg 语法能够在第二天凌晨汇总当天的订单信息。通过数据回刷,咱们就能够失去对立的每日销售额统计。

上面用 Zeppelin 演示 Hive SQL on Flink 构建流批一体数仓的 Demo。

demo 演示:https://www.bilibili.com/video/BV1GM411N7bK/

五、将来瞻望

以下是咱们将来的一些布局:

在流批一体方面,尽管咱们在这个版本曾经做了极大的致力,但存储层依然是不对立。比方在流上咱们仍旧应用 Kafka 作为两头后果的寄存,在批上咱们更偏向于应用 HDFS,因而存储层对立也是至关重要的。另外,Batch 的用户当初更偏向应用 Hive SQL 写作业,但咱们更心愿他们能将 Batch 作业全副迁徙至 Flink Batch SQL 中来。所以,将来咱们将一直晋升 Batch SQL 的功能性。

在 Hive 的集成方面,次要分为以下 3 点:

  • 优化读各种格局的文件,包含对读 Parquet 文件的嵌套列 PushDown、FilterPushDown 的优化等,从而晋升性能。
  • 晋升写 Hive 端到端的生产可用性。比方,批模式下解决小文件多的问题。
  • 依据用户的反馈不断加强 Hive 的语法反对。

在 Flink SQL Gateway 方面,它仍旧处于起步的状态。咱们将从以下三个方面来欠缺它:

  • SQL Client 反对向 SQL Gateway 提交 SQL,保障性能完整性。
  • 补全认证性能,保障 SQL Gateway 根本生产可用。
  • 基于 SQL Gateway 对接更多生态工具,加强 SQL Gateway 的利用范畴。

点击查看原文视频 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

正文完
 0