作者:京东物流 康琪

本文综合Apache Flink原理与京东实时计算平台(JRC)的背景,具体讲述了大规模Flink流作业的调优办法。通过浏览本文,读者可理解Flink流作业的通用调优措施,并利用于生产环境。

写在后面

Apache Flink作为Google Dataflow Model的工业级实现,通过多年的倒退,现在曾经成为流式计算开源畛域的事实标准。它具备高吞吐、低时延、原生流批一体、高一致性、高可用性、高伸缩性的特色,同时提供丰盛的层级化API、工夫窗口、状态化计算等语义,不便用户疾速入门实时开发,构建实时计算体系。

古语有云,工欲善其事,必先利其器。要想让大规模、大流量的Flink作业高效运行,就必然要进行调优,并且了解其背地的原理。本文是笔者依据过往教训以及调优实际,联合京东实时计算平台(JRC)背景产出的面向业余人员的Flink流作业调优指南。次要蕴含以下四个方面:

  • TaskManager内存模型调优
  • 网络栈调优
  • RocksDB与状态调优
  • 其余调优项

本文基于Flink 1.12版本。浏览之前,倡议读者对Flink根底组件、编程模型和运行时有较深刻的理解。

01 *TaskManager内存模型调优

1.1 TaskManager内存模型与参数

目前的Flink TaskManager内存模型是1.10版本确定下来的,官网文档中给出的图示如下。在高版本Flink的Web UI中,也能够看到这张图。

图1 TaskManager内存模型

上面来看图谈话,分区域给出比官网文档具体一些的介绍。t.m.即为taskmanager. memory.前缀的缩写。

1.2 平台特定参数

除了TaskManager内存模型相干的参数之外,还有一些平台提供的其余参数,列举如下。

1.3 TM/平台参数与JVM的关系

上述参数与TaskManager JVM自身的参数有如下的对应关系:

  • -Xms | -Xmx → t. m. framework. heap. size + t. m. task. heap. size
  • -Xmn → -Xmx * apus. taskmanager. heap. newsize. ratio
  • -XX: Max Direct Memory Size → t. m. framework. off- heap. size + t. m. task. off- heap. size + $network
  • -XX: Max Metaspace Size → t. m. jvm- metaspace. size

另外,还能够通过env.java.opts.{jobmanager | taskmanager}配置项来别离设定JM和TM JVM的附加参数。

1.4 内存调配示例

上面以在生产环境某作业中运行的8C / 16G TaskManager为例,依据以上规定,手动计算各个内存分区的配额。留神有局部参数未采纳默认值。

t.m.process.size = 16384t.m.flink.size   = t.m.process.size * apus.memory.incontainer.available.ratio   = 16384 * 0.9 = 14745.6t.m.jvm-metaspace.size   = [t.m.process.size - t.m.flink.size] * apus.metaspace.incutoff.ratio   = [16384 - 14745.6] * 0.25 = 409.6$overhead   = MIN{t.m.process.size * t.m.jvm-overhead-fraction, t.m.jvm-overhead.max}   = MIN{16384 * 0.1, 1024} = 1024$network   = MIN{t.m.flink.size * t.m.network.fraction, t.m.network.max}   = MIN{14745.6 * 0.3, 5120} = 4423.68$managed   = t.m.flink.size * t.m.managed.fraction   = 14745.6 * 0.25 = 3686.4t.m.task.off-heap.size   = t.m.flink.size * apus.taskmanager.memory.task.off-heap.fraction   = 14745.6 * 0.01 = 147.4t.m.task.heap.size   = t.m.flink.size - $network - $managed - t.m.task.off-heap.size - t.m.framework.heap.size - t.m.framework.off-heap.size   = 14745.6 - 4423.68 - 3686.4 - 147.4 - 128 - 128 = 6232.12

与Web UI中展现的内存配额做比对,可发现齐全吻合。

图2 Web UI展现的内存分配情况

1.5 调优概览

了解TaskManager内存模型是发展调优的大前提,进行调优的主旨就是:正当调配,避免浪费,保障性能。上面先对比拟容易呈现问题的三块区域做简要的讲解。

