关于后端:HiveSQL-迁移-FlinkSQL-在快手的实践

1次阅读

共计 13192 个字符,预计需要花费 33 分钟才能阅读完成。

摘要:本文整顿自快手数据架构工程师张芒,阿里云工程师刘大龙,在 Flink Forward Asia 2022 生产实践专场的分享。本篇内容次要分为四个局部:

  1. Flink 流批一体引擎
  2. Flink Batch 生产实践
  3. 外围优化解读
  4. 将来布局

点击查看原文视频 & 演讲 PPT

一、Flink 流批一体引擎

1.1 Lambda 架构

首先,介绍一下咱们抉择 Flink 作为流批一体引擎的思考。如上图所示,是当初生产利用最广的 Lambda 架构,置信大家曾经很相熟了,大概率也都在应用。Lambda 架构的劣势非常明显:

  • 灵便。实时链路和离线链路齐全独立,按理论需要开发,互不影响;
  • 容易落地。实时和离线链路都有成熟的解决方案;

当然毛病也很显著,实时计算和离线计算两条链路,存储不能复用,所以资源冗余重大。

而后,两种计算引擎,离线计算个别应用 Spark,实时计算应用 Flink,那么就要学习和保护两套代码,老本较高。

个别实时和离线又是两个团队开发和保护的,那么实现细节和口径难以对立,所以常常会有后果对不上的状况。因而,业务同学也十分心愿实现流批的对立。

1.2 引擎对立

咱们把流批一体分为两个方面,一个是引擎的对立,另一个是存储的对立。这里次要介绍,引擎的对立。

如果流批引擎对立了,那么用户只须要学习一种引擎,并且开发的代码也能够大量复用。这样极大的升高了开发运维的老本,因为计算逻辑雷同,数据品质也就失去了保障。除此之外,快手离线作业切换引擎是十分便捷的。因而咱们引擎对立的上线节奏和上线品质很容易把控。

那么应用哪个引擎来作为流批对立的引擎呢?咱们比照了支流的大数据引擎之后,抉择了 Flink 作为流批对立的引擎。

因为 Flink 作为流计算畛域的标杆,在架构设计上曾经思考到了流批交融,同时领有沉闷的社区。并且通过多个版本迭代之后,Batch 曾经具备肯定可用性了,咱们之前在生产上也有过一些业务落地。

二、Flink Batch 生产实践

接下来,重点介绍一下 Flink Batch 生产利用的状况。目前,咱们线上稳固运行了 3000+ 的 Flink Batch 作业,次要是平滑迁徙的 Batch SQL。

与此同时,咱们为用户提供多种入口抉择。其中,Batch SQL 入口,面向传统的离线生产开发应用 Hive 方言,也是我本次分享的重点。

调度平台的 Flink Batch 入口,次要不便相熟 Flink 的用户间接应用 Flink 方言或 API 开发 Batch 作业,并提供残缺的离线调度反对。其余入口是业务方依据本人须要,基于咱们平台搭建的业务零碎。

如何在生产环境应用好基于 Hive 方言的 Flink Batch 呢?须要解决这几个方面的问题。

  • 明确上线流程和规范。首先要筛选出适合的作业,而后验证数据品质、时效性、资源等各项指标,之后能力上线。
  • 解决和 Hive SQL 的语法兼容问题,接入离线生产的各个系统,比方权限核心,元数据中心等。
  • 保障生产环境的稳固运行,离线环境比实时环境简单很多,会遇到一些实时场景不存在的问题。
  • 解决和原离线引擎的性能差距;比方前面大龙老师会介绍的动静分区打消 Sort 算子优化。

这几个方面都解决之后,根本就能够推广应用了。

接下来,简略介绍一下快手的离线生产体系。在应用层,个别都是各种开发平台,或者是一些业务零碎。在服务层,快手是应用 HiveServer 作为 Batch SQL 的对立入口,对立应用 Hive 方言。

BeaconServer 能够做 SQL 的改写,引擎的路由策略,还有 HBO 优化等。上面引擎层能够自在的切换,所以咱们将 Flink 接入离线生产,只须要适配 HiveServer,而后在 BeaconServer 中,退出 Flink 引擎路由规定即可。

