背景
在字节跳动的实时计算场景中,咱们有很多工作(数量 2k+)会间接服务于线上,其输入时延和稳定性会间接影响线上产品的用户体验,这类工作通常具备如下特点:
- 流量大,并发高(最大的工作并行度超过 1w)
- 拓扑相似于多流 Join,将各个数据源做整合输入给上游,不依赖 Checkpoint
- 没有应用 Checkpoint 并且对短时间内的小局部数据失落不敏感(如 0.5%),但对数据输入的持续性要求极高
在 Flink 现有的架构设计中,多流 Join 拓扑下单个 Task 失败会导致所有 Task 重新部署,耗时可能会继续几分钟,导致作业的输入断流,这对于线上业务来说是不可承受的。针对这一痛点,咱们提出单点复原的计划,通过对 network 层的加强,使得在机器下线或者 Task 失败的状况下,以短时间内故障 Task 的局部数据失落为代价,达成以下指标:
- 作业不产生全局重启,只有故障 Task 产生 Failover
- 非故障 Task 不受影响,失常为线上提供服务
解决思路
当初遇到这些问题的时候,咱们提出的想法是说能不能在机器故障下线的时候,只让在这台机器上的 Tasks 进行 Failover,而这些 Tasks 的上下游 Tasks 能恰好感知到这些失败的 Tasks,并作出对应的措施:
- 上游:将本来输入到 Failed Tasks 的数据间接抛弃,期待 Failover 实现后再开始发送数据。
- 上游:清空 Failed Tasks 产生的不残缺数据,期待 Failover 实现后再从新建设连贯并承受数据
依据这些想法咱们思考得出几个比拟关键点在于:
- 如何让上下游感知 Task Failed ?
- 如何清空上游不残缺的数据 ?
- Failover 实现后如何与上下游从新建设连贯 ?
基于以上思考咱们决定基于已有的 Network 层线程模型,批改上下游对于 Task Failed 后的解决逻辑,让非故障的 Tasks 短时间内实现对失败 Task 的感知操作,从而使得作业继续稳固地输入。
以后架构
注:咱们的实现基于 Flink-1.9,1.11 后的网络模型退出了 Unaligned Checkpoint 的个性,可能会有所变动。咱们先将 Flink 的上下游 Task 通信模型简略形象一下:
上下游 Task 感知彼此状态的逻辑,分三种状况思考:
- Task 因为逻辑谬误或 OOM 等起因 Fail,Task 本身会被动开释 network resources,给上游发送 channel close 信息,给上游发送 Exception。
- TaskManager 过程被 Yarn Kill,TCP 连贯会被操作系统失常敞开,上游 Netty Server 和上游 Netty Client 能够感知到连贯状态变动。
- 机器断电宕机,这个状况下操作系统不会正确敞开 TCP 连贯,所以 Netty 的 Server 和 Client 可能相互感知不到,这个时候咱们在 deploy 新的 Task 后须要做一些强制更新的解决。
能够看到,在大部分状况下,Task 是能够间接感知到上下游 Task 的状态变动。理解了根底的通信模型之后,咱们能够依照之前的解决思路持续深刻一下,别离在上游发送端和上游接收端能够做什么样改良来实现单点复原。
优化计划
依据咱们的解决思路,咱们来绘制一下单个 Task 挂了之后,整个 Job 的通信流程:
Map(1) 失败之后:
- 将 Map(1) 失败的信息告诉 Source(1)、Sink(1) 和 JobManager。
- JobManager 开始申请新的资源筹备 Failover,同时上游 Source(1) 和上游 Sink(1) 切断和 Map(1) 的数据通道,然而 Source(1) 和 Sink(1) 和其余 Task 的数据传输仍失常进行。
- Map(1)’ 被胜利调度,和上游建设连贯,JobManager 告诉 Sink(1) 和 Map(1)’ 建设连贯,数据传输通道被复原。
从这个流程,咱们能够将优化分为三个模块,别离为上游发送端、上游接收端和 JobManager。
上游发送端的优化
咱们再细化一下上游发送端的相干细节,
(1) Netty Server 收到 Client 发送的 Partition Request 后,在对应的 Subpartition 注册读取数据的 SubpartitionView 和 Reader。
(2) RecordWriter 发送数据到不同的 Subpartitions,每个 Subpartition 外部保护一个 buffer 队列,并将读取数据的 Reader 放入到 Readers Queue 中。(Task 线程)
(3) Netty 线程读取 Readers Queue,取出对应的 Reader 并读取对应 Subpartition 中的 buffer 数据,发送给上游。(Netty 线程)
咱们的冀望是上游发送端在感知到上游 Task 失败之后,间接将发送到对应 Task 的数据抛弃。那么咱们的改变逻辑,在这个示意图中,就是 Subpartition 通过 Netty Server 收到上游 Task Fail 的音讯后,将本人设置为 Unavailable,而后 RecordWriter 在发送数据到指定 Subpartition 时,判断是否可用,如果不可用则间接将数据抛弃。而当 Task Failover 实现后从新与上游建设连贯后,再将该 Subpartition 置为 Available,则数据能够从新被生产。
发送端的改变比较简单,得益于 Flink 外部对 Subpartition 的逻辑做了很好的形象,并且能够很容易的通过参数来切换 Subpartition 初始化的类型,咱们在这里参考 PipelinedSubpartition 的实现,根据上述的逻辑,实现了咱们本人的 Subpartition 和对应的 View。
上游接收端的优化
同样,咱们来细化一下上游接收端的细节:
认真来看,其实和上游的线程模型颇有类似之处:
(1) InputGate 初始化所有的 Channel 并通过 Netty Client 和上游 Server 建设连贯。
(2) InputChannel 接管到数据后,缓存到 buffer 队列中并将本人的援用放入到 Channels Queue 里。(Netty 线程)
(3) InputGate 通过 InputProcessor 的调用,从 Queue 里拉取 Channel 并读取 Channel 中缓存的 buffer 数据,如果 buffer 不残缺(比方只有半条 record),那么则会将不残缺的 buffer 暂存到 InputProcessor 中。(Task 线程)
这里咱们冀望上游接收端感知到上游 Task 失败之后,能将对应 InputChannel 的接管到的不残缺的 buffer 间接革除。不残缺的 buffer 存储在 InputProcessor 中,那么咱们如何让 InputProcessor 晓得哪个 Channel 呈现了问题?
简略的计划是说,咱们在 InputChannel 中间接调用 InputGate 或者 InputProcessor,做 buffer 清空的操作,然而这样引入一个问题,因为 InputChannel 收到 Error 是在 Netty 线程,而 InputProcessor 的操作是在 Task 线程,这样跨线程的调用突破了已有的线程模型,必然会引入锁和调用工夫的不确定性,减少架构设计的复杂度,并且因为 InputProcessor 会对每一条 record 都有调用,稍有不慎就会带来性能的降落。
咱们沿用已有的线程模型,Client 感知到上游 Task 失败的音讯之后告知对应的 Channel,Channel 向本人保护的 receivedBuffers 的开端插入一个 UnavailableEvent,并期待 InputProcessor 拉取并清空对应 Channel 的 buffer 数据。示意图如下所示,红色的模块是咱们新增的局部:
JobManager 重启策略的优化
JobManager 重启策略能够参考社区已有的 RestartIndividualStrategy,比拟重要的区别是,在从新 deploy 这个失败的 Task 后,咱们须要通过 ExecutionGraph 中的拓扑信息,找到该 Task 的上游 Tasks,并通过 Rpc 调用让上游 Tasks 和这个新的上游 Tasks 从新建设连贯。
这里实现有一个难点是如果 JobManager 去 update 上游的 Channel 信息时,旧的 Channel 对应的 buffer 数据还没有被革除怎么办?咱们这里通过新增 CachedChannelProvider 来解决这一逻辑:
如图所示,以 Channel – 1 为例,如果 JobManager 更新 Channel 的 Rpc 申请到来时 Channel 处于不可用状态,那么咱们间接利用 Rpc 申请中携带的 Channel 信息来从新初始化 Channel。以 Channel – 3 为例,如果 Rpc 申请到来时 Channel 依然可用,那么咱们将 Channel 信息临时缓存起来,等 Channel – 3 中所有数据生产结束后,告诉 CachedChannelProvider,而后再通过 CachedChannelProvider 去更新 Channel。
这里还须要特地提到一点,在字节跳动外部咱们实现了预留 TaskManager 的性能,当 Task 呈现 Failover 时,可能间接应用 TaskManager 的资源,大大节约了 Failover 过程数据失落的损耗。
实现中的关键点
整个解决的思路其实是比拟清晰的,置信大家也比拟容易了解,然而在实现中依然有很多须要留神的中央,举例如下:
- 下面提到 JobManager 发送的 Rpc 申请如果过早,那么会临时缓存下来期待 Channel 数据生产实现。而此时作业的状态是未知的,可能始终处于僵死的状态(比方卡在了网络 IO 或者 磁盘 IO 上),那么 Channel 中的 Unavailable Event 就无奈被 InputProcessor 生产。这个时候咱们通过设置一个定时器来做兜底策略,如果没有在定时器设置的工夫内实现 Channel 的从新初始化,那么该 Task 就会主动下线,走单点复原的 Failover 流程。
- 网络层作为 Flink 内线程模型最简单的一个模块,咱们为了缩小改变的复杂度和改变的危险,在设计上没有新增或批改 Netty 线程和 Task 线程之间通信的模型,而是借助于已有的线程模型来实现单点复原的性能。但在实现过程中因为给 Subpartition 和 Channel 减少了相似 isAvailable 的状态位,所以在这些状态的批改上须要特地留神线程可见性的解决,防止多线程读取状态不统一的状况产生。
收益
目前在字节跳动外部,单点复原性能曾经上线了 1000+ 作业,在机器下线、网络抖动的状况下,上游在上游作业做 Failover 的过程简直没有感知。
接下来咱们以上面这个作业拓扑为例,在作业失常运行时咱们手动 Kill 一个 Container,来看看不同并行度作业开启单点复原的成果:
咱们在 1000 和 4000 并行度的作业上进行测试,每个 slot 中有 2 个 Source 和 1 个 Joiner 共 3 个 Task,手动 Kill 一个 Container 后,从故障复原工夫和断流影响两个维度进行收益计算:
论断:能够看到,在 4000 个 Slot 的作业里,如果不开启单点复原,作业整体的 Failover 工夫为 81s,同时 对于上游服务 来说,上游服务断流 81s,这在实时服务线上的场景中显著是不可承受的。而开启了单点复原和预留资源后,Kill 1 个 Container 只会影响 4 个 Slot,且 Failover 的工夫只有 5s,同时对于上游服务来说,上游服务产生的数据缩小 4/4000= 千分之一,继续 5s,成果是十分不言而喻的。