1.对于工作堆外内存

平台方的解释是有些用户的作业须要这部分内存,但从Flink Runtime的角度讲,次要是批作业(如Sort-Merge Shuffle过程)会踊跃地应用它。绝对地,流作业很少波及这一部分,除非用户代码或用户援用的第三方库间接操作了DirectByteBuffer或Unsafe之类。所以个别能够优先保障堆内存,即尝试将
apus.t.m.task.off-heap.fraction再调小一些(如0.05),再察看作业运行是否失常。

2.对于托管内存

如果应用RocksDB状态后端,且状态数据量较大或读写较频繁,倡议适当减少t.m.managed.fraction,如0.2~0.5,可配合RocksDB监控决定。如果不应用RocksDB状态后端,可设为0,因为其余状态后端下的本地状态会存在TaskManager堆内存中。后文会具体解说RocksDB相干的调优项。

3.对于网络缓存

须要特地留神的是,网络缓存的占用量与并行度和作业拓扑无关,而与理论网络流量关系不大,所以不能简略地以作业的数据量来设置这一区域。粗略地讲,对简略拓扑,倡议以默认值启动作业,再察看该区域的利用状况并进行调整;对简单拓扑,倡议先适当调大t.m.network.fraction和max,保障不呈现IOException: Insufficient number of network buffers异样,而后再做调整。另外,请肯定不要把t.m.network.min和max设成相等的值,这样会间接疏忽fraction,而这种间接的设定往往并不迷信。下一节就来具体解说Flink网络栈的调优。

02 网络栈调优

2.1 网络栈和网络缓存

图3 Flink网络栈

Flink的网络栈构建在Netty的根底之上。如上图所示,每个TaskManager既能够是Server(发送端)也能够是Client(接收端),并且它们之间的TCP连贯会被复用,以缩小资源耗费。

图中的小色块就是网络缓存(NetworkBuffer),它是数据传输的最根本单位,以间接内存的模式调配,承载序列化的StreamRecord数据,且一个Buffer的大小就等于一个MemorySegment的大小(t.m.segment-size,默认32KB)。TM中的每个Sub-task都会创立网络缓存池(NetworkBufferPool),用于调配和回收Buffer。上面解说一下网络缓存的调配规定。

2.2 网络缓存调配规定

Flink流作业的执行打算用三层DAG来示意,即:StreamGraph(逻辑打算)→ JobGraph(优化的逻辑打算)→ ExecutionGraph(物理打算)。当ExecutionGraph真正被调度到TaskManager下面执行时,造成的是如下图所示的构造。

图4 Flink物理执行图构造

每个Sub-task都有一套用于数据交换的组件,输入侧称为ResultPartition(RP),输出侧称为InputGate(IG)。另外,它们还会依据并行度和上下游的DistributionPattern(POINTWISE或ALL_TO_ALL)划分为子块,别离称为ResultSubpartition(RS)和InputChannel(IC)。留神上下游RS和IC的比例是严格1:1的。网络缓存就是在ResultPartition和InputGate级别调配的,具体的调配规定是:

  • Buffer-RP = #RS + 1 && #Buffer-RS <= t.network.m.max-buffers-per-channel (10)

  • Buffer-IG = #IC * t.network.m.buffers-per-channel (2, exclusive) + t.network.m.floating-buffers-per-gate (8, floating)

<!---->

翻译一下:

  • 发送端RP调配的Buffer总数为RS的数量+1,且为了避免歪斜,每个RS可取得的Buffer数不能多于taskmanager.network.memory.max-buffers-per-channel(默认值10);
  • 接收端每个IC独享的Buffer数为taskmanager. network. memory. buffers- per- channel(默认值2),IG可额定提供的浮动Buffer数为taskmanager. network. memory. floating- buffers- per- gate(默认值8)。

多说一句,上图这套机制也是Flink实现Credit-based流控(反压)的根底,想想诊断反压时会看的**PoolUsage参数就明确了。反压是比拟根底的话题,这里就不再开展。

