Flink 并行度:
优先级:算子层面 > 环境层面 > 客户端层面 > 零碎层面
Operator Level(操作算子层面)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
复制代码
- operators、data sources、data sinks 都能够调用 setParallelism()办法来设置 parallelism
Execution Environment Level(执行环境层面)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
复制代码
- 在 ExecutionEnvironment 外头能够通过 setParallelism 来给 operators、data sources、data sinks 设置默认的 parallelism;如果 operators、data sources、data sinks 本人有设置 parallelism 则会笼罩 ExecutionEnvironment 设置的 parallelism
Client Level(客户端层面)
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
复制代码
或者
try {PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {e.printStackTrace();
}
复制代码
- 应用 CLI client,能够在命令行调用是用 - p 来指定,或者 Java/Scala 调用时在 Client.run 的参数中指定 parallelism
System Level(零碎层面)
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
复制代码
- 能够在 flink-conf.yaml 中通过 parallelism.default 配置项给所有 execution environments 指定零碎级的默认 parallelism
Flink 数据流图简介
1.1 Flink 作业的逻辑视图
在大数据畛域,词频统计(WordCount)程序就像是一个编程语言的 HelloWorld 程序,它展现了一个大数据引擎的根本标准。麻雀虽小,五脏俱全,从这个样例中,咱们能够一窥 Flink 设计和运行原理。
如图 1 所示,程序分为三大部分,第一局部读取数据源(Source),第二局部对数据做转换操作(Transformation),最初将转换后果输入到一个目的地(Sink)。代码中的办法被称为算子(Operator),是 Flink 提供给程序员的接口,程序员须要通过这些算子对数据进行操作。Source 算子读取数据源中的数据,数据源能够是数据流、也能够存储在文件系统中的文件。Transformation 算子对数据进行必要的计算解决。Sink 算子将处理结果输入,数据个别被输入到数据库、文件系统或下一个数据流程序。
咱们能够把算子了解为 1 + 2 运算中的加号,加号(+)是这个算子的一个符号示意,它示意对数字 1 和数字 2 做加法运算。同样,在 Flink 或 Spark 这样的大数据引擎中,算子对数据进行某种操作,程序员能够依据本人的需要调用适合的算子,实现所需计算工作。罕用的算子有 map
、flatMap
、keyBy
、timeWindow
等,它们别离对数据流执行不同类型的操作。
在程序理论运行前,Flink 会将用户编写的代码做一个简略解决,生成一个如图 2 所示的逻辑视图。图 2 展现了 WordCount 程序中,数据从不同算子间流动的状况。图中,圆圈代表算子,圆圈间的箭头代表数据流,数据流在 Flink 程序中通过不同算子的计算,最终生成为指标数据。其中,keyBy
、timeWindow
和 sum
独特组成了一个工夫窗口上的聚合操作,被归结为一个算子。咱们能够在 Flink 的 Web UI 中,点击一个作业,查看这个作业的逻辑视图。
对于词频统计这个案例,逻辑上来讲无非是对数据流中的单词做提取,而后应用一个 Key-Value 构造对单词做词频计数,最初输入后果即可,这样的逻辑本能够用几行代码实现,改成应用算子模式,反而让新人看着一头雾水,为什么肯定要用算子的模式来写程序呢?实际上,算子进化成以后这个状态,就像人类从石块计数,到手指计数,到算盘计数,再到计算机计数这样的进化过程一样,只管更低级的形式能够实现肯定的计算工作,然而随着计算规模的增长,古老的计数形式存在着低效的弊病,无奈实现更高级别和更大规模的计算需要。试想,如果咱们不应用大数据引擎提供的算子,而是本人实现一套上述的计算逻辑,只管咱们能够疾速实现以后的词频统计的工作,然而当面临一个新计算工作时,咱们须要从新编写程序,实现一整套计算工作。咱们本人编写代码的横向扩展性可能很低,当输出数据暴增时,咱们须要做很大改变,以部署在更多机器上。
大数据引擎的算子对计算做了一些形象,对于新人来说有肯定学习老本,而一旦把握这门技术,人们所能解决的数据规模将成倍增加。大数据引擎的算子呈现,正是针对数据分布在多个节点的大数据场景下,须要一种对立的计算描述语言来对数据做计算而进化出的新计算状态。基于 Flink 的算子,咱们能够定义一个数据流的逻辑视图,以此实现对大数据的计算。剩下那些数据交换、横向扩大、故障复原等问题全交由大数据引擎来解决。
1.2 从逻辑视图到物理执行
在绝大多数的大数据处理场景下,一台机器节点无奈解决所有数据,数据被切分到多台节点上。在大数据畛域,当数据量大到超过单台机器解决能力时,须要将一份数据切分到多个分区(Partition)上,每个分区散布在一台虚拟机或物理机上。
前一大节曾经提到,大数据引擎的算子提供了编程接口,咱们能够应用算子构建数据流的逻辑视图。思考到数据分布在多个节点的状况,逻辑视图只是一种形象,须要将逻辑视图转化为物理执行图,能力在分布式环境下执行。
图 3 为 WordCount 程序的物理执行图,这里数据流散布在 2 个分区上。箭头局部示意数据流分区,圆圈局部示意算子在分区上的算子子工作(Operator Subtask)。从逻辑视图变为物理执行图后,FlatMap 算子在每个分区都有一个算子子工作,以解决该分区上的数据:FlatMap[1/2]算子子工作解决第一个数据流分区上的数据,以此类推。
算子子工作又被称为算子实例,一个算子在并行执行时,会有多个算子实例。即便输出数据增多,咱们也能够通过部署更多的算子实例来进行横向扩大。从图 3 中能够看到,除去 Sink 外的算子都被分成了 2 个算子实例,他们的并行度(Parallelism)为 2,Sink 算子的并行度为 1。并行度是能够被设置的,当设置某个算子的并行度为 2 时,也就意味着有这个算子有 2 个算子子工作(或者说 2 个算子实例)并行执行。理论利用中个别依据输出数据量的大小,计算资源的多少等多方面的因素来设置并行度。
留神,在本例中,为了演示,咱们把所有算子的并行度设置为了 2:env.setParallelism(2);
,把最初输入的并行度设置成了 1:wordCount.print().setParallelism(1);
。如果不独自设置 print
的并行度的话,它的并行度也是 2。
算子子工作是 Flink 物理执行的根本单元,算子子工作之间是互相独立的,某个算子子工作有本人的线程,不同算子子工作可能散布在不同的节点上。后文在 Flink 的资源分配局部咱们还会重点介绍算子子工作。
再谈逻辑视图到物理执行图
理解了 Flink 的分布式架构和外围组件,这里咱们从更细粒度上来介绍从逻辑视图转化为物理执行图过程,该过程能够分成四层:StreamGraph
-> JobGraph
-> ExecutionGraph
-> 物理执行图。
StreamGraph
:是依据用户编写的代码生成的最后的图,用来示意一个 Flink 作业的拓扑构造。在StreamGraph
中,节点StreamNode
就是算子。JobGraph
:JobGraph
是提交给 JobManager 的数据结构。StreamGraph
通过优化后生成了JobGraph
,次要的优化为,将多个符合条件的节点链接在一起作为一个JobVertex
节点,这样能够缩小数据交换所须要的传输开销。这个链接的过程叫做算子链(Operator Chain),会在下一大节持续介绍。JobVertex
通过算子链后,会蕴含一到多个算子,它输入是IntermediateDataSet
,是通过算子解决产生的数据集。ExecutionGraph
:JobManager 将JobGraph
转化为ExecutionGraph
。ExecutionGraph
是JobGraph
的并行化版本:如果某个JobVertex
的并行度是 2,那么它将被划分为 2 个ExecutionVertex
,ExecutionVertex
示意一个算子子工作,它监控着单个子工作的执行状况。每个ExecutionVertex
会输入一个IntermediateResultPartition
,这是单个子工作的输入,再通过ExecutionEdge
输入到上游节点。ExecutionJobVertex
是这些并行子工作的合集,它监控着整个算子的运行状况。ExecutionGraph
是调度层十分外围的数据结构。- 物理执行图:JobManager 依据
ExecutionGraph
对作业进行调度后,在各个 TaskManager 上部署具体的工作,物理执行图并不是一个具体的数据结构。
能够看到,Flink 在数据流图上堪称殚精竭虑,仅各类图就有四种之多。对于新人来说,能够不必太关怀这些十分细节的底层实现,只须要理解以下几个外围概念:
- Flink 采纳主从架构,Master 起着治理协调作用,TaskManager 负责物理执行,在执行过程中会产生一些数据交换、生命周期治理等事件。
- 用户调用 Flink API,结构逻辑视图,Flink 会对逻辑视图优化,并转化为并行化的物理执行图,最初被执行的是物理执行图。
工作、算子子工作与算子链
在结构物理执行图的过程中,Flink 会将一些算子子工作链接在一起,组成算子链。链接后以工作 (Task) 的模式被 TaskManager 调度执行。应用算子链是一个十分无效的优化,它能够无效升高算子子工作之间的传输开销。链接之后造成的 Task 是 TaskManager 中的一个线程。
例如,数据从 Source 前向流传到 FlatMap,这两头没有产生跨分区的数据交换,因而,咱们齐全能够将 Source、FlatMap 这两个子工作组合在一起,造成一个 Task。数据通过 keyBy
产生了数据交换,数据会逾越分区,因而无奈将 keyBy
以及其前面的窗口聚合链接到一起。因为 WindowAggregation 的并行度是 2,Sink 的并行度为 1,数据再次发生了替换,咱们不能把 WindowAggregation 和 Sink 两局部链接到一起。1.2 节中提到,Sink 的并行度是人为设置为 1,如果咱们把 Sink 的并行度也设置为 2,那么是能够让这两个算子链接到一起的。
默认状况下,Flink 会尽量将更多的子工作链接在一起,这样能缩小一些不必要的数据传输开销。但一个子工作有超过一个输出或产生数据交换时,链接就无奈建设。两个算子可能链接到一起是有一些规定的,感兴趣的读者能够浏览 Flink 源码中 org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
中的 isChainable
办法。StreamingJobGraphGenerator
类的作用是将 StreamGraph
转换为JobGraph
。
只管将算子链接到一起会升高一些传输开销,然而也有一些状况并不需要太多链接。比方,有时候咱们须要将一个十分长的算子链拆开,这样咱们就能够将原来集中在一个线程中的计算拆分到多个线程中来并行计算。Flink 容许开发者手动配置是否启用算子链,或者对哪些算子应用算子链。
工作槽位与计算资源
工作槽位
依据前文的介绍,咱们曾经理解到 TaskManager 负责具体的工作执行。TaskManager 是一个 JVM 过程,在 TaskManager 中能够并行运行多个 Task。在程序执行之前,通过优化,局部子工作被链接在一起,组成一个 Task。每个 Task 是一个线程,须要 TaskManager 为其调配相应的资源,TaskManager 应用工作槽位给 Task 分配资源。
在解释 Flink 工作槽位的概念前,咱们先回顾一下过程与线程的概念。在操作系统层面,过程(Process)是进行资源分配和调度的一个独立单位,线程(Thread)是 CPU 调度的根本单位。比方,咱们罕用的 Office Word 软件,在启动后就占用操作系统的一个过程。Windows 上能够应用工作管理器来查看以后沉闷的过程,Linux 上能够应用 top
命令来查看。线程是过程的一个子集,一个线程个别专一于解决一些特定工作,不独立领有系统资源,只领有一些运行中必要的资源,如程序计数器。一个过程至多有一个线程,也能够有多个线程。多线程场景下,每个线程都解决一小个工作,多个线程以高并发的形式同时解决多个小工作,能够进步解决能力。
回到 Flink 的槽位分配机制上,一个 TaskManager 是一个过程,TaskManager 能够治理一至多个 Task,每个 Task 是一个线程,占用一个槽位。每个槽位的资源是整个 TaskManager 资源的子集,比方这里的 TaskManager 下有 3 个槽位,每个槽位占用 TaskManager 所治理的 1 / 3 的内存,第一个槽位中的 Task 不会与第二个槽位中的 Task 相互争抢内存资源。留神,在分配资源时,Flink 并没有将 CPU 资源明确调配给各个槽位。
假如咱们给 WordCount 程序调配两个 TaskManager,每个 TaskManager 又调配 3 个槽位,所以总共是 6 个槽位。联合图 7 中对这个作业的并行度设置,整个作业被划分为 5 个 Task,应用 5 个线程,这 5 个线程能够依照图 8 所示的形式调配到 6 个槽位中。
Flink 容许用户设置 TaskManager 中槽位的数目,这样用户就能够确定以怎么的粒度将工作做互相隔离。如果每个 TaskManager 只蕴含一个槽位,那么运行在该槽位内的工作将独享 JVM。如果 TaskManager 蕴含多个槽位,那么多个槽位内的工作能够共享 JVM 资源,比方共享 TCP 连贯、心跳信息、局部数据结构等。官网倡议将槽位数目设置为 TaskManager 下可用的 CPU 外围数,那么均匀下来,每个槽位都能均匀取得 1 个 CPU 外围。