乐趣区

Flink作业问题分析和调优实践

摘要:本文次要分享 Flink 的 CheckPoint 机制、反压机制及 Flink 的内存模型。对这 3 局部内容的相熟是调优的前提,文章次要从以下几个局部分享:

  1. 原理分析
  2. 性能定位
  3. 经典场景调优
  4. 内存调优

Checkpoint 机制

1. 什么是 checkpoint

简略地说就是 Flink 为了达到容错和 exactly-once 语义的性能,定期把 state 长久化下来,而这一长久化的过程就叫做 checkpoint,它是 Flink Job 在某一时刻全局状态的快照。

当咱们要对分布式系统实现一个全局状态保留的性能时,传统计划会引入一个对立时钟,通过分布式系统中的 master 节点播送进来给每一个 slaves 节点,当节点接管到这个对立时钟时,它们就记录下本人以后的状态即可。

然而对立时钟的形式也存在肯定的问题,某一个 node 进行的 GC 工夫比拟长,或者 master 与 slaves 的网络在过后存在稳定而造成时钟的发送提早或者发送失败,都会造成此 slave 和其它的机器呈现数据不统一而最终导致脑裂的状况。如果咱们想要解决这个问题,就须要对 master 和 slaves 做一个 HA(High Availability)。然而,一个零碎越是简单,就越不稳固且保护老本越高。

Flink 是将 checkpoint 都放进了一个名为 Barrier 的流。

上图中就是一个 Barrier 的例子,从上游的第一个 Task 到上游的最初一个 Task,每次当 Task 通过图中蓝色的栅栏时,就会触发 save snapshot(快照)的性能。咱们用一个例子来简略阐明。

2. 实例剖析

这是一个简略的 ETL 过程,首先咱们把数据从 Kafka 中拿过去进行一个 trans 的转换操作,而后再发送到一个上游的 Kafka

此时这个例子中没有进行 chaining 的调优。所以此时采纳的是 forward strategy,也就是“一个 task 的输入只发送给一个 task 作为输出”,这样的形式,这样做也有一个益处就是如果两个 task 都在一个 JVM 中的话,那么就能够防止不必要的网络开销

设置 Parallism 为 2,此时的 DAG 图如下:

■ CK 的剖析过程

每一个 Flink 作业都会有一个 JobManager,JobManager 外面又会有一个 checkpoint coordinator 来治理整个 checkpoint 的过程,咱们能够设置一个工夫距离让 checkpoint coordinator 将一个 checkpoint 的事件发送给每一个 Container 中的 source task,也就是第一个工作(对应并行图中的 task1,task2)。

当某个 Source 算子收到一个 Barrier 时,它会暂停本身的数据处理,而后将本人的以后 state 制作成 snapshot(快照),并保留到指定的长久化存储中,最初向 CheckpointCoordinator 异步发送一个 ack(Acknowledge character — 确认字符),同时向本身所有上游算子播送该 Barrier 后复原本身的数据处理。

每个算子依照下面一直制作 snapshot 并向上游播送,直到最初 Barrier 传递到 sink 算子,此时快照便制作实现。这时候须要留神的是,上游算子可能是多个数据源,对应多个 Barrier 须要全副到齐才一次性触发 checkpoint,所以在遇到 checkpoint 工夫较长的状况时,有可能是因为数据对齐须要消耗的工夫比拟长所造成的。

■ Snapshot & Recover

如图,这是咱们的 Container 容器初始化的阶段,e1 和 e2 是刚从 Kafka 生产过去的数据,与此同时,CheckpointCoordinator 也往它发送了 Barrier。

此时 Task1 实现了它的 checkpoint 过程,成果就是记录下 offset 为 2(e1,e2),而后把 Barrier 往上游的算子播送,Task3 的输出为 Task1 的输入,当初假如我的这个程序的性能是统计数据的条数,此时 Task3 的 checkpoint 成果就是就记录数据数为 2(因为从 Task1 过去的数据就是 e1 和 e2 两条),之后再将 Barrier 往下播送,当此 Barrier 传递到 sink 算子,snapshot 就算是制作实现了。

此时 source 中还会源源不断的产生数据,并产生新的 checkpoint,然而此时如果 Container 宕机重启就须要进行数据的复原了。刚刚实现的 checkpoint 中 offset 为 2,count 为 2,那咱们就依照这个 state 进行复原。此时 Task1 会从 e3 开始生产,这就是 Recover 操作。

■ checkpoint 的注意事项

上面列举的 3 个留神要点都会影响到零碎的吞吐,在理论开发过程中须要留神:

