前言

微信搜【Java3y】关注这个朴实无华的男人,点赞关注是对我最大的反对!

文本已收录至我的GitHub:https://github.com/ZhongFuCheng3y/3y,有300多篇原创文章,最近在连载面试和我的项目系列!

最近始终在迁徙Flink相干的工程,期间也踩了些坑,checkpoint反压是其中的一个。

敖丙太菜了,Flink都不会,只能我本人来了。看敖丙只能图一乐,学技术还是得看三歪

平时敖丙黑我都没啥程度,拿点简略的货色来就说我不会。我是敖丙的头等黑粉

明天来分享一下 Flinkcheckpoint机制和背压原理,我置信通过这篇文章,大家在玩Flink的时候能够更加粗浅地理解Checkpoint是怎么实现的,并且在设置相干参数以及应用的时候能够更加地得心应手。

上一篇曾经写过Flink的入门教程了,如果还不理解Flink的同学能够先去看看:《Flink入门教程》

前排揭示,本文基于Flink 1.7

浅入浅出学习Flink的背压常识》

开胃菜

在解说FlinkcheckPoint背压机制之前,咱们先来看下checkpoint背压的相干根底,有助于前面的了解。

作为用户,咱们写好Flink的程序,上治理平台提交,Flink就跑起来了(只有程序代码没有问题),细节对用户都是屏蔽的。

实际上大抵的流程是这样的:

  1. Flink会依据咱们所写代码,会生成一个StreamGraph的图进去,来代表咱们所写程序的拓扑构造。
  2. 而后在提交的之前会将StreamGraph这个图优化一把(能够合并的工作进行合并),变成JobGraph
  3. JobGraph提交给JobManager
  4. JobManager收到之后JobGraph之后会依据JobGraph生成ExecutionGraph ExecutionGraphJobGraph 的并行化版本)
  5. TaskManager接管到工作之后会将ExecutionGraph 生成为真正的物理执行图

能够看到物理执行图真正运行在TaskManagerTransformSink之间都会有ResultPartitionInputGate这俩个组件,ResultPartition用来发送数据,而InputGate用来接收数据。

屏蔽掉这些Graph,能够发现Flink的架构是:Client->JobManager->TaskManager

从名字就能够看出,JobManager是干「治理」,而TaskManager是真正干活的。回到咱们明天的主题,checkpoint就是由JobManager收回。

Flink自身就是有状态的,Flink能够让你抉择执行过程中的数据保留在哪里,目前有三个中央,在Flink的角度称作State Backends

  • MemoryStateBackend(内存)
  • FsStateBackend(文件系统,个别是HSFS)
  • RocksDBStateBackend(RocksDB数据库)

同样的,checkpoint信息也是保留在State Backends

耗子屎

最近在Storm迁徙Flink的时候遇到个问题,我来简略形容一下背景。

咱们从各个数据源从荡涤出数据,借助Flink荡涤,组装成一个宽模型,最初交由kylin做近实时数据统计和展现,供经营实时查看。

迁徙的过程中,发现订单的topic生产提早了良久,初步狐疑是因为订单上游的并发度不够所影响的,所以调整了两端的并行度从新公布一把。

公布的过程中,零碎起来当前,再去看topic 生产提早的监控,就懵逼了。什么?怎么这么久了啊?丝毫没有降下去的意思。

这时候只能找组内的大神去寻求帮忙了,他排查一番后示意:这checkpoint始终没做上,都堵住了,从新公布的时候只会在上一次checkpoint开始,因为checkpoint长时间没实现掉,所以从新公布数据量会很大。这没啥好方法了,只能在这个堵住的环节下扔掉吧,预计是业务逻辑出了问题。

画外音:接管到订单的数据,会去溯源点击,判断该订单从哪个业务来,通过了哪些的业务,最终是哪块业务以致该订单成交。

画外音:内部真正应用时,依赖「订单后果HBase」数据

咱们认为点击的数据有可能会比订单的数据处理要慢一会,所以找不到的数据会距离一段时间轮询,又因为Flink提供State「状态」 和checkpoint机制,咱们把找不到的数据放入ListState按肯定的工夫轮询就好了(即使零碎因为重启或其余起因挂了,也不会把数据丢了)。

实践上只有没问题,这套计划是可行的。但当初后果通知咱们:订单数据报来了当前,一小批量数据始终在「订单后果HBase」没找到数据,就搁置到ListState上,而后来一条数据就去遍历ListState。导致的结果就是:

  • 数据生产不过去,造成反压
  • checkpoint始终没胜利

过后解决的形式就是把ListState清空掉,临时丢掉这一部分的数据,让数据追上进度。