咱们目前应用 SQL-Client 的形式接入 HiveServer,将来可能会扩大 SQL Gateway 的反对。

解决完如何接入离线体系的问题之后,咱们须要明确作业上线的流程。

  • 第一步,筛选出符合要求的 Batch SQL,比方,刚开始咱们抉择低优先级的简略数据处理作业。
  • 第二步,应用 Flink 对 SQL 进行解析和校验,确定 Flink 是否反对。
  • 第三步,对 Flink 能够运行的 SQL,进行改写,把插入表改成测试库中的表,而后提交运行。
  • 第四步,比照影子作业和线上作业的后果是否统一,以及资源应用状况。
  • 第五步,把前四步都胜利的作业切换到 Flink 引擎上来,并且还要持续察看数据品质。

利用第三步提到的双跑能力,只是影子作业应用原来的离线引擎,线上作业应用 Flink,而后比照后果,确保没有意料之外的问题产生。这一步十分重要,可能帮咱们及时发现没思考到的 case。因为线上环境非常复杂,后期上线须要多察看。

目前,这个流程曾经做到了自动化。咱们的人力次要集中在解决发现的异样 case;

上面会介绍,这个流程的几个关键点,给大家一个参考。

刚开始应用 Flink 校验 SQL 的过程中,发现很多罕用的语法都不反对,感觉不太失常。剖析后发现,是因为打开方式不对,导致没有真正的用上 Flink HiveParser,通过查看这块的代码逻辑,发现问题所在。在 Flink 里,要真正应用 HiveParser,须要满足两个条件。

  • 应用 Hive 方言。
  • 以后 Catalog 必须是 HiveCatalog,否则会回滚到 FlinkParser。

除此之外,须要确保 HiveModule 是最高优先级。这样 Flink 和 Hive 同名的 Function 才会用 Hive 的实现。

如上图所示,依照左边这种形式实现之后,SQL 校验通过率进步了很多。但仍有很多 Batch 语法不反对,比方 Add Remote JAR 和 insert 目录等等。

在 SQL 改写方面,个别有两种状况。

  • 用户作业里没有指标表的建表语句。咱们会应用 CREATE TABLE LIKE 语句,先创立出测试库的指标表。而后,把原始 SQL 批改为写入测试库。
  • 用户作业里带有指标表的建表语句。咱们会间接个建表语句,改成创立到测试库。而后,把原始 SQL 批改为写入测试表。

在执行影子作业时,能够应用一个小权限的账号。这个账号只有写测试库的权限,防止 SQL 改写失败把数据写入到线上库。

在品质校验方面,咱们的策略如下。首先,依据 HiveServer 记录的作业输出信息,比对输出的数据量和分区数据是否统一。而后,依据作业的统计信息,比对写出的数据量是否统一。最初,比对写出的数据后果是否统一。

咱们比对的形式是,把后果数据按列求和,如何所有列的后果都统一,则证实数据品质没问题。在按列求和时,如果是 Number 类型的列,间接求和;如果是非 Number 类型列,先取 Hashcode,而后再求和。

当后果比照统一后,咱们会比照资源开销。这里对立应用 YARN 的统计口径,依照每个 Container 应用的资源 *Container 运行的工夫,最初加和算出资源总量。

上线作业的规范是,数据品质没有问题,并且资源应用增长量,不超过原引擎的 10%,执行时长不超过原引擎 20 分钟。介绍完 Flink Batch 作业上线流程之后,咱们看一下接入离线生产还须要做哪些工作?

如上图所示,列出了咱们做的一些批改。原来 Flink 的配置由 Flink、Hadoop、Hive 三局部组成,配置管理起来比较复杂也不够清晰。

因为咱们通过 HiveServer 接入,HiveServer 在启动 Flink 时,会把 Hive Session 里的配置,都传给 Flink。这里包含用户手动 set 的配置和 Hadoop 相干配置。所以,咱们把 Flink 的配置改为两局部,一部分是 Flink 本人的配置,一部分是 Hadoop 和 Hive 的配置。

SQL-Client 会默认会开启单词补全性能,即输出单词的一部分,而后应用 Tab 键来补全单词,这个性能在交互模式下是没问题的。但在应用文件传入 SQL 时,如果 SQL 内容中,刚好有这种状况,就会导致 SQL 发生变化,呈现字段找不到的异样。所以从文件输出 SQL 时,须要敞开补全性能。