3. 背压的产生及 Flink 的反压解决

在分布式系统中常常会呈现多个 Task 多个 JVM 之间可能须要做数据的替换,咱们应用生产者和消费者来阐明这个事件。

假如我当初的 Producer 是应用了无界 buffer 来进行存储,当咱们的生产者生产速度远大于消费者生产的速度时,生产端的数据会因为生产端的生产能力低下而导致数据积压,最终导致 OOM 的产生。

而就算应用了有界 buffer,同样消费者端的生产能力低下,当 buffer 被积满时生产者就会进行生产,这样还不能齐全地解决咱们的问题,所以就须要依据不同的状况进行调整。

Flink 也是通过有界 buffer 来进行不同 TaskManager 的数据交换。而且做法分为了动态控流和动静控流两种形式。

简略来说就是当生产者比消费者的 TPS 多时,咱们采纳溢写的形式,应用 batch 来封装好咱们的数据,而后分批发送进来,每次发送实现后再 sleep 一段时间,这个工夫的计算形式是 left(残余的数据)/ tps,然而这个做法是很难去预估零碎的状况的。

Flink 1.5 之前的流控是基于 TCP 的滑动窗口实现的,在之前的课程中曾经有提到过了。而 Flink 在 1.5 之后曾经弃用了该机制,所以这里不开展阐明。在此网络模型中,数据生成节点只能通过查看以后的 channel 是否可写来决定本人是否要向生产端发送数据,它对上游数据生产端的实在容量状况一概不知。这就导致,当生成节点发现 channel 曾经不可写的时候,有可能上游生产节点曾经积压了很多数据。

Credit-Based 咱们用上面的数据交换的例子阐明:

Flink 的数据交换大抵分为三种,一种是同一个 Task 的数据交换,另一种是 不同 Task 同 JVM 下的数据交换。第三种就是不同 Task 且不同 JVM 之间的替换。

同一个 Task 的数据交换就是咱们刚刚提到的 forward strategy 形式,次要就是防止了序列化和网络的开销。

第二种数据交换的形式就是数据会先通过一个 record Writer,数据在外面进行序列化之后再传递给 Result Partition,之后数据会通过 local channel 传递给另外一个 Task 的 Input Gate 外面,再进行反序列化,推送给 Record Reader 之后进行操作。

因为第三种数据交换波及到了不同的 JVM,所以会有肯定的网络开销,和第二种的区别就在于它先推给了 Netty,通过 netty 把数据推送到近程端的 Task 上。

■ Credit-Based

此时咱们能够看到 event1 曾经连带一个 backlog = 1 推送给了 TaskB,backlog 的作用其实只是为了让生产端感知到咱们生产端的状况

此时 event1 被 TaskB 接管后,TaskB 会返回一个 ack 给 TaskA,同时返回一个 credit = 3,这个是告知 TaskA 它还能接管多少条数据,Flink 就是通过这种相互告知的形式,来让生产者和消费者都能感知到对方的状态。

此时通过一段时间之后,TaskB 中的有界 buffer 曾经满了,此时 TaskB 回复 credit = 0 给 TaskA,此时 channel 通道将会进行工作,TaskA 不再将数据发往 TaskB。


此时再通过一段时间,TaskA 中的有界 Buffer 也曾经呈现了数据积压,所以咱们平时遇到的吞吐降落,解决提早的问题,就是因为此时整个零碎相当于一个停滞的状态,如图二示,所有的过程都被打上“X”,示意这些过程都曾经进行工作。

JVM 是一个非常复杂的零碎,当其内存不足时会造成 OOM,导致系统的解体。Flink 在拿到咱们调配的内存之后会先调配一个 cutoff 预留内存,保证系统的安全性。Netword buffers 其实就是对应咱们刚刚始终提到的有界 buffer,momery manager 是一个内存池,这部分的内存能够设置为堆内或者堆外的内存,当然在流式作业中咱们个别设置其为堆外内存,而 Free 局部就是提供给用户应用的内存块。

当初咱们假如调配给此 TaskManager 的内存是 8g。

  1. 首先是要砍掉 cutoff 的局部,默认是 0.25,所以咱们的可用内存就是 8gx0.75
  2. network buffers 占用可用内存的 0.1,所以是 6144×0.1
  3. 堆内 / 堆外内存为可用内存减去 network buffers 的局部,再乘以 0.8
  4. 给到用户应用的内存就是堆内存剩下的 0.2 那局部

其实真实情况是 Flink 是先晓得了 heap 内存的大小而后逆推出其它内存的大小。

