共计 8013 个字符,预计需要花费 21 分钟才能阅读完成。
作者 | Robin
翻译 | 周凯波
Contentsquare 公司的 Robin 总结了他们将 Spark 工作迁徙到 Flink 遇到的 10 个『陷阱』。对于第一次将 Flink 用于生产环境的用户来说,这些教训十分有参考意义。
采纳新的框架总是会带来很多惊喜。当你花了几天工夫去排查为什么服务运行异样,后果发现只是因为某个性能的用法不对或者短少一些简略的配置。
在 Contentsquare[1],咱们须要一直降级数据处理工作,以满足越来越多的数据上的刻薄需要。这也是为什么咱们决定将用于会话 [2] 解决的小时级 Spark 工作迁徙到 Flink[3] 流服务。这样咱们就能够利用 Flink 更为强壮的解决能力,提供更实时的数据给用户,并能提供历史数据。不过这并不轻松,咱们的团队在下面工作了有一年工夫。同时,咱们也遇到了一些令人诧异的问题,本文将尝试帮忙你防止这些陷阱。
1. 并行度设置导致的负载歪斜
咱们从一个简略的问题开始:在 Flink UI 中考察某个作业的子工作时,对于每个子工作解决的数据量,你可能会遇到如下这种奇怪的状况。
每个子工作的工作负载并不平衡
这表明每个子工作的算子没有收到雷同数量的 Key Groups,它代表所有可能的 key 的一部分。如果一个算子收到了 1 个 Key Group,而另外一个算子收到了 2 个,则第二个子工作很可能须要实现两倍的工作。查看 Flink 的代码,咱们能够找到以下函数:
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}
其目标是将所有 Key Groups 分发给理论的算子。Key Groups 的总数由 maxParallelism 参数决定,而算子的数量和 parallelism 雷同。这里最大的问题是 maxParallelism 的默认值,它默认等于 operatorParallelism + (operatorParallelism / 2) [4]。如果咱们设置 parallelism 为 10,那么 maxParallelism 为 15(理论最大并发度值的上限是 128,下限是 32768,这里只是为了不便举例)。这样,依据下面的函数,咱们能够计算出哪些算子会调配给哪些 Key Group。
在默认配置下,局部算子调配了两个 Key Group,局部算子只调配了 1 个
解决这个问题非常容易:设置并发度的时候,还要为 maxParallelism 设置一个值,且该值为 parallelism 的倍数。这将让负载更加平衡,同时不便当前扩大。
2. 留神 mapWithState & TTL 的重要性
在解决蕴含有限多键的数据时,要思考到 keyed 状态保留策略(通过 TTL 定时器来在给定的工夫之后清理未应用的数据)是很重要的。术语『有限』在这里有点误导,因为如果你要解决的 key 以 128 位编码,则 key 的最大数量将会有个限度(等于 2 的 128 次方)。但这是一个微小的数字!你可能无奈在状态中存储那么多值,所以最好思考你的键空间是无界的,同时新键会随着工夫一直呈现。
如果你的 keyed 状态蕴含在某个 Flink 的默认窗口中,则将是平安的:即便未应用 TTL,在解决窗口的元素时也会注册一个革除计时器,该计时器将调用 clearAllState 函数,并删除与该窗口关联的状态及其元数据。
如果要应用 Keyed State Descriptor [5]来治理状态,能够很不便地增加 TTL 配置,以确保在状态中的键数量不会无限度地减少。
然而,你可能会想应用更简便的 mapWithState 办法,该办法可让你拜访 valueState 并暗藏操作的复杂性。尽管这对于测试和大量键的数据来说是很好的抉择,但如果在生产环境中遇到有限多键值时,会引发问题。因为状态是对你暗藏的,因而你无奈设置 TTL,并且默认状况下未配置任何 TTL。这就是为什么值得思考做一些额定工作的起因,如申明诸如 RichMapFunction 之类的货色,这将使你能更好的管制状态的生命周期。
3. 从检查点还原和从新分区
在应用大状态时,有必要应用增量检查点(incremental checkpointing)。在咱们的案例中,工作的残缺状态约为 8TB,咱们将检查点配置为每 15 分钟做一次。因为检查点是增量式的,因而咱们只能设法每 15 分钟将大概 100GB 的数据发送到对象存储,这是一种更快的形式并且网络占用较少。这对于容错成果很好,然而在更新工作时咱们也须要检索状态。罕用的办法是为正在运行的作业创立一个保留点(savepoint),以可移植的格局蕴含整个状态。
然而,在咱们的状况下,保留点可能须要几个小时能力实现,这使得每次公布版本都是一个漫长而麻烦的过程。相同,咱们决定应用保留检查点(Retained Checkpoints[6])。设置此参数后,咱们能够通过从上一个作业的检查点复原状态来放慢公布速度,而不用触发简短的保留点!
此外,只管保留点比检查点具备更高的可移植性,但您依然能够应用保留的检查点来更改作业的分区(它可能不适用于所有类型的作业,所以最好对其进行测试)。这与从保留点从新分区齐全一样,然而不须要经验 Flink 在 TaskManager 之间重新分配数据的漫长过程。当咱们尝试这样做时,大概花了 8 个小时才实现,这是不可继续的。侥幸的是,因为咱们应用的是 RocksDB 状态后端,因而咱们能够在这步中减少更多线程以进步其速度。这是通过将以下两个参数从 1 减少到 8 来实现的:
state.backend.rocksdb.checkpoint.transfer.thread.num:8
state.backend.rocksdb.thread.num:8
应用保留的检查点,并减少调配给 RocksDB 传输的线程数,能将公布和从新分区工夫缩小 10 倍!
4. 提前减少日志记录
这一点可能看起来很显著,但也很容易遗记。开发作业时,请记住它将运行很长时间,并且可能会解决意外的数据。产生这种状况时,你将须要尽可能多的信息来考察产生了什么,而不用通过再次回溯雷同的数据来重现问题。
咱们的工作是将事件汇总在一起,并依据特定规定进行合并。这些规定中的某些规定在大多数状况下性能还能够,然而当有数据歪斜时却要花费很长时间。当咱们发现工作卡住了 3 个小时,却不晓得它在做什么。仿佛只有一个 TaskManager 的 CPU 能够失常工作,因而咱们狐疑是特定数据导致咱们的算法性能降落。
最终解决完数据后,所有恢复正常,然而咱们不晓得从哪开始查看!这就是为什么咱们为这些状况增加了一些预防性日志的起因:在解决窗口时,咱们会测量破费的工夫。只有计算窗口所需的工夫超过 1 分钟,咱们就会记录下所有可能的数据。这对于精确理解导致性能降落的歪斜是十分有帮忙的,并且当再次发生这种状况时,咱们可能定位到合并过程解决慢的局部起因。如果收到的是反复的数据,则可能的确须要几个小时。当然,重要的是不要过多地记录信息,因为这会升高性能。因而,请尝试找到仅在异常情况下才显示信息的阈值。
5. 如何找出卡住的作业实际上在做什么
对上述问题的考察也使咱们意识到,咱们须要找到一种简略的办法,来定位作业疑似卡住时以后正在运行的代码段。侥幸的是,有一个简略的办法能够做到这一点!首先,您将须要配置 TaskManagers 的 JMX 以承受近程监督。在 Kubernetes 部署中,咱们能够通过三个步骤连贯到 JMX:
- 首先,将此属性增加到咱们的 flink-conf.yaml 中
env.java.opts: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099 -Djava.rmi.server.hostname=127.0.0.1"
- 而后,将本地端口 1099 转发到 TaskManager 的 pod 中的端口
kubectl port-forward flink-taskmanager-4 1099
- 最初,关上 jconsole
jconsole 127.0.0.1:1099
这使您能够轻松地在 JVM 上查看指标 TaskManager 的信息。对于卡住的作业,咱们以正在运行的惟一一个 TaskManager 为指标,剖析了正在运行的线程:
JConsole 向咱们展现了每个线程以后正在做什么
深入研究,咱们能够看到所有线程都在期待,除了其中一个(在下面的屏幕截图中已突出显示)。这使得咱们可能疾速发现作业是卡在哪个办法调用外面的,并轻松修复!
6. 将数据从一种状态迁徙到另一种状态的危险
依据你的理论状况,可能须要保留两个具备不同语义的不同状态描述符。例如,咱们通过 WindowContent 状态为进行中的会话累积事件,接着将解决后的会话挪动到称为 HistoricalSessions 的 ValueState 中。第二个状态为了避免前面须要用到会保留几天,直到 TTL 过期抛弃它为止。
咱们做的第一个测试运行良好:咱们能够发送额定的数据到已解决的会话,这将为雷同的键创立一个新窗口。在窗口的处理过程中,咱们会从 HistoricalSessions 状态中获取数据,以将新数据与旧会话合并,并且后果会话是历史会话的加强版本,这也正是咱们所冀望的。
在执行此操作时,咱们遇到过几次内存问题。通过几次测试后,咱们理解到 OOM 仅在将旧数据发送到 Flink 时才产生(即,发送数据的工夫戳早于其以后水印)。这使得咱们发现了以后解决形式中的一个大问题:当接管到旧数据时,Flink 将其与旧窗口合并,而旧窗口的数据仍在 WindowContent 状态内(这能够通过设置 AllowedLateness 实现)。而后后果窗口会与 HistoricalSessions 内容合并,该内容还蕴含旧的数据。最终咱们失去的是反复的事件,在同一会话中收到一些事件后,每个事件都将有数千条反复,从而导致了 OOM。
这个问题的解法非常简单:咱们心愿 WindowContent 在将其内容移至第二个状态之前主动革除。咱们应用了 Flink 的 PurgingTrigger 来达到这个目标,当窗口触发时,该音讯会发送一条革除状态内容的音讯。具体代码如下所示:
// Purging the window's content allows us to receive late events without merging them twice with the old session
val sessionWindows = keyedStream
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.allowedLateness(Time.days(7))
.trigger(PurgingTrigger.of(EventTimeTrigger.create()))
7. Reduce VS Process
如上所述,咱们对 Flink 的应用依赖于累积给定键的数据,并将所有这些数据合并在一起。这能够通过两种形式实现:
- 将数据存储在 ListState 容器中,期待会话完结,并在会话完结时将所有数据合并在一起
- 应用 ReducingState 在每个新事件达到时,将其与之前的事件合并
应用第一种还是第二种状态取决于你在 WindowedStream 上运行的性能:应用 ProcessWindowFunction 的 process 调用将应用 ListState,而应用 ReduceFunction 的 reduce 调用则将应用 ReducingState。
ReducingState 的长处非常明显:不存储窗口解决之前的所有数据,而是在单个记录中一直地对其进行聚合。这通常会导致状态更小,取决于在 reduce 操作期间会抛弃多少数据。对咱们来说,它在存储方面简直没有改善,因为与咱们为历史会话存储的 7 天数据相比,该状态的大小能够忽略不计。相同,咱们留神到通过应用 ListState 能够进步性能!
起因是:每次新事件到来时,间断的 reduce 操作都须要对数据进行反序列化和序列化。这能够在 RocksDBReducingState[7] 的 add 函数中看到,该函数会调用 getInternal[8],从而导致数据反序列化。
然而,当应用 RocksDB 更新 ListState 中的值,咱们能够看到没有序列化产生[9]。这要归功于 RocksDB 的合并操作能让 Flink 能够将数据进行追加而无需反序列化。
最初,咱们抉择了 ListState 办法,因为性能晋升有助于缩小提早,而存储的影响却很小。
8. 不要置信输出数据!
永远不要假如你的输出会像你冀望的那样。可能会呈现各种未知的状况,比方你的工作接管到了歪斜的数据、反复的数据、意外的峰值、有效的记录……总是往最坏的方面想,爱护你的作业免受这些影响。
让咱们疾速定义几个要害术语,供前面应用:
- “网页浏览(PV)事件”是咱们接管到的次要信息。当访问者在客户端加载 URL 以及 userId、sessionNumber 和 pageNumber 等信息时,就会触发它
- “会话”代表用户在不来到网站的状况下进行的所有互动的总和。它们是由 Flink 通过汇总 PV 事件和其余信息计算得出的
为了爱护咱们的工作,咱们已尽可能的减少前置过滤。咱们必须恪守的规定是,尽可能早地在流中过滤掉有效数据,以防止在中后期造成不必要的低廉操作。例如,咱们有一个规定,对于给定的会话,最多只能发送 300 个 PV 事件。每个 PV 事件都用一个递增的页码标记,以批示其在会话中的地位。当咱们在一个会话中接管到超过 300 个 PV 事件时,咱们能够通过以下办法来过滤它们:
- 计算一个给定窗口过期时的 PV 事件的数量
- 抛弃页码超过 300 的事件
第一个计划仿佛更牢靠,因为它不依赖于页码的值,然而咱们要在状态中累积 300 多个 PV 事件,而后能力排除它们。最终咱们抉择了第二个计划,该计划在谬误数据进入 Flink 时就进行了排除。
除了这些无状态过滤器之外,咱们还须要依据与每个键相干的指标排除数据。例如,每个会话的最大大小(以字节为单位)设置为 4MB。抉择此数字是出于业务起因,也是为了帮忙解决 Flink 中 RocksDB 状态的一个限度。事实上,如果 Flink 应用的 RocksDB API 的值超过 2 ^ 31 字节[10],那么它就会失败。因而,如果你像下面解释的那样应用一个 ListState,则须要确保你永远不要累积太多的数据。
当你只有对于新生产的事件的信息时,就不可能晓得会话的以后大小,这意味着咱们不能应用与解决页码雷同的技巧。咱们所做的只是将 RocksDB 中的每个键 (即每个会话) 的元数据存储在一个独自的 ValueState 中。此元数据在 keyBy 算子之后,但在开窗之前应用和更新。这意味着咱们能够爱护 RocksDB 防止在其 ListState 中积攒太多数据,因为基于此元数据,咱们晓得何时进行承受给定键的值!
9. 事件工夫的危险性
事件工夫解决在大多数状况下都很杰出,但你必须牢记:如果你解决晚到数据的办法很费时,可能会有一些蹩脚的结果。这个问题并不是间接与 Flink 无关,当某个内部组件往 Kafka topic 在写数据,而同时 Flink 正在生产这个 topic 的数据,如果这个内部组件呈现问题,就会产生数据晚到的景象。具体来说,当这个组件生产某些分区的速度比其余组件慢时。
这个组件(称为 Asimov)是一个简略的 Akka 流程序,该程序读取 Kafka topic,解析 JSON 数据,将其转换为 protobuf,而后将其推送到另一个 Kafka topic,这样 Flink 就能够解决这个 protobuf。Asimov 的输出在每个分区中应该是有序的,然而因为分区不是与输入 topic 一对一映射,因而当 Flink 最终解决音讯时,可能会呈现一些乱序。这样也没啥问题,因为 Flink 能通过提早水印来反对乱序。
问题是,当 Asimov 读取一个分区的速度比其余分区慢时:这意味着 Flink 的水印将随着最快的 Asimov 输出分区(而不是 Flink 的输出,因为所有分区都失常后退)后退,而慢的分区将收回具备更旧工夫戳的记录。这最终会导致 Flink 将这些记录视为迟来的记录! 这可能没问题,然而在咱们的作业中,咱们应用特定的逻辑来解决晚到的记录,须要从 RocksDB 获取数据并生成额定的音讯来执行上游的更新。这意味着,每当 Asimov 因为某种原因在几个分区上落后时,Flink 就须要做更多的工作。
在有 128 个分区的 topic 中,只有 8 个分区累积提早,从而导致 Flink 中的数据晚到
咱们发现了两种解决此问题的办法:
- 咱们能够依照与它的输入 topic 雷同的形式(通过 userId)对 Asimov 的输 入 topic 进行分区。这意味着,当 Asimov 滞后几个分区,Flink 输出中的相应分区也滞后,从而导致水印后退得更慢:
咱们决定不这样做,因为如果咱们在 Asimov 之前就有晚到的数据,这个问题依然会存在,这迫使咱们得以雷同的形式来给每个 topic 划分分区。但这在很多状况下是不能做的。
- 另一个解决方案依赖于攒批处理晚到的事件:如果咱们能够推延对晚到事件的解决,咱们能够确保每个会话最多产生一个更新,而不是每个事件产生一个更新。
咱们能够通过应用自定义触发器,以避免出现晚到事件达到时就触发窗口,从而实现第二种解决方案。正如你在默认的 EventTimeTrigger 实现中所看到的,晚到事件在特定状况下不会注册计时器。在咱们的计划中,无论如何咱们都会注册一个计时器,并且不会立刻触发窗口。因为咱们的业务需要容许以这种形式进行批量更新,所以咱们能够确保当上游呈现提早时,咱们不会生成数百个低廉的更新。
10. 防止将所有内容存储在 Flink 中
让咱们以一些广泛的观点来完结咱们的探讨:如果你的数据很大,并且不须要常常拜访,那么最好将其存储在 Flink 之外。在设计作业时,你心愿所有须要的数据都间接在 Flink 节点上(在 RocksDB 或内存中)可用。当然,这使得应用这种数据的形式更快,但当数据很大时,它会给你的作业减少很多老本。这是因为 Flink 的状态没有被复制,所以失落一个节点须要从检查点完全恢复。如果你常常须要向检查点存储写入数百 GB 数据,则检查点机制自身也很低廉。
如果对状态的拜访是性能需求中的要害局部,那么将它存储在 Flink 中相对是值得的。然而,如果你能够忍耐额定的提早,那么将它存储在具备复制性能和反对对给定记录快速访问的内部数据库中,这将为你节俭很多麻烦。对于咱们的用例,咱们抉择将 WindowContent 状态保留在 RocksDB 中,但咱们将 HistoricalSessions 数据移入了 Aerospike[11]中。因为状态较小,这使得咱们的 Flink 作业更快,更容易保护。咱们甚至还受害于这样一个事实:存储在 Flink 中的残余数据足够小,能够都放入内存,这让咱们无需应用 RocksDB 和本地 SSD。
结尾
总而言之,应用 Flink 是一次很棒的经验:只管有时咱们无奈了解框架的行为,但最终它总是有情理的。我强烈推荐订阅 Flink 用户邮件列表[12],从这个十分有用和敌对的社区取得额定的提醒!
更多参考链接,请移步作者原文:
https://engineering.contentsq…