作业进度汇报,是对用户体验十分重要的性能。不然作业提交后,用户无奈像 Hive/Spark 那样看到进度信息,HiveServer 也不晓得作业运行是否失常,可能会呈现作业始终卡主的状况。所以咱们做了进度汇报性能,如果长时间没有汇报进度,HiveServer 就会被动杀掉作业。

最初,监控看板真很有必要。在剖析问题时,能够辅助定位,不然只能盲猜。另外,在接入离线生产方面,还有一些和平台产品适配的工作。

比方,空分区公布和 SUCCESS FILE 性能,快手的离线调度平台目前反对 3 种依赖形式。

  • 工作依赖。当上游工作胜利之后,才会拉起上游工作。
  • 分区依赖。探测分区元数据是否生成,生成之后就拉起上游工作。
  • SUCCESS FILE 依赖。依据文件是否存在,决定是否拉起上游工作。

Flink 依据 Sink 写出文件目录,来判断须要公布哪些分区,动静分区状况下没有问题。如果是动态分区写入的工作,同时没有数据生成,Flink 就不会公布分区,这样就可能会导致上游不被拉起。除此之外,如果没有写出 SUCCESS FILE 的话,也会有相似的问题。

在收集统计信息方面,Flink Batch 原来没有统计信息收集,当生成分区后,元数据中心显示数据为 0。用户看到之后,认为作业没有执行胜利,就会重跑作业。如果用户配置了数据品质校验,没有统计信息,也会导致校验不通过。

介绍完接入相干内容,咱们来看一下在线上运行之后遇到的问题。

咱们晓得,离线生产个别是 T+1。因为 0 点之后开始解决前一天的数据,所以 0 点之后,会调度起少量的作业,离线资源就会很缓和。这个时候启动的基线作业,可能就拿不到资源。为了保障基线作业按时实现,YARN 会 Kill 掉一些低优作业的 Container,把资源分给基线工作。

Flink 个别会在一段时间内,当 Task 失败总次数达到阈值,作业失败。离线引擎个别是同一个 Task 失败几次,作业才会失败,并且离线引擎不会将平台起因导致的失败计算在内。

Flink Batch 上线之初,就遇到了资源抢占问题。作业会呈现运行一段时间就失败,触发调度平台的失败重试,重试几次才会胜利。有的作业可能不会失败,但因为 Task 被删除,须要重算数据,所以执行工夫被拉长。

要想解决这个问题,又不能简略的把 Task 失败阈值上调。如果遇到业务逻辑导致的 Task 失败,调大失败阈值,会导致异样没有被及时发现,重大的会造成事变。

因而,咱们参考离线引擎的做法,在 Task Fail 时拿到具体的失败起因。如果是资源抢占或者机器下线之类的平台起因,不计入失败次数。这样就解决了 Flink 作业频繁失败重试的问题。如果用户感觉运行工夫过长,就须要思考调整作业优先级。

解决了资源抢占问题后,离线集群慢节点问题是另一个稳定性隐患。CPU 利用率过高和 IO 忙碌在离线集群十分常见,个别 Task 长尾会导致整个作业执行工夫超长。

解决这个问题的办法很简略,参考离线计算的揣测执行即可。在发现 Task 执行工夫,超过同 Task 的均匀执行工夫一段时间后。调度器在其余节点拉起一个镜像 Task,而后哪个 Task 先执行完,就用哪个 Task 的数据。

这个个性是快手和社区共建实现的,这里要特地留神的是,Flink 里数据分片是动态分配的,和 Hive、Spark 的动态机制不同。所以 Source 的揣测执行实现复杂度会高很多,并且还要思考到资源抢占等异样 case。

随着聚合类作业的上线,咱们发现一些简略的聚合计算工作执行工夫十分不稳固,有时很快,有时异样的慢。仔细分析之后发现,Flink 默认应用 TaskManager 来做 Shuffle,如果 Shuffle 数据没有被上游齐全生产,那么 TaskManager 就不能开释。这样就会带来两个问题:

  • 资源节约。闲暇的 TaskManager 不能开释。
  • 如果这时候遇到资源抢占,或者机器下线,TaskManager 被 Kill 了,那么 Shuffle 数据就没了,须要重算这部分数据,这样就导致作业执行工夫被拉长。

