共计 6195 个字符,预计需要花费 16 分钟才能阅读完成。
摘要:本文整顿自 Apache Flink PMC&Committer 伍翀(云邪)在 9 月 24 日 Apache Flink Meetup 的演讲。次要内容包含:
- Hive SQL 迁徙的动机
- Hive SQL 迁徙的挑战
- Hive SQL 迁徙的实际
- Hive SQL 迁徙的演示
- 将来布局
点击查看直播回放 & 演讲 PDF
一、Hive SQL 迁徙的动机
Flink 曾经是流计算的事实标准,以后国内外做实时计算或流计算个别都会抉择 Flink 和 Flink SQL。另外,Flink 也是是妇孺皆知的流批一体大数据计算引擎。
然而,目前 Flink 也面临着挑战。比方尽管当初大规模利用都以流计算为主,但 Flink 批计算的利用并不宽泛,想要进一步推动真正意义上的流批一体落地,须要推动业界更多地落地 Flink 批计算,须要更踊跃地拥抱现有的离线生态。以后业界离线生态次要以 Hive 为主,因而咱们在过来版本中做了很多与 Hive 相干的集成,包含 Hive Catalog、Hive 语法兼容、Hive UDF 兼容、流式写入 Hive 等。在 Flink 1.16 版本中,咱们进一步晋升了 HiveSQL 的兼容度,还反对了 HiveServer2 的协定兼容。
所以,为什么 Flink 要去反对 Hive SQL 的迁徙?一方面,咱们心愿吸引更多的 Hive 离线数仓用户,通过用户来一直打磨批计算引擎,对齐支流批计算引擎。另一方面,通过兼容 Hive SQL,来升高现有离线用户应用 Flink 开发离线业务的门槛。除此之外,另外,生态是开源产品的最大门槛。Flink 曾经领有十分丰盛的实时生态工具,但离线生态仍然较为欠缺。通过兼容 Hive 生态能够疾速融入 Hive 离线生态工具和平台,升高用户接入的老本。最初,这也是实现流批一体的重要一环,咱们心愿推动业界尝试对立的流计算和批计算引擎,再对立流计算和批计算 SQL。
从用户角度来看,Hive SQL 为什么要迁徙到 Flink SQL 上?
对于平台方而言,对立流批计算引擎,只需保护一套 Flink 引擎,能够升高保护老本,晋升团队研发效率。另外,能够利用 Flink + Gateway+ HiveSQL 兼容,疾速建设一套 OLAP 零碎。Flink 的另一劣势是领有丰盛的 connector 生态,能够借助 Flink 丰盛的数据源实现弱小的联邦查问。比方不仅能够在 Hive 数仓里做 ad-hoc 查问,也能够将 Hive 表数据与 MySQL、HBase、Iceberg、Hudi 等数据源做联邦查问等。
对于离线数仓用户而言,能够用 Hive SQL 写流计算作业,极大升高实时化革新老本。应用的仍然是以前的 HiveSQL 语法,然而能够运行在 streaming 模式下。在此基础之上也能够进一步摸索流批一体 SQL 层以及流批一体数仓层的建设。
二、Hive SQL 迁徙的挑战
然而 Flink 反对 HiveSQL 的迁徙面临着很多挑战,次要有以下三个方面:
- 兼容:包含离线数仓作业和 Hive 平台工具的兼容。次要对应用户层的兼容和平台方的兼容。
- 稳定性:迁徙后的作业首先要保障生产的稳定性。咱们在 1.16 中也做了很多这方面的工作,包含 FLIP-168 预测执行和 Adaptive Hash Join。后续咱们会发表更多的文章来介绍这方面的工作。
- 性能:最初性能也是很重要的,在 1.16 中咱们也做了很多这方面的工作,包含 Dynamic Partition Pruning(DPP)、元数据拜访减速等,后续也会发表更多文章来介绍这方面的工作。
接下来咱们重点解说下 Hive 兼容相干的工作。
Hive 语法的兼容并没有齐全造出一套新的 SQL 引擎,而是复用了 Flink SQL 的很多外围流程和代码。咱们形象出了可插拔的 parser 层来反对和扩大不同的语法。Flink SQL 会通过 Flink Parser 转换成 Flink RelNode,再通过 Logical Plan 优化为 Physical Plan,最初转换为 Job Graph 提交执行。为了反对 Hive 语法兼容,咱们引入了 Hive Parser 组件,来将 Hive SQL 转化成 Flink RelNode。这个过程中,复用了大部分 Hive 现有的 SQL 解析逻辑,保障语法层的兼容(均基于 Calcite)。之后 RelNode 复用同样的流程和代码转化成 LogicalPlan、Physical Plan、JobGraph,最初提交执行。
从架构上看,Hive 语法兼容并不简单,但这是一个“魔鬼在细节”的工作。上图为局部 Flink1.16 版本里 Flink Hive 兼容相干的 issue,波及 query 兼容、类型零碎、语义、行为、DDL、DML、辅助查问命令等十分多语法性能。累计实现的 issue 数达近百个。
Flink1.16 版本将 Hive 兼容度从 85% 晋升至 94.1%。兼容度测试次要依附 Hive qtest 测试集,其中蕴含 12,000 多个测试 case,笼罩了 Hive 目前所有支流语法性能。没有兼容的一部分包含 ACID 性能(业界应用较少),如果除去 ACID 性能,兼容度已达 97% 以上。
SQLGateway 是 Flink SQL 的 server 层组件,是独自的过程,对标 HiveServer2 组件。从 Flink 整体架构上看,SQLGateway 处于两头地位。
向下,封装了用户 API 的 Flink SQL 和 Hive SQL。不论是 Flink SQL 还是 Hive SQL,都应用 Flink 流批一体的 Runtime 来执行,能够运行在批模式,也能够运行在流模式。Flink 的资源也能够部署运行在 YARN、K8S、Flink standalone 集群上。
向上,SQLGateway 提供了可插拔协定层 Endpoint,目前提供了 HiveServer2 和 REST 两种协定实现。通过 HiveServer2 Endpoint,用户能够将 Hive 生态的很多工具和组件(Zeppelin、Superset、Beeline、DBeaver 等)连贯到 SQL Gateway,提供流批对立的 SQL 服务并兼容 Hive SQL。通过 REST 协定能够应用 Postman、curl 命令或本人通过 Python、Java 编程来拜访,提供欠缺和灵便的流计算服务。未来,Endpoint 能力也会持续扩大,比方能够提供更高性能的 gRPC 协定或兼容 PG 协定。
三、Hive SQL 迁徙的实际
目前快手正在与 Flink 社区严密单干,推动流批一体的落地。目前快手迁徙 Hive SQL 作业到 Flink SQL 作业曾经获得了初步的停顿,已有上千个作业实现了迁徙。快手的迁徙次要策略为双跑平台,已有业务持续运行,双跑平台有智能路由组件,能够通过指定规定或 pattern 辨认出作业,投递到 MapReduce、Spark 或 Flink 上运行。初期的运行较为审慎,会通过白名单机制指定某些作业先运行在 Flink,察看其稳定性与性能,比照其后果一致性,后续逐渐通过规定来放量。更多的实践经验与细节能够关注 Flink Forward Asia 2022 上分享的《Hive SQL 迁徙到 Flink SQL 在快手的实际 》。
四、Hive SQL 迁徙的演示
Demo1:Hive SQL 如何迁徙到 Flink SQL
接下来演示一下 Hive SQL 如何迁徙到 Flink SQL。咱们曾经搭建好一个 YARN 集群,以及 Hive 相干组件,包含 HiveServer2 的服务。咱们应用 Zeppelin 做数据可视化和 SQL 查问。咱们将演示 Hive SQL 迁徙到 Flink SQL 只需改一行地址,Zeppelin 体验并无二致,SQL 也无需批改。残缺的 Demo 视频请观看残缺的演讲视频:https://www.bilibili.com/vide…
首先在 Zeppelin 中配置 Hive Interpreter,填入 HiveServer2 的 JDBC 地址和端口、用户名明码、Driver 等信息。
应用以后的 Hive Interpreter,咱们能够通过 Hive DDL 命令创立一张打宽的 store_sale_detail 表。应用 Hive SQL 语法关联 store_sales、date_dim、store 三张表,打成一张宽表写到 store_sale_detail。执行该 INSERT INTO 语句后,便能在 Hadoop 平台上看到运行起来的 MapReduce 工作。
store_sale_detail 明细宽表生产完后,咱们就能够进行查问剖析,比方查看星期天每个店铺的销量。运行完后可通过饼图等多种形式展现后果。
下面简略演示了应用 Hive 进行数据生产和数据分析,其中计算引擎应用的是 Hive 原生的 Hadoop MapReduce 作业,作业运行在 YARN 集群上。接下来咱们开始迁徙到 Flink SQL,作业依然运行在 YARN 集群上。
首先搭建 Flink SQL 集群以及启动 SQLGateway。咱们曾经下载并解压了 Flink 1.16 版本。其中 lib 文件夹下也曾经提前准备 Hive connector、JDBC connector 和 MySQL Driver。另外,还须要将 flink-table-planner-loader 与 opt/ 目录下的 flink-table-planner JAR 包做个替换,而后启动 YARN session 集群。Session 集群启动后,可在 yarn 上看到 Flink 的 session application。
在启动 SQLGateway 之前,须要先批改配置,次要配置 HiveServer2 Endpoint 相干的信息。
此处 SQLGateway 的 endpoint type 是 HiveServer2,以及须要额定设置三个配置:
HiveServer2 的 hive-conf-dir、thrift.host 以及 thrift.port。这里留神咱们启动的端口号是 20002。而后通过 sql-gateway.sh start 命令启动 SQL Gateway 服务。
启动后便能够进行迁徙。因为 HiveServer2 运行在同一台机器上,因而只需批改端口号即可。将此处 10000 端口号改为刚刚启动的 20002 端口号,即 Flink SQLGateway 的端口,无需进行任何其余改变。重启 interpreter,迁徙实现!
接着咱们能够在 Zeppelin 中从新执行一遍刚刚的 Hive SQL 语句,能够发现后果都是统一的。
如上图所示,是查问每个商店在周日的销售总额的后果,其饼图后果与应用 Hive 引擎查问的后果完全一致,不同的是这次的查问是运行在 Flink 引擎之上。
Hive SQL 迁徙到 Flink SQL 后不仅能取得更好的性能,还能取得 Flink SQL 提供的额定能力,包含更丰盛的联邦查问和流批一体能力。
咱们能够用 Flink DDL 创立新的 catalog,比方 MySQL 表里还有新的额定的维度信息,不在 Hive 中,心愿关联它做新数据的探查。只需应用 Flink 的 CREATE CATALOG 语句创立 MySQL catalog,即可实现联邦查问。同时,Flink 会将能下推的 projection、filter 等都下推到 MySQL 进行裁剪。
除此之外,也能够应用 Hive SQL 体验流计算的能力。应用 Flink 语法创立一张 datagen 表,该表会源源不断产生随机数据。再切回 Hive 语法创立一张 Hive 后果表 sink。将运行模式改为 streaming,执行 insert into 语句,便提交了一个流作业,该作业会源源不断地将 datagen 中生成的数据流式地写入 Hive。
为了验证 Hive 后果表始终在被流作业写入数据,咱们也能够用 Hive 语法查问写入的表。如上图所示,通过一直执行 count(*) 语句,能够看到该表始终在写入数据,因而查问后果会一直变动。
五、将来布局
将来,Flink 将在以下三个方面继续演进:
- 第一,继续在 batch 上做更多尝试和投入,晋升 batch 的稳定性和性能,指标是短期内可能追齐支流的批计算引擎。
- 第二,欠缺数据湖的剖析,比方更高效的批式数据湖读写、查问优化下推、列存上的读写优化,Iceberg、Hudi 以及 Flink Table Store 的反对等。另外,也会提供丰盛的湖上数据查问和治理性能,比方查问快照版本的能力、查问元数据、更丰盛的 DML 语法(UPDATE、DELETE、MERGE INTO)以及治理湖上数据 CALL 命令等。
- 第三,Flink Batch 生态建设,包含进一步欠缺 Remote Shuffle Service、血统治理等。
Q&A
Q:Hive 写通过 Flink 执行,如果 Hive 数据量特地大,是否会呈现内存不足、OOM 等报错?
A:目前 Flink 执行 batch 模式下,根本所有算子里都有内存管理机制。数据不是以 Java 对象的形式存在 Flink,而是在 Java 内存外面开拓了独自的内存空间供其应用。如果内存满,则会做落盘、spill,速度可能会稍微降落,但个别不会产生内存 OOM。
Q:Flink 是否反对 Hive 自定义 UDF 函数?迁徙老本如何?
A:反对,可间接迁徙。
Q:现有的离线数仓从 Hive 迁到 Flink 是否存在危险?平滑迁徙的留神点有哪些?
A:平滑迁徙目前大多应用双跑平台,通过机制挑选出局部作业先进行迁徙,迁徙的作业在两个平台同时运行,因而须要验证行为、后果是否统一,而后逐步将老平台的作业下线,变为单跑。整个过程须要循序渐进,通常须要半年至一年的工夫。
Q:Demo 中有一个 SQL 查问应用了 Hive on MR 引擎,迁徙之后是走 Flink SQLGateway 还是 Hive on MR 模式?
A:迁徙后,因为配置的端口是 Flink SQL Gateway 的端口,所以 SQL 申请走的是 Flink SQL Gateway,Gateway 会将 Hive SQL 编译成 Flink 作业提交到 YARN 集群上运行。
Q:Flink 运行批量工作时,TaskManager 个数是咱们指定还是主动生成?
A:对于 standalone 模式,包含运行在 k8s 上的 standalone 模式,TM 数量由用户指定。其余模式下,TM 数量都由 Flink 本人决定和拉起,包含 yarn/k8s application 模式,yarn session 模式, yarn per-job 模式,native k8s session 模式。拉起的 TM 数量和作业申请的 slot 数相干,taskmanager.numberOfTaskSlots 参数决定了 slot 与 TM 个数的映射关系,slot 数量则和被调度的作业节点的并发度相干。
Q:Flink 运行在 K8S 上时,如果启用了动静资源分配,shuffle 数据会始终保留在 POD 磁盘上吗?
A:能够抉择,能够 on TM 也能够 on RemoteShuffleService。
Q:离线作业迁徙后,是否还反对 with as 语法以及 partition by 语法?
A:WITH AS 语法仍然反对,CREATE TABLE 中的 PARTITIONED BY 语法也依然反对。
点击查看直播回放 & 演讲 PDF
Flink 1.16.0 已如期公布,欢送大家体验和应用 Hive SQL 迁徙的能力,也欢送退出【Flink Batch 交换群】交换和反馈相干的问题和想法。
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…