乐趣区

关于消息中间件:Flink-113面向流批一体的运行时与-DataStream-API-优化

简介: 在 1.13 中,针对流批一体的指标,Flink 优化了大规模作业调度以及批执行模式下网络 Shuffle 的性能,以及在 DataStream API 方面欠缺无限流作业的退出语义。

本文由社区志愿者苗文婷整顿,内容起源自阿里巴巴技术专家高赟 (云骞) 在 5 月 22 日北京站 Flink Meetup 分享的《面向流批一体的 Flink 运行时与 DataStream API 优化》。文章次要分为 4 个局部:

  1. 回顾 Flink 流批一体的设计
  2. 介绍针对运行时的优化点
  3. 介绍针对 DataStream API 的优化点
  4. 总结以及后续的一些布局。

GitHub 地址
https://github.com/apache/flink
欢送大家给 Flink 点赞送 star~

1. 流批一体的 Flink

1.1 架构介绍

首先看下 Flink 流批一体的整体逻辑。Flink 在晚期的时候,尽管是一个能够同时反对流解决和批处理的框架,然而它的流解决和批处理的实现,不论是在 API 层,还是在底下的 Shuffle、调度、算子层,都是独自的两套。这两套实现是齐全独立的,没有特地严密的关联。

在流批一体这一指标的疏导下,Flink 当初曾经对底层的算子、调度、Shuffle 进行了对立的形象,以对立的形式向上反对 DataStream API 和 Table API 两套接口。DataStream API 是一种比拟偏物理层的接口,Table API 是一种 Declearetive 的接口,这两套接口对流和批来说都是对立的。

1.2 长处

  • 代码复用

    基于 DataStream API 和 Table API,用户能够写同一套代码来同时解决历史的数据和实时的数据,例如数据回流的场景。

  • 易于开发

    对立的 Connector 和算子实现,缩小开发和保护的老本。

  • 易于学习

    缩小学习老本,防止学习两套类似接口。

  • 易于保护

    应用同一零碎反对流作业和批作业,缩小保护老本。

1.3 数据处理过程

上面简略介绍 Flink 是怎么形象流批一体的,Flink 把作业拆成了两种:

  • 第一种类型的作业是解决有限数据的有限流的作业

    这种作业就是咱们平时所认知的流作业,对于这种作业,Flink 采纳一个规范流的执行模式,须要思考记录的工夫,通过 Watermark 对齐的形式推动整个零碎的工夫以达到一些数据聚合和输入的目标,两头通过 State 来保护中间状态。

  • 第二种类型的作业是解决无限数据集的作业

    数据可能是保留在文件中,或者是以其余形式提前保留下来的一个无限数据集。此时能够把无限数据集看作是有限数据集的一个特例,所以它能够天然的跑在之前的流解决模式之上,无需通过代码批改,能够间接反对。

    但这里可能会疏忽掉无限数据集数据无限的特点,在接口上还须要解决更细粒度的工夫、Watermark 等语义,可能会引入额定的复杂性。另外,在性能方面,因为是按流的形式解决,在一开始就须要把所有的工作拉起来,可能须要占用更多的资源,如果采纳的是 RocksDB backend,相当于是一个大的 Hash 表,在 key 比拟多的状况下,可能会有随机 IO 拜访的问题。

    然而在批执行模式下,能够通过排序的形式,用一种 IO 更加敌对的形式来实现整个数据处理的流程。所以说,批处理模式在思考数据无限的前提下,在调度、Shuffle、算子的实现上都给咱们提供了更大的抉择空间。

    最初,针对无限数据流,不论是采纳哪种解决模式,咱们心愿最终的处理结果都是统一的。

1.4 近期演进