再反复上一节的那句话:网络缓存的占用量与并行度和作业拓扑无关,而与理论网络流量关系不大。特地地,因为ALL_TO_ALL散布(如Hash、Rebalance)会产生O(N^2)级别的RS和IC,所以对Buffer的需求量也就更大。当然,咱们根本不可能通过用肉眼看简单的拓扑图来计算Buffer数,所以最好的办法是疾速试错,来看一个例子。

2.3 网络缓存调优示例

本节以测试环境中的某作业(下称“示例作业”)为例。

该作业有54个8C / 16G规格的TM,并行度400,运行4330个Sub-tasks,且蕴含大量的keyBy操作。初始设定t.m.network.fraction = 0.2 & t.m.network.max = 3GB,报IOException: Insufficient network buffers异样;再次设定t.m.network.fraction = 0.3 & t.m.network.max = 5GB,作业失常启动,理论调配4.32GB,占用率73%~78%之间浮动(参见之前的Web UI图)。这个分配情况绝对于原作业的fraction = 0.5 & min = max = 8GB显然是更优的。

有的同学可能会问:闲暇的Network区域内存不能挪作他用吗?答案是否定的。在作业启动时,Network区域的全副内存都会初始化成Buffer,并按上一节所述的配额调配到RP和IG,Web UI中Netty Shuffle Buffers → Available一栏的Buffer根本能够认为被节约了。所以,当作业遇到瓶颈时,自觉增大网络缓存对吞吐量有害无益。

2.4 容易疏忽的缓存超时

网络缓存在发送端被Flush到上游有三种机会:Buffer写满、超时工夫到、遇到非凡标记(如Checkpoint Barrier)。之所以要设计缓存超时,是为了防止Buffer总是无奈写满导致上游解决提早。能够通过`
StreamExecutionEnvironment#setBufferTimeout`办法或者execution.buffer-timeout参数来设置缓存超时,默认100ms,个别无需更改。

图5 缓存的填充与发送

然而,思考大并行度、大量ALL_TO_ALL替换的作业,数据绝对扩散,每个ResultSubpartition的Buffer并不会很快填满,大量的Flush操作反而会无谓地占用CPU。此时能够思考适当增大缓存超时,升高Flush频率,可能无效升高CPU Usage。以前述作业为例,将缓存超时设为500ms,其余参数不变,稳固生产阶段TM的均匀CPU Usage升高了40%,成果拔群。当然这仍是以上游提早作为trade-off的,故时效性极敏感的作业不适用于此优化。

2.5 网络容错

平台采纳Flink on Kubernetes的部署形式,然而Kubernetes网络虚拟化(Calico、Flannel等)会损失网络性能,故对于大流量或简单作业,务必进步网络容错性。以下是三个相干的参数。

1.taskmanager.network.request-backoff.max

默认值10000(社区版)/ 60000(平台),示意上游InputChannel申请上游ResultSubpartition的指数退却最大时长,单位为毫秒。如果申请失败,会抛出
PartitionNotFoundException: Partition xx@host not found,应适当调大,如240000。留神此报错与Kafka Partition无关,切勿混同。

2.akka.ask.timeout

默认值10s(社区版)/ 60s(平台),示意Akka Actor的Ask RPC期待返回后果的超时。如果网络拥塞或者拓扑过于简单,就会呈现AskTimeoutException: Ask timed out on Actor akka://xx after xx ms的信息,应调大此值,如120s。留神长时间GC也可能导致此问题,留心排查。

3.heartbeat.timeout

默认值50000,示意JobManager和TaskManager之间心跳信号的发送/接管超时,单位为毫秒。与akka.ask.timeout同理,若呈现TimeoutException: Heartbeat of TaskManager with id xx timed out,倡议适当调大。

03 RocksDB与状态调优

3.1 Flink中的FRocksDB

图6 FRocksDB读写流程

Flink RocksDB状态后端采纳的是名为FRocksDB的分支版本,由Ververica保护。它的读写流程与原版基本相同,如上图所示,MemTable和BlockCache别离就是读写缓存和读缓存。特地地,因为Flink在每个Checkpoint周期都会将RocksDB的数据快照长久化到文件系统,所以不须要写预写日志(WAL)。