起初排查后发现是上游在消息报字段上做了「手脚」,解析失败导致点击失落,造成这一连锁的结果。

排查问题的要害是了解Flink反压checkpoint的原理是什么样的,上面我来讲述一下。

反压

反压backpressure是流式计算中很常见的问题。它意味着数据管道中某个节点成为瓶颈,解决速率跟不上「上游」发送数据的速率,上游须要进行限速

下面的图代表了是反压极简的状态,说白了就是:上游解决不过去了,上游得慢点,要堵了!

最令人好奇的是:“上游是怎么告诉上游要发慢点的呢?

在后面Flink的基础知识解说,咱们能够看到ResultPartition用来发送数据,InputGate用来接收数据。

Flink在一个TaskManager外部读写数据的时候,会有一个BufferPool(缓冲池)供该TaskManager读写应用(一个TaskManager共用一个BufferPool),每个读写ResultPartition/InputGate都会去申请本人的LocalBuffer

以上图为例,假如上游解决不过去,那InputGateLocalBuffer是不是被填满了?填满了当前,ResultPartition是不是没方法往InputGate发了?而ResultPartition没法发的话,它本人自身的LocalBuffer 也迟早被填满,那是不是按照这个逻辑,始终到Source就不会拉数据了...

这个过程就犹如InputGate/ResultPartition都开了本人的有界阻塞队列,反正“我”就只能解决这么多,往我这里发,我满了就堵住呗,造成连锁反应始终堵到源头上...

下面是只有一个TaskManager的状况下的反压,那多个TaskManager呢?(毕竟咱们很多时候都是有多个TaskManager在为咱们工作的)

咱们再看回Flink通信的总体数据流向架构图:

从图上能够荡涤地发现:近程通信用的Netty,底层是TCP Socket来实现的。

所以,从宏观的角度看,多个TaskManager只不过多了两个Buffer(缓冲区)。

依照下面的思路,只有InputGateLocalBuffer被打满,Netty Buffer也迟早被打满,而Socket Buffer同样迟早也会被打满(TCP 自身就带有流量管制),再反馈到ResultPartition上,数据又又又发不进来了...导致整条数据链路都存在反压的景象。

当初问题又来了,一个TaskManagertask可是有很多的,它们都共用一个TCP Buffer/Buffer Pool ,那只有其中一个task的链路存在问题,那不导致整个TaskManager跟着遭殃?

Flink 1.5版本之前,的确会有这个问题。而在Flink 1.5版本之后则引入了credit机制。

从下面咱们看到的Flink所实现的反压,宏观上就是间接依赖各个Buffer是否满了,如果满了则无奈写入/读取导致连锁反应,直至Source端。

credit机制,实际上能够简略了解为以「更细粒度」去做流量管制:每次InputGate会通知ResultPartition本人还有多少的闲暇量能够接管,让ResultPartition看着发。如果InputGate通知ResultPartition曾经没有闲暇量了,那ResultPartition就不发了。

那实际上是怎么实现的呢?撸源码!

在撸源码之前,咱们再来看看上面物理执行图:实际上InPutGate下是InputChannelResultPartition下是ResultSubpartition(这些在源码中都有体现)。

InputGate(接收端解决反压)

咱们先从接收端看起吧。Flink接收数据的办法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput

随后定位到解决反压的逻辑:

final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();

进去getNextNonBlocked()办法看(抉择的是BarrierBuffer实现):

咱们就间接看null的状况,看下从初始化阶段开始是怎么搞的,进去getNextBufferOrEvent()

进去办法外面看到两个比拟重要的调用:

requestPartitions();result = currentChannel.getNextBuffer();

先从requestPartitions()看起吧,发现里边套了一层(从InputChannel下获取到subPartition):

于是再进requestSubpartition()(看RemoteInputChannel的实现吧)

在这里看起来就是创立Client端,而后接管上游发送过去的数据:

先看看client端的创立姿态吧,进createPartitionRequestClient()办法看看(咱们看Netty的实现)。

点了两层,咱们会进到createPartitionRequestClient()办法,看源码正文就能够清晰发现,这会创立TCP连贯并且创立出Client供咱们应用

咱们还是看null的状况,于是定位到这里:

进去connect()办法看看:

咱们就看看具体生成逻辑的实现吧,所以进到getClientChannelHandlers

意外发现源码还有个通信简要流程图给咱们看(哈哈哈):

好了,来看看getClientChannelHandlers办法吧,这个办法不长,次要判断了下要生成的client是否开启creditBased机制:

