摘要:本文整顿自快手技术专家、Apache Flink & Apache Calcite Committer 张静,在 FFA 流批一体专场的分享。本篇内容次要分为四个局部:
- Flink 在快手的倒退
- 流批一体在快手的布局
- 第一阶段(增强批能力)的停顿
- 第二阶段(业务视角的流批一体)的挑战
点击查看直播回放 & 演讲 PPT
一、Flink 在快手的倒退
在快手外部,Flink 的体量无论从作业规模还是集群规模上,绝对于去年都有大幅的晋升,上图列了几个要害数据。峰值的 TPS 达到了每秒 13 亿,作业数量上流作业有 6000 多个,批作业也到了 3000 个,物理资源上曾经有 70 万 Core。在业务场景的笼罩上,Flink 给公司的各个业务方,包含社科、电商、商业化、音视频、实时数仓都提供了实时计算能力。
2017 年快手外部为了满足直播、短视频实时品质监控的需要,初步引入了 Flink 引擎,前面几年陆续建设了周边体系,进步稳定性和性能,把 Flink 推广到公司的各个业务线。
2020 年开始鼎力推动 Flink SQL 的落地,对于这些工作,去年 FFA 上也做了分享 []()。2021 年底,有一些用 Flink 解决实时需要的业务方,心愿也能用 Flink Batch 解决局部离线场景的需要,所以咱们开始推流批一体这个方向,到去年也获得了一些阶段性的成绩。
明天咱们分享的重点有两个。
- 第一个是回顾咱们过来在流批一体方向踩过的坑,解决的过程,心愿可能给同行的敌人带来参考意义。
- 第二个重点的内容是咱们遇到哪些新的挑战以及有哪些新的思考。
提到流批一体不免就要提到 Lambda 架构,每个公司都有本人的离线计算计划,这个方向曾经倒退了很多年成熟牢靠。毛病是不能满足越来越多的实时需要,所以又引入另外一条实时链路。在实时计算摸索初期,后果的准确性不能失去保障,要离线计算结果来加以校对和修改,由业务来负责合并两个后果,这就是常说的 Lambda 架构。
它的毛病也很显著,首先须要提供两套计算引擎,一个提供 Streaming 能力,一个提供 Batch 能力。不同的引擎提供的 API 不一样,所以业务须要开发两套业务逻辑。而且因为不同引擎的行为可能会有差别,尤其是在规范没有明确规定的 case 里,所以两条计算链路后果的一致性也很难失去保障。
过来几年,始终都在尝试解决 Lambda 架构带来的问题,上图列举了其中三个比拟典型的我的项目。
- 第一个是 Bean,它引入对立的 API,不须要底层的引擎也对立。这样之前提到的不同引擎造成后果不统一的问题,也很难去解。而且对一些简单的流批一体的业务场景,比方有状态作业的流批混跑,批模式执行完当前生成中间状态,让流模式启动的时候加载这些快照数据。如果流和批用两条独自的执行引擎,很难通用的解决方才说的这种场景。
- 第二个是 Spark,它是一个曾经失去宽泛应用的批处理引擎。不过它基于微批和频繁调度的思路,也能够提供准实时的能力。但它因为是基于批来做流,很难满足极致的实时要求。
- 第三个是 Flink,通过这些年的倒退,Flink 曾经成为流计算畛域的实时规范,同时它也提供批处理的能力。但它也哟一些毛病,就是它的批能力和传统的离线计算引擎相比,还有待进一步加强。
咱们外部最开始收到流批一体需要的业务方,都对业务的实时性有较强的要求,心愿在不升高实时性的前提下,用 Flink Batch 进步开发效率。
另外咱们也看到流批一体也是社区重点的投入方向。Flink 在过来几个版本里,简直每个版本都做了流批一体的能力建设。上图是引自去年阿里云分享的一篇文章,往年社区也做了很多在批上的工作,比方揣测执行、Remote Shuffle Service、Hybrid Shuffle 等等。有一些较新的工作成绩没有反映在这个图上,不过足以看出 Flink 曾经在 API 层、Connector 层、调度层、底层的 Shuffle 上都做了十分多的流批一体对立的工作。所以咱们抉择用 Flink 来做流批一体,确立了和社区单干共建的形式。
在咱们外部落地的过程中,社区给了咱们十分多的反对和启发。明天我分享里提到的工作也是和社区通力合作的后果。而且很多性能在咱们外部失去验证当前,曾经推出社区版本,心愿可能普惠到更多用户。
二、流批一体在快手的布局
咱们最早收到的流批一体的诉求来源于两个外部业务,机器学习和数据集成。机器学习场景始终是咱们外部 Flink Streaming 的重点业务方,他们用 Flink 做实时特色计算和实时样本拼接。他们心愿可能用 Flink Batch 复用一套业务逻辑,来满足回溯的需要,做数据修改和冷启动生成历史数据。而且用一个引擎也能够防止后果不统一的问题。
另一个需要来自于数据同步团队,数据同步产品在异构的数据源之间做数据同步,分为离线同步和实时同步。以前老的架构,离线同步基于 MR 和 DataX,实时是基于 MR 和自研框架。这个架构的毛病是计算能力弱,扩展性不强。所以数据同步团队正在基于 Flink 来打造新的版本,心愿用 Flink 计算能力和可扩展性来加强数据同步这个产品。
咱们认为流批一体的终极目标有以下几点。
- 对立的用户体验。用户只须要一套业务代码,并且在对立的平台上开发运维和治理作业。
- 对立的引擎,包含对立的计算引擎和存储引擎。
- 更智能的引擎,进步用户体验。不须要用户理解流模式和批模式是什么,也不须要用户操心应该抉择哪种执行模式、Shuffle 形式、调度器等底层细节。
- 满足更简单的业务,比方流批交融需要。指标是要能反对更简单的业务需要,比方有状态作业的流批混跑。
要想实现这些指标,须要对框架层做大量的改良,咱们采纳分阶段建设的思路来逐渐实现这些指标。
第一个阶段的指标是反对好目前对流批一体有需要的两个业务方,提供对立的用户体验和计算引擎,让用户先能用起来。布局的重点是增强 Flink 的批能力,买通产品入口以及给业务进行贴身的反对。
第二阶段的指标是让产品更好用,包含智能的引擎、极致的批处理性能,对立的存储引擎,且能反对更宽泛、更简单的业务需要。
三、第一阶段(增强批能力)的停顿
咱们第一期的用户来源于机器学习场景和数据同步场景。用户对批能力的需要,总结起来,稳定性是生产可用的必要条件,另外也心愿在易用性上有所晋升,对性能和性能的要求在这个阶段绝对弱化一些,心愿大部分 pattern 下没有显著性能回退就能够。所以在第一阶段增强能力建设上,咱们外部的重心集中在稳定性和易用性上,这也是我明天重点分享的内容,补充一点,其实在性能上咱们也在 Hive 的 Source/Sink 上也做了优化,感兴趣的听众能够关注 生产实践专场《Hive SQL 迁徙 Flink SQL 在快手的实际》这个分享。
以下是 Flink Batch 稳定性的外围问题:
- 慢节点问题。在一个分布式系统里,个别的机器故障、资源缓和或者是网络问题都可能导致单个并发的性能降落,这些慢的节点可能成为整个作业的瓶颈。
- TaskManager Shuffle 不稳固。之前采纳的 Native TaskManager Shuffle 形式,Shuffle 服务不稳固。
- 离线工作稳定性差。次要起因有两个,第一个是离线集群开启资源抢占,中低优工作的资源频繁被抢占。第二个是离线集群资源缓和,导致并发间 splits 调配不平均,failover 开销大。
第一个,慢节点问题。在分布式环境下,很可能呈现个别节点比其余节点慢的状况。大略有两类起因,一个是个别并发干的活儿比其余的并发多,第二个是大家干的活儿差不多的状况下,有一些并发干的比较慢。
这两种解决思路齐全不同,咱们明天次要分享的是第二类问题,也就是如何解决批模式下,机器起因或者网络原引发的慢节点问题。如上图所示,聚合有三个并发,每一个并发解决的数据量基本一致,但因为第三个并发所在机器过于忙碌,导致聚合算子解决数据速度远远慢于并发 1 和并发 2。
所以 Flink 引入揣测执行来解决慢节点的问题,和传统 MapReduce、Spark 的思路相似。检测到长尾工作后,在非热的机器上部署长尾工作的镜像实例。如上图所示,第三个聚合的 worker 在另外一个机器上拉起了一个镜像实例,即 Agg3’。这两个哪个先执行完就用哪个后果,并把其余的勾销掉。
这里假如是起初启动的实例先执行完,也就是只有它产出的数据对上游可见,并且会把原来的 Agg3 勾销掉。
上图反映了框架层面反对揣测执行的实现细节。在整体架构里须要如下几个组件:
- JobMaster 里的 Task Detector; 它用来定期检查是否呈现了长尾工作。
- Scheduler 为长尾工作长期的创立部署新的镜像实例。
- 黑名单机制,它用来把镜像任务分配到和原来不一样的机器上。
除了框架层面,还须要 connector 反对揣测执行。咱们对 Source 反对揣测执行的指标是通过框架层的改变,不须要每个 Source 有额定的开发。在 Flink 里 Source Task 和数据分片的分配关系,不是编译期间就固定好的,而是在运行的时候,每个 Task 解决完一个分片后再去申请下一个分片。
引入揣测执行当前,为了保证数据的正确性,新启动的镜像实例必须和之前的实例解决雷同的分片汇合,所以咱们在缓存里记录了 sub-task 和曾经调配到的 splits 汇合的映射关系。
如上图所示,sub-task1 曾经解决了 split1 和 split2,正在解决 Split3。但因为解决的比较慢,被断定为慢节点,在另外一个机器上启动了一个镜像实例,即 Source1’。在申请 split 的时候,JobMaster 会把缓存记录里 sub-task 解决过的 splits 给 Source1’,只有当曾经走完原来的 Source1 走过的脚印后,JobMaster 才会给 sub-task1 调配新的 split,并会把新的 split 记录到缓存里。
和 Source 不一样,大部分 Sink Connector 须要额定的开发来防止写入抵触或者提交抵触。目前咱们外部在 File Sink 和 Hive Sink 这两个 Sink 上反对了揣测执行。这两个 Sink 底层都用到了 FileSystemOutputFormat,同时咱们还引入了一个新的接口叫 SpeculativeFinalizeOnMaster,这个接口和原来的 FinalizeOnMaster 的外围区别是 JobMaster 在回调的时候,会额定传入 sub-task 和最快完结实例之间的映射关系。
另外,咱们还批改了 FileSystemOutputFormat 对临时文件的组织形式,在文件组织目录里加了实例信息。在所有的并发实现后,每个 subtask 里最快执行完的实例,所产生的临时文件才会被挪到正式目录下,其余慢实例产生的文件会被删除。因为工夫关系,Sink 反对揣测执行没来得及进入 1.16 版本,会在 1.17 版本公布。到时不仅会反对 OutputFormat Sink,也会反对 FLIP-143 引入的 New Sink。
第二个,TaskManager Shuffle 不稳固。Flink Batch 作业有两种数据交换形式,一种是不落盘的 Pipeline Shuffle,一种是 Blocking shuffle。对于 Blocking Shuffle,上游 task 须要把 Shuffle 数据写到离线文件中,等上游 task 启动当前,再来生产 Shuffle 的数据。Shuffle 数据能够落在本地的 TaskManager 外面也能够落在近程的服务里。
Flink 社区版本提供了两种 Blocking Shuffle 的实现。第一个是 TaskManager Shuffle,是把上游计算节点数据写到本地盘,上游节点连贯到上游 TaskManager 上读取 Shuffle 文件。所以 TaskManager 计算工作实现当前,不能立即退出,要等上游生产完 Shuffle 文件后能力开释掉。这样不仅造成了资源节约,而且容错代价大。
第二种是 Remote Shuffle Service,它通过独自的集群提供数据的 Shuffle 服务,能够防止 TaskManager Shuffle 的资源利用率低和容错开销大的问题,而且它是一种拥抱云原生实现存算拆散的计划。
快手外部曾经有了相似的 Remote Shuffle Service 实现,所以为了复用之前的技术降低成本,咱们最终采纳了外部的 Remote Shuffle Service 实现,这个架构次要分为五个角色。
- 第一个角色是 Shuffle Master,它负责全局的 Shuffle 资源调度,治理 Shuffle Worker,让其吸引的 Worker 负载平衡。
- 第二个角色是 APP Shuffle Manager,它和计算引擎的调度器交互来负责单个作业 Shuffle 的申请和开释。
- 第三个角色是 Shuffle Worker,作为整个集群的 slave 节点,负责将数据依照 Partition 维度聚合,排序,spill 到 dfs 上。
- 第四个角色是 Shuffle Writer,他负责把 Shuffle 数据依照 Partition 维度发到对应的 Shuffle Worker 上。
- 第五个角色是 Shuffle Reader,它负责从分布式文件系统上把 Shuffle 文件读回来。
能够看到外部的实现和社区的 Remote shuffle service 基本一致,只有一个外围区别。外部用的是 Reduce Partition,社区公布的版本目前是 Map Partition。
Map Partition 里的数据由一个上游工作产生,可能会被多个上游工作生产。而 Reduce Partition 的数据由多个上游计算工作输入,只会被一个上游并发生产。
Reduce Partition 的益处是上游生产是程序读,防止随机小 I/O,同时缩小磁盘压力。然而为了防止数据不可用的状况下从新拉起所有上游 map,所以个别会做多正本,但这就会减少存储的开销。不过因为长期的 Shuffle 数据存储周期并不长,所以多正本的开销也能承受。
第三个,离线工作稳定性差。次要有两个起因。
第一个是中低优工作频繁失败重启。离线集群开启资源抢占,中低优工作的资源频繁被抢占,导致离线工作屡次重启, 一旦超过阈值,这个作业就会认为是失败;由产品侧再从新拉起。如果抢占发生地过于频繁,超过产品侧拉起次数的下限,这个工作就须要用户手工染指解决。咱们也不能通过用户把 failover 的阈值设置的很大来躲避这个问题。因为一旦产生业务问题引发的失败,引擎在一直的重试,导致用户可能要很久能力感知到故障。
第二个是失败复原开销大。离线集群资源缓和,工作可能只能申请 到局部资源,曾经运行的工作解决太多 Splits,一旦产生异样,复原代价大。
对于第一个问题,引擎须要辨别资源抢占类导致的失败和其余业务异样导致的失败。对资源抢占类的异样由框架主动重试,不计入失败重启次数。上图反映了外部的实现细节须要做两个改变,第一个是 YarnResourceManager 把 container 退出码通知 AM,第二个是 ExecutionFailureHandler 辨认每次失败的起因,被动跳过一些指定类型的异样,不计入失败重启次数。
对于第二个失败复原开销大的问题,上图左侧的反映了当初的解决形式。假如一个作业须要 4 个 Task,但因为资源缓和,只申请到了 2 个 TaskManager 资源,这个时候就会先启动两个 task 来解决,这两个 task 会把所有的活都干完后,把资源让给 task3 和 task4,前面两个 task 一看没什么要干的,就退出了。这种 splits 调配歪斜问题,会导致 failover 的代价大。
为了不让 failover 的代价过大,咱们对单个 task 最多能解决的 splits 做了限度,一旦一个 task 解决 splits 的数目达到阈值,这个 task 就被动的退出,将剩下 splits 交给前面 task 来解决。
这里的实现上有一些细节须要留神。比方工作失败了,会把本人以前解决的 splits 还回去,这个时候也要把记录的 splits counter 数扣掉相应的个数,免得呈现 splits 泄露的问题。
咱们在易用性上的工作能够提炼成以下三点。
- 第一个是在开发阶段,基于社区的于 Adaptive Batch Scheduler 主动推导并发度,不须要用户再手工的设置并发度。
- 第二个是在运行阶段,被动定期上报作业的进度信息和异样信息,让用户晓得作业当初运行的状态。
- 第三个是在预先定位阶段,批作业和流作业不同,因为很多用户都是在作业失败或者后果不符的时候,才会关注这个作业,但这个时候作业很可能曾经执行完了,所以咱们欠缺了 history server,在用户完结当前,也能够让用户在 UI 上查看 JM 和 TM 的日志。
咱们在反对流批一体业务的同时,也在思考如何大规模落地 Flink 批。很多业务其实也有流批一体的需要,但还在张望,放心 Flink Batch 的普适性是否适宜其余 pattern,是否适宜其余的业务场景,而咱们本人也须要一个办法来论证和当时来评估。所以咱们外部把 Flink Batch 接入离线的生产引擎,让 Flink Batch 也能够承接线上的离线作业。
上图展现的是快手的离线生产引擎架构,咱们所有的离线 SQL 生产作业,都采纳 Hive,且以 HiveServer 作为对立的入口。从图中能够看到有一个 BeaconServer 组件,它用来承载智能路由的性能。依据肯定的规定,把 SQL 作业路由到底层的引擎上。底层目前曾经对接 spark、mr 和 presto 的引擎。
咱们把 Flink 也接到了这套零碎。这个零碎自身很灵便,所以很不便咱们定制各种路由策略。比方白名单机制能够限度只路由固定 pattern 的 SQL 作业到 Flink 上;黑名单机制,能够禁止某些 pattern 的 SQL 作业路由到 Flink 上,比方 Flink 目前暂不反对的 SQL 语法;优先级机制,初期只路由低优的作业到 Flink 上;灰度机制能够限度路由肯定比例的大量作业到 Flink 引擎上等等。这些策略让咱们能够逐渐扩充 Flink 在新增和存量离线作业中的占比。
在 Flink 接入离线生产引擎里,咱们有三项要害工作。
- 第一是加强 Flink 引擎的能力,包含进步 Hive 语法兼容性和 Hive connector 的能力。
- 第二是产品接入,除了方才提到的接入公司的 HiveServer2 和 Beacon Server,还包含接入公司的鉴权体系等等。
- 第三个是周边建设,包含接入双跑平台来验证后果的正确性、资源开销等。另外,还建设自动化监控和报警流程,缩小运维老本,及时发现问题解决问题。
通过第一个阶段的致力,咱们的离线作业数目曾经达到了 3000 个,笼罩了机器学习、数据集成、离线生产等多个业务场景。
四、第二阶段(业务视角的流批一体)的挑战
第一个挑战是手动指定运行模式。比方指定作业运行模式、交互方式、是否开启揣测执行等等。这些都须要用户晓得很多底层的细节,能力判断本人须要哪些参数,以及怎么正当的来设置这些参数。
但咱们认为对用户更敌对的应用形式是不须要用户了解甚至感知这么多底层的细节的,所以咱们的解决方案有两个方向。一个改良方向是尽量减少开箱须要调整的参数,另外一个方向是引入智能化交互方式。用户只须要提供业务逻辑和对后果的生产形式,其余都交给引擎来推导。
第二个挑战是批处理的性能。而 Flink 批要想有更广大的业务场景,要在性能上有更大的冲破。Flink 社区在刚公布的 1.16 版本也曾经开启了好几个 FLIP,心愿把 Flink Batch 的性能在往前走一步。
第一个方向是在调度层的优化。比方动静的自适应执行,社区曾经初步具备了一些自适应能力。下一步社区会布局更灵便的自适应能力,比方在运行的时候能够依据 join 两边输出的理论数据量,动静调整 join 的策略。回顾之前的 Adaptive Batch Scheduler,它只是动静调整拓扑中边的关系,而调整 join 策略就波及到了动静调整拓扑的组织形式。从模块上来说,这个改变甚至要突破优化器和调度器的边界。
第二个点是 Optimizer 上的改变,这里举了一个 Runtime Filter Pruning 的例子,基于运行时的信息进一步进行条件过滤,社区在 1.16 版本曾经在这个点上做了尝试,Dynamic Partition Pruning,并且在 TPC-DS benchmark 测试中,这个优化曾经带来显著收益。不过以后的 Dynamic Partition Pruning 只能在 Source 上做运行时的分区裁剪。但事实上,Source 其余的列上也能够做相似的运行时过滤,而且两头任何一级 join 都能够做相似的优化来实现性能的晋升。
第三个优化点是 Shuffle 上的优化,之前 Flink 的批作业要么抉择 Pipeline Shuffle,要么抉择 Blocking Shuffle,Pipeline Shuffle 的益处是上下游之间的数据不落盘,性能好。毛病是资源开销大、容错代价大。而 Blocking Shuffle 的长处和毛病刚好和 Pipeline Shuffle 反过来。为了联合这两个的劣势,1.16 版本推出了 Hybrid Shuffle。
第四个优化点是在 Vectorized Native Engine 层面的优化,一些比拟新的查问引擎都会应用向量化执行和以数据为核心的代码生成技术。而在 Flink 里引入这些技术须要对框架层面做十分大的革新,所以须要在正式投入前做充沛的 POC 验证,评估计划的复杂性,以及给一些可量化的性能收益。
上面分享流批混跑的业务场景下遇到的两个挑战。
第一个是有状态作业的流批混跑。比方用批模式刷完历史数据后流模式接着增量生产,目前只反对对一些无状态的 SQL 作业的流批混跑,有状态的 SQL 作业目前还不反对。因为批上的算子不和状态交互,批作业执行当前也不会生成状态数据。所以等批作业完结后,流作业启动时无奈晓得如何初始化各个算子的状态。
要实现这个指标,不论流模式还是批模式,算子都要和状态交互。思考到一些性能问题,批模式的算子能够在执行实现后,把数据 dump 到状态,防止频繁和状态做交互。
在流批混跑场景下,另外一个困扰是如果批作业刷进去的状态很大,流作业启动的时候如何疾速从快照中复原。这个问题在当初纯流的业务场景也有相似的困扰,比方状态很大,一旦作业 failover 或者重启,从状态复原要花很长的工夫。因为 Flink 是 local state,复原的时候须要把所有的状态都提前拉到本地,而后 restore 到算子里。这也是一个有挑战的问题。
流批混跑第二个挑战是存储的割裂。尽管业务代码能够用对立的计算引擎来表白,但存储还是不一样。流模式个别用 Kafka 音讯队列来做 Source 和 Sink,批作业个别用 Hive 或者 DFS 文件做 Source 和 Sink。存储的割裂造成的后果就是把复杂性留给了用户和平台。用户须要形象出逻辑表,指定逻辑表和底层物理表映射关系,平台层依据执行模式把逻辑表路由到底层的物理表上,引入对立的流批一体存储曾经成为必不可少的一个环节。Flink 社区在去年提出了 Flink Table Store,它是一种流批一体的存储设计,既能反对流写流读,也能反对批写批读,在存储上屏蔽了流和批的差别。
更多 Flink Batch 相干技术问题,可扫码退出社区钉钉交换群~
点击查看直播回放 & 演讲 PPT
更多内容
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…