为了解决这个问题,须要把 Shuffle Service 独立出 TaskManager。有两种实现思路。

  • 相似 Hive 或 Spark,应用基于 Yarn NodeManager 的 Shuffle Service。但 Flink 还没有相干实现,须要咱们自行开发。
  • 应用 Remote Shuffle Service。Flink 有开源实现,快手也有自研的 Remote Shuffle Service。

通过调研之后,咱们抉择了快手自研的 Remote Shuffle Service。因为快手的 Remote Shuffle Service 反对 Push-Based Shuffle。Shuffle Service 会将雷同 Shuffle Partition 的数据合并,Task 只需从一个中央就能够读取到全副的 Shuffle 数据,社区的 Remote Shuffle Service 将来也会反对这个性能。

其次,快手的 Remote Shuffle Service 具备端到端数据一致性的校验,对数据品质有很好的保障。

随着迁徙作业量的增长,面临一个很辣手的问题,设置的默认并发度对大部分作业来说都不是最优的。

在实时计算场景,作业并发度都是用户本人设置的。但对离线计算来说,用户不须要设置并发度,引擎会依据数据量主动计算出对应的并发度。对咱们来说,手动设置并发度是不事实的。因为数据量每天都在变动,不能每天都用同样的并发。

如果须要手动设置并发度,就无奈实现平滑迁徙 Hive/Spark 作业的指标。这个问题是咱们和社区共建解决的,Adaptive Scheduler 依据数据量主动预估适合的并发度,这样咱们就无需批改用户作业,实现平滑迁徙。

除此之外,合并小文件的并发度 Adaptive Scheduler 临时不能精确预估,咱们通过 Hack 的形式长期解决,后续社区也会针对这种非凡 case 扩大 API 反对。

目前,咱们正在逐渐上量聚合类的 Batch 作业,遇到了两个比较复杂的问题,正在和社区一起解决。

  • Hive UDAF 的反对。目前,Flink 只反对 Partial1 和 Final 模式的 Hive UDAF,像 Rank 类函数临时不能反对。
  • Hash Agg 的反对。目前,应用 Hive UDAF 的作业都会应用 Sort Agg,相较 Hash Agg 性能差别还是很显著的。

为了不便平滑迁徙聚合类作业,Hash Agg 和残缺 Hive UDAF 的反对都十分必要。

三、外围优化解读

Flink Batch 在快手落地上线的过程中遇到了诸多问题,包含语法兼容、Hive Connector、稳定性等多个方面。针对这些问题,快手和社区一起单干,独特解决这些问题,胜利推动了 Flink Batch 的上线。接下来,给大家介绍一下社区从能用、好用、稳固可用等多个方面做的优化改良工作。

因为 Flink 是规范的 ANSI SQL,Hive SQL 与 ANSI SQL 语法差别较多。为了让 Hive SQL 平迁到 Flink SQL 引擎上,快手抉择了应用 Hive Dialect。这样的话,绝大部分的作业都能够迁徙,不须要用户批改 SQL。尽管在 Flink 1.16 版本之前,社区在 Hive Dialect 兼容上,曾经做了很多工作。但离齐全兼容 Hive SQL,仍有差距。快手选定了一批筹备迁徙的作业后,通过解析验证,发现诸多不反对的语法。

在快手给出 input 后,社区第一优先级做出了反对。如上图所示,咱们列出了比拟重要且很罕用的一些语法,比方 CTAS、ADD JAR、USING JAR、宏命令、Transform 等。

UDF 在 Hive SQL 会常常应用的,用户个别会先在作业中 Add 一个近程的 UDF JAR,而后注册并应用。在 Flink 中,以后不反对 Add JAR,导致很多作业都无奈迁徙。除此之外,算法同学不喜爱写 Java UDF,他们个别用 python 写脚本,而后通过 transform 来解决数据。通过补全 Hive Dialect 语法,解决了迁徙过程中的第一个 block。胜利保障了现有的 Hive SQL,能跑在 Flink 引擎上。

