关于flink:FlinkRegular-Join-乱序问题分析与总结

0次阅读

共计 3988 个字符,预计需要花费 10 分钟才能阅读完成。

1. Join 算子数据处理流程

在流式解决数据的过程中,当本侧到来一条新的数据时,咱们无奈预测对侧是否在之后还会到来可能和该数据关联上的数据,且思考到时效性,咱们也无奈始终期待右侧所有数据到齐后再关联下发,因而 Flink 的解决形式是先将以后数据和对侧曾经到来过的所有数据(如果设置了 TTL,则是对应 TTL 时间段的数据)进行关联计算,并将关联后果下发,如果是 Outer Join,则还要思考关联不上须要下发一条对侧为 null 的数据。除此之外,咱们还要讲该数据记录在状态中,以不便后续对侧数据来做镜像的关联解决。

2. 状态视图

而 Regular Join 的状态构造依据是否蕴含惟一键以及是否是 Outer Join 共有六种:

Inner/Outer 状态 RocksDB 语义
JoinRecordStateViews.JoinKeyContainsUniqueKey Inner ValueState<RowData> get
JoinRecordStateViews.InputSideHasUniqueKey Inner MapState<RowData, RowData> seek
JoinRecordStateViews.InputSideHasNoUniqueKey Inner MapState<RowData, Integer> seek
OuterJoinRecordStateViews.JoinKeyContainsUniqueKey Outer ValueState<Tuple2<RowData, Integer>> get
OuterJoinRecordStateViews.InputSideHasUniqueKey Outer MapState<RowData, Tuple2<RowData, Integer>> seek
OuterJoinRecordStateViews.InputSideHasNoUniqueKey Outer MapState<RowData, Tuple2<Integer, Integer>> seek

Outer Join 的状态视图相比 Inner Join 会多记录一个要害属性:numOfAssociations,用来标识该条 record 关联上的对侧 record 的数量,这是因为 Outer Join 相比 Inner Join 有一个非凡的点,即没关联到数据也会下发,因而 Outer Join 须要通过 numOfAssociations 的值来确定以后该下发的关联数据是什么。以 Left Outer Join 为例,当 numOfAssociations 为 0 时,会下发 +I [left-record, null],前面 numOfAssociations 被更新为 1 时,则须要先下发 -D [left-record, null] 回撤之前的关联后果,再下发最新的关联后果 +I [left-record, right- record]。同理,当 numOfAssociations 被从 1 更新为 0 时(比方收到一条右流的回撤信息),则须要下发 -D [left-record, right-record] 回撤之前的关联后果,再下发最新后果 +I [left-record, null]。

状态乱序问题剖析

关联乱序的问题成因许多,比方屡次 shuffle,应用的 keyby 字段不统一,可能导致关联时雷同 join key 的数据 A、B 在 source 端按序被 Flink 框架生产解决,在 Join 算子前被散发到了不同的 subtask,B 先被解决完,反而先达到了 Join 算子等等,本文不会枚举所有的乱序场景及根因,而是聚焦 Regular Join 算子状态视图带来的乱序问题及剖析。

在状态层面,Outer Join 和 Inner Join 对状态应用区别不大,因而下文咱们间接对 Inner Join 进行探讨,且该探讨后果可间接利用到 Inner Join 上。

参考上述表格,咱们发现只有 JoinContainsUniqueKey 是 ValueState,每个 Join Key 对应一条记录,因而不会有乱序问题,但其隐含的业务语义是一对一关联,在理论生产环境中并不通用,即无奈将所有业务作业革新为 JoinContainsUniqueKey。

InputHasUniqueKey 和 InputHasNoUniqueKey 底层应用的都是 MapState,家喻户晓,MapState 并不提供工夫语义上保序的机制,即无奈确保先来的数据会被先读取到,因而从底层设计来看,在一对多、甚至多对多关联场景下是十分可能呈现先来的数据被后关联的状况的,但两者是否真正会造成业务视角层面的乱序则须要进一步剖析。

首先,在理论生产环境中,业务视角最常碰到的是 InputHasNoUniqueKey 而非 InputHashUniqueKey 导致的乱序,但单单看 MapState 咱们并不能。咱们先剖析一下 InputHasUniqueKey 和 InputHasNoUniqueKey 的区别。