Flink 作业的问题定位

1. 问题定位口诀

一压二查三指标,提早吞吐是外围
时刻关注资源量 , 排查首先看 GC。”

一压是指背压,遇到问题先看背压的状况,二查就是指 checkpoint,对齐数据的工夫是否很长,state 是否很大,这些都是和零碎吞吐密切相关的,三指标就是指 Flink UI 那块的一些展现,咱们的次要关注点其实就是提早和吞吐,系统资源,还有就是 GC logs。

  • 看反压:通常最初一个被压高的 subTask 的上游就是 job 的瓶颈之一。
  • 看 Checkpoint 时长:Checkpoint 时长能在肯定水平影响 job 的整体吞吐。
  • 看外围指标:指标是对一个工作性能精准判断的根据,提早指标和吞吐则是其中最为要害的指标。
  • 资源的使用率:进步资源的利用率是最终的目标。

■ 常见的性能问题

简略解释一下:

  1. 在关注背压的时候大家往往疏忽了数据的序列化和反序列化过程所造成的性能问题。
  2. 一些数据结构,比方 HashMap 和 HashSet 这种 key 须要通过 hash 计算的数据结构,在数据量大的时候应用 keyby 进行操作,造成的性能影响是十分大的。
  3. 数据歪斜是咱们的经典问题,前面再进行开展。
  4. 如果咱们的上游是 MySQL,HBase 这种,咱们都会进行一个批处理的操作,就是让数据存储到一个 buffer 外面,在达到某些条件的时候再进行发送,这样做的目标就是缩小和内部零碎的交互,升高网络开销的老本。
  5. 频繁 GC,无论是 CMS 也好,G1 也好,在进行 GC 的时候,都会进行整个作业的运行,GC 工夫较长还会导致 JobManager 和 TaskManager 没有方法准时发送心跳,此时 JobManager 就会认为此 TaskManager 失联,它就会另外开启一个新的 TaskManager
  6. 窗口是一种能够把有限数据切割为无限数据块的伎俩。比方咱们晓得,应用滑动窗口的时候数据的重叠问题,size = 5min 尽管不属于大窗口的领域,可是 step = 1s 代表 1 秒就要进行一次数据的解决,这样就会造成数据的重叠很高,数据量很大的问题。

2.Flink 作业调优


咱们能够通过一些数据结构,比方 Set 或者 Map 来联合 Flink state 进行去重。然而这些去重计划会随着数据量一直增大,从而导致性能的急剧下降,比方刚刚咱们剖析过的 hash 抵触带来的写入性能问题,内存过大导致的 GC 问题,TaskManger 的失联问题。




计划二和计划三也都是通过一些数据结构的伎俩去进行去重,有趣味的同学能够自行上来理解,在这里不再开展。

■ 数据歪斜

数据歪斜是大家都会遇到的高频问题,解决的计划也不少。

第一种场景是当咱们的并发度设置的比分区数要低时,就会造成下面所说的生产不平均的状况。

第二种提到的就是 key 散布不平均的状况,能够通过增加随机前缀打散它们的散布,使得数据不会集中在几个 Task 中。

在每个节点本地对雷同的 key 进行一次聚合操作,相似于 MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条雷同的 key,因为多条雷同的 key 都被聚合起来了。其余节点在拉取所有节点上的雷同 key 时,就会大大减少须要拉取的数据数量,从而也就缩小了磁盘 IO 以及网络传输开销。

■ 内存调优

Flink 的内存构造刚刚咱们曾经提及到了,所以咱们分明,调优的方面次要是针对 非堆内存 Network buffer,manager pool 和堆内存的调优,这些根本都是通过参数来进行管制的。

这些参数咱们都须要联合本身的状况去进行调整,这里只给出一些倡议。而且对于 ManagerBuffer 来说,Flink 的流式作业当初并没有过多应用到这部分的内存,所以咱们都会设置得比拟小,不超过 0.3。

堆内存的调优是对于 JVM 方面的,次要就是将默认应用的垃圾回收器改为 G1,因为默认应用的 Parallel Scavenge 对于老年代的 GC 存在一个串行化的问题,它的 Full GC 耗时较长,上面是对于 G1 的一些介绍,网上材料也十分多,这里就不开展阐明了。



总 结

本文带大家理解了 Flink 的 CheckPoint 机制,反压机制及 Flink 的内存模型和基于内存模型剖析了一些调优的策略。心愿能对大家有所帮忙,原文分享的视频回顾可移步下方链接:

https://ververica.cn/developers/flink-training-course-operation/

退出移动版