社区在 Flink 1.16 版本做了大量工作, 补全 Hive 语法。目前,通过 qtest 测试下来,整体兼容度能达到 95%,根本能保障用户现有的 Query 都能迁到 Flink 上来。Flink-25592&Flink-26360,这两个 umbrella issue 在追踪 Flink Batch 相干的工作。因为 CTAS&USING JAR 这两个性能,波及到 PUBLIC API 的改变,在社区有对应的 FLIP 设计文档,因而接下来我会具体介绍一下这块的设计。

如上图所示,先介绍一下 FLIP-214 Create Function using JAR 性能。因为这个性能波及到 SQL 模块的 ClassLoader 的改变。因而,有必要给大家介绍一下设计思路,防止大家踩一些 ClassLoader 的坑。

写 SQL 的人都晓得,因为业务逻辑形形色色,计算引擎内置的函数往往不能满足需要。在此种状况下,须要用户手写 UDF 满足需要,尤其是 Java 技术栈的大数据引擎。咱们会把 UDF 打到 JAR 包里,而后上传到某个近程的 HDFS 地址上,在应用的时候先 Add JAR 或者间接基于 JAR 包创立 UDF。

思考到该场景以及快手的业务需要,社区在 1.16 反对了 USING JAR 性能。整体的语法局部如 PPT 中红色局部标出的字体,相比于之前,多了 USING JAR 的关键字,并且容许指定 JAR 包的地址,该地址能够是近程的,也能够是 Local 的。目前咱们仅 Java&Scala 语言反对该语法。

接下来,我来具体介绍一下如何应用 USING JAR 这个性能,以及其执行机制。首先,注册 UDF,在注册 UDF 的过程中,咱们会解析 UDF 的 DDL,先判断函数是否是 temporary。如果不是,则间接注册到 Catalog 中,不做任何其余额定的工作。如果是 temporary,咱们会接着判断 JAR 包的地址,是本地文件,还是近程的 HDFS、OSS 地址。

如果是 Local JAR,则会校验 JAR 包是否非法,如果 JAR 包非法,会把 JAR 包地址增加到 ResourceManager 中,同时也增加到 MutableURLClassLoader 中。

这里须要额定的阐明一下,为了解决 Flink Table 模块中经常出现的 Connector&Catalog 相干的 ClassLoader 问题。1.16 版本社区在 Table 模块引入了一个 MutableURLClassLoader,每个 TableEnvironment 持有一个该 ClassLoader,容许动静的增加 JAR 包到 ClassLoader 中,这就解决了动静加载 JAR 包的问题。

接下来会把该 JAR 包注册到 FunctionCatalog 中治理。如果 JAR 包是近程的地址,会多一步下载的动作,这个动作由 ResourceManager 来实现,把 JAR 包下载到本地的长期目录,同时加载到 MutableURLClassLoader 中。

第二步是应用 UDF,如果在作业的 Query 中应用了 UDF,在 Query 解析优化的过程中,会先判断该 Function 是 temporary 的还是长久化的。如果是后者,会从 Catalog 中拿出其 JAR 地址信息,先把 JAR 包下载到本地,并加载到 ClassLoader 中,接着进行 Query 的优化,并生成 JobGraph。

生成 JobGraph 之后,第三步则是要把作业部署到集群上运行。咱们在 Query 优化的时候须要 JAR 包,同时在集群上运行的时候也须要这些 JAR 包,否则作业运行时就会呈现 ClassNotFoundException。那咱们是怎么做的呢?

这里咱们利用了 Flink 的 BlobServer,在往集群上提交作业时,咱们会先把 ResourceManager 中保护的所有的本地 JAR 包,上传到 Flink JobManager 的 BlobServer 中,也就是图中黄色虚线标出的局部;在作业执行时,由 TM 负责从 BlobServer 中拉取这些 JAR 包。

接下来,咱们介绍另一个比拟罕用的性能 CTAS。这个语法在所有大数据计算引擎中都反对,相比 CREATE TABLE 语法,其不同的中央在于文本中红色标出的字体。