Flink 在最近的几个版本中,在 API 和实现层都朝着流批一体的指标做了很多的致力。

  • 在 Flink 1.11 及之前:

    Flink 对立了 Table/SQL API,并引入了对立的 blink planner,blink planner 对流和批都会翻译到 DataStream 算子之上。此外,对流和批还引入了对立的 shuffle 架构。

  • 在 Flink 1.12 中:

    针对批的 shuffle 引入了一种新的基于 Sort-Merge 的 shuffle 模式,绝对于之前 Flink 内置的 Hash shuffle,性能会有很大晋升。在调度方面,Flink 引入了一种基于 Pipeline Region 的流批一体的调度器。

  • 在 Flink 1.13 中:

    欠缺了 Sort-Merge Shuffle,并对 Pipeline Region scheduler 在大规模作业下进行了性能优化。另外,后面提到过,对于无限流的两种执行模式,咱们预期它的执行后果应该是统一的。然而当初 Flink 在作业执行完结的时候还有一些问题,导致它并不能齐全达到统一。

    所以在 1.13 中,还有一部分的工作是针对无限数据集作业,怎么在流批,尤其是在流的模式下,使它的后果和预期的后果保持一致。

  • 将来的 Flink 1.14:

    须要持续实现无限作业一致性保障、批流切换 Source、逐渐废除 DataSet API 等工作。

2. 运行时优化

2.1 大规模作业调度优化

2.1.1 边的工夫复杂度问题

Flink 提交作业时会生成一个作业的 DAG 图,由多个顶点组成,顶点对应着咱们理论的解决节点,如 Map。每个解决节点都会有并发度,此前 Flink 的实现里,当咱们把作业提交到 JM 之后,JM 会对作业开展,生成一个 Execution Graph。

如下图,作业有两个节点,并发度别离为 2 和 3。在 JM 中理论保护的数据结构里,会别离保护 2 个 task 和 3 个 task,并由 6 条执行边组成,Flink 基于此数据结构来保护整个作业的拓扑信息。在这个拓扑信息的根底上,Flink 能够独自保护每个 task 的状态,当工作挂了之后以辨认须要拉起的 task。

如果以这种 all-to-all 的通信,也就是每两个上下游 task 之间都有边的状况下,上游并发 上游并发,将呈现 O(N^2) 的数据结构。这种状况下,内存的占用是十分惊人的,如果是 10k 10k 的边,JM 的内存占用将达到 4.18G。此外,作业很多的计算复杂度都是和边的数量相干的,此时的空间复杂度为 O(N^2) 或 O(N^3),如果是 10k * 10k 的边,作业首次调度工夫将达到 62s。

能够看出,除了初始调度之外,对于批作业来说,有可能是上游执行完之后继续执行上游,两头的调度复杂度都是 O(N^2) 或 O(N^3),这样就会导致很大的性能开销。另外,内存占用很大的话,GC 的性能也不会特地好。

2.1.2 Execution Graph 的对称性

针对 Flink 在大规模作业下内存和性能方面存在的一些问题,通过一些深入分析,能够看出上述例子中上下游节点之间是有肯定对称性的。

Flink 中“边”的类型能够分为两种:

  • 一种是 Pointwise 型 ,上游和上游是一一对应的,或者上游一个对应上游几个,不是全副相连的,这种状况下,边的数量根本是线性的 O(N), 和算子数在同一个量级。
  • 另一种是 All-to-all 型 ,上游每一个 task 都要和上游的每一个 task 相连,在这种状况下能够看出,每一个上游的 task 产生的数据集都要被上游所有的 task 生产,实际上是一个对称的关系。只有记住上游的数据集会被上游的所有 task 来生产,就不必再独自存两头的边了。

所以,Flink 在 1.13 中对上游的数据集和上游的节点别离引入了 ResultPartitionGroup 和 VertexGroup 的概念。尤其是对于 All-to-all 的边,因为上下游之间是对称的,能够把所有上游产生的数据集放到一个 Group 里,把上游所有的节点也放到一个 Group 里,在理论保护时不须要存两头的边的关系,只须要晓得上游的哪个数据集是被上游的哪个 Group 生产,或上游的哪个顶点是生产上游哪个 Group 的数据集。
通过这种形式,缩小了内存的占用。

另外,在理论做一些调度相干计算的时候,比方在批处理里,如果所有的边都是 blocking 边的状况下,每个节点都属于一个独自的 region。之前计算 region 之间的上下游关系,对上游的每个顶点,都须要遍历其上游的所有顶点,所以是一个 O(N^2) 的操作。
而引入 ConsumerGroup 之后,就会变成一个 O(N) 的线性操作。

