关于flink:Improvements-of-Job-Scheduler-and-Query-Execution-on-Flink-OLAP

10次阅读

共计 5984 个字符,预计需要花费 15 分钟才能阅读完成。

摘要:本文整顿自字节跳动基础架构工程师方勇在 Flink Forward Asia 2021 核心技术专场的演讲。次要内容包含:

  1. 背景
  2. 问题和剖析
  3. 调度执行优化
  4. 将来打算

点击查看直播回放 & 演讲 PDF

一、背景

字节跳动的很多的业务方都有混合计算的需要,心愿一套零碎既反对 TP 计算,也反对 AP 计算。

上图是咱们 HTAP 零碎的总体架构。TP 侧应用咱们外部自研的数据库作为 TP 计算引擎,AP 侧是用 Flink 作为 AP 的计算引擎。咱们对外通过 MySQL 协定作为对立的入口,如果一个查问是 AP 计算,就会被转发到 Flink 的 Gateway 生成执行打算,而后提交到 Flink 引擎去执行计算。AP 侧有一个列式存储,Flink 引擎通过 Catalog 和 Connector 的接口,别离与存储端的元信息和存储层进行交互。AP 计算实现后,Client 端会向 Flink Gateway 发动 Proxy 数据的申请,而后由 Gateway 向 Flink 集群 Proxy 后果数据。至此,整个 AP 计算的查问交互和计算执行就实现了。

家喻户晓,Flink 是一个流批一体的计算引擎,既能够反对流式计算,也能够反对批式计算。为什么当初有很多零碎抉择应用 Flink 来做 OLAP 计算?

咱们比照了 Flink 与 Presto 的差别,首先从架构上看,Flink 反对多种不同的部署模式,Flink 的 session 集群是一个十分典型的 MPP 架构,这是 Flink 能够反对 OLAP 计算的前提和根底。在 Flink 的计算执行上能够分为执行打算、作业 Runtime 治理、计算工作执行治理、集群部署和 Failover 治理 4 大部分。从 Presto 和 Flink OLAP 的架构以及功能模块图来看,两套零碎在反对这些计算性能的具体实现上有很大的差别,但他们提供的零碎能力和模块性能上是基本一致的。

所以 Flink 引擎在架构以及性能实现上,齐全能够反对残缺的 Flink OLAP 的计算需要。

在字节外部,Flink 最开始是被用作流式计算,起初因为 Flink 流批一体的计算能力,对于一些实时数仓场景咱们也应用了 Flink 作为批式计算引擎。最终选用 Flink 作为 AP 计算引擎,次要基于三个方面的思考:

  • 第一,对立引擎升高运维老本。咱们对 Flink 有十分丰盛的运维和优化开发教训,在流批一体的根底上,应用 Flink 作为 AP 计算引擎能够升高开发和运维老本;
  • 第二,生态反对。Flink 外部有很多存储系统,也有很多业务方应用 Flink SQL 来开发流式和批式作业,而咱们外部的存储系统开发和对接了很多其余零碎,所以用户应用 Flink 反对 OLAP 计算十分不便;
  • 最初一个是性能劣势。咱们外部做过 TCP-DS 相干的基准测试 Benchmark,Flink 计算引擎相比 Presto 和 Spark SQL,在计算性能上并不逊色,并且在某些查问方面甚至是占优的。

二、问题和剖析

首先介绍一下如何应用 Flink 做 OLAP 计算。

首先在接入层,咱们应用 Flink SQL Gateway 作为接入层,提供 rest 协定间接接管 SQL 语句查问;架构上,是在 K8s 上拉起 Flink 的 session 集成,这是一个十分典型的 MPP 架构;计算模式上,咱们应用 batch 模式加上计算全拉起的调度模式,缩小了计算节点之间的数据落盘且能晋升 OLAP 计算的性能。

在 Flink OLAP 计算过程中,次要存在以下几个问题:

  • 首先,Flink OLAP 计算相比流式和批式计算,最大的特点是 Flink OLAP 计算是一个面向秒级和毫秒级的小作业,作业在启动过程中会频繁申请内存、网络以及磁盘资源,导致 Flink 集群内产生大量的资源碎片。
  • 另一个 OLAP 最大的特点是查问作业对 latency 和 QPS 有要求的,须要保障作业在 latency 的前提下提供比拟高的并发调度和执行能力,这就对 Flink 引擎提出了一个新的要求。

为了测试 Flink 执行 OLAP 计算的能力,咱们比照了 Flink 作业调度的 Benchmark 相干测试。咱们设计了三组不同复杂度的作业,别离是单节点的作业、两个节点的 wordcount 作业以及 6 个节点的 join 作业。每组作业计算节点并发度都是 128。

咱们选取 5 台物理机启动一个 Flink session 集群,集群内有 1 万多个 slot,还实现了一个 Benchmarket Client 能够多线程并发提交作业,而后统计 10 分钟之内实现的作业的数量以及实现作业的均匀 latency。

后果如下图所示。