该语法的作用是,由引擎基于 SELECT Query 主动推断出指标表的 Schema,并由 Catalog 负责创立;其等效于先创立指标表,再写一个 insert into…select query,其最大的益处是在 Query 比较复杂的时候,防止了用户手写指标表的 DDL,简化了用户的工作量,这个性能在生产环境中是十分有用的。

接下来,介绍一下 CTAS 整体的执行流程。首先,用户写了一个 CTAS Query,在客户端编译优化的过程中,咱们会先基于 Query 推导出指标表的 Schema。而后,把对应的 Catalog 序列化。序列化的目标是,为了在 JobManager 上能反序列化,执行建表的动作。同时,咱们生成一个钩子对象,由这个钩子在 JobManager 负责调用 Catalog 创立指标表。

第二步就是作业的执行。在作业开始调度前,咱们首先在 JobManager 上把钩子对象及 Catalog 对象反序列化回来。接着,由钩子调用 Catalog 先创立指标表。而后,调度作业。

假如作业最终胜利执行,则没有额定的动作。如果作业执行失败,或者被手动勾销,出于原则性的思考,咱们会通过钩子调用 Catalog 把创立的指标表给 Drop 掉,保障最终没有对外部零碎产生副作用。

思考到 Flink 是一个流批一体的计算引擎,CTAS 语法在流批两种场景都能应用。但个别在流场景下,作业失败时,咱们不会手动删除表,而是靠内部零碎的更新能力,保证数据的最终一致性。

因而,咱们引入了一个原子性相干的 option 由用户来决定是否须要保证数据的原子性。社区在 Flink 1.16 版本只实现了 CTAS 的一个基本功能,还没有反对原子性,这个会在 1.17 实现,更多细节大家能够去看 FLIP-218 的设计文档。

在快手 Flink Batch 实际过程中,咱们发现 Hive Connector 诸多方面的问题。比方 Split 计算减速,统计信息收集、小文件合并等等。如上图所示,列出了在应用过程中,绝对比拟重要的一些性能。

通过这些优化,咱们丰盛了 Hive Connector 的能力,使其在 Batch 场景下更加好用。接下来,我会具体介绍动静分区写入优化和小文件合并。

不同于动态分区的写入,总是须要用户指定分区列的值。动静分区容许用户在写入数据的时候,不指定分区列的值。

比方,有这样一个分区表:用户能够应用如下的 SQL 语句向该分区表写入数据。

在该 SQL 语句中,用户没有指定分区列的值,这就是一个典型的动静分区写入的例子。

在 Flink 中,对应生成的 plan 是什么呢?如左边执行打算图所示,这里会有四个节点。其中,值得注意的是灰色的 Sort 节点。Flink 在动静分区写入时,会把数据依照动静分区列先做一个排序,而后再一个一个分区的写入数据。

这样带来了一些益处,但也导致作业的执行工夫变的更长。因而,针对该现状和快手的业务场景,咱们引入了一个选项,在写入动静分区时,容许用户手动敞开 Sort 节点,防止额定的排序,放慢上游数据的产出速度。

小文件问题在生产环境中也是一个很常见的问题。在写入 Hive 表的时候,为了保障写入的速度,作业的并发设置较大。尽管放慢了写入速度,但也引入了小文件问题。

小文件会减少 HDFS NameNode 压力和 RPC 压力,对上游的读取工作不敌对。除此之外,在动静分区写入时,某个并发可能会同时写很多动静分区,导致大量的小文件。基于上述问题,咱们在 Hive Batch 写入,反对了自适应合并小文件。

上图是 Batch 模式下,Hive Sink 反对小文件合并的拓扑。咱们看到图中有四个节点,别离是 Writer、CompactorCoordinator、Rewriter 和 PartitionCommitter。这里的外围是 CompactorCoordinator 和 Rewriter。

CompactorCoordinator 是单并发节点,上游的 Writer 写完文件后,把文件门路信息通知 CompactorCoordinator。CompactorCoordinator 拿到上游的所有文件后,判断哪些文件是小文件,须要合并成的指标文件大小,从而决定把哪些小文件合并成一个指标大文件。

而后把这些信息通知给 Rewriter,由 Rewriter 来实现合并的工作,最初由 PartitionCommitter 提交分区信息。自适应合并小文件带来的收益是缩小文件数量,升高 HDFS 的压力;进步用户作业的数据读取效率;放慢执行速度。