2.1.3 优化后果

通过以上数据结构的优化,在 10k * 10k 边的状况下,能够将 JM 内存占用从 4.18G 放大到 12.08M, 首次调度工夫长从 62s 缩减到 12s。这个优化其实是十分显著的,对用户来说,只有降级到 Flink 1.13 就能够取得收益,不须要做任何额定的配置。

2.2 Sort-Merge Shuffle

另外一个优化,是针对批的作业在数据 shuffle 方面做的优化。个别状况下,批的作业是在上游跑完之后,会先把后果写到一个两头文件里,而后上游再从两头文件里拉取数据进行解决。

这种形式的益处就是能够节俭资源,不须要上游和上游同时起来,在失败的状况下,也不须要从头执行。这是批处理的罕用执行形式。

2.2.1 Hash Shuffle

那么,shuffle 过程中,两头后果是如何保留到两头文件,上游再拉取的?

之前 Flink 引入的是 Hash shuffle,再以 All-to-all 的边举例,上游 task 产生的数据集,会给上游的每个 task 写一个独自的文件,这样零碎可能会产生大量的小文件。并且不论是应用文件 IO 还是 mmap 的形式,写每个文件都至多应用一块缓冲区,会造成内存节约。上游 task 随机读取的上游数据文件,也会产生大量随机 IO。

所以,之前 Flink 的 Hash shuffle 利用在批处理中,只能在规模比拟小或者在用 SSD 的时候,在生产上能力比拟 work。在规模比拟大或者 SATA 盘上是有较大的问题的。

2.2.2 Sort Shuffle

所以,在 Flink 1.12 和 Flink 1.13 中,通过两个版本,引入了一种新的基于 Sort Merge 的 shuffle。这个 Sort 并不是指对数据进行 Sort,而是对上游所写的 task 指标进行 Sort。

大抵的原理是,上游在输入数据时,会应用一个固定大小的缓冲区,防止缓冲区的大小随着规模的增大而增大,所有的数据都写到缓冲区里,当缓冲区满时,会做一次排序并写到一个独自文件里,前面的数据还是基于此缓存区持续写,续写的一段会拼到原来的文件前面。最初单个的上游工作会产生一个两头文件,由很多段组成,每个段都是有序的构造。

和其余的批处理的框架不太一样,这边并不是基于一般的外排序。个别的外排序是指会把这些段再做一次独自的 merge,造成一个整体有序的文件,这样上游来读的时候会有更好的 IO 连续性,避免每一段每一个 task 要读取的数据段都很小。然而,这种 merge 自身也是要耗费大量的 IO 资源的,有可能 merge 的工夫带来的开销会远超过上游程序读带来的收益。

所以,这里采纳了另外一种形式:在上游来申请数据的时候,比方下图中的 3 个上游都要来读上游的两头文件,会有一个调度器对上游申请要读取的文件地位做一个排序,通过在下层减少 IO 调度的形式,来实现整个文件 IO 读取的连续性,避免在 SATA 盘上产生大量的随机 IO。

在 SATA 盘上,绝对于 Hash shuffle,Sort shuffle 的 IO 性能能够进步 2~8 倍。通过 Sort shuffle,使得 Flink 批处理根本达到了生产可用的状态,在 SATA 盘上 IO 性能能够把磁盘打到 100 多 M,而 SATA 盘最高也就能达到 200M 的读写速度。

为了放弃兼容性,Sort shuffle 并不是默认启用的,用户能够管制上游并发达到多少来启用 Sort Merge Shuffle。并且能够通过启用压缩来进一步提高批处理的性能。Sort Merge shuffle 并没有额定占用内存,当初占用的上游读写的缓存区,是从 framework.off-heap 中抽出的一块。

3. DataStream API 优化

3.1 2PC & 端到端一致性

为了保障端到端的一致性,对于 Flink 流作业来说,是通过两阶段提交的机制来实现的,联合了 Flink 的 checkpoint、failover 机制和内部零碎的一些个性。

大略的逻辑是,当我想做端到端的一致性,比方读取 Kafka 再写到 Kafka,在失常解决时会把数据先写到一个 Kafka 的事务里,当做 checkpoint 时进行 preCommit,这样数据就不会再丢了。