先剖析 QPS 的后果:

  • 单节点作业,client 单线程的时候 QPS 是 7.81;线程数是 4 的时候,曾经达到了 QPS 极限 17 左右;
  • Wordcount 两节点作业,client 单线程的时候,QPS 是 1.38;线程数是 32 的时候,QPS 是 7.53;
  • Join 作业的体现是最差的,client 单线程的时候,QPS 只有 0.44;线程数减少到 32 时,QPS 也只有 2.17。

再看一下 latency 的体现:

client 线程数减少时,单作业的 latency 从 100 多毫秒提交减少到 2 秒;Wordcount 作业的从 700 多毫秒减少到了 4 秒;Join 的作业从 2 秒减少到了 15 秒多,有数倍的增长。

这样的作业调度性能在线上应用过程中是不可承受的。

针对 Flink 并发作业调度的性能问题,咱们也尝试针对一些性能的瓶颈点进行简略的优化,但成果并不现实。所以咱们针对 Flink 作业的调度执行全链路进行剖析,将 Flink 作业的执行分为作业管理、资源申请、计算工作三个次要的阶段,而后对每个阶段进行相应的性能优化和改良。

三、调度执行优化

3.1 作业管理优化

首先是作业管理优化。Flink 通过 Dispatcher 模块接管和治理作业,整个作业的执行过程能够分为 4 个步骤:初始化、作业执行筹备、启动作业执行、完结作业执行。

Dispatcher 外部有 3 个线程池负责执行作业的 4 个步骤,别离是 Netty/Rest、Dispatcher Actor 以及 Akka 线程池。依据测试和剖析:

  • Netty/Rest 线程池默认大小太小;
  • Dispatcher Actor 单点解决且执行了一些十分重量级的作业操作;
  • 此外,Akka 线程池太忙碌,不仅要负责 dispatcher 内的作业管理,还负责了很多 jobmaster 类作业的具体执行以及 resource manager 中的资源管理。

针对上述问题,咱们别离进行了相应的优化。加大了 Netty/Rest 线程池的大小,对作业进行拆解并创立了两个独立的线程池:IO 线程池和 Store 线程池,别离负责执行作业管理过程中比拟重量级的操作,加重 Dispatcher Actor 和 Akka 线程池的工作压力。

一个作业具体执行过程中会有很多定时工作,包含作业模块间的超时查看 \ 心跳查看,作业资源申请过程中的超时查看等。在 Flink 目前的实现里,这些超时工作会被放到 Akka 线程池里,由 Akka 线程池调度和执行。即便一个作业曾经完结,也没方法间接回收和开释,这会使 Akka 线程池里缓存的定时工作十分多,导致 JobManager 节点产生大量的 fullGC,JobManager 过程有 90% 左右的内存被这些定时工作占用。

针对这个问题咱们也进行了相干优化,在每一个作业启动的时候都为它创立一个作业级别的本地线程池,作业相干的定时工作会先提交到本地线程池,当这些工作须要被真正执行的时候,本地线程池会将他们发送到 Akka 线程池间接执行。作业完结后会间接被开释,疾速进行定时工作的回收。

3.2 资源申请优化

字节跳动公司外部目前应用的是 Flink1.11 版本,Flink 资源申请次要是基于 slot 维度,咱们应用全拉起的作业调度模式,所以作业会期待 slot 资源全副申请实现之后才会进行计算任务调度。比方 resource manager 有 4 个 slot,当初有两个作业并发申请资源,每个作业都须要三个 slot,如果它们都只申请到两个 slot,就会导致两个作业互相期待 slot 资源而产生死锁。

针对这个问题,咱们抉择将 slot 粒度的资源申请优化为作业 batch 粒度的申请。这里的 batch 资源申请次要有两个难点:

  • 一个是跟原先 slot 粒度的资源申请怎么做兼容,因为有很多机制是基于 slot 粒度的,比方资源申请的超时,这两块机制须要进行无缝交融;
  • 第二是 batch 资源申请的事务性,咱们须要保障一个 batch 内的资源是同时申请到或者同时开释的,如果有异常情况,这些资源申请就须要同时勾销。

3.3 工作执行优化

3.3.1 作业间连接复用

首先是连贯复用的问题,Flink 上下游计算工作通过 channel 传输数据,在一个 Flink 的作业外部,雷同计算节点的网络连接是能够复用的,然而不同作业间的网络连接无奈复用。一个作业所有的计算工作完结之后,它在 manager 之间的网络连接会被敞开并且开释。而另外一个作业执行计算的时候,TaskManager 须要创立新的网络连接,会呈现作业工作执行过程中频繁创立和敞开连贯,最终影响计算工作和查问作业的 latency 以及 QPS,同时在过程也会导致资源应用的不稳固,减少 CPU 的使用率以及 CPU 应用过程中的波峰波谷。

多作业复用在 TaskManager 的网络连接里次要存在以下几个难点:

  • 第一,稳定性的问题,channel 不仅用来做数据传输,而且还跟计算工作的反压相干,所以复用连贯可能会导致计算工作饿死以及死锁等问题;
  • 第二,脏数据的问题,不同的作业复用计算连贯有可能引起计算工作在执行过程中会产生脏数据;
  • 第三,网络连接的收缩和回收问题,对于不再应用的网络连接,咱们须要及时探测并且敞开、开释资源。

