关于java:Flink-反压-浅入浅出

5次阅读

共计 7178 个字符,预计需要花费 18 分钟才能阅读完成。

前言

微信搜【Java3y】关注这个朴实无华的男人,点赞关注是对我最大的反对!

文本已收录至我的 GitHub:https://github.com/ZhongFuCheng3y/3y,有 300 多篇原创文章,最近在连载 面试和我的项目 系列!

最近始终在迁徙 Flink 相干的工程,期间也踩了些坑,checkpoint 反压 是其中的一个。

敖丙太菜了,Flink都不会,只能我本人来了。看敖丙只能图一乐,学技术还是得看三歪

平时敖丙黑我都没啥程度,拿点简略的货色来就说我不会。我是敖丙的头等黑粉

明天来分享一下 Flinkcheckpoint 机制和 背压 原理,我置信通过这篇文章,大家在玩 Flink 的时候能够更加粗浅地理解 Checkpoint 是怎么实现的,并且在设置相干参数以及应用的时候能够更加地得心应手。

上一篇曾经写过 Flink 的入门教程了,如果还不理解 Flink 的同学能够先去看看:《Flink 入门教程》

前排揭示,本文基于 Flink 1.7

浅入浅出 学习 Flink 的背压常识》

开胃菜

在解说 FlinkcheckPoint 背压 机制之前,咱们先来看下 checkpoint背压 的相干根底,有助于前面的了解。

作为用户,咱们写好 Flink 的程序,上治理平台提交,Flink就跑起来了(只有程序代码没有问题),细节对用户都是屏蔽的。

实际上大抵的流程是这样的:

  1. Flink会依据咱们所写代码,会生成一个 StreamGraph 的图进去,来代表咱们所写程序的拓扑构造。
  2. 而后在提交的之前会将 StreamGraph 这个图 优化 一把(能够合并的工作进行合并),变成JobGraph
  3. JobGraph 提交给JobManager
  4. JobManager收到之后 JobGraph 之后会依据 JobGraph 生成ExecutionGraph ExecutionGraphJobGraph 的并行化版本)
  5. TaskManager接管到工作之后会将 ExecutionGraph 生成为真正的 物理执行图

能够看到 物理执行图 真正运行在 TaskManagerTransformSink 之间都会有 ResultPartitionInputGate这俩个组件,ResultPartition用来发送数据,而 InputGate 用来接收数据。

屏蔽掉这些 Graph,能够发现Flink 的架构是:Client->JobManager->TaskManager

从名字就能够看出,JobManager是干「治理」,而 TaskManager 是真正干活的。回到咱们明天的主题,checkpoint就是由 JobManager 收回。

Flink 自身就是 有状态 的,Flink能够让你抉择 执行过程中的数据 保留在哪里,目前有三个中央,在 Flink 的角度称作State Backends

  • MemoryStateBackend(内存)
  • FsStateBackend(文件系统,个别是 HSFS)
  • RocksDBStateBackend(RocksDB 数据库)

同样的,checkpoint信息也是保留在 State Backends

耗子屎

最近在 Storm 迁徙 Flink 的时候遇到个问题,我来简略形容一下背景。

咱们从各个数据源从荡涤出数据,借助 Flink 荡涤,组装成一个宽模型,最初交由 kylin 做近实时数据统计和展现,供经营实时查看。

迁徙的过程中,发现订单的 topic 生产提早了良久,初步狐疑是因为订单上游的 并发度 不够所影响的,所以调整了两端的并行度从新公布一把。

公布的过程中,零碎起来当前,再去看topic 生产提早的监控,就懵逼了。什么?怎么这么久了啊?丝毫没有降下去的意思。

这时候只能找组内的大神去寻求帮忙了,他排查一番后示意:这 checkpoint 始终没做上,都堵住了,从新公布的时候只会在上一次 checkpoint 开始,因为 checkpoint 长时间没实现掉,所以从新公布数据量会很大。这没啥好方法了,只能在这个堵住的环节下扔掉吧,预计是业务逻辑出了问题。

画外音:接管到订单的数据,会去 溯源 点击,判断该订单从哪个业务来,通过了哪些的业务,最终是哪块业务以致该订单成交。

画外音:内部真正应用时,依赖「订单后果 HBase」数据

咱们认为点击的数据有可能会比订单的数据处理要慢一会,所以找不到的数据会距离一段时间轮询,又因为 Flink 提供 State「状态」和checkpoint 机制,咱们把找不到的数据放入 ListState 按肯定的工夫轮询就好了(即使零碎因为重启或其余起因挂了,也不会把数据丢了)。

实践上只有没问题,这套计划是可行的。但当初后果通知咱们:订单数据报来了当前,一小批量数据 始终在「订单后果 HBase」没找到数据,就搁置到 ListState 上,而后来一条数据就去遍历ListState。导致的结果就是:

  • 数据生产不过去,造成 反压
  • checkpoint始终没胜利

