共计 7178 个字符,预计需要花费 18 分钟才能阅读完成。
前言
微信搜【Java3y】关注这个朴实无华的男人,点赞关注是对我最大的反对!
文本已收录至我的 GitHub:https://github.com/ZhongFuCheng3y/3y,有 300 多篇原创文章,最近在连载 面试和我的项目 系列!
最近始终在迁徙 Flink
相干的工程,期间也踩了些坑,checkpoint
和 反压
是其中的一个。
敖丙太菜了,Flink
都不会,只能我本人来了。看敖丙只能图一乐,学技术还是得看三歪
平时敖丙黑我都没啥程度,拿点简略的货色来就说我不会。我是敖丙的头等黑粉
明天来分享一下 Flink
的 checkpoint
机制和 背压
原理,我置信通过这篇文章,大家在玩 Flink
的时候能够更加粗浅地理解 Checkpoint
是怎么实现的,并且在设置相干参数以及应用的时候能够更加地得心应手。
上一篇曾经写过 Flink
的入门教程了,如果还不理解 Flink
的同学能够先去看看:《Flink 入门教程》
前排揭示,本文基于 Flink 1.7
《浅入浅出 学习 Flink 的背压常识》
开胃菜
在解说 Flink
的checkPoint
和 背压
机制之前,咱们先来看下 checkpoint
和背压
的相干根底,有助于前面的了解。
作为用户,咱们写好 Flink
的程序,上治理平台提交,Flink
就跑起来了(只有程序代码没有问题),细节对用户都是屏蔽的。
实际上大抵的流程是这样的:
Flink
会依据咱们所写代码,会生成一个StreamGraph
的图进去,来代表咱们所写程序的拓扑构造。- 而后在提交的之前会将
StreamGraph
这个图 优化 一把(能够合并的工作进行合并),变成JobGraph
- 将
JobGraph
提交给JobManager
JobManager
收到之后JobGraph
之后会依据JobGraph
生成ExecutionGraph
(ExecutionGraph
是JobGraph
的并行化版本)TaskManager
接管到工作之后会将ExecutionGraph
生成为真正的物理执行图
能够看到 物理执行图
真正运行在 TaskManager
上Transform
和 Sink
之间都会有 ResultPartition
和InputGate
这俩个组件,ResultPartition
用来发送数据,而 InputGate
用来接收数据。
屏蔽掉这些 Graph
,能够发现Flink
的架构是:Client
->JobManager
->TaskManager
从名字就能够看出,JobManager
是干「治理」,而 TaskManager
是真正干活的。回到咱们明天的主题,checkpoint
就是由 JobManager
收回。
而 Flink
自身就是 有状态 的,Flink
能够让你抉择 执行过程中的数据 保留在哪里,目前有三个中央,在 Flink
的角度称作State Backends
:
- MemoryStateBackend(内存)
- FsStateBackend(文件系统,个别是 HSFS)
- RocksDBStateBackend(RocksDB 数据库)
同样的,checkpoint
信息也是保留在 State Backends
上
耗子屎
最近在 Storm
迁徙 Flink
的时候遇到个问题,我来简略形容一下背景。
咱们从各个数据源从荡涤出数据,借助 Flink
荡涤,组装成一个宽模型,最初交由 kylin
做近实时数据统计和展现,供经营实时查看。
迁徙的过程中,发现订单的 topic
生产提早了良久,初步狐疑是因为订单上游的 并发度
不够所影响的,所以调整了两端的并行度从新公布一把。
公布的过程中,零碎起来当前,再去看topic
生产提早的监控,就懵逼了。什么?怎么这么久了啊?丝毫没有降下去的意思。
这时候只能找组内的大神去寻求帮忙了,他排查一番后示意:这 checkpoint
始终没做上,都堵住了,从新公布的时候只会在上一次 checkpoint
开始,因为 checkpoint
长时间没实现掉,所以从新公布数据量会很大。这没啥好方法了,只能在这个堵住的环节下扔掉吧,预计是业务逻辑出了问题。
画外音:接管到订单的数据,会去 溯源 点击,判断该订单从哪个业务来,通过了哪些的业务,最终是哪块业务以致该订单成交。
画外音:内部真正应用时,依赖「订单后果 HBase」数据
咱们认为点击的数据有可能会比订单的数据处理要慢一会,所以找不到的数据会距离一段时间轮询,又因为 Flink
提供 State
「状态」和checkpoint
机制,咱们把找不到的数据放入 ListState
按肯定的工夫轮询就好了(即使零碎因为重启或其余起因挂了,也不会把数据丢了)。
实践上只有没问题,这套计划是可行的。但当初后果通知咱们:订单数据报来了当前,一小批量数据 始终在「订单后果 HBase」没找到数据,就搁置到 ListState
上,而后来一条数据就去遍历ListState
。导致的结果就是:
- 数据生产不过去,造成 反压
checkpoint
始终没胜利
过后解决的形式就是把 ListState 清空掉,临时丢掉这一部分的数据,让数据追上进度。
起初排查后发现是上游在消息报字段上做了「手脚」,解析失败导致点击失落,造成这一连锁的结果。
排查问题的要害是了解 Flink
的反压
和checkpoint
的原理是什么样的,上面我来讲述一下。
反压
反压 backpressure
是流式计算中很常见的问题。它意味着数据管道中某个节点成为瓶颈,解决速率跟不上「上游」发送数据的速率,上游须要进行限速
下面的图代表了是反压极简的状态,说白了就是:上游解决不过去了,上游得慢点,要堵了!
最令人好奇的是:“上游是怎么告诉上游要发慢点的呢?”
在后面 Flink
的基础知识解说,咱们能够看到 ResultPartition
用来发送数据,InputGate
用来接收数据。
而 Flink
在一个 TaskManager
外部读写数据的时候,会有一个 BufferPool
(缓冲池)供该TaskManager
读写应用(一个 TaskManager
共用一个 BufferPool
),每个读写ResultPartition/InputGate
都会去申请本人的LocalBuffer
以上图为例,假如上游解决不过去,那 InputGate
的LocalBuffer
是不是被填满了?填满了当前,ResultPartition
是不是没方法往 InputGate
发了?而 ResultPartition
没法发的话,它本人自身的 LocalBuffer
也迟早被填满,那是不是按照这个逻辑,始终到Source
就不会拉数据了 …
这个过程就犹如 InputGate/ResultPartition
都开了本人的 有界阻塞队列,反正“我”就只能解决这么多,往我这里发,我满了就堵住呗,造成连锁反应始终堵到源头上 …
下面是只有一个 TaskManager
的状况下的反压,那多个 TaskManager
呢?(毕竟咱们很多时候都是有多个 TaskManager
在为咱们工作的)
咱们再看回 Flink
通信的总体数据流向架构图:
从图上能够荡涤地发现:近程通信用的 Netty
,底层是TCP Socket
来实现的。
所以,从宏观的角度看,多个 TaskManager
只不过多了两个Buffer
(缓冲区)。
依照下面的思路,只有 InputGate
的LocalBuffer
被打满,Netty Buffer
也迟早被打满,而 Socket Buffer
同样迟早也会被打满(TCP 自身就带有流量管制),再反馈到 ResultPartition
上,数据又又又发不进来了 … 导致整条数据链路都存在反压的景象。
当初问题又来了,一个 TaskManager
的task
可是有很多的,它们都 共用 一个 TCP Buffer/Buffer Pool
,那只有其中一个task
的链路存在问题,那不导致整个 TaskManager
跟着遭殃?
在 Flink 1.5 版本
之前,的确会有这个问题。而在 Flink 1.5 版本
之后则引入了 credit
机制。
从下面咱们看到的 Flink
所实现的反压,宏观上就是间接依赖各个 Buffer
是否满了,如果满了则无奈写入 / 读取导致连锁反应,直至 Source
端。
而 credit
机制,实际上能够简略了解为以「更细粒度」去做流量管制:每次 InputGate
会通知 ResultPartition
本人还有多少的闲暇量能够接管,让 ResultPartition
看着发。如果 InputGate
通知 ResultPartition
曾经没有闲暇量了,那 ResultPartition
就不发了。
那实际上是怎么实现的呢?撸源码!
在撸源码之前,咱们再来看看上面 物理执行图 :实际上InPutGate
下是 InputChannel
,ResultPartition
下是ResultSubpartition
(这些在源码中都有体现)。
InputGate(接收端解决反压)
咱们先从接收端看起吧。Flink
接收数据的办法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput
随后定位到解决反压的逻辑:
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
进去 getNextNonBlocked()
办法看(抉择的是 BarrierBuffer
实现):
咱们就间接看 null
的状况,看下从 初始化阶段 开始是怎么搞的,进去getNextBufferOrEvent()
进去办法外面看到两个比拟重要的调用:
requestPartitions();
result = currentChannel.getNextBuffer();
先从 requestPartitions()
看起吧,发现里边套了一层(从 InputChannel
下获取到subPartition
):
于是再进 requestSubpartition()
(看RemoteInputChannel
的实现吧)
在这里看起来就是创立 Client
端,而后接管上游发送过去的数据:
先看看 client
端的创立姿态吧,进 createPartitionRequestClient()
办法看看(咱们看 Netty
的实现)。
点了两层,咱们会进到 createPartitionRequestClient()
办法,看源码正文就能够清晰发现,这会创立 TCP
连贯并且创立出 Client
供咱们应用
咱们还是看 null
的状况,于是定位到这里:
进去 connect()
办法看看:
咱们就看看具体生成逻辑的实现吧,所以进到 getClientChannelHandlers
上
意外发现源码还有个通信简要流程图给咱们看(哈哈哈):
好了,来看看 getClientChannelHandlers
办法吧,这个办法不长,次要判断了下要生成的 client
是否开启 creditBased
机制:
public ChannelHandler[] getClientChannelHandlers() {
NetworkClientHandler networkClientHandler =
creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :
new PartitionRequestClientHandler();
return new ChannelHandler[] {
messageEncoder,
new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
networkClientHandler};
}
于是咱们的 networkClientHandler
实例是CreditBasedPartitionRequestClientHandler
到这里,咱们暂且就认为 Client
端曾经生成完了,再 退回去 getNextBufferOrEvent()
这个办法,requestPartitions()
办法是生成接收数据的 Client
端,具体的实例是CreditBasedPartitionRequestClientHandler
上面咱们进 getNextBuffer()
看看接收数据具体是怎么解决的:
拿到数据后,就会开始执行咱们用户的代码了调用 process
办法了(这里咱们先不看了)。还是回到 反压 的逻辑上,咱们如同还没看到反压的逻辑在哪里。重点就是 receivedBuffers
这里,是谁塞进去的呢?
于是咱们回看到 Client
具体的实例 CreditBasedPartitionRequestClientHandler
,关上办法列表一看,感觉就是ChannelRead()
没错了:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {decodeMsg(msg);
} catch (Throwable t) {notifyAllChannelsOfErrorAndClose(t);
}
}
跟着 decodeMsg
持续往下走吧:
持续下到decodeBufferOrEvent()
持续下到onBuffer
:
所以咱们往 onSenderBacklog
上看看:
最初调用 notifyCreditAvailable
将Credit
往上游发送:
public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
}
最初再画张图来了解一把(要害链路):
ResultPartition(发送端解决反压)
发送端咱们从 org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager
开始看起
于是咱们进去看fromConfiguration()
进去 start()
去看,随后进入 connectionManager.start()
(还是看Netty
的实例):
进去看 service.init()
办法做了什么(又看到相熟的身影):
好了,咱们再进去 getServerChannelHandlers()
看看吧:
有了下面教训的咱们,间接进去看看它的办法,没错,又是channnelRead
,只是这次是channelRead0
。
ok,咱们进去 addCredit()
看看:
reader.addCredit(credit)
只是更新了下数量
public void addCredit(int creditDeltas) {numCreditsAvailable += creditDeltas;}
重点咱们看下 enqueueAvailableReader()
办法,而enqueueAvailableReader()
的重点就是判断 Credit
是否足够发送
isAvailable
的实现也很简略,就是判断 Credit
是否大于 0 且有实在数据可发
而 writeAndFlushNextMessageIfPossible
实际上就是往上游发送数据:
拿数据的时候会判断 Credit
是否足够,不足够抛异样:
再画张图来简略了解一下:
背压总结
「上游」的处理速度跟不上「上游」的发送速度,从而升高了处理速度,看似是很美妙的(毕竟看起来就是帮忙咱们限流了)。
但在 Flink
里,背压再加上 Checkponit
机制,很有可能导致 State
状态始终变大,拖慢实现 checkpoint
速度甚至超时失败。
当 checkpoint
处理速度提早时,会加剧背压的状况(很可能大多数工夫都在解决 checkpoint
了)。
当 checkpoint
做不上时,意味着重启 Flink
利用就会从上一次实现 checkpoint
从新执行(…
举个我实在遇到的例子:
我有一个
Flink
工作,我只给了它一台TaskManager
去执行工作,在更新 DB 的时候发现会有并发的问题。只有一台
TaskManager
定位问题很简略,略微定位了下判断:我更新 DB 的 Sink 并行度调高了。如果 Sink 的并行度设置为 1,那必定没有并发的问题,但这样解决起来太慢了。
于是我就在 Sink 之前依据
userId
进行keyBy
(雷同的 userId 都由同一个 Thread 解决,那这样就没并发的问题了)
看似很美妙,但 userId
存在热点数据的问题,导致上游数据处理造成 反压
。本来一次checkpoint
执行只须要 30~40ms
, 反压
后一次 checkpoint
须要2min+
。
checkpoint
执行距离绝对频繁(6s/ 次
),执行工夫2min+
,最终导致数据始终解决不过去,整条链路的生产速度从原来的3000qps
到背压后的300qps
,始终堵住(程序没问题,就是处理速度大大降落,影响到数据的最终产出)。
最初
原本想着这篇文章把反压和 Checkpoint
都一起写了,但写着写着发现有点长了,那 checkpoint
开下一篇吧。
置信我,只有你用到Flink
,迟早会遇到这种问题的,当初可能有的同学还没看懂,没关系,先点个赞????,珍藏起来,前面就用得上了。
参考资料:
- https://www.cnblogs.com/ljygz/tag/flink/
- https://ci.apache.org/projects/flink/flink-docs-release-1.11/
三歪把【大厂面试知识点】、【简历模板】、【原创文章】全副整顿成电子书,共有 1263 页!点击下方 链接 间接取就好了
- GitHub
- Gitee 拜访更快
PDF 文档的内容 均为手打 ,有任何的不懂都能够间接 来问我