咱们实现 Flink 作业间的连贯复用,次要计划是在 TaskManager 外面减少一个 net 连接池,作业须要创立载体连贯时会向连接池发动申请,连接池会依据须要创立或复用曾经存在的连贯,实现计算后,计算工作会向连接池开释连贯。

为了保证系统的稳定性,Flink 现有的作业内的连贯应用机制是放弃不变的,每个 net 连贯有三个状态,别离是 idle、busy 以及 invalid。连接池会治理计算网络连接的三个状态,还反对依据须要来创立网络连接,而后发送减少校验,同时会回收网络连接,后盾的定时工作会 check 连贯状态。

3.3.2 PartitionRequest 优化

第二块是 partition request 优化,次要分为两个方面:batch 优化和告诉机制的优化。

一个作业内上下游计算工作创立连贯后,上游的计算工作会向上游发送一个 partition request 音讯,通知上游工作须要接管哪些 partition 数据的信息。Partition request 的音讯最大的问题是音讯量太大,是上下游计算节点并发度的平方量级。

batch 优化的次要目标是将雷同 TaskManager 内上下游计算工作间的 partition request 的音讯数量进行打包解决,升高 partition request 的量级。比方在一个计算节点 100 并发的状况下,两个 TaskManager partition request 数量能够从原先的 10000 升高到当初的 4,由并发的平方降为 TaskManager 数量的平方,改善非常明显。

因为上下游的计算工作是并行部署的,所以会存在上游计算工作部署实现之后,上游的计算工作还没有开始部署的状况。当上游的计算工作向上游发送一个 partition request 的时候,上游的 TaskManager 会返回一个 partition not found 的异样,上游的计算工作依据这个异样会一直重试和轮询,直到申请实现。

这个过程存在两个问题,一个是 partition request 数量过多,另外一个是上游的计算工作在轮询重试的过程中有时间差,导致计算工作的 latency 加大。所以咱们为上下游计算工作交互实现了一个 listen+notify 机制。上游的 TaskManager 承受到上游计算工作发送的 partition request 时,如果上游的计算工作还未部署,则会将 partition request 放入到一个 listen 列表外面,计算工作部署实现再从计算队列外面获取 partition request,并且回调执行实现整个交互。

3.3.3 网络内存池优化

最初一块是网络内存优化。TaskManager 启动后,会事后调配一块内存作为网络内存池,计算工作在 TaskManager 部署时会从网络内存池里调配一个本地内存池,并退出到网络内存池列表。计算工作创立本地内存池后,申请内存分片以及开释本地内存池等所有操作时,网络内存池都会遍历本地内存池列表,TaskManager 外面并行执行的计算工作很多时,这个遍历的次数会十分大,是 slot 的数量乘以上游并发度的数量,甚至会达到千万量级。

遍历的次要目标是提前开释其余本地内存池外面闲暇的内存分片,晋升内存的使用率。咱们的次要优化是将这块遍历操作删去,尽管这会造成一部分的内存节约,但可能极大地晋升计算工作的执行性。

此外,咱们还做了很多其余的优化和革新。包含计算调度方面,咱们反对实现全拉起和 block 联合的调度模式;在执行打算方面,咱们优化和实现了很多计算下推,将计算下推到存储去执行;在工作执行方面,咱们针对工作的拉起和初始化都做了很多相干优化和实现。

3.4 Benchmark

上图是优化的成果 Benchmark。

QPS 方面,针对单节点计算 QPS 从原先的 17 晋升到当初 33,Wordcount 两节点从最高的 7.5 晋升到 20 左右,join 计算从原先最高的 2 左右晋升到当初 11 左右,成果非常明显。

latency 方面晋升成果也十分不错,32 个线程下,单节点作业的 latency 从原先的 1.8 秒升高到 200 毫秒左右,Wordcount 两节点作业从 4 秒升高为 2 秒不到,最显著的是 join 作业,从原先的 15 秒降为 2.5 秒左右,晋升的幅度十分大。

四、将来打算

当初 Flink OLAP 尽管曾经投入理论业务场景中应用,但我感觉还只是从 0 走到了 1。将来,咱们心愿做得更欠缺,从 1 走到 100。Flink OLAP 零碎的指标次要能够分为三块:稳定性、性能和性能。

稳定性方面,首先要晋升单点的稳定性问题,包含资源管理单点以及作业管理单点。其次,晋升运行时的资源应用以及计算线程等治理,还有 OLAP 计算结果的优化治理。此外还有很多其余稳定性相干的优化。

性能方面,包含打算优化、细粒度计算工作执行和治理优化,还有面向行合列的计算优化等,都能够极大晋升 Flink OLAP 计算的性能。

性能方面,咱们心愿继续欠缺产品化建设,包含 history server 的继续欠缺和建设。另一方面咱们会更加欠缺 web 剖析工具,帮忙业务方更好地定位在查问过程中发现的问题。


点击查看直播回放 & 演讲 PDF

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0