共计 14685 个字符,预计需要花费 37 分钟才能阅读完成。
本文首发于泊浮目标语雀:https://www.yuque.com/17sing
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2022.1.26 | 文章首发 |
0. 前言
前阵子组里的小伙伴问我“为什么 Flink 从咱们的代码到真正可执行的状态,要通过这么多个 graph 转换?这样做有什么益处嘛?”我晚期看到这里的设计时确实有过雷同的纳闷,过后因为手里还在看别的货色,查阅过一些材料后就翻页了。现在又碰到了这样的问题,无妨就在这篇文章中好好搞清楚。
本文的源码基于 Flink
1.14.0
。
1. 分层设计
该图来自 Jark 大佬的博客:http://wuchong.me/blog/2016/0…
以上是 Flink 的 Graph 档次图,在接下来的内容咱们会逐个揭开它们的面纱,得悉它们存在的意义。
1.1 BatchAPI 的 OptimizedPlan
在这个大节中,咱们会看到 DataSet 从 Plan 转换到 OptimizedPlan 的过程中。为了不便读者有个概念,咱们在这里解释一下几个名词:
- DataSet:面向用户的批处理 API。
- Plan:形容 DataSource 以及 DataSink 以及 Operation 如何互动的打算。
- OptimizedPlan:优化过的执行打算。
代码入口:
|--ClientFrontend#main
\-- parseAndRun
\-- runApplication
\-- getPackagedProgram
\-- buildProgram
\-- executeProgram
|-- ClientUtils#executeProgram
|-- PackagedProgram#invokeInteractiveModeForExecution
\-- callMainMethod // 调用用户编写的程序入口
|-- ExecutionEnvironment#execute
\-- executeAsync // 创立 Plan
|-- PipelineExecutorFactory#execute
|-- EmbeddedExecutor#execute
\-- submitAndGetJobClientFuture
|-- PipelineExecutorUtils#getJobGraph
|-- FlinkPipelineTranslationUtil#getJobGraph
|-- FlinkPipelineTranslator#translateToJobGraph // 如果传入的是 Plan,则会在外部实现中先转换出 OptimizedPlan,再转换到 JobGraph;如果是 StreamGraph,则会间接转换出 JobGraph
|-- PlanTranslator#translateToJobGraph
\-- compilePlan
咱们看一下这段代码:
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
OptimizedPlan optimizedPlan = optimizer.compile(plan);
JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
}
十分的清晰。就是从 OptimizedPlan
到JobGraph
。OptimizedPlan 的转换过程咱们看 Optimizer#compile
办法。先看办法签名上的正文:
/**
* Translates the given program to an OptimizedPlan. The optimized plan describes for each
* operator which strategy to use (such as hash join versus sort-merge join), what data exchange
* method to use (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined,
* batch), where to cache intermediate results, etc,
*
* <p>The optimization happens in multiple phases:
*
* <ol>
* <li>Create optimizer dag implementation of the program.
* <p><tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute
* size estimates.
* <li>Compute interesting properties and auxiliary structures.
* <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting
* property computation (as opposed to the Database approaches), because we support plans
* that are not trees.
* </ol>
*
* @param program The program to be translated.
* @param postPasser The function to be used for post passing the optimizer's plan and setting
* the data type specific serialization routines.
* @return The optimized plan.
* @throws CompilerException Thrown, if the plan is invalid or the optimizer encountered an
* inconsistent situation during the compilation process.
*/
private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser)
这里提到了会有好几步来做优化:
- 创立优化过的 DAG,为其生成的 OptimizerNode 遵循 PACT 模型,并为其调配了并发度以及计算资源。
- 生成一些重要的属性以及辅助性数据结构。
- 枚举所有的代替计划。
在办法的实现中,会创立大量的 Visitor 来对程序做遍历优化。
1.1.1GraphCreatingVisitor
首先是创立 GraphCreatingVisitor,对原始的 Plan 进行优化,将每个 operator 优化成 OptimizerNode,OptimizerNode 之间通过 DagConnection 相连,DagConnection 相当于一个边模型,有 source 和 target,能够示意 OptimizerNode 的输出和输入。在这个过程中会做这些事:
- 为每个算子创立一个 OptimizerNode——更加靠近执行形容的 Node(估算出数据的大小、data flow 在哪里进行拆分和合并等)
- 用 Channel 将它们连接起来
- 依据倡议生成相应的策略:Operator 用什么策略执行:比方 Hash Join or Sort Merge Join;Operator 间的数据交换策略,是 Local Pipe Forward、Shuffle,还是 Broadcast;Operator 间的数据交换模式,是 Pipelined 还是 Batch。
1.1.2 IdAndEstimatesVisitor
顾名思义,为每个算子生成 id,并估算其数据量。估算的实现见OptimizerNode#computeOutputEstimates
——这是一个形象函数,咱们能够关注一下 DataSourceNode 里的实现,它会依据上游数据源的一系列属性(比方行数、大小)得出估算值。** 但这段代码放在这里并不适合
,作者的原意仿佛是关注 file 类型的上游,正文这么说道:see, if we have a statistics object that can tell us a bit about the file
**。
1.1.3 UnionParallelismAndForwardEnforcer
这里会保障 UnionNode 的并发度与上游对其,防止数据分布有误而导致数据不精准(见 https://github.com/apache/fli…)。
1.1.4 BranchesVisitor
计算不会闭合的下游子 DAG 图。见其定义:
/**
* Description of an unclosed branch. An unclosed branch is when the data flow branched (one
* operator's result is consumed by multiple targets), but these different branches (targets)
* have not been joined together.
*/
public static final class UnclosedBranchDescriptor {
1.1.5 InterestingPropertyVisitor
依据 Node 的属性估算老本。
估算算法见:node.computeInterestingPropertiesForInputs
- WorksetIterationNode
- TwoInputNode
- SingleInputNode
- BulkIterationNode
之后便会依据老本算出一系列的执行打算:
// the final step is now to generate the actual plan alternatives
List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
在这里,OptimizerNode 优化成了 PlanNode,PlanNode 是最终的优化节点类型,它蕴含了节点的更多属性,节点之间通过 Channel 进行连贯,Channel 也是一种边模型,同时确定了节点之间的数据交换形式 ShipStrategyType 和 DataExchangeMode,ShipStrategyType 示意的两个节点之间数据的传输策略,比方是否进行数据分区,进行 hash 分区,范畴分区等; DataExchangeMode 示意的是两个节点间数据交换的模式,有 PIPELINED 和 BATCH,和 ExecutionMode 是一样的,ExecutionMode 决定了 DataExchangeMode——间接发下去还是先落盘。
1.1.6 PlanFinalizer.createFinalPlan
PlanFinalizer.createFinalPlan()。其大抵的实现就是将节点增加到 sources、sinks、allNodes 中,还可能会为每个节点设置工作占用的内存等。
1.1.7 BinaryUnionReplacer
顾名思义,针对上游同样是 Union 的操作做去重替换,合并到一起。笔者认为,这在输入等价的状况下,缩小了 Node 的生成。
1.1.8 RangePartitionRewriter
在应用范畴分区这一个性时,须要尽可能保障各分区所解决的数据集均衡性以最大化利用计算资源并缩小作业的执行工夫。为此,优化器提供了范畴分区重写器(RangePartitionRewriter)来对范畴分区的分区策略进行优化,使其尽可能均匀地调配数据,防止数据歪斜。
如果要尽可能的平均分配数据,必定要对数据源进行估算。但显然是没法读取所有的数据进行估算的,这里 Flink 采纳了 ReservoirSampling
算法的改良版——能够参考论文 Optimal Random Sampling from Distributed Streams Revisited
,在代码中由org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement
和org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement
实现。
值得一提的是,无论是
Plan
还是OptimizerNode
都实现了Visitable
接口,这是典型的 策略模式 应用,这让代码变得非常灵活,正如正文所说的——遍历形式是能够自在编写的。
package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
/**
* This interface marks types as visitable during a traversal. The central method <i>accept(...)</i>
* contains the logic about how to invoke the supplied {@link Visitor} on the visitable object, and
* how to traverse further.
*
* <p>This concept makes it easy to implement for example a depth-first traversal of a tree or DAG
* with different types of logic during the traversal. The <i>accept(...)</i> method calls the
* visitor and then send the visitor to its children (or predecessors). Using different types of
* visitors, different operations can be performed during the traversal, while writing the actual
* traversal code only once.
*
* @see Visitor
*/
@Internal
public interface Visitable<T extends Visitable<T>> {
/**
* Contains the logic to invoke the visitor and continue the traversal. Typically invokes the
* pre-visit method of the visitor, then sends the visitor to the children (or predecessors) and
* then invokes the post-visit method.
*
* <p>A typical code example is the following:
*
* <pre>{@code
* public void accept(Visitor<Operator> visitor) {* boolean descend = visitor.preVisit(this);
* if (descend) {* if (this.input != null) {* this.input.accept(visitor);
* }
* visitor.postVisit(this);
* }
* }
* }</pre>
*
* @param visitor The visitor to be called with this object as the parameter.
* @see Visitor#preVisit(Visitable)
* @see Visitor#postVisit(Visitable)
*/
void accept(Visitor<T> visitor);
}
1.2 StreamAPI 的 StreamGraph
结构 StreamGraph 的入口函数是 StreamGraphGenerator.generate()
。该函数会由触发程序执行的办法 StreamExecutionEnvironment.execute()
调用到。就像 OptimizedPla,StreamGraph 也是在 Client 端结构的。
在这个过程中,流水线首先被转换为 Transformation 流水线,而后被映射为 StreamGraph,该图与具体的执行无关,外围是表白计算过程的逻辑。
对于 Transformation
的引入,能够看社区的 issue:https://issues.apache.org/jir…。实质是为了防止 DataStream 这一层对 StreamGraph 的耦合,因而引入这一层做解耦。
Transformation
关注的属性偏差框架外部,如:name(算子名)、uid(job 重启时调配之前雷同的状态,长久保留状态)、bufferTimeout、parallelism、outputType、soltSharingGroup 等。另外,Transformation 分为物理 Transformation 和虚构 Transformation,这于下一层的 StreamGraph 实现是有关联的。
StreamGraph 的外围对象有两个:
- StreamNode:它能够有多个输入,也能够有多个输出。由 Transformation 转换而来——实体的 StreamNode 会最终变成物算子,虚构的 StreamNode 会附着在 StreamEdge 上。
- StreamEdge:StreamGraph 的边,用于连贯两个 StreamNode。就像下面说的——一个 StreamNode 能够有多个出边、入边。StreamEdge 中蕴含了旁路输入、分区器、字段筛选输入(与 SQL Select 中抉择字段的逻辑一样)等的信息。
具体的转换代码在 org.apache.flink.streaming.api.graph.StreamGraphGenerator
中,每个 Transformation 都有对应的转换逻辑:
static {@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(
TimestampsAndWatermarksTransformation.class,
new TimestampsAndWatermarksTransformationTranslator<>());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
tmp.put(
KeyedBroadcastStateTransformation.class,
new KeyedBroadcastStateTransformationTranslator<>());
translatorMap = Collections.unmodifiableMap(tmp);
}
1.3 流批一体的 JobGraph
代码入口和 1.1 大节简直一摸一样,DataSet 的入口类是 ExecutionEnvironment
,而 DataStream 的入口是StreamExecutionEnvironment
。PlanTranslator
变成了StreamGraphTranslator
。所以,StreamGraph 到 JobGraph 的转化也是在 Client 端进行的,次要工作做优化。其中十分重要的一个优化就是Operator Chain,它会将条件容许的算子合并到一起,防止跨线程、跨网络的传递。
是否开启 OperationChain 能够在程序中显示的调整。
接下来,咱们来看下 JobGraph 到底是什么。先看正文:
/**
* The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
* All programs from higher level APIs are transformed into JobGraphs.
*
* <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
* form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
* but inside certain special vertices that establish the feedback channel amongst themselves.
*
* <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate
* result define the characteristics of the concrete operation and intermediate data.
*/
public class JobGraph implements Serializable {
它是一张图,由 vertices
和intermediate
组成。并且它是一个低等级的 API,为 JobMaster 而生——所有高等级的 API 都会被转换成 JobGraph。接下来咱们须要关注的对象别离是JobVertex
、JobEdge
、IntermediateDataSet
。其中,JobVertex 的输出是 JobEdge,输入是 IntermediateDataSet。
1.3.1 JoBVertex
通过符合条件的多个 StreamNode 通过优化后的可能会交融在一起生成一个 JobVertex,即一个 JobVertex 蕴含一个或多个算子(有趣味的同学能够看 StreamingJobGraphGenerator#buildChainedInputsAndGetHeadInputs
或者浏览相干的 Issue:https://issues.apache.org/jir…)。
1.3.2 JobEdge
JobEdge 是连贯 IntermediateDatSet 和 JobVertex 的边,代表着 JobGraph 中的一个数据流转通道,其上游是 IntermediateDataSet,上游是 JobVertex——数据通过 JobEdge 由 IntermediateDataSet 传递给指标 JobVertex。
在这里,咱们要关注它的一个成员变量:
/**
* A distribution pattern determines, which sub tasks of a producing task are connected to which
* consuming sub tasks.
*
* <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected
* in {@link EdgeManagerBuildUtil}
*/
public enum DistributionPattern {
/** Each producing sub task is connected to each sub task of the consuming task. */
ALL_TO_ALL,
/** Each producing sub task is connected to one or more subtask(s) of the consuming task. */
POINTWISE
}
该散发模式会间接影响执行时 Task 之间的数据连贯关系:点对点连贯 or 全连贯(或者叫播送)。
1.3.3 IntermediateDataSet
两头数据集 IntermediateDataSet 是一种逻辑构造,用来示意 JobVertex 的输入,即该 JobVertex 中蕴含的算子会产生的数据集。在这里咱们须要关注 ResultPartitionType:
- Blocking:顾名思义。都上游解决完数据后,再交给上游解决。这个数据分区能够被生产屡次,也能够并发生产。这个分区并不会被主动销毁,而是交给调度器判断。
- BlokingPersistent:相似于 Blocking,然而其生命周期由用户端指定。调用 JobMaster 或者 ResourceManager 的 API 来销毁,而不是由调度器管制。
- Pipelined:流替换模式。能够用于有界和无界流。这种分区类型的数据只能被每个消费者生产一次。且这种分区能够保留任意数据。
- PipelinedBounded:与 Pipelined 有些不同,这种分区保留的数据是无限的,这不会使数据和检查点提早太久。因而实用于流计算场景(需注意,批处理模式下没有 CheckpointBarrier)。
- Pipelined_Approximate:1.12 引入的策略,用于针对单个 task 做 fast failover 的分区策略。有趣味的同学能够浏览相干 issue:https://issues.apache.org/jir…。
不同的执行模式下,其对应的后果分区类型不同,决定了在执行时刻数据交换的模式。
IntermediateDataSet 的个数与该 JobVertext 对应的 StreamNode 的出边数量雷同,能够是一个或者多个。
1.4 ExecutionGraph
JobManager 接管到 Client 端提交的 JobGraph 及其依赖 Jar 之后就要开始调度运行该工作了,但 JobGraph 还是一个逻辑上的图,须要再进一步转化为并行化、可调度的执行图。这个动作是 JobMaster 做的——通过 SchedulerBase 触发,理论动作交给 DefaultExecutionGraphBuilder#buildGraph
来做。在这些动作中,会生成与 JobVertex 对应的 ExecutionJobVertex(逻辑概念)和 ExecutionVertex,与 IntermediateDataSet 对应的 IntermediateResult(逻辑概念)和 IntermediateResultPartition 等,所谓的并行度也将通过上述类实现。
接下来要聊聊 ExecutionGraph 的一些细节,会波及一些逻辑概念,因而在这里笔者画了一张图,便于参考。
1.4.1 ExecutionJobVertex 与 ExecutionVertex
ExecutionJobVertex 和 JobGraph 中的 JobVertex 一一对应。该对象还蕴含一组 ExecutionVertex,数量与该 JobVertex 中所蕴含的 StreamNode 的并行度统一,如上图所示,如果并行度为 N,那么就会有 N 个 ExecutionVertex。所以每一个并行执行的实例就是 ExecutionVertex。同时也会构建 ExecutionVertex 的输入 IntermediateResult。
因而 ExecutionJobVertex 更像是一个逻辑概念。
1.4.2 IntermediaResult 与 IntermediaResuktParitition
IntermediateResult 示意 ExecutionJobVertex 的输入,和 JobGraph 中的 IntermediateDataSet 一一对应,该对象也是一个逻辑概念。同理,一个 ExecutionJobVertex 能够有多个两头后果,取决于以后 JobVertex 有几个出边(JobEdge)。
一个两头后果集蕴含多个两头后果分区 IntermediateResultPartition,其个数等于该 Job Vertext 的并发度,或者叫作算子的并行度。每个 IntermediateResultPartition 示意 1 个 ExecutionVertex 输入后果。
1.4.3 Execution
ExecutionVertex 在 Runtime 对应了一个 Task。在真正执行的时会将 ExecutionVerterx 包装为一个 Execution。
对于 JobGraph 如何提交到 JobMaster 不是本文的重点,有趣味的同学能够自行查看
org.apache.flink.runtime.dispatcher.DispatcherGateway#submitJob
的相干调用栈。
1.4.5 从 JobGraph 到 ExecutionGraph
下面介绍了几个重要概念。接下来看一下 ExecutionGraph 的构建过程。次要参考办法为org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#attachJobGraph
。
首先是构建 ExecutionJobVertex(参考其构造方法),设置其并行度、共享 Solt、CoLocationGroup,并构建 IntermediaResult 与 IntermediaResuktParitition,依据并发度创立 ExecutionVertex,并查看 IntermediateResults 是否有反复援用。最初,会对可切分的数据源进行切分。
其次便是构建 Edge(参考 org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil#connectVertexToResult)。依据 DistributionPattern 来创立 EdgeManager,并将 ExecutionVertex 和 IntermediateResult 关联起来,为运行时建设 Task 之间的数据交换就是以此为根底建设数据的物理传输通道的。
1.4.6 开胃菜:从 ExecutionGraph 到真正的执行
当 JobMaster 生成 ExecutionGraph 后,便进入了作业调度阶段。这外面波及到了不同的调度策略、资源申请、工作散发以及 Failover 的治理。波及的内容极多,因而会在另外的文章中探讨。对此好奇的同学,能够先看DefaultExecutionGraphDeploymentTest#setupScheduler
,外面的代码较为简单,能够察看 ExecutionGraph 到 Scheduling 的过程。
private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2)
throws Exception {v1.setParallelism(dop1);
v2.setParallelism(dop2);
v1.setInvokableClass(BatchTask.class);
v2.setInvokableClass(BatchTask.class);
DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
// execution graph that executes actions synchronously
final SchedulerBase scheduler =
SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(v1, v2),
ComponentMainThreadExecutorServiceAdapter.forMainThread())
.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory())
.setFutureExecutor(executorService)
.setBlobWriter(blobWriter)
.build();
final ExecutionGraph eg = scheduler.getExecutionGraph();
checkJobOffloaded((DefaultExecutionGraph) eg);
// schedule, this triggers mock deployment
scheduler.startScheduling();
Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
assertEquals(dop1 + dop2, executions.size());
return scheduler;
}
2. 小结
通过本文,咱们理解各层图存在的意义:
- StreamGraph 与 OptimizedPlan:从内部 API 转向外部 API,生成 Graph 的根本属性。如果是批处理,则会进行一系列的优化。
- JobGraph:流批对立的 Graph。在这里做一些通用的优化,比方 OperatorChain。
- ExecutionGraph:可执行级别的图,构建时关注大量的执行细节:如并发、Checkpoint 配置有效性、监控打点设置、反复援用查看、可切分的数据源进行切分等等。
通过图的分层,Flink 将不同的优化项、查看项放到了适合它们的档次,这也是繁多职责准则的体现。