接下来,讲一讲在应用 UDAF 过程中遇到的性能方面的问题。首先,我先来介绍一下 Sort-Agg 和 Hash-Agg 这两个概念。个别在聚合计算场景,有两种策略,别离是 Sort-Agg 和 Hash-Agg。

Sort-Agg 是在聚合计算之前,依据 group by key,对数据进行全局排序。排序之后,遍历所有数据,遇到雷同 key 的数据,就做累加操作。如果遇到不同 key 的数据,意味着上一个 group 的所有数据曾经计算完,能够间接往上游发送后果。而后,接着计算新 key 对应的 group 的聚合值。

Hash-Agg 则是指,在内存中构建一个 Hash 表,key 是 group by 的 key,value 是每个 group 的聚合值,始终往上累加。当所有数据遍历完,则最终后果才可输入。一般来说 Hash-Agg 在内存中实现,比拟高效,而 Sort-Agg 须要一步内部排序,因而性能相对而言会差。

以后在 Flink 中存在两种聚合计算函数接口,别离是 ImperativeAggregateFunction 和 DeclarativeAggregateFunction。上图右边列举了这两种接口的对应的 UDAF 实现的优缺点。

Hive UDAF 以后只能走 Sort-Agg 策略,整体性能比拟差。针对这个问题,通过调研之后,咱们决定基于 DeclarativeAggregateFunction 接口,在 Flink 里从新实现 Hive 的一些罕用的 UDAF。这里的难点是要做到与 Hive 的行为保持一致。从新实现之后,绝大部分的 Query 都能够应用 Hash-Agg,整体上达到了与内置函数一样的性能。

接下来,讲一下另一个比拟重要的性能自适应调度器。写过 Flink 流作业的用户都晓得,Flink 作业在上线前都须要设置并发度。对流作业而言,这是一个大家默认承受的事件。但对于批作业而言,状况就简单很多。

首先,批作业数量很多,动辄成千盈百,乃至数万,用户不可能 case by case 的调并发,费时费力。

其次,数据量每日都有可能变动,难以预估。因而,同一个并发度设置对同一个作业,不肯定始终实用。无奈保障作业的运行工夫始终在一个稳固的工夫基线范畴内,对生产的影响会比拟大。

最初,SQL 作业,除了 Source 和 Sink 外,只能配置全局对立的并行度,没法进行细粒度并行度设置,也会遇到资源节约和额定开销的问题。

为了解决这些问题,社区为 Flink 引入了自适应批处理调度器。通过它框架会依据计算节点须要解决的数据量,主动推导节点的并行度。

这样的并行度配置比拟通用,能够实用于大部分作业,无需为每个作业独自配置。主动设置的并行度,可能适配每天不同的数据量。同时,因为在运行时能够采集,各个节点理论须要解决的数据量,所以可能进行细粒度的并行度设置。它的流程大抵如下:

  • 当上游逻辑节点的所有执行节点都完结时,咱们会采集其产出的数据量大小。
  • 当上游逻辑节点生产的数据量确定后,咱们能够通过并行度推导策略组件,为节点计算出适合的并行度。
  • 在逻辑节点并行度确定后,咱们会把它的执行节点,退出执行拓扑中,并尝试进行调度和部署。

和传统 Flink 作业执行不一样的中央在于,以往的作业执行拓扑是,在作业提交时就曾经构建,是动态的。而自适应批处理调度的作业,执行拓扑是动静生成的。在动静执行拓扑下,一个上游节点能够生产多个 sub-partition,使得上游节点的执行过程,和上游节点的并行度解耦。

自适应批处理调度加上 Hive Source 的并发推导能力,解决了并发度设置的问题。在快手侧拿到的成果次要体现在两个方面:

  • 有了这个性能,无需用户为每个作业独自配置并行度,使得 Flink Batch 更易用,反对细粒度的并行度设置,防止了资源节约。
  • 依据数据量,主动调整算子的并发,保障作业了在生产环境中稳固运行,保障了产出基线,作业能够平滑的迁徙过去并上线。