两者最大的区别在于,InputHasUniqueKey 对于雷同主键的数据只会记录一条数据,它的 addRecord 解决语义本质上等价于 update,即起初的更新数据会笼罩前一条数据,而 InputHasNoUniqueKey 则没有这个限度,那么在有主键的数据更新场景(而在流式解决 changelog 的思路下这种场景十分常见)下,InputHasUniqueKey 只会记录一条最新值,而 InputHasNoUniqueKey 的状态视图里则会保留两条数据。如果咱们冀望的是一对一官网,当对侧流到来数据进行关联时候,针对 InputHasUniqueKey 咱们仍旧能够确保这个语义,但在 InputHasNoUniqueKey 状况下,这个关联就会被转变成一对多,即便在关联后咱们进行排序去重,也可能会看到乱序的后果因为 MapState 不提供保序,所以先到的数据可能被后发了。

针对这种乱序状况,最常见的解决形式有两种:一种是在源表申明 Unique Key,但这种形式并不能提供强有力的保障,因为 Unique Key 的推导不是时常正确的,比方开启 Mini Batch 时,Flink SQL 会失落所有源表的 Unique Key 信息;另一种是关联前进行去重,这种形式绝对前者是一种更好的形式,因为它既提供了 Unique Key 的保障(partition by 的字段会被辨认为主键),又确保了不会下发过期数据,目前也是最常见驳回的解决 Unique Key 失落导致的乱序问题。

另一种会在业务视角观测到乱序的场景则与数据处理的粒度无关,这类场景比拟少见。假如咱们有左右流,以订单表关联订单商品详情表(关联键为订单 ID),其中左表的主键是订单 ID,右表主键是订单 ID + 商品 ID,左右的状态视图为 JoinKeyContainsUniqueKey,右流为 InputSideHasUniqueKey,一个订单对应多个商品,因而这是一个一对多关联场景。而关联后如果上游的数据处理粒度为订单 + 商品,因为雷同主键只会有一条数据的限度,因而不会观测到乱序,但如果关联后上游以订单为粒度解决数据,比方取该订单的最初一个商品,那就乱序了,两者的区别在于 InputSideHasUniqueKey 的状态视图能够保障主键内不乱序(反正也只有一条),而因为 MapState 的存在无奈保障主键间不乱序。而这种状况还是有解法的,仍旧是引入排序去重算子,让上游依据事件工夫或具备业务语义的字段进行一次排序。如果遇到右流在推导过程中失落了 Unique Key 信息,最坏状况下要在关联前后各引入一次排序去重算子。

最初咱们思考更简单的多对多场景,依据下面的剖析,咱们曾经得悉 InputHasNoUniqueKey 能够通过排序去重算子将其转为 InputSideHasUniqueKey,因而为了简化解决,咱们间接假如左右流的状态视图均为 InputSideHasUniqueKey,看起来咱们仍旧能采取下面提到的思路,但理论执行的解决老本更高,只能按左右流主键的组合粒度来排序或解决能力防止乱序问题。

3. 解决思路

下面的解决思路其实都是通过引入一次排序去重算子(rownumber)来解决乱序的问题,这种解法在业务上的确能防止,但笔者认为这并非解决 Regular Join 乱序的最好办法,有以下起因:

  • row_number 语法较为简单,对用户的开发成本高;
  • row_number 的引入并非是基于用户自身的业务需要,而是基于对 Flink 框架设计不欠缺的一种补救,须要用户对 Flink 的状态有较深的了解,认知老本过高;
  • row_number 是一种绕过伎俩,并不能根治 Flink MapState 不保序这个问题,更多是基于 case 积攒的一种解决伎俩,是一种业务侧的解决形式。

而从 Flink 框架开发者的角度来看,更好的思路还是从根本上解决 MapState 的乱序问题,一层从算子层做:比方存储数据时带上数据到来的工夫戳(即解决工夫),在读取数据时读完后按工夫戳进行一次排序,该形式的长处是:革新简略,只需在算子层改下状态描述符、存状态时减少一个工夫字段以及一段排序逻辑,毛病也很显著,每次关联都须要做一次排序,这是一种非必要开销。

另一种则是从状态层做,即确保 MapState 自身就提供了乱序的机制,因为笔者目前对状态和 RocksDB 不太理解,暂不对此的工作思路开展剖析,业界有许多相近的思路,读者可自行浏览理解。

4. 总结

在本文,笔者首先简略介绍了 Join 算子的解决流程以及 Join 算子的状态视图,着重剖析了状态问题导致的乱序问题,以及在什么场景下会造成理论业务侧视角的乱序,并提供了目前业务侧的解决形式以及从框架底层的通用解决思路。限于篇幅起因,本文一些内容可能有误,或形容不够精确之处,欢送斧正,或对该问题有任何不同见解也欢送探讨,可分割 mukingdo@gmail.com。

正文完
 0