本文首发于泊浮目标语雀:https://www.yuque.com/17sing
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2022.1.26 | 文章首发 |
0.前言
前阵子组里的小伙伴问我“为什么Flink从咱们的代码到真正可执行的状态,要通过这么多个graph转换?这样做有什么益处嘛?”我晚期看到这里的设计时确实有过雷同的纳闷,过后因为手里还在看别的货色,查阅过一些材料后就翻页了。现在又碰到了这样的问题,无妨就在这篇文章中好好搞清楚。
本文的源码基于Flink
1.14.0
。
1. 分层设计
该图来自Jark大佬的博客:http://wuchong.me/blog/2016/0…
以上是Flink的Graph档次图,在接下来的内容咱们会逐个揭开它们的面纱,得悉它们存在的意义。
1.1 BatchAPI的OptimizedPlan
在这个大节中,咱们会看到DataSet从Plan转换到OptimizedPlan的过程中。为了不便读者有个概念,咱们在这里解释一下几个名词:
- DataSet:面向用户的批处理API。
- Plan:形容DataSource以及DataSink以及Operation如何互动的打算。
- OptimizedPlan:优化过的执行打算。
代码入口:
|--ClientFrontend#main
\-- parseAndRun
\-- runApplication
\-- getPackagedProgram
\-- buildProgram
\-- executeProgram
|-- ClientUtils#executeProgram
|-- PackagedProgram#invokeInteractiveModeForExecution
\-- callMainMethod //调用用户编写的程序入口
|-- ExecutionEnvironment#execute
\-- executeAsync // 创立Plan
|-- PipelineExecutorFactory#execute
|-- EmbeddedExecutor#execute
\-- submitAndGetJobClientFuture
|-- PipelineExecutorUtils#getJobGraph
|-- FlinkPipelineTranslationUtil#getJobGraph
|-- FlinkPipelineTranslator#translateToJobGraph //如果传入的是Plan,则会在外部实现中先转换出OptimizedPlan,再转换到JobGraph;如果是StreamGraph,则会间接转换出JobGraph
|-- PlanTranslator#translateToJobGraph
\-- compilePlan
咱们看一下这段代码:
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
OptimizedPlan optimizedPlan = optimizer.compile(plan);
JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
}
十分的清晰。就是从OptimizedPlan
到JobGraph
。OptimizedPlan的转换过程咱们看Optimizer#compile
办法。先看办法签名上的正文:
/**
* Translates the given program to an OptimizedPlan. The optimized plan describes for each
* operator which strategy to use (such as hash join versus sort-merge join), what data exchange
* method to use (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined,
* batch), where to cache intermediate results, etc,
*
* <p>The optimization happens in multiple phases:
*
* <ol>
* <li>Create optimizer dag implementation of the program.
* <p><tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute
* size estimates.
* <li>Compute interesting properties and auxiliary structures.
* <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting
* property computation (as opposed to the Database approaches), because we support plans
* that are not trees.
* </ol>
*
* @param program The program to be translated.
* @param postPasser The function to be used for post passing the optimizer's plan and setting
* the data type specific serialization routines.
* @return The optimized plan.
* @throws CompilerException Thrown, if the plan is invalid or the optimizer encountered an
* inconsistent situation during the compilation process.
*/
private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser)
这里提到了会有好几步来做优化:
- 创立优化过的DAG,为其生成的OptimizerNode遵循PACT模型,并为其调配了并发度以及计算资源。
- 生成一些重要的属性以及辅助性数据结构。
- 枚举所有的代替计划。
在办法的实现中,会创立大量的Visitor来对程序做遍历优化。
1.1.1GraphCreatingVisitor
首先是创立GraphCreatingVisitor,对原始的Plan进行优化,将每个operator优化成OptimizerNode,OptimizerNode之间通过DagConnection相连,DagConnection相当于一个边模型,有source和target,能够示意OptimizerNode的输出和输入。在这个过程中会做这些事:
- 为每个算子创立一个OptimizerNode——更加靠近执行形容的Node(估算出数据的大小、data flow在哪里进行拆分和合并等)
- 用Channel将它们连接起来
- 依据倡议生成相应的策略:Operator用什么策略执行:比方Hash Join or Sort Merge Join;Operator间的数据交换策略,是Local Pipe Forward、Shuffle,还是Broadcast;Operator间的数据交换模式,是Pipelined还是Batch。
1.1.2 IdAndEstimatesVisitor
顾名思义,为每个算子生成id,并估算其数据量。估算的实现见OptimizerNode#computeOutputEstimates
——这是一个形象函数,咱们能够关注一下DataSourceNode里的实现,它会依据上游数据源的一系列属性(比方行数、大小)得出估算值。**但这段代码放在这里并不适合
,作者的原意仿佛是关注file类型的上游,正文这么说道:see, if we have a statistics object that can tell us a bit about the file
**。
1.1.3 UnionParallelismAndForwardEnforcer
这里会保障UnionNode的并发度与上游对其,防止数据分布有误而导致数据不精准(见https://github.com/apache/fli…)。
1.1.4 BranchesVisitor
计算不会闭合的下游子DAG图。见其定义:
/**
* Description of an unclosed branch. An unclosed branch is when the data flow branched (one
* operator's result is consumed by multiple targets), but these different branches (targets)
* have not been joined together.
*/
public static final class UnclosedBranchDescriptor {
1.1.5 InterestingPropertyVisitor
依据Node的属性估算老本。
估算算法见:node.computeInterestingPropertiesForInputs
- WorksetIterationNode
- TwoInputNode
- SingleInputNode
- BulkIterationNode
之后便会依据老本算出一系列的执行打算:
// the final step is now to generate the actual plan alternatives
List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
在这里,OptimizerNode优化成了PlanNode,PlanNode是最终的优化节点类型,它蕴含了节点的更多属性,节点之间通过Channel进行连贯,Channel也是一种边模型,同时确定了节点之间的数据交换形式ShipStrategyType和DataExchangeMode,ShipStrategyType示意的两个节点之间数据的传输策略,比方是否进行数据分区,进行hash分区,范畴分区等; DataExchangeMode示意的是两个节点间数据交换的模式,有PIPELINED和BATCH,和ExecutionMode是一样的,ExecutionMode决定了DataExchangeMode——间接发下去还是先落盘。
1.1.6 PlanFinalizer.createFinalPlan
PlanFinalizer.createFinalPlan()。其大抵的实现就是将节点增加到sources、sinks、allNodes中,还可能会为每个节点设置工作占用的内存等。
1.1.7 BinaryUnionReplacer
顾名思义,针对上游同样是Union的操作做去重替换,合并到一起。笔者认为,这在输入等价的状况下,缩小了Node的生成。
1.1.8 RangePartitionRewriter
在应用范畴分区这一个性时,须要尽可能保障各分区所解决的数据集均衡性以最大化利用计算资源并缩小作业的执行工夫。为此,优化器提供了范畴分区重写器(RangePartitionRewriter)来对范畴分区的分区策略进行优化,使其尽可能均匀地调配数据,防止数据歪斜。
如果要尽可能的平均分配数据,必定要对数据源进行估算。但显然是没法读取所有的数据进行估算的,这里Flink采纳了ReservoirSampling
算法的改良版——能够参考论文Optimal Random Sampling from Distributed Streams Revisited
,在代码中由org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement
和org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement
实现。
值得一提的是,无论是
Plan
还是OptimizerNode
都实现了Visitable
接口,这是典型的策略模式应用,这让代码变得非常灵活,正如正文所说的——遍历形式是能够自在编写的。
package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
/**
* This interface marks types as visitable during a traversal. The central method <i>accept(...)</i>
* contains the logic about how to invoke the supplied {@link Visitor} on the visitable object, and
* how to traverse further.
*
* <p>This concept makes it easy to implement for example a depth-first traversal of a tree or DAG
* with different types of logic during the traversal. The <i>accept(...)</i> method calls the
* visitor and then send the visitor to its children (or predecessors). Using different types of
* visitors, different operations can be performed during the traversal, while writing the actual
* traversal code only once.
*
* @see Visitor
*/
@Internal
public interface Visitable<T extends Visitable<T>> {
/**
* Contains the logic to invoke the visitor and continue the traversal. Typically invokes the
* pre-visit method of the visitor, then sends the visitor to the children (or predecessors) and
* then invokes the post-visit method.
*
* <p>A typical code example is the following:
*
* <pre>{@code
* public void accept(Visitor<Operator> visitor) {
* boolean descend = visitor.preVisit(this);
* if (descend) {
* if (this.input != null) {
* this.input.accept(visitor);
* }
* visitor.postVisit(this);
* }
* }
* }</pre>
*
* @param visitor The visitor to be called with this object as the parameter.
* @see Visitor#preVisit(Visitable)
* @see Visitor#postVisit(Visitable)
*/
void accept(Visitor<T> visitor);
}
1.2 StreamAPI的StreamGraph
结构StreamGraph的入口函数是 StreamGraphGenerator.generate()
。该函数会由触发程序执行的办法StreamExecutionEnvironment.execute()
调用到。就像OptimizedPla,StreamGraph 也是在 Client 端结构的。
在这个过程中,流水线首先被转换为Transformation流水线,而后被映射为StreamGraph,该图与具体的执行无关,外围是表白计算过程的逻辑。
对于Transformation
的引入,能够看社区的issue:https://issues.apache.org/jir…。实质是为了防止DataStream这一层对StreamGraph的耦合,因而引入这一层做解耦。
Transformation
关注的属性偏差框架外部,如:name(算子名)、uid(job重启时调配之前雷同的状态,长久保留状态)、bufferTimeout、parallelism、outputType、soltSharingGroup等。另外,Transformation分为物理Transformation和虚构Transformation,这于下一层的StreamGraph实现是有关联的。
StreamGraph的外围对象有两个:
- StreamNode:它能够有多个输入,也能够有多个输出。由Transformation转换而来——实体的StreamNode会最终变成物算子,虚构的StreamNode会附着在StreamEdge上。
- StreamEdge:StreamGraph的边,用于连贯两个StreamNode。就像下面说的——一个StreamNode能够有多个出边、入边。StreamEdge中蕴含了旁路输入、分区器、字段筛选输入(与SQL Select中抉择字段的逻辑一样)等的信息。
具体的转换代码在org.apache.flink.streaming.api.graph.StreamGraphGenerator
中,每个Transformation都有对应的转换逻辑:
static {
@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(
TimestampsAndWatermarksTransformation.class,
new TimestampsAndWatermarksTransformationTranslator<>());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
tmp.put(
KeyedBroadcastStateTransformation.class,
new KeyedBroadcastStateTransformationTranslator<>());
translatorMap = Collections.unmodifiableMap(tmp);
}
1.3 流批一体的JobGraph
代码入口和1.1大节简直一摸一样,DataSet的入口类是ExecutionEnvironment
,而DataStream的入口是StreamExecutionEnvironment
。PlanTranslator
变成了StreamGraphTranslator
。所以,StreamGraph到JobGraph的转化也是在Client端进行的,次要工作做优化。其中十分重要的一个优化就是Operator Chain,它会将条件容许的算子合并到一起,防止跨线程、跨网络的传递。
是否开启OperationChain能够在程序中显示的调整。
接下来,咱们来看下JobGraph到底是什么。先看正文:
/**
* The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
* All programs from higher level APIs are transformed into JobGraphs.
*
* <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
* form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
* but inside certain special vertices that establish the feedback channel amongst themselves.
*
* <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate
* result define the characteristics of the concrete operation and intermediate data.
*/
public class JobGraph implements Serializable {
它是一张图,由vertices
和intermediate
组成。并且它是一个低等级的API,为JobMaster而生——所有高等级的API都会被转换成JobGraph。接下来咱们须要关注的对象别离是JobVertex
、JobEdge
、IntermediateDataSet
。其中,JobVertex的输出是JobEdge,输入是IntermediateDataSet。
1.3.1 JoBVertex
通过符合条件的多个StreamNode通过优化后的可能会交融在一起生成一个JobVertex,即一个JobVertex蕴含一个或多个算子(有趣味的同学能够看StreamingJobGraphGenerator#buildChainedInputsAndGetHeadInputs
或者浏览相干的Issue:https://issues.apache.org/jir…)。
1.3.2 JobEdge
JobEdge是连贯IntermediateDatSet和JobVertex的边,代表着JobGraph中的一个数据流转通道,其上游是IntermediateDataSet,上游是JobVertex——数据通过JobEdge由IntermediateDataSet传递给指标JobVertex。
在这里,咱们要关注它的一个成员变量:
/**
* A distribution pattern determines, which sub tasks of a producing task are connected to which
* consuming sub tasks.
*
* <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected
* in {@link EdgeManagerBuildUtil}
*/
public enum DistributionPattern {
/** Each producing sub task is connected to each sub task of the consuming task. */
ALL_TO_ALL,
/** Each producing sub task is connected to one or more subtask(s) of the consuming task. */
POINTWISE
}
该散发模式会间接影响执行时Task之间的数据连贯关系:点对点连贯or全连贯(或者叫播送)。
1.3.3 IntermediateDataSet
两头数据集IntermediateDataSet是一种逻辑构造,用来示意JobVertex的输入,即该JobVertex中蕴含的算子会产生的数据集。在这里咱们须要关注ResultPartitionType:
- Blocking:顾名思义。都上游解决完数据后,再交给上游解决。这个数据分区能够被生产屡次,也能够并发生产。这个分区并不会被主动销毁,而是交给调度器判断。
- BlokingPersistent:相似于Blocking,然而其生命周期由用户端指定。调用JobMaster或者ResourceManager的API来销毁,而不是由调度器管制。
- Pipelined:流替换模式。能够用于有界和无界流。这种分区类型的数据只能被每个消费者生产一次。且这种分区能够保留任意数据。
- PipelinedBounded:与Pipelined有些不同,这种分区保留的数据是无限的,这不会使数据和检查点提早太久。因而实用于流计算场景(需注意,批处理模式下没有CheckpointBarrier)。
- Pipelined_Approximate:1.12引入的策略,用于针对单个task做fast failover的分区策略。有趣味的同学能够浏览相干issue:https://issues.apache.org/jir…。
不同的执行模式下,其对应的后果分区类型不同,决定了在执行时刻数据交换的模式。
IntermediateDataSet的个数与该JobVertext对应的StreamNode的出边数量雷同,能够是一个或者多个。
1.4 ExecutionGraph
JobManager接管到Client端提交的JobGraph及其依赖Jar之后就要开始调度运行该工作了,但JobGraph还是一个逻辑上的图,须要再进一步转化为并行化、可调度的执行图。这个动作是JobMaster做的——通过SchedulerBase触发,理论动作交给DefaultExecutionGraphBuilder#buildGraph
来做。在这些动作中,会生成与JobVertex对应的ExecutionJobVertex(逻辑概念)和ExecutionVertex,与IntermediateDataSet对应的IntermediateResult(逻辑概念)和IntermediateResultPartition等,所谓的并行度也将通过上述类实现。
接下来要聊聊ExecutionGraph的一些细节,会波及一些逻辑概念,因而在这里笔者画了一张图,便于参考。
1.4.1 ExecutionJobVertex与ExecutionVertex
ExecutionJobVertex和JobGraph中的JobVertex一一对应。该对象还蕴含一组ExecutionVertex,数量与该JobVertex中所蕴含的StreamNode的并行度统一,如上图所示,如果并行度为N,那么就会有N个ExecutionVertex。所以每一个并行执行的实例就是ExecutionVertex。同时也会构建ExecutionVertex的输入IntermediateResult。
因而ExecutionJobVertex更像是一个逻辑概念。
1.4.2 IntermediaResult与IntermediaResuktParitition
IntermediateResult示意ExecutionJobVertex的输入,和JobGraph中的IntermediateDataSet一一对应,该对象也是一个逻辑概念。同理,一个ExecutionJobVertex能够有多个两头后果,取决于以后JobVertex有几个出边(JobEdge)。
一个两头后果集蕴含多个两头后果分区IntermediateResultPartition,其个数等于该Job Vertext的并发度,或者叫作算子的并行度。每个IntermediateResultPartition示意1个ExecutionVertex输入后果。
1.4.3 Execution
ExecutionVertex在Runtime对应了一个Task。在真正执行的时会将ExecutionVerterx包装为一个Execution。
对于JobGraph如何提交到JobMaster不是本文的重点,有趣味的同学能够自行查看
org.apache.flink.runtime.dispatcher.DispatcherGateway#submitJob
的相干调用栈。
1.4.5 从JobGraph到ExecutionGraph
下面介绍了几个重要概念。接下来看一下ExecutionGraph的构建过程。次要参考办法为org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#attachJobGraph
。
首先是构建ExecutionJobVertex(参考其构造方法),设置其并行度、共享Solt、CoLocationGroup,并构建IntermediaResult与IntermediaResuktParitition,依据并发度创立ExecutionVertex,并查看IntermediateResults是否有反复援用。最初,会对可切分的数据源进行切分。
其次便是构建Edge(参考 org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil#connectVertexToResult)。依据DistributionPattern来创立EdgeManager,并将ExecutionVertex和IntermediateResult关联起来,为运行时建设Task之间的数据交换就是以此为根底建设数据的物理传输通道的。
1.4.6 开胃菜:从ExecutionGraph到真正的执行
当JobMaster生成ExecutionGraph后,便进入了作业调度阶段。这外面波及到了不同的调度策略、资源申请、工作散发以及Failover的治理。波及的内容极多,因而会在另外的文章中探讨。对此好奇的同学,能够先看DefaultExecutionGraphDeploymentTest#setupScheduler
,外面的代码较为简单,能够察看ExecutionGraph到Scheduling的过程。
private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2)
throws Exception {
v1.setParallelism(dop1);
v2.setParallelism(dop2);
v1.setInvokableClass(BatchTask.class);
v2.setInvokableClass(BatchTask.class);
DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
// execution graph that executes actions synchronously
final SchedulerBase scheduler =
SchedulerTestingUtils.newSchedulerBuilder(
JobGraphTestUtils.streamingJobGraph(v1, v2),
ComponentMainThreadExecutorServiceAdapter.forMainThread())
.setExecutionSlotAllocatorFactory(
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory())
.setFutureExecutor(executorService)
.setBlobWriter(blobWriter)
.build();
final ExecutionGraph eg = scheduler.getExecutionGraph();
checkJobOffloaded((DefaultExecutionGraph) eg);
// schedule, this triggers mock deployment
scheduler.startScheduling();
Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
assertEquals(dop1 + dop2, executions.size());
return scheduler;
}
2.小结
通过本文,咱们理解各层图存在的意义:
- StreamGraph与OptimizedPlan:从内部API转向外部API,生成Graph的根本属性。如果是批处理,则会进行一系列的优化。
- JobGraph:流批对立的Graph。在这里做一些通用的优化,比方OperatorChain。
- ExecutionGraph:可执行级别的图,构建时关注大量的执行细节:如并发、Checkpoint配置有效性、监控打点设置、反复援用查看、可切分的数据源进行切分等等。
通过图的分层,Flink将不同的优化项、查看项放到了适合它们的档次,这也是繁多职责准则的体现。
发表回复