如果 checkpoint 胜利的话,会进行一次正式的 commit。这样就保障了内部零碎的事务和 Flink 外部的 failover 是统一的,比方 Flink 产生了 failover 须要回滚到上一个 checkpoint , 内部零碎中跟这一部分对应的事务也会被 abort 掉,如果 checkpoint 胜利了,内部事务的 commit 也会胜利。

Flink 端到端的一致性依赖于 checkpoint 机制。然而,在遇到无限流时,就会有一些问题:

  • 具备无限流的作业,task 完结之后,Flink 是不反对做 checkpoint 的,比方流批混合的作业,其中有一部分会完结,之后 Flink 就没方法再做 checkpoint,数据也就不会再提交了。
  • 在无限流数据完结时,因为 checkpoint 是定时执行的,不能保障最初一个 checkpoint 肯定能在解决完所有数据后执行,可能导致最初一部分数据无奈提交。

以上就会导致在流模式下,无限流作业流 / 批执行模式后果不统一。

3.2 反对局部 Task 完结后的 Checkpoint (进行中)

从 Flink 1.13 开始,反对在一部分 task 完结之后,也能做 checkpoint。checkpoint 实际上是保护了每个算子的所有 task 的状态列表。

在有一部分 task 完结之后,如下图的虚线局部。Flink 会把完结的 task 分为两种:

  • 如果一个算子的所有 subtask 都曾经完结了,就会为这个算子存一个 finished 标记。
  • 如果一个算子只有局部 task 完结,就只存储未完结的 task 状态。

    基于这个 checkpoint,当 failover 之后还是会拉起所有算子,如果辨认到算子的上一次执行曾经完结,即 finsihed = true,就会跳过这个算子的执行。尤其是针对 Source 算子来说,如果曾经完结,前面就不会再从新执行发送数据了。通过上述形式就能够保障整个状态的一致性,即便有一部分 task 完结,还是照样走 checkpoint。

Flink 也重新整理了完结语义。当初 Flink 作业完结有几种可能:

  • 作业完结:数据是无限的,无限流作业失常完结;
  • stop-with-savepoint,采一个 savepoint 完结;
  • stop-with-savepoint –drain,采一个 savepoint 完结,并会将 watermark 推动到正无穷大。

之前这边是两种不同的实现逻辑,并且都有最初一部分数据无奈提交的问题。

  • 对作业完结和 stop-with-savepoint –drain 两种语义,预期作业是不会再重启的,都会对算子调 endOfInput() , 告诉算子通过一套对立的形式做 checkpoint。
  • 对 stop-with-savepoint 语义,预期作业是会持续 savepoint 重启的,此时就不会对算子调 endOfInput()。后续会再做一个 checkpoint , 这样对于肯定会完结并不再重启的作业,能够保障最初一部分数据肯定能够被提交到内部零碎中。

4. 总结

在 Flink 的整个指标里,其中有一点是冀望做一个对无限数据集和有限数据集高效解决的对立平台。目前基本上曾经有了一个初步的雏形,不论是在 API 方面,还是在 runtime 方面。上面来举个例子阐明流批一体的益处。

针对用户的回流作业,平时是解决有限流的作业,如果某一天想改个逻辑,用 stop-with-savepoint 形式把流停掉,然而这个变更逻辑还须要追回到前两个月之内的数据来保障后果的一致性。此时,就能够启一个批的作业:作业不加批改,跑到提前缓存下来的输出数据上,用批的模式能够尽快地勘误前两个月的数据。另外,基于新的逻辑,应用后面保留的 savepoint,能够重启一个新的流作业。

能够看出,在上述整个流程中,如果是之前流批离开的状况,是须要独自开发作业进行数据勘误的。但在流批一体的状况下,能够基于流的作业天然的进行数据勘误,不须要用户再做额定的开发。

在 Flink 后续的版本中,还会进一步思考更多流批联合的场景,比方用户先做一个批的解决,对状态进行初始化之后,再切到有限流上的场景。当然,在流和批独自的性能上,也会做进一步的优化和欠缺,使得 Flink 在流批方面都是具备竞争力的计算框架。

版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

退出移动版