TM中的每个Slot都领有一个RocksDB实例,且传统形式下每个列族(CF)都对应一套MemTable、BlockCache和SST。而在Flink作业中申请的一个StateHandle——即Runtime Context# get... State (State Descriptor) ——就对应一个取StateDescriptor名称的列族。显然,同一作业内StateDescriptor的名称不能反复。

3.2 RocksDB托管内存机制

上述传统形式有个显著的毛病,即RocksDB的内存简直不受控(因为Flink并不限度用户能申请多少个StateHandle)。因而,Flink在1.10版本借助RocksDB 5.6+提出的WriteBufferManager和LRUCache协同机制,实现了全托管的RocksDB内存治理,如下图所示。

图7 全托管RocksDB内存治理

托管内存机制默认启用(state. backend. rocksdb. memory. managed = true),此时TM会将整块Managed Memory区域作为所有RocksDB实例共用的BlockCache,并通过WriteBufferManager将MemTable的内存耗费向BlockCache记账(即写入只有size信息的dummy块),从而BlockCache可能感知到全副的内存应用并施加限度,防止OOM产生。SST索引和Bloom Filter块则会进入BlockCache的高优先级区。须要留神,因为历史起因以及Iterator-pinned Blocks的存在,BlockCache在多数状况下不能严格限度内存,故有必要配置一些JVM Overhead作为兜底。

托管内存默认在各个Slot之间平均分配,用户也能够通过
s.b.r.memory.fixed-per-slot参数来为每个Slot手动设定托管内存配额,但个别不举荐。除此之外,可调整的两个参数如下。

  • s.b.r.memory.write-buffer-ratio:MemTable内存占托管内存的比例,默认值0.5;

<!---->

  • s.b.r.memory.high-prio-pool-ratio:高优先级区内存占托管内存的比例,默认值0.1。

残余的局部(默认0.4)就是留给数据BlockCache的配额。用户个别不须要更改它们,若作业状态特地重读或重写,可适当调整,但必须先保障托管内存短缺。

3.3 其余RocksDB参数

**
1.s.b.r.checkpoint.transfer.thread.num(默认1)**

每个有状态算子在Checkpoint时传输数据的线程数,增大此值会对网络和磁盘吞吐量有更高要求。个别倡议4~8,1.13版本中默认已改为4。

**
2.s.b.r.timer-service.factory(社区版默认ROCKSDB,平台默认HEAP)**

Timer相干状态存储的地位,蕴含用户注册的Timer和框架外部注册的Timer(如Window、Trigger)。若存储在堆中,则Timer状态做CP时无奈异步Snapshot,所以Timer很多的状况下存在RocksDB内更好。但美中不足的是,设置为ROCKSDB会有一个极偶发的序列化bug,导致无奈从Savepoint复原状态,若不能承受,倡议HEAP。

**
3.s.b.r.predefined-options(默认DEFAULT)**

社区提供的预设RocksDB调优参数集,有4种:DEFAULT、SPINNING_DISK_OPTIMIZED、
SPINNING_DISK_OPTIMIZED_HIGH_MEM、FLASH_SSD_OPTIMIZED(名称都很self-explanatory)。该参数容易疏忽,但强烈建议设置,比起默认值均有不错的性能收益。若单个Slot的状态量达到GB级别,且托管内存富余,设为SPINNING_DISK_OPTIMIZED_HIGH_MEM最佳。其余状况设为SPINNING_DISK_OPTIMIZED即可。

除了上述参数之外,原则上倡议遵循RocksDB Wiki的忠告("No need to tune it unless you see an obvious performance problem"),不再手动调整RocksDB高级参数(如s.b.r.{block | writebuffer | compaction}.*),除非呈现了托管内存机制无奈解决的问题。笔者也将局部高级参数列出如下,供参考。

图8 RocksDB高级参数

留神划线的项会被托管内存机制笼罩掉。如果通过谨慎思考,必须fine tune RocksDB,则须要将s.b.r.memory.managed设为false,同时用户要承当可能的OOM危险。