过后解决的形式就是把 ListState 清空掉,临时丢掉这一部分的数据,让数据追上进度。

起初排查后发现是上游在消息报字段上做了「手脚」,解析失败导致点击失落,造成这一连锁的结果。

排查问题的要害是了解 Flink反压 checkpoint的原理是什么样的,上面我来讲述一下。

反压

反压 backpressure 是流式计算中很常见的问题。它意味着数据管道中某个节点成为瓶颈,解决速率跟不上「上游」发送数据的速率,上游须要进行限速

下面的图代表了是反压极简的状态,说白了就是:上游解决不过去了,上游得慢点,要堵了!

最令人好奇的是:“上游是怎么告诉上游要发慢点的呢?

在后面 Flink 的基础知识解说,咱们能够看到 ResultPartition 用来发送数据,InputGate用来接收数据。

Flink 在一个 TaskManager 外部读写数据的时候,会有一个 BufferPool(缓冲池)供该TaskManager 读写应用(一个 TaskManager 共用一个 BufferPool),每个读写ResultPartition/InputGate 都会去申请本人的LocalBuffer

以上图为例,假如上游解决不过去,那 InputGateLocalBuffer是不是被填满了?填满了当前,ResultPartition是不是没方法往 InputGate 发了?而 ResultPartition 没法发的话,它本人自身的 LocalBuffer 也迟早被填满,那是不是按照这个逻辑,始终到Source 就不会拉数据了 …

这个过程就犹如 InputGate/ResultPartition 都开了本人的 有界阻塞队列,反正“我”就只能解决这么多,往我这里发,我满了就堵住呗,造成连锁反应始终堵到源头上 …

下面是只有一个 TaskManager 的状况下的反压,那多个 TaskManager 呢?(毕竟咱们很多时候都是有多个 TaskManager 在为咱们工作的)

咱们再看回 Flink 通信的总体数据流向架构图:

从图上能够荡涤地发现:近程通信用的 Netty,底层是TCP Socket 来实现的。

所以,从宏观的角度看,多个 TaskManager 只不过多了两个Buffer(缓冲区)。

依照下面的思路,只有 InputGateLocalBuffer被打满,Netty Buffer也迟早被打满,而 Socket Buffer 同样迟早也会被打满(TCP 自身就带有流量管制),再反馈到 ResultPartition 上,数据又又又发不进来了 … 导致整条数据链路都存在反压的景象。

当初问题又来了,一个 TaskManagertask可是有很多的,它们都 共用 一个 TCP Buffer/Buffer Pool ,那只有其中一个task 的链路存在问题,那不导致整个 TaskManager 跟着遭殃?

Flink 1.5 版本 之前,的确会有这个问题。而在 Flink 1.5 版本 之后则引入了 credit 机制。

从下面咱们看到的 Flink 所实现的反压,宏观上就是间接依赖各个 Buffer 是否满了,如果满了则无奈写入 / 读取导致连锁反应,直至 Source 端。

credit 机制,实际上能够简略了解为以「更细粒度」去做流量管制:每次 InputGate 会通知 ResultPartition 本人还有多少的闲暇量能够接管,让 ResultPartition 看着发。如果 InputGate 通知 ResultPartition 曾经没有闲暇量了,那 ResultPartition 就不发了。

那实际上是怎么实现的呢?撸源码!

在撸源码之前,咱们再来看看上面 物理执行图 :实际上InPutGate 下是 InputChannelResultPartition 下是ResultSubpartition(这些在源码中都有体现)。

InputGate(接收端解决反压)

咱们先从接收端看起吧。Flink接收数据的办法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput

随后定位到解决反压的逻辑:

final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();

进去 getNextNonBlocked() 办法看(抉择的是 BarrierBuffer 实现):

咱们就间接看 null 的状况,看下从 初始化阶段 开始是怎么搞的,进去getNextBufferOrEvent()

进去办法外面看到两个比拟重要的调用:

requestPartitions();

result = currentChannel.getNextBuffer();

先从 requestPartitions() 看起吧,发现里边套了一层(从 InputChannel 下获取到subPartition):

于是再进 requestSubpartition()(看RemoteInputChannel 的实现吧)

在这里看起来就是创立 Client 端,而后接管上游发送过去的数据:

先看看 client 端的创立姿态吧,进 createPartitionRequestClient() 办法看看(咱们看 Netty 的实现)。

点了两层,咱们会进到 createPartitionRequestClient() 办法,看源码正文就能够清晰发现,这会创立 TCP 连贯并且创立出 Client 供咱们应用

咱们还是看 null 的状况,于是定位到这里:

进去 connect() 办法看看:

咱们就看看具体生成逻辑的实现吧,所以进到 getClientChannelHandlers

意外发现源码还有个通信简要流程图给咱们看(哈哈哈):

好了,来看看 getClientChannelHandlers 办法吧,这个办法不长,次要判断了下要生成的 client 是否开启 creditBased 机制:

public ChannelHandler[] getClientChannelHandlers() {
        NetworkClientHandler networkClientHandler =
            creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :
                new PartitionRequestClientHandler();
        return new ChannelHandler[] {
            messageEncoder,
            new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
            networkClientHandler};
    }

于是咱们的 networkClientHandler 实例是CreditBasedPartitionRequestClientHandler

到这里,咱们暂且就认为 Client 端曾经生成完了,再 退回去 getNextBufferOrEvent() 这个办法,requestPartitions()办法是生成接收数据的 Client 端,具体的实例是CreditBasedPartitionRequestClientHandler

上面咱们进 getNextBuffer() 看看接收数据具体是怎么解决的:

拿到数据后,就会开始执行咱们用户的代码了调用 process 办法了(这里咱们先不看了)。还是回到 反压 的逻辑上,咱们如同还没看到反压的逻辑在哪里。重点就是 receivedBuffers 这里,是谁塞进去的呢?

于是咱们回看到 Client 具体的实例 CreditBasedPartitionRequestClientHandler,关上办法列表一看,感觉就是ChannelRead() 没错了:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {decodeMsg(msg);
        } catch (Throwable t) {notifyAllChannelsOfErrorAndClose(t);
        }
    }

跟着 decodeMsg 持续往下走吧:

持续下到decodeBufferOrEvent()

持续下到onBuffer

所以咱们往 onSenderBacklog 上看看:

最初调用 notifyCreditAvailableCredit往上游发送:

public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
    }

最初再画张图来了解一把(要害链路):

ResultPartition(发送端解决反压)

发送端咱们从 org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager 开始看起

于是咱们进去看fromConfiguration()

进去 start() 去看,随后进入 connectionManager.start()(还是看Netty 的实例):

进去看 service.init() 办法做了什么(又看到相熟的身影):

好了,咱们再进去 getServerChannelHandlers() 看看吧:

有了下面教训的咱们,间接进去看看它的办法,没错,又是channnelRead,只是这次是channelRead0

ok,咱们进去 addCredit() 看看:

reader.addCredit(credit)只是更新了下数量

public void addCredit(int creditDeltas) {numCreditsAvailable += creditDeltas;}

重点咱们看下 enqueueAvailableReader() 办法,而enqueueAvailableReader() 的重点就是判断 Credit 是否足够发送

isAvailable的实现也很简略,就是判断 Credit 是否大于 0 且有实在数据可发

writeAndFlushNextMessageIfPossible 实际上就是往上游发送数据:

拿数据的时候会判断 Credit 是否足够,不足够抛异样:

再画张图来简略了解一下:

背压总结

「上游」的处理速度跟不上「上游」的发送速度,从而升高了处理速度,看似是很美妙的(毕竟看起来就是帮忙咱们限流了)。

但在 Flink 里,背压再加上 Checkponit 机制,很有可能导致 State 状态始终变大,拖慢实现 checkpoint 速度甚至超时失败。

checkpoint 处理速度提早时,会加剧背压的状况(很可能大多数工夫都在解决 checkpoint 了)。

checkpoint 做不上时,意味着重启 Flink 利用就会从上一次实现 checkpoint 从新执行(…

举个我实在遇到的例子:

我有一个 Flink 工作,我只给了它一台 TaskManager 去执行工作,在更新 DB 的时候发现会有并发的问题。

只有一台 TaskManager 定位问题很简略,略微定位了下判断:我更新 DB 的 Sink 并行度调高了。

如果 Sink 的并行度设置为 1,那必定没有并发的问题,但这样解决起来太慢了。

于是我就在 Sink 之前依据 userId 进行keyBy(雷同的 userId 都由同一个 Thread 解决,那这样就没并发的问题了)

看似很美妙,但 userId 存在热点数据的问题,导致上游数据处理造成 反压 。本来一次checkpoint 执行只须要 30~40ms 反压 后一次 checkpoint 须要2min+

checkpoint执行距离绝对频繁(6s/ 次 ),执行工夫2min+,最终导致数据始终解决不过去,整条链路的生产速度从原来的3000qps 到背压后的300qps,始终堵住(程序没问题,就是处理速度大大降落,影响到数据的最终产出)。

最初

原本想着这篇文章把反压和 Checkpoint 都一起写了,但写着写着发现有点长了,那 checkpoint 开下一篇吧。

置信我,只有你用到Flink,迟早会遇到这种问题的,当初可能有的同学还没看懂,没关系,先点个赞????,珍藏起来,前面就用得上了。

参考资料:

  • https://www.cnblogs.com/ljygz/tag/flink/
  • https://ci.apache.org/projects/flink/flink-docs-release-1.11/

三歪把【大厂面试知识点】、【简历模板】、【原创文章】全副整顿成电子书,共有 1263 页!点击下方 链接 间接取就好了

  • GitHub
  • Gitee 拜访更快

PDF 文档的内容 均为手打 ,有任何的不懂都能够间接 来问我

正文完
 0