public ChannelHandler[] getClientChannelHandlers() {        NetworkClientHandler networkClientHandler =            creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :                new PartitionRequestClientHandler();        return new ChannelHandler[] {            messageEncoder,            new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),            networkClientHandler};    }

于是咱们的networkClientHandler实例是CreditBasedPartitionRequestClientHandler

到这里,咱们暂且就认为Client端曾经生成完了,再退回去getNextBufferOrEvent()这个办法,requestPartitions()办法是生成接收数据的Client端,具体的实例是CreditBasedPartitionRequestClientHandler

上面咱们进getNextBuffer()看看接收数据具体是怎么解决的:

拿到数据后,就会开始执行咱们用户的代码了调用process办法了(这里咱们先不看了)。还是回到反压的逻辑上,咱们如同还没看到反压的逻辑在哪里。重点就是receivedBuffers这里,是谁塞进去的呢?

于是咱们回看到Client具体的实例CreditBasedPartitionRequestClientHandler,关上办法列表一看,感觉就是ChannelRead()没错了:

    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        try {            decodeMsg(msg);        } catch (Throwable t) {            notifyAllChannelsOfErrorAndClose(t);        }    }

跟着decodeMsg持续往下走吧:

持续下到decodeBufferOrEvent()

持续下到onBuffer

所以咱们往onSenderBacklog上看看:

最初调用notifyCreditAvailableCredit往上游发送:

public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {        ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));    }

最初再画张图来了解一把(要害链路):

ResultPartition(发送端解决反压)

发送端咱们从org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager开始看起

于是咱们进去看fromConfiguration()

进去start()去看,随后进入connectionManager.start()(还是看Netty的实例):

进去看service.init()办法做了什么(又看到相熟的身影):

好了,咱们再进去getServerChannelHandlers()看看吧:

有了下面教训的咱们,间接进去看看它的办法,没错,又是channnelRead,只是这次是channelRead0

ok,咱们进去addCredit()看看:

reader.addCredit(credit)只是更新了下数量

public void addCredit(int creditDeltas) {        numCreditsAvailable += creditDeltas;    }

重点咱们看下enqueueAvailableReader() 办法,而enqueueAvailableReader()的重点就是判断Credit是否足够发送

isAvailable的实现也很简略,就是判断Credit是否大于0且有实在数据可发

writeAndFlushNextMessageIfPossible实际上就是往上游发送数据:

拿数据的时候会判断Credit是否足够,不足够抛异样:

再画张图来简略了解一下:

背压总结

「上游」的处理速度跟不上「上游」的发送速度,从而升高了处理速度,看似是很美妙的(毕竟看起来就是帮忙咱们限流了)。

但在Flink里,背压再加上Checkponit机制,很有可能导致State状态始终变大,拖慢实现checkpoint速度甚至超时失败。

checkpoint处理速度提早时,会加剧背压的状况(很可能大多数工夫都在解决checkpoint了)。

checkpoint做不上时,意味着重启Flink利用就会从上一次实现checkpoint从新执行(...

举个我实在遇到的例子:

我有一个Flink工作,我只给了它一台TaskManager去执行工作,在更新DB的时候发现会有并发的问题。

只有一台TaskManager定位问题很简略,略微定位了下判断:我更新DB的Sink 并行度调高了。

如果Sink的并行度设置为1,那必定没有并发的问题,但这样解决起来太慢了。

于是我就在Sink之前依据userId进行keyBy(雷同的userId都由同一个Thread解决,那这样就没并发的问题了)

看似很美妙,但userId存在热点数据的问题,导致上游数据处理造成反压。本来一次checkpoint执行只须要30~40ms反压后一次checkpoint须要2min+

checkpoint执行距离绝对频繁(6s/次),执行工夫2min+,最终导致数据始终解决不过去,整条链路的生产速度从原来的3000qps到背压后的300qps,始终堵住(程序没问题,就是处理速度大大降落,影响到数据的最终产出)。

最初

原本想着这篇文章把反压和Checkpoint都一起写了,但写着写着发现有点长了,那checkpoint开下一篇吧。

置信我,只有你用到Flink,迟早会遇到这种问题的,当初可能有的同学还没看懂,没关系,先点个赞????,珍藏起来,前面就用得上了。

参考资料:

  • https://www.cnblogs.com/ljygz/tag/flink/
  • https://ci.apache.org/projects/flink/flink-docs-release-1.11/

三歪把【大厂面试知识点】、【简历模板】、【原创文章】全副整顿成电子书,共有1263页!点击下方链接间接取就好了

  • GitHub
  • Gitee拜访更快

PDF文档的内容均为手打,有任何的不懂都能够间接来问我