接下来,介绍一下社区和快手单干在生产稳定性方面,做的一个比拟重要的性能揣测执行。在生产环境中,热点机器个别都是无奈防止的,集群混部、密集回刷数据,都可能导致一台机器的负载变高、IO 忙碌,使得下面运行的 Flink 作业异样迟缓。一些偶发的机器异样,也会导致同样的问题。

这些迟缓的工作会影响整个作业的执行工夫,使得作业的产出基线无奈失去保障。而揣测执行,是一种曾经失去广泛的认可、用来解决这类问题的办法。因而社区在 Flink 1.16 版本中引入了这套机制。

开启揣测执行后,当框架发现批作业中呈现某个 SubTask,显著比其余 SubTask 执行迟缓时,会为其拉起新的执行实例。咱们把它叫做影子 Task,部署在失常的机器节点上,而本来的慢工作实例会被保留继续执行。

这些影子工作和对应的原始工作,具备雷同的输出和产出。其中,最先实现的工作会被认可,产出的数据能够用来被上游节点生产;其余对应的实例会被勾销,产出的数据会被革除。

揣测执行的具体流程如图所示,当 SlowTaskDetector 发现存在慢工作时,会告诉给揣测执行调度器。调度器会把慢工作所在机器辨认为热点机器,将其退出黑名单中。而后,如果慢工作的运行中的执行实例数尚未达到下限,调度器就会为其创立新的执行实例,并进行部署。当任意执行实例胜利完结后,调度器会勾销该实例对应执行节点的其余所有执行实例。

后面咱们介绍了框架层面实现的通用的揣测执行流程,然而对于 Source 和 Sink,揣测执行会有一些非凡之处。对于 Source 节点来说,咱们要保障同一个 Source 并发的不同执行实例,总是要读取雷同的数据。这样能力保障后果的正确性。这里有几个非凡状况须要思考。

  • 对于 FLIP-27 新 Source,Source 端 Split 是动态分配的,咱们须要保障影子 Task 和原来的慢 Task 解决的是雷同的 Split。
  • 原来的慢 Task 曾经解决的一部分 Split,影子工作解决的速度比拟快,能把后面的 Split 追上来。这时影子工作会申请调配更多的 Split,这个过程也须要原来慢工作解决的也是雷同的 Split,这里可能是个互相赛马的过程;最终谁先执行实现,用谁的数据。
  • 因为资源抢占、机器异样等起因,可能会呈现影子工作或者慢工作挂掉的状况。如果只挂了一个 Task 能够先不必管;如果两个 Task 都挂了,揣测执行调度器判断辨认进去后,须要把曾经解决的 Split 信息还回来,接着调度新的工作,来解决这些 Split。

大体上来说,就是在框架层退出了一个缓存,来记录各个 Source 并发曾经获取到的数据分片,以及其下的所有执行实例曾经解决到的分片信息。

对于 Sink 端,因为只是负责写数据,状况则简略很多,只须要保障影子工作和慢工作,最终那个先执行完,就提交那个的数据,同时清理掉另外一个有效 Sink 的数据,防止数据反复。

通过揣测执行性能,保障了 Batch 工作执行的稳固,产出工夫绝对稳固可控,保障了 Flink Batch 在快手生产应用过程中的整体稳定性,为进一步的 Batch 落地打下了良好基础。

以上就是从能用、好用、生产稳固可用等三个方面,介绍了社区和快手单干在 Batch 方面做的一些外围优化改良工作。这些工作保障了 Flink Batch 在快手的上线,并在生产环境中稳固运行。随着快手在 Batch 的进一步推动,将来还会有诸多方面的工作要做。

四、将来布局

如上图所示,咱们会在 Flink Batch 方向继续投入。监控指标展现以及 History Server 的可用性须要尽快补全,不便问题的定位和剖析,用户就能够自助解决一些简略问题。除此之外,当聚合场景下的相干问题解决后,咱们就能够大量迁徙聚合类的作业。解决 Join 场景的问题后,将开始迁徙简单 Join 场景作业;

在流批一体存储的摸索方面,待引擎能力建设后,开始建设对立的存储服务,给流作业和批作业提供对立的读写 API,解决冗余存储带来的老本问题。

点击查看原文视频 & 演讲 PPT


更多内容

流动举荐阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://click.aliyun.com/m/1000372333/

正文完
 0