3.4 RocksDB监控 & 调优示例

在大状态作业正式上线之前,应关上一部分必要的RocksDB监控,察看是否有性能瓶颈。开启监控对状态读写性能有肯定影响,个别倡议如下6项:

  • s.b.r.metrics.{block-cache-capacity | block-cache-usage | cur-size-all-mem-tables | mem-table-flush-pending | num-running-flushes | num-running-compactions} = true

察看结束并解决问题后,请务必敞开它们。

图9 示例作业RocksDB监控

上图是示例作业的局部RocksDB Metrics图表,比拟失常。如果在稳固生产阶段,Flush和Compaction等重量级操作特地频繁,以至于图中的点连成线,个别就提醒RocksDB遇到了瓶颈。然而托管内存(即BlockCache)占用100%是失常景象,根本不用放心。

作为参考,该作业的增量Checkpoint大小在15G左右,每日摄入数十亿条状态数据,设置参数为:t. m. managed. fraction = 0.25(理论调配托管内存3.6G),s. b. r. predefined- options = SPINNING_ DISK_ OPTIMIZED,s. b. r. checkpoint. transfer. thread. num = 8。体现良好。而调优前作业的t. m. managed. fraction是默认的0.1,并且还对RocksDB高级参数做了一些无谓的批改,性能体现不佳。

3.5 状态TTL

RocksDB的状态TTL须要借助CompactionFilter实现,如下图所示。

图10 状态TTL原理

用户调用State Ttl Config# cleanupIn Rocksdb Compact Filter (N)办法,就能够设定在拜访状态N次后,更新CompactionFilter记录的工夫戳。当SST执行Compaction操作时,会依据该工夫戳查看状态键值对是否过期并删除掉。留神若拜访状态十分频繁,N值应适当调大(默认仅为1000),避免影响Compaction性能。

3.6 状态缩放与最大并行度

当作业的并行度扭转并从CP / SP复原时,就会波及状态缩放的问题。Flink内Keyed State数据以KeyGroup为单位组织,每个key通过两重Murmur Hash计算出它应该落在哪个KeyGroup中,同时每个Sub-task会调配到一个或多个KeyGroup。如下图所示,并行度变动只会影响KeyGroup的调配,能够将状态复原的过程近似化为程序读,提高效率。

图11 Keyed State的缩放

KeyGroup的数量与最大并行度雷同,而最大并行度扭转会导致作业无奈从CP / SP复原,所以要审慎设定。如果用户没有显式设置,就会依据以下规定来推算:

128 <= round Up To Power Of Two (operator Parallelism * 1.5) <= 32768

显然这并不平安。假如一个作业的并行度是200,推算的最大并行度是512;若将其并行度晋升至400,推算的最大并行度就会变成1024。所以总是举荐显式设置正当的最大并行度。

3.7 状态本地复原

状态本地复原默认敞开,能够通过设置
state.backend.local-recovery = true启用,但它只能作用于Aligned Checkpoint和Keyed State。启用后,每次CP产生两份快照:Primary(远端DFS)和Secondary(本地磁盘),且Secondary CP失败不会影响整个CP流程。作业复原时,首先尝试从无效的Secondary快照复原状态,能显著进步复原速度。如果Secondary快照不可用或不残缺,再fallback到Primary复原。如下图所示。

图12 状态本地复原

状态本地复原会引入额定的磁盘耗费:非增量CP会导致磁盘占用量翻倍;增量CP因为原生存在援用计数机制,不会多耗费空间,但因为数据比拟扩散,IOPS会相应减少。

04 其余调优项

4.1 Checkpoint相干

读者应该很相熟Checkpoint相干的配置项了,这里只提两点:一是checkpointTimeout依据作业个性设置,但不要过长,避免CP卡死覆盖作业自身的问题(如数据歪斜);二是肯定要设置
minPauseBetweenCheckpoints,防止算子始终处在CP过程中导致性能降落。示例作业的设置是:checkpointInterval = 3min / checkpointTimeout = 15min / minPauseBetweenCheckpoints = 1min。

