摘要:本篇内容整顿自美团数据平台工程师冯斐、王不凡在 Flink Forward Asia 2021 的演讲。次要内容包含:
- 相干背景
- 大作业部署优化
- Checkpoint 跨机房正本
- 状态稳定性相干优化
- 将来布局
FFA 2021 直播回放 & 演讲 PDF 下载
一、相干背景
美团 Flink 的利用场景笼罩了社区定义的三种场景:
- 利用比拟多的是数据管道场景,比方数仓 ODS 层数据的实时接入,或跨数据源的实时数据同步;
- 比拟典型的利用场景是数据分析,比方实时数仓的建设和利用,业务会出一些实时报表和大盘辅助业务做决策,或者计算一些实时特色服务于业务生产;
- 事件驱动场景,目前次要利用于平安风控、系统监控告警。
随着业务的倒退和实时计算的迭代,业务对实时计算的利用越来越宽泛,也越来越深刻。以后咱们有将近 5 万个作业部署在超过 15,000 台机器上,高峰期解决的流量达到了 5.4 亿条 / 秒,这几个指标相比今年都有了很大的增长。除了整体的规模增长,往年咱们也遇到了单作业规模大幅增长的状况,目前咱们的大作业并发度达到 5000,状态达到了 10 TB。单作业规模的增长也给咱们带来了新的问题和挑战。
首先是作业启动的问题。以往在小规模作业上不那么显著的问题,在大作业部署启动流程中裸露了进去,比方 Task 启动慢部署慢、散布不均,大作业对 HDFS 的影响等。另外在状态方面,状态很大的时候同样给作业带来了不可疏忽的影响,次要是 Savepoint 的制作开销和复原效率,以及状态的容灾等问题。
二、大作业部署优化
咱们以后的大作业算子并发达到了 5000,拓扑复杂度上算子数量达到了 8000,有两层的数据 shuffle 替换,资源上作业须要超过 1000 个 TaskManager,在这样的规模下,咱们遇到了一些之前没有遇到过的问题。
- 首先,部署大量 Task 的时候会遇到部署工夫长或因为 RPC 超时而部署失败的问题;
- 此外,Task 散布不够正当,局部 TaskManager 中 Network Buffer 的数量有余,会导致作业启动失败;
- 另外,大作业做 Checkpoint 期间,会给 HDFS 带来刹时压力,也会影响其余作业应用 HDFS。
针对上述问题咱们进行了以下优化:
咱们首先剖析了 JobManager 视角的作业部署流程,心愿能搞清楚各个环节有哪些因素影响部署,以及他们是如何影响部署的,而后隔靴搔痒去解决问题。
能够看到,从收到 JobGraph 到启动所有 Task,次要环节有构建执行图、申请资源、部署 Task、启动 Task 这几个步骤。
- 构建执行图环节次要受作业规模和拓扑复杂度的影响。以后构建执行图的工夫复杂度比拟高,构建大作业执行图的时候,耗时会大幅减少,不过咱们以后的规模临时还没有遇到这个问题,而且社区在 1.13 版本曾经对这一问题做了一系列优化,能够参考借鉴。
- 资源申请环节次要受资源的需求量、资源衰弱度、调度性能、调度策略的影响,通常可能会遇到资源有余、资源碎片、调度到坏节点等问题,导致作业无奈失常启动,但这些问题目前还不是很重大。
- 部署和启动 Task 环节次要受 Task 数量、TaskManager 数量、拓扑复杂度、user jar 大小的影响。作业规模很大的时候,JobManager 作为一个 master 节点,可能会遇到一些解决瓶颈,就有可能呈现 Task 部署慢或部署失败的状况。
当咱们重点关注部署和启动 Task 步骤的时候,咱们发现了以下几个景象:首先 JobManager 所在的机器的网卡在部署期间会被打满;从 TaskManager 日志中也能够看到,下载 userjar 操作花的工夫比拟长;通过比照测试,咱们发现 userjar 放大体积之后就没有 RPC 申请超时的景象了,部署耗时也有所缩小。所以咱们钻研了 userjar 的下载流程,每个 Task 线程在启动的时候须要从用户的 jar 包中加载用户代码的类加载器,这一步须要从 JobManager 的 BlobServer 中下载 userjar。
Flink 以后的实现中,每个 TaskManager 都须要下载一次 userjar,TaskManager 的多个 Task 会解析同一个 jar 包。当作业开启 ha 的时候,TaskManager 会从 HDFS 中读取 userjar,从而加重 JobManager 的散发压力。当 userjar 比拟大,数量又比拟多,且没有开启 ha 的状况下,JobManager 散发 userjar 的压力就会很大,会导致阻塞网络甚至打满网卡。
明确了起因,咱们就能够对 userjar 的散发进行优化。咱们以后部署是 yarn session 模式,为了提交作业时能复用 session cluster 达到疾速部署的目标,也为了同一个 session cluster 中提交多个 job 的时候都能有优化成果,咱们没有利用 yarn 的机制来散发 userjar,而是在 Flink 层面做了优化。
咱们让同一个节点上的 TaskManager 只下载一次 userjar,该节点上所有 TaskManager 共享这次下载后果,因为 userjar 的下载次数从 TaskManager 粒度降落到了机器粒度,升高了一个数量级,大幅减小了 JobManager 的散发压力。
优化了 userjar 的散发问题,咱们还发现,在有 shuffle 而且并发度比拟大的作业上,部署过程中依然有存在一些 RPC 超时的状况,而且 JobManager 上有大量的 requestPartitionState 申请。这是因为上游 Task 启动的时候会查看上游 Task 的 partition 是否就绪,如果还没有就绪,上游会申请 JobMaster 去询问上游 Task 的状态,判断是否须要持续申请上游 Task 的 partition。当作业规模很大的时候,很容易呈现不同 Task 启动速度不统一的状况,导致 JobMaster 上呈现大量的 requestPartitionState 申请。
对于这个问题咱们做了一个简略的优化。上游 Task 申请 partition 失败的时候,先本人尝试重试几次,而不是立刻申请 JobMaster。通过这个调整,大幅缩小了 JobMaster 上 requestPartitionState 的 RPC 申请量,使得 JobMaster 能够有更多工夫去解决其余的 RPC 申请。
通过以上两步优化,JobManager 的散发压力大幅减小。上图是散发 userjar 的优化效果图,能够看到作业规模越大,优化成果越显著。此外在以后规模下,咱们也打消了 RPC 超时的异样,使得大作业能够胜利部署。
咱们再来看下 Task 散布不均的问题。
咱们发现,在大作业部署过程中,Task 没有均匀分布在所有 TaskManager 中,这会导致局部 TaskManager 呈现 Network Buffer 有余的状况,导致作业启动失败。咱们尽管可能通过减少总内存、调整不同内存的占比来长期解决这个问题,然而这种办法并不能解决实质的问题,反而会加剧资源的节约。因为不同 TaskManager 中的 Task 数量和类型不同,理论所须要的资源量也不同的,然而 TaskManager 都是对立依照最大的资源量来申请的,这会导致很多 TaskManager 申请了比理论须要更多的资源。另一方面 Task 比拟集中的 TaskManager 计算压力更大,也更容易成为整个作业的计算瓶颈。
咱们总结了两类 Task 散布不均的问题:
- 一类是 Task 数量散布不均,也就是不同算子的 Task 集中在同一个 TaskManager 中,例如左图中多种 source 算子集中在第一个 Task 外面;
- 另一类是 Task 类型散布不均,也就是雷同算子的不同 Task 集中在一个 TaskManager 中,例如右图中 source 的两个 Task,sink 的两个 Task 都集中在第一个 TaskManager 中。
Task 的数量散布不均,次要是因为 SlotSharing 机制容许不同算子的不同 Task 应用同一个 Slot,然而在 Task 抉择 Slot 的时候,并没有思考 Slot 里 Task 数量的散布状况,导致多个 Task 集中在一个 Slot 里,进而导致 Task 集中在同一个 TaskManager 中。呈现这种状况通常是业务有一些多 source 多 sink 的作业来做一些流量散发、汇聚的操作。
针对数量散布不均的问题,咱们对 Task 抉择 Slot 的策略做了一些优化,新的抉择策略如下:对于无上游的 Task,尽量把它调配到新的 Slot 上,直到 Slot 数达到下限;对于有上游的 Task,优先选择把它放在与上游 Task 雷同的 Slot 里,缩小不必要的数据散发;有多个可选 Slot 的时候,优先选择 Task 数量少的 Slot。通过这样的优化,Task 在 Slot 间的散布就比拟平均了。上图能够看到之前集中在 1 个 Slot 外面的 4 个 source 算子,当初曾经比拟平均地散布在不同的 Slot 外面了。
再来看下 Task 类型散布不均的问题。咱们晓得,一个 Slot 内不可能有多个雷同类型的 Task,只有蕴含雷同类型 Task 的多个 Slot 集中在某个 TaskManager 中,才会导致 Task 类型集中,而 Slot 抉择 TaskManager 受申请程序的影响。以后,Slot 申请程序是随机的,并没有思考 Task 类型的散布状况,这就会导致雷同类型的 Task 集中在同一个 TaskManager 中。这个问题也比拟广泛,只有作业不同算子的并发度不统一,就有可能呈现这个问题。
针对类型散布不均的状况,咱们对以后 Slot 的申请程序做了优化。依照 Slot 中 Task 类型组合的状况,对 Slot 申请程序进行调整,蕴含雷同 Task 类型组合的 Slot,尽量散布在不同的 TaskManager 中,比方蕴含 Source、Process、Sink 的这三个算子的 Slot,被咱们平均的扩散到了不同的 TaskManager 中。
上述两个优化是独立的,组合应用能够失去更好的优化成果,如上图。
以咱们线上的大作业来验证,能够看到在 Task 的数量散布方面,优化后每个 TaskManager 对立都有 6 个 Task,散布很平均;资源和负载方面,因为 Task 数量和类型的平衡,TaskManager 中 Network Buffer、数量和 CPU 使用率,从之前的有高有低变得比拟统一了。
最初咱们再来看下 HDFS 的压力问题。导致 HDFS 压力的起因有两个:
- 一个是随着业务的失常增长,HDFS 的负载也在逐渐增大;
- 另一方面大作业的部署上线也给 HDFS 带来了更大的刹时压力,大作业在制作 Checkpoint 期间会给 HDFS 带来大量的 RPC 申请,造成 RPC CallQuque 打满,影响其余作业读写 HDFS。
能够看出,HDFS 的压力次要来自于 NameNode。
咱们应答 NameNode 压力的办法也很间接。首先在底层部署了多组 HDFS NameNode,这样能够在底层资源上做到程度扩大。在引擎层,咱们提供了多组工作的平衡策略,来决定作业应用哪一组 NameNode。之后,再通过动静指定相干的门路,使作业真正应用不同的 NameNode。最终使得 HDFS 的服务能力可程度扩大,大作业的部署运行也不再影响其余作业。
除了下面的优化,咱们还做了一些其余方面的优化项来帮忙大作业更好地部署运行。咱们向用户凋谢了 Flink 的运行参数,用户能够针对本人作业的运行特点做个性化的调优、咱们限度了 Checkpoint 最小制作距离来防止不合理的高频 Checkpoint 制作影响集群上的其余作业。
三、Checkpoint 跨机房正本
在美团,咱们的 Flink 计算资源会有多机房交互,同一个项目组可能在不同的机房都有计算资源,因而作业有换机房启动的场景。基于过往的教训,咱们更偏向于应用 Retained Checkpoint 而不是 Savepoint 去重启作业。因为咱们次要应用 RocksDBStateBackend 的增量 Checkpoint,绝对于 Savepoint,它的制作和复原效率会更高,距离配置更短,从状态复原时须要回溯的数据也更少。此外,一些业务的重要作业要求具备更高的状态容灾能力,整个机房故障时也要可能切换到其余机房运行。最初,尽管咱们在 2020 年反对的 Savepoint 跨机房正本可能解决局部下面提到的问题,但因为业务越来越偏向于应用 Retained Checkpoint 来复原作业,这个性能也就无奈再满足需要。
从上述背景中咱们能够提炼出两个指标:
- 第一是所有作业都须要反对换机房从 Checkpoint 启动,这就须要在作业换机房启动前,将原机房的 Checkpoint 复制到指标机房;
- 第二是要害作业的 Checkpoint 须要反对跨机房容灾,这就意味着须要随着 Checkpoint 的一直实现,实时地将新产生的 Checkpoint 复制到备份机房,以避免原机房忽然故障。
通过剖析,咱们须要分两步来实现上述指标。
- 首先须要革新 Flink 引擎,使得 Checkpoint 满足 self-contained & relocatable 的条件,具体的概念和原理会在前面进行具体的介绍,当初能够先简略了解成只有满足这个条件,Checkpoint 的正本才是可用的,否则 Checkpoint 复制到其余中央也无奈从正本 Checkpoint 上复原作业;
- 其次须要实现 Checkpoint 正本制作的能力,即 Checkpoint Replicate Service。
Checkpoint self contained & relocatable 个性次要是为了让 Checkpoint 可能被挪动和复制。其实 Flink 在 1.11 版本曾经在 Savepoint 上反对了这个个性,但 Checkpoint 的状况比较复杂所以还没有反对。为了讲清楚这个问题,须要先理解 Checkpoint 目录构造。
首先是 checkpoints/{job-id} 目录,也称为 exclusive 目录,每个 Checkpoint 的 id 会对应一个 exclusive 目录,用于寄存每个 checkpoint 的独有的文件。其次是 shared 目录,用于寄存各个 Checkpoint 之间会共享的文件。最初是 taskowned 的目录,用于寄存永远不能由 JM 删除的文件。
每个 Checkpoint 都领有一个 metadata 文件,外面保留了 Checkpoint 元数据。
另外就是独有的状态文件和 Checkpoint 间共享的文件,如上图橘红色的线条示意,Checkpoint 文件中蕴含了对 exclusive 文件和 shared 文件的援用,通过 metadata 文件,就能找到一个 Checkpoint 所须要的所有文件。
至此,就能够阐明 Checkpoint 不是 self-contained 的含意了。如上图,在一些状况下一个 Checkpoint 的 metadata 文件援用了其余作业实例的 Checkpoint 的 shared 文件,这里不同的作业实例可能是同一个作业的代码屡次部署,每个 Flink job 的 id 对应一个作业实例。
这种状况常产生在作业从增量的 Checkpoint 复原时,如图 job1、job2、job3 是同一个作业代码屡次启动,job3 启动时从 job2 留下的 Checkpoint 复原,job2 启动时又是从 job1 留下的 Checkpoint 复原,后果就是造成一个长长的援用链。理论生产利用中这种状况十分常见,咱们常常会在调整作业参数、批改代码等操作之后,再从 retained Checkpoint 重启作业。
上述情况会带来两个问题:
- 第一,会导致 Retained Checkpoint 难以被清理。清理作业的 Retained Checkpoint 时要确保其中的文件不会再被其余 Checkpoint 援用,因而作业管理平台就须要保护 Checkpoint 中文件的援用计数,这无疑减少了平台治理的复杂度;
- 第二,会导致跨存储系统的 Checkpoint 正本不可用。比方咱们将 Checkpoint 从 HDFS1 复制到 HDFS2 上之后,因为跨 job 实例援用的文件在 HDFS2 下面并不存在,会导致复制过来的 Checkpoint 不可用。当然咱们也能够通过将所有被间接和间接援用的文件都复制到 HDFS2 上来防止这个问题,但这会极大减少正本制作的复杂度。
那么如何让 Checkpoint 实现 self-contained?
首先须要阐明的是,上述探讨尽管没有局限于某一个具体的 StateBackend,但实际上这种问题次要是 RocksDBStateBackend 的增量 Checkpoint 导致的,这是咱们在生产环境中默认应用的 Backend 和 Checkpoint 形式。所以咱们先来看看 RocksDBStateBackend 的增量 Checkpoint。
RocksDB 是一个基于 LSM Tree 的 KV 存储引擎,它会将长久化数据写到磁盘文件中。上图是一个 RocksDB 实例的文件目录构造,能够分为两类:
- 第一类是各种元数据比方 DB 的配置、version changelog 等,这类文件可能会在运行过程中一直被更新;
- 另一类是 SST 文件,这是 RocksDB 的数据文件,外面蕴含数据内容、索引等。这类文件一旦产生就不会再批改,只会随着数据的一直写入和 compassion 而一直地产生和删除。
开始制作 Checkpoint 时,RocksDBStateBackend 会先将数据刷盘,而后将 DB 实例中所有文件上传到指定的 Checkpoint storage 中,在咱们的场景里就是上传到 HDFS。如左图,如果在制作 Checkpoint3 时,DB 中有 123 这三个 SST 文件,这些 SST 文件因为不会被批改,就有可能会被后续的增量 Checkpoint 间接应用,因此会被放到 shared 目录下。而所有的 meta 文件都会被放到 exclusive 目录下。
如果过了一段时间,咱们开始基于 Checkpoint3 去制作前面的 Checkpoint5。这里会存在一个疑难,Checkpoint3 之后为什么是 Checkpoint5,而不是 Checkpoint4?这是因为这里可能会插入一个 Savepoint,而 Savepoint 要占用 Checkpoint 序号。
制作 Checkpoint5 时,DB 实例下的文件状况如上图左边所示,新增了 04.sst 缩小 01.sst。因为是增量 Checkpoint,这时候只须要将 metadata 文件和 04.sst 进行上传,而 02.sst 和 03.sst 只须要在 metadata 中记录文件援用,不须要反复上传。
那么制作 Checkpoint5 的时候是如何晓得 02.sst 和 03.sst 曾经上传过了?其实是通过一个 previous-sst-list 来记录的,外面记录了上次胜利的 Checkpoint 中所有 sst 文件信息,这样就能够一直基于 previous-sst-list 来进行增量的 Checkpoint 制作。
当作业基于增量 Checkpoint 复原时,如上图所示首先会依据 Checkpoint 信息去复原 previous-sst-list,而后去结构 RocksDB 实例,就是将 meta 文件和 sst 文件下载到对应的地位。这样即便是启动后的第一个 Checkpoint,也能够基于 restored Checkpoint 进行增量制作。
但这也就意味着新启动作业的 Checkpoint 可能会援用它所 restore 的 Checkpoint 中的文件,这正是后面提到的 Checkpoint 跨作业实例文件援用的根本原因。
分明了原理当前,改变形式也就变得清晰了,只须要在复原 previous-sst-list 之前,判断 restore Checkpoint 所属的作业是不是以后作业,如果是,就复原 previous-sst-list;如果不是,就阐明是新作业从 retained Checkpoint 启动,不复原 previous-sst-list。不复原的话,作业启动之后的第一个 Checkpoint 就会上传所有文件,再之后的 Checkpoint 才会基于后面的 Checkpoint 进行增量制作,这也就不会存在跨作业文件援用的问题了。
新的问题是如何晓得 restore 的 Checkpoint 所属的作业 ID 呢?上图形容了 Checkpoint metadata 构造,从中咱们无奈获取到作业 ID,那就要想方法把作业 ID 放进去。要害就在于红框中的 keyed state handle,它有多种不同的实现,每种不同的实现代表一种 Checkpoint 或者 Savepoint 的形式。
如果是 IncrementalRemoteKeyedStateHandle,阐明这是一个 RocksDBStatebackend 的增量 Checkpoint,所以只有给 IncrementalRemoteKeyedStatehandle 减少一个 jobID 字段,在制作 Checkpoint 时把 ID 字段也序列化到 meta 文件中,这样在 restore 的时候就能够晓得 Checkpoint 所属的 job ID 了。
再来看一下 relocatable 的问题,如上图,metdata 中会记录 exclusive 文件和 shared 文件的援用,其实就是记录了一个文件的绝对路径。而当 Checkpoint 被整个复制到其余目录时,这些援用就生效了。解决的办法也很简略,就是将绝对路径换成相对路径,这样就能依据 Checkpoint 的 exclusive 目录和文件的相对路径计算出文件的具体位置,这样 Checkpoint 就不怕被移来移去了。到这里,Checkpoint relocatable 的问题就解决了。
有了 self-contaiend & relocatable,咱们的 Checkpoint 就反对把正本制作到任何中央了,这样咱们也能力开始 Checkpoint 跨机房正本制作能力的反对。
最后在评估如何实现正本跨机房制作能力的时候,有几个备选计划:
- 第一就是像反对 Savepoint 正本制作一样,通过 distcp 对整个 Checkpoint 目录进行跨机房复制,这种形式在复制 Savepoint 时工作良好。但因为 distcp 的每个复制工作都会启动一个很重的 mapreduce 作业,而 Checkpoint 又比 Savepoint 频繁得多,而且 distcp 过程中作业还在运行,可能会一直有文件在复制过程中被删除,尽管能够配置为疏忽,但也会导致一些其余问题,因而不太适合;
- 第二就是编写一个 Checkpoint Replicate Service,连贯多个 HDFS 集群,专门用于 Checkpoint 的正本制作,这也是咱们最初抉择的形式;
- 第三是通过革新 Flink 引擎,在制作 Checkpoint 时间接将数据双写到两个 HDFS 集群上,然而这种形式无疑会给引擎减少不稳固的因素,不能为了应答小概率的机房故障而放弃作业运行的稳定性和效率;
- 最初就是革新 Flink 的 Checkpoint coordinator,使其在制作 Checkpoint 实现后触发一次 distcp,在 distcp 实现前不触发后续的 Checkpoint 制作。这种形式能够防止计划 1 中提到的 distcp 复制过程中文件变动的问题,但也是因为 distcp 效率起因而被放弃。
Checkpoint Replicate Service 的实现形式如下:每一个节点会持有多个 HDFS client,上图中以橘色和紫色来辨别两个 HDFS 集群和 HDFS client,别离是运行的集群和正本要制作的集群。在进行正本制作时,通过原集群的 HDFS client 读取文件,传给指标集群的 HDFS client,将文件写入指标集群。
如图咱们要对 Checkpoint5 进行正本制作时,首先读取 Checkpoint5 的 metadata 文件,解析出援用的所有文件失去 referencedFiles,再加上 metadata 文件,就是咱们要复制到指标集群的所有文件。通过 replicate service 将这些文件复制到指标集群的对应地位上,再加上后面介绍的 Checkpoint self-contained&relocatable 个性,咱们就在指标集群上失去了一个可用的 Checkpoint 正本。
这里有一个疑难,运行过程中 Checkpoint 一直实现,后续的正本制作是否也可能像 Checkpoint 一样进行增量制作,答案是必定的。
如图咱们假如制作 Checkpoint5 的正本时,指标集群对应地位上曾经存在了 Checkpoint3,这时咱们就能够依据 Checkpoint3 的副原本进行增量正本制作。先读取 metadata 文件,解析出援用文件列表 referenceFiles3,而后对这两个文件列表进行汇合运算,就晓得如何进行增量的正本制作了。
- 第一局部,只存在于 Checkpoint5 中的文件是新增的文件,须要复制到指标集群中去;
- 第二局部,只存在于 Checkpoint3 中的文件是在新 Checkpoint 过程中被删除的,因为正本集群只须要保留最新的 Checkpoint3,这部分文件会被间接删除;
- 最初是相交的局部,这些文件尽管被 Checkpoint5 所须要,但曾经被上传过了,因而能够疏忽。通过这种形式,咱们就能像 Checkpoint 增量制作一样去进行增量的正本制作。
咱们在理论工程实际上也取得了不少的教训:
- 第一点是须要革新 Flink 引擎的 metadata 解析过程。以后的实现会在解析过程中去拜访 metadata 文件所在的 HDFS,因为应用的不是咱们指定的 HDFS client,可能就会因 metadata 文件所在集群不是正本服务默认连贯的集群而导致解析失败。但其实这个拜访不是必须的,因而咱们在解析服务中将这个拜访间接移除。
- 第二点是思考缓存 metadata 的解析后果。生产上的大状态作业,一个 metadata 可能有几十 M (甚至几个 G),援用文件会达到几十万个,解析工夫可能须要分钟级别,而增量制作正本时会有屡次解析同一个 metadata,因而能够思考把解析后果缓存起来。
- 第三点是援用文件的复制和删除能够拆分成多个批次发送到多个节点上并行执行。这是因为大状态的作业一个 Checkpoint 复制的文件量可能就达到了 10TB+,很容易达到一台机器的网络瓶颈。
最初还有两个小倡议:
- 首先是运行中的作业正本制作失败时不须要进行重试,次要是思考到运行中的作业会一直有更新的 Checkpoint 产生,新 Checkpoint 复制胜利的意义要大于旧 Checkpoint 的复制;
- 此外,files-to-delete 的执行能够异步进行,即便失败了也只是多一些无用的文件残留,不影响正本的可用;只有保障最终有兜底策略进行清理就行。
四、状态稳定性相干其余优化
状态稳定性方面,咱们还进行了另外三个方面的优化:
第一,修复了一个 RocksDBStateBackend 的内存泄露问题,这个问题触发的条件是作业产生了 restart,并且 restart 之后会复用没有退出的 TM。同时 TM 的 heap 内存又很短缺,full gc 很不频繁。上图能够看到咱们定位的一个 TM 的内存变动,图中两次 restart 之后,该 TM 的内存都增长了 4G 左右,后续如果再产生 restart,就会导致 TM 内存超用。
导致这个问题得起因是 RocksDBStateBackend 清理过程中存在 bug,有一处 RocksObject 没有被清理,进而导致 restart 前 RocksDB 实例的 native 内存开释不了。
第二,Savepoint 之后的第一个增量 Checkpoint 会进化成全量 Checkpoint,会上传所有 RocksDB 文件。上图能够看到红框中是一个 Savepoint,而黄框中是紧跟其后的一个 Checkpoint,这个 Checkpoint 上传了将近 800G 文件,显著大于之后失常的 Checkpoint。
导致这个问题的起因是 Savepoint 制作实现后谬误清理了 previous-sst-list,咱们已将修复提交给社区,须要的同学能够降级到对应的版本。
最初,咱们反对了在触发 Checkpoint 时指定独自的超时工夫,做这个优化是因为大状态作业的 Savepoint 的制作工夫个别会远超增量 Checkpoint。上图能够看到,Savepoint 制作破费将近 7 分钟,增量的 Checkpoint 只须要一两秒,然而 Savepoint 却间接采纳 Checkpoint 的超工夫配置,导致咱们须要给 Checkpoint 配置一个可能笼罩 Savepoint 的超时工夫,这很不利于及早地裸露作业问题。
五、将来布局
将来,咱们会在以下三个方面持续做改良和稳定性建设:
- 稳定性方面,咱们会持续优化作业的断流工夫,晋升作业稳定性,摸索 k8s 来获取更好的资源隔离和资源扩缩容能力。
- 运行性能方面,咱们会对状态后端做优化来反对大状态作业更好地运行,并对反压做优化让作业在顶峰和恢复期运行得更好。
- 最初在资源效率方面,咱们会对作业的资源利用率进行评估和优化,来节俭资源和人力老本。
FFA 2021 直播回放 & 演讲 PDF 下载
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~