另外,在大状态作业中碰到一种常见的景象,即Checkpoint全副ack之后卡在IN_PROGRESS,通过1~3分钟左右才会变成COMPLETED,如下图所示。

图13 Checkpoint卡在IN_PROGRESS状态的景象

这是因为TaskManager和HDFS之间通信不畅,或者是HDFS自身的压力导致数据块写入失败。而Flink必须保障Checkpoint的完整性,即重试到所有快照数据都胜利写入能力标记为COMPLETED。读者可在TM日志中发现形如Exception in createBlockOutputStream: Connect timed out的异样信息。

4.2 对象重用

对象重用在Flink配置中不是很起眼,但却相当有用。Flink在生成JobGraph时会将合乎肯定条件的算子组合成算子链(OperatorChain),所有chain在一起的Sub-task都会在同一个TM Slot中执行。而对象重用的实质就是在算子链内的上游算子中间接应用上游算子发射对象的浅拷贝。

图14 算子链示意

如图所示,若不启用对象重用,算子链中的虚线默认是CopyingChainingOutput(深拷贝)。通过ExecutionConfig#enableObjectReuse()或者pipeline.object-reuse = true启用对象重用,CopyingChainingOutput就会被替换为ChainingOutput(浅拷贝)。下图示出了两者之间的差别。

图15 是否重用对象的区别

DataStream API作业个别不倡议开启对象重用,除非非常确定不存在上游算子间接批改上游算子发射的对象的状况。并且DataStream API作业开启对象重用的收益不高,仅当其中有简单数据类型定义时,才会有20%左右的性能晋升。

然而SQL作业强烈建议开启,因为Flink SQL的类型零碎与DataStream API有差别,StringData、MapData等的深拷贝老本很大,并且Flink SQL的代码生成器可能保障可变对象的安全性。测试结果表明,对象重用的SQL作业均匀可取得翻倍的性能晋升。

4.3 别忘了JobManager

绝对于TaskManager,JobManager的配置往往比拟省心,仿佛轻易给个2C / 4G的配置就能够居安思危了。实际上JobManager外部保护的组件很多,如:作业DAG即{Job | Execution}Graph、SlotPool & Scheduler、<TaskManagerLocation, TaskExecutorGateway>的映射关系、CheckpointCoordinator、HeartbeatManager、ShuffleMaster、PartitionTracker等。

所以,如果作业Slot / Sub-task多,Checkpoint比拟大,或者是重Shuffle的批作业,肯定要适当减少JobManager的资源。最近作者部门有两个作业频繁呈现ResourceManager leader changed to new address null的异样信息,就是因为JM压力过大、GC工夫太长,导致ZooKeeper Session生效了。以示例作业的JM(4C / 8G)为例,其内存调配如下。

图16 示例作业JobManager内存调配

4.4 其余小Tips

  • 从Flink 1.12开始,默认的工夫语义变成了事件工夫。如果作业是解决工夫语义,能够禁用水印发射,即:Execution Config# set Auto WatermarkInterval (0)
  • 设置metrics.latency.interval(单位毫秒)能够周期性插入LatencyMarker,用于测量各算子及全链路的提早。解决LatencyMarker会占用资源,因而不须要特地频繁,60000左右比拟适合。
  • 用户注册的Timer会依照<key, timestamp>去重,并在外部以最小堆存储。所以要尽量避免onTimer风暴,即大量key的Timer在同一个工夫戳触发,造成性能抖动。
  • 如果须要替换Flink原生没有Serializer反对的数据类型(如HyperLogLog、RoaringBitmap),应在代码中注册自定义的Serializer,防止fallback到Kryo导致性能降落。
  • POJO类型反对状态Schema变动,增删字段不会影响复原(新增的字段会以默认值初始化)。然而切记不能批改字段的数据类型以及POJO的类名。

05 References

  • Flink官网文档:
  • https://nightlies.apache.org/flink/flink-docs-release-1.12/
  • Flink源码:
  • https://github.com/apache/flink
  • FRocksDB源码:
  • https://github.com/ververica/frocksdb