关于后端:最佳实践|如何写出简单高效的-Flink-SQL

2次阅读

共计 9908 个字符,预计需要花费 25 分钟才能阅读完成。

摘要:本文整顿自阿里巴巴高级技术专家、Apache Flink PMC 贺小令,在 Flink Forward Asia 2022 生产实践专场的分享。本篇内容次要分为三个局部:

  1. Flink SQL Insight
  2. Best Practices
  3. Future Works

点击查看原文视频 & 演讲 PPT

一、Flink SQL Insight

Flink 作为流批一体计算引擎,给大家提供了对立的 API,对立的算子形容,以及对立的调度。但 Flink 算子的底层仍有一些轻微的差异。

对于一个批算子而言,它的输出是一个无限数据集。批算子会基于残缺数据集进行计算,计算过程中如果内存装不下,数据会 Spill 到磁盘。

对于流算子而言,它的输出是一个有限数据集。与批算子不同,流算子不能在收集到所有输出数据之后才开始解决,也不可能将这些数据存到磁盘上。所以流算子的解决是一条一条的(也可能会按一小批进行计算)。

当流算子接管到上游的一条数据后,对于 Stateful 算子会从 State 里读取之前的计算结果,而后联合以后数据进行计算,并将计算结果存储到 State 里。因而 State 优化是流计算里十分重要的一部分。

批处理里仅有 Append 音讯,而流解决中,不仅有 Append 音讯,还有 UpdateBefore、UpdateAfter 和 Delete 音讯,这些音讯都是历史数据的勘误。在算子的计算逻辑里,如果不能很好的解决勘误音讯,会导致上游数据收缩,引发新的问题。

如何使一个作业运行更快?能够总结为 6 点:缩小反复计算、缩小有效数据计算、解决数据申请问题、进步计算吞吐、缩小 State 拜访、缩小 State 的大小。

除了最初两个是针对流解决的,其余都是各个通用计算引擎都须要面对的。

接下来,通过分析一个 Flink SQL 如何变成一个的 Flink 作业的过程,来介绍作业变得更优的伎俩。

Flink 引擎接管到一个 SQL 文本后,通过 SqlParser 将其解析成 SqlNode。通过查问 Catalog 中的元数据信息,对 SqlNode 中的表、字段、类型、udf 等进行校验,校验通过后会转换为 LogicalPlan,通过 Optimizer 优化后转换为 ExecPlan。ExecPlan 是 SQL 层的最终执行打算,通过 CodeGen 技术将执行打算翻译成可执行算子,用 Transformation 来形容,最初转换为 JobGraph 并提交到 Flink 集群上执行。

如何让 SQL 作业执行更快?Optimizer 在此施展了重要作用。它将 SQL 语义等价的 LogicalPlan 转换为可高效执行的 ExecPlan,而 ExecPlan 到 JobGraph 转换过程都是确定的。

下图展现了优化器外部的细节。一个 Flink 作业通常蕴含多个 Insert 语句,当多个 Insert 语句翻译成 Logical Plan,会造成一个 DAG,咱们称该优化器为 DAG 优化器。

Optimizer 的输出包含 5 局部:LogicalPlan、Flink 配置、束缚信息(例如 PK 信息),统计信息、用户在 SQL 中提供的 Hints。

具体的优化过程包含:首先须要将 DAG 基于 View 进行合成,该做法是为了尽可能保障执行打算可复用。合成完后会失去一个个独立的 Tree,从叶子节点向根节点顺次将这个 Tree 交给 Calcite 的 Optimizer 进行优化。而后将优化完的后果从新组装为 DAG,造成 PhysicalPlan 的 DAG,并应用子图复用技术持续改写 PhysicalPlan,进一步打消反复计算。最初将 PhysicalPlan 转换为 ExecPlan。基于 Exec DAG,能够做更多的优化:如 MultipleInput Rewrite,缩小网络 Shuffle 带来的开销;DynamicFiltering Rewrite,缩小有效数据的读取和计算等。

Calcite Optimizer 是经典的数据库中的关系代数优化,有基于规定的优化——RBO,基于代价模型的优化——CBO,同时还定义了大量的优化规定,大量的 Meta 信息的推导(如 PrimaryKey 推导、RowCount 的推导),来帮忙优化器取得最优的执行打算。

在 Calcite Optimizer 里,联合了经典的优化思路:首先是对 LogicalPlan 进行一些确定性的改写,例如 SubQuery 改写、解关联、常量折叠等。而后做一些通用的改写,例如各种 push down,造成 FlinkLogicalPlan。这部分优化是流批通用的。而后流和批依据各自底层的实现,进行特定的优化,例如批上会依据 Cost 抉择不同的 Join 算法,流上会依据配置决定是否将 Retract 解决转换为 Upsert 解决,是否开启 Local/Global 解决数据热点问题等。

DAG 优化器解决了大量后面提到的各种问题,Flink SQL 是否足够 Fast 起到了十分关键作用。

二、Best Practices

接下来将联合生产中一些高频应用的场景和 SQL,介绍一些优化的最佳实际。

2.1 Sub-Plan Reuse

首先是 Sub-Plan Reuse(即子图复用)优化。下图中的两句 SQL 会被优化成两个独立的 Pipeline,见下图中左下角的执行打算,Scan 算子和 Filter 算子中 a>10 会计算两次。

通过开启 Sub-Plan Reuse 优化,优化器会主动发现执行打算中计算逻辑齐全一样的子图并将它们进行合并,从而防止反复计算。这个例子中,因为两个 Scan 算子齐全一样,所以通过优化之后变成一个 Scan 算子,从而减小数据的读取的次数,优化后的 DAG 如上图右下角所示。

逻辑上,优化器也能够复用 Filter 中的 a>10 的局部的计算。然而当遇到非常复杂的 Query 时,同时联合各种 push down 的优化,要找到现实的可复用的执行打算是一件十分麻烦的事件。能够应用基于视图(View)的子图复用的形式来达到最大化子图复用的成果。

这里的 View,能够了解为一个子图复用单元。基于用户定义的 View,优化器可能自动识别哪些 View 可能被复用。下图中的 SQL 会翻译成左下角的执行打算,通过子图复用优化后 Scan 和 Filter a>10 都将被复用(如下图右下角执行打算所示)。

2.2 Fast Aggregation

Aggregation Query 在生产中被高频应用,这里以“select a,sum(b) from my_table group by a;”为例进行介绍 Aggregation 相干优化。该 SQL 将被翻译成下图中右边的逻辑执行打算,而图中左边是执行拓扑:Scan 算子的并发为 3,Aggregation 算子的并发为 2。

图中的一个小方框示意一条数据,用色彩来示意字段 a 的值,用数字示意字段 b 的值,所有的 Scan 算子的数据会按字段 a 进行 Shuffle 输入给 Aggregate 算子。流计算中,当 Aggregate 算子接管到一条数据后,首先会从数据中抽取它对应的 Group Key,而后从 State 里取得对应 Key 之前的聚合后果,并基于之前的聚合后果和以后的数据进行聚合计算。最初将后果更新到 State 里并输入到上游。(逻辑可参考上图中的伪代码)

上图中 Aggregate 算子的输出有 15 条数据,Aggregate 计算会触发 15 次 State 操作(15 次 get 和 put 操作),这么频繁的操作可能会造成性能影响。与此同时,第一个 Aggregate 实例接管了 8 条红色数据,会造成热点 Key。如何解决 State 拜访频繁和热点 Key 的问题在生产中常常遇到。

通过开启 MiniBatch 能缩小 State 的拜访和缩小热点 Key 的拜访。对应的配置为:

  • table.exec.mini-batch.enabled: true
  • table.exec.mini-batch.allow-latency: 5s // 用户按需配置

allow-latency 表明了 MiniBatch 的大小是 5 秒,即 5 秒内的数据会成为为一个 Mini Batch。开启 Mini Batch 之后,Aggregation 的计算逻辑就会发生变化:当 Aggregate 算子接管到一条数据时,不会间接触发计算,而是把它放到一个 Buffer 里。当 Buffer 攒满之后,才开始计算:将 Buffer 的数据按 Key 进行分组,以每个组为单位进行计算。对于每个 Key,先从 State 中获取该 Key 对应的之前的聚合后果,而后逐条计算该 Key 对应的所有数据,计算实现后将后果更新 State 并输入给上游。

这里 State 的拜访次数等于 Key 的个数。如果将上图中所示的数据都放到一个 Buffer 的话,这里 Key 的个数只有 4 个,因而只有 4 次 State 拜访。相比于原来的 15 次访问,开启 MiniBatch 优化后大大减少了 State 拜访的开销。

开启 MiniBatch 实用的场景为:

  • 作业对提早没有严格要求(因为攒 Batch 自身会带来提早);
  • 以后 Aggregate 算子拜访 State 是瓶颈;
  • 上游算子的解决能力有余(开启 MiniBatch 会缩小往上游输入的数据量)。

上图中,咱们察看到第一个 Aggregate 实例中尽管拜访 State 频率很少,然而要解决的数据量还是没变,其整体数据量和单 Key 数据量相比其余 Aggregate 实例都多很多,即存在数据歪斜。咱们能够通过两阶段 Aggregate(Local/Global)来解决数据歪斜防止热点。

开启两阶段 Aggregate 须要开启 MiniBatch 和将 agg-phase-strategy 设置为 TWO PHASE/AUTO,开启后的 Plan 如下图所示。

LocalAgg 和上游算子 chain 在一起,即 Scan 和 LocalAgg 采纳雷同并发,执行两头不存在数据 Shuffle。在 LocalAgg 里,会依照 MiniBatch 的大小进行聚合,并把聚合的后果输入到 GlobalAgg。

上图示例中,通过 LocalAgg 聚合后的红色数据在第一个 GlobalAgg 实例中从 8 条减到 3 条,各个 GlobalAgg 实例之间不存在数据歪斜。

开启 Local/Global 实用的场景为:

  • 所有 Aggregate Function 都实现了 merge 办法,不然没法在 GlobalAgg 在对 LocalAgg 的后果进行合并;
  • LocalAgg 聚合度比拟高,或者 GlobalAgg 存在数据歪斜,否则开启 LocalAgg 会引入额定的计算开销。

当 Query 中有 Distinct Aggreation Function 时,Local/Global 解决不了热点问题。通过下图的例子能够看进去,LocalAgg 按 a、b 字段进行聚合,聚合成果并不显著。因而,第一个 GlobalAgg 实例还是接管到大量的热点数据,共 7 条红色数据。LocalAgg 的聚合 Key(a、b)和上游的 Shuffle Key(a)不统一导致 Local/Global 无奈解决 DistinctAgg 的数据热点问题。

咱们能够通过开启 Partial/Final Aggreation 来解决 DistinctAgg 的数据热点问题。PartialAgg 通过 Group Key + Distinct Key 进行 Shuffle,这样确保雷同的 Group Key + Distinct Key 能被 Shuffle 到同一个 PartialAgg 实例中,实现第一层的聚合,从而缩小 FinalAgg 的输出数据量。能够通过上面配置开启 Partial/Final 优化:

  • table.optimizer.distinct-agg.split.enabled: true;
  • table.optimizer.distinct-agg.split.bucket-num:1024

开启配置之后,优化器会将原来的 plan 翻译成下图的 plan:

上图例子中,通过 PartialAgg 的聚合之后,第一个 FinalAgg 实例的热点 Key 从 7 条缩小到 3 条。开启 Partial/Final 实用的场景为:

  • Query 中存在 Distinct Function,且存在数据歪斜;
  • 非 Distinct Function 只能是如下函数:count,sum,avg,max,min;
  • 数据集比拟大的时候成果更好,因为 Partial/Final 两层 Agg 之间会引入额定的网络 Shuffle 开销;
  • Partial Agg 还会引入额定的计算和 State 的开销。

和 LocalAgg 不一样,PartialAgg 会将后果寄存在 State 中,这会造成 State 翻倍。为了解决 PartialAgg 引入额定的 State 开销的问题,在 Partital/Final 根底上引入 Increment 优化。它同时联合了 Local/Global 和 Partial/Final 优化,并将 Partial 的 GlobalAgg 和 Final 的 LocalAgg 合并成一个 IncrementalAgg 算子,它只寄存 Distinct Value 相干的值,因而缩小了 State 的大小。开启 IncrementalAgg 之后的执行打算如下图所示:

一些 Query 改写也能缩小 Aggregate 算子的 State。很多用户应用 COUNT DISTICNT + CASE WHEN 统计同一个维度不同条件的后果。在 Aggregate 算子里,每个 COUNT DISTICNT 函数都采纳独立的 State 存储 Distinct 相干的数据,而这部分数据是冗余的。能够通过将 CASE WHEN 改写为 FILTER,优化器会将雷同维度的 State 数据共享,从而缩小 State 大小。

2.3 Fast Join

Join 是咱们生产中十分常见的 Query,Join 的优化带来的性能晋升成果非常明显。以下图简略的 Join Query 为例介绍 Join 相干的优化:该 Query 将被翻译成 Regular Join(也常被称为双流 Join),其 State 会保留 left 端和 right 端所有的数据。当数据集较大时,Join State 也会十分大,在生产中可能会造成重大的性能问题。优化 Join State 是流计算十分重要的内容。

针对 Regular Join,有两种常见的优化形式:

  1. 当 Join Key 中带有 PrimaryKey(以下简称 PK)时,State 中存储的数据较少,它只存了输出数据和对应的关联次数。当 Join 的输出存有 PK 时,State 中存储了一个 Map,Map Key 是 PK,Value 是输出数据以及关联的次数。当 Join Key 和 Join 输出都没有 PK 时,State 仍用 Map 存储,但它的 Key 是输出的数据,Value 是这一行数据的呈现次数和关联次数。尽管它们都是用 Map 存储,Join 输出带 PK 时的 State 拜访效率高于没有 PK 的状况。因而倡议用户在 Query 中尽量定义 PK 信息,帮忙优化器更好的优化。
  2. 倡议在 Join 之前只保留必要的字段,让 Join State 外面存储的数据更少。

除了 Regular Join 外,Flink 还提供了其余类型的 Join:Lookup Join、Interval Join、Temporal Join、Window Join。在满足业务需要的条件下,用户能够将 Regular Join 进行改写成其余类型的 Join 以节俭 State。Regular Join 和其余 Join 的转换关系如下图所示:

  • 将 Regular Join 改写成 Lookup Join。支流来一条数据会触发 Join 计算,Join 会依据支流的数据查找维表中相干最新数据,因而 Lookup Join 不须要 State 存储输出数据。目前很多维表 Connector 提供了点查机制和缓存机制,执行性能十分好,在生产中被大量应用。前面章节会独自介绍 Lookup Join 相干优化。Lookup Join 的毛病是当维表数据有更新时,无奈触发 Join 计算。
  • 将 Regular Join 改写为 Interval Join。Interval Join 是在 Regluar Join 根底上对 Join Condition 加了时间段的限定,从而在 State 中只须要存储该时间段的数据即可,过期数据会被及时清理。因而 Interval Join 的 State 相比 Regular Join 要小很多。
  • 把 Regular Join 改写成 Window Join。Window Join 是在 Regluar Join 根底上定义相干 Window 的数据能力被 Join。因而,State 只寄存的最新 Window,过期数据会被及时清理。
  • 把 Regular Join 改写成 Temporal Join。Temporal Join 是在 Regular Join 上定义了相干版本能力被 Join。Temporal Join 保留最新版本数据,过期数据会被及时清理。

2.4 Fast Lookup Join

Lookup Join 在生产中被宽泛应用,因而咱们这里对其做独自的优化介绍。以后 Flink 对 Lookup Join 提供了多种优化形式:

第一种,提供同步和异步查问机制。如下图所示,在同步查问中,当一条数据发送给维表 Collector 后,须要等后果返回,能力解决下一条,两头会有大量的工夫期待。在异步查问过程中,会将一批数据同时发送给维表 Collector,在这批数据实现查问会对立返回给上游。通过异步查问模式模式,查问性能失去极大晋升。Flink 提供了 Query Hint 的形式开启同步和异步模式,请参考下图所示。

第二种,在异步模式下,提供了 Ordered 和 Unordered 机制。在 Order 模式之下,须要等所有数据返回后并对数据排序,以确保输入的程序和输出的程序统一,排序实现能力发送给上游。在 Unordered 模式下,对输入程序没有要求,只有查问到后果,就能够间接发送给上游。因而,相比于 Order 模式,Unordered 模式可能有极大晋升性能。Flink 提供了 Query Hint 的形式开启 Ordered 和 Unordered 模式,请参考下图所示。(Flink 也提供 Job 级别的配置,开启所有维表查问的都采纳雷同输入模式。)

第三种,Cache 机制。这是一种很常见的优化,用 本地 Memory Lookup 进步查问效率。目前,Flink 提供了三种 Cache 机制:

  • Full Caching,将所有数据全副 Cache 到内存中。该形式适宜小数据集,因为数据量过大会导致 OOM。Flink 提供了 Table Hints 开启。同时,还能够通过 Hints 定义 reload 策略。
  • Partial Caching,适宜大数据集应用,框架底层应用 LRU Cache 保留最近被应用的数据。当然,也能够通过 Hint 定义 LRU Cache 的大小和 Cache 的生效工夫。
  • No Caching,即敞开 Cache。

2.5 Fast Deduplication

流计算中常常遇到数据反复,因而“去重”应用频率十分高。在晚期版本,Flink 通过 group by + first_value 的形式查找第一行数据;通过 group by + last_value 查找最初一行数据。示例如下图所示:

上述 Query 会被转换为 Aggregate 算子,它的 State 十分大且无奈保障语义残缺。因为上游的输入数据可能来源于不同行,导致与按行去重的语义语义相悖。

在最近版本中,Flink 提供了更高效的去重形式。因为 Flink 没有专门提供去重语法,目前通过 Over 语句表白去重语义,参考下图所示。对于查找第一行场景,能够依照工夫升序,同时输入 row_number,再基于 row_number 进行过滤。此时,算子 State 只须要存储去重的 Key 即可。对于查找最初一行场景,把工夫程序变为降序,算子 State 只须要存储最初一行数据即可。

2.6 Fast TopN

TopN(也称为 Rank)在生产中也常常遇到,Flink 中没有提供专门的 TopN 的语法,也是通过 Over 语句实现。目前,TopN 提供了三种实现形式(它们性能顺次递加)。

第一种,AppendRank,要求输出是 Append 音讯,此时 TopN 算子中 State 仅须要为每个 Key 寄存满足要求的 N 条数据。

第二种,UpdateFastRank,要求输出是 Update 音讯,同时要求上游的 Upsert Key 必须蕴含 Partition Key,Order-by 字段必须是枯燥的且其枯燥方向须要与 Order-by 的方向相同。以上图中左边的 Query 为例,Group By a,b 会产生以 a,b 为 Key 的更新流。在 TopN 算子中,上游产生的 Upsert Key(a,b)蕴含 Over 中 Partition Key(a,b);Sum 函数的入参都是负数(where c >= 0),所以 Sum 的后果将是枯燥递增的,从而字段 c 是枯燥递增,与排序方向相同。因而这个 Query 就能够被翻译成 UpdateFastRank。

第三种,RetractRank,对输出没有任何要求。State 中须要寄存所有输出数据,其性能也是最差的。

除了在满足业务需要的状况下批改 Query 以便让优化器抉择更优的算子外,TopN 还有一些其余的优化办法:

  1. 不要输入 row_number 字段,这样能够大大减少上游解决的数据量。如果上游须要排序,能够在前端拿到数据后重排。
  2. 减少 TopN 算子中 Cache 大小,缩小对 State 的拜访。Cache 命中率的计算公式为:cache_hit = cache_size * parallelism / top_n_num / partition_key_num。由此可见,减少 Cache 大小能够减少 Cache 命中率(能够通过 table.exec.rank.topn-cache-size 批改 Cache 大小,默认值是 1 万)。须要留神的是,减少 Cache 大小时,TaskManager 的内存也须要相应减少。
  3. 分区字段最好与工夫相干。如果 Partition 字段不与工夫属性关联,无奈通过 TTL 进行清理,会导致 State 有限收缩。(配置了 TTL,数据被过期清理可能导致后果谬误,须要谨慎)

2.7 Efficient User Defined Connector

Flink 提供了多种 Connector 相干接口,帮忙优化器生成更优的执行打算。用户能够依据 Connector 的能力,实现对应的接口从而进步 Collector 的执行性能。

  • SupportsFilterPushDown,将 Filter 条件下推到 Scan 里,从而缩小 Scan 的读 IO,以进步性能。
  • SupportsProjectionPushDown,通知 Scan 只读取必要的字段,缩小有效的字段读取。
  • SupportsPartitionPushDown,在动态优化时,通知 Scan 只须要读取无效分区,防止有效分区读取。
  • SupportsDynamicFiltering,在作业运行时,动静辨认出哪些分区是无效的,防止有效分区读取。
  • SupportsLimitPushDown,将 limit 值下推到 Scan 里,只须要读取 limit 条数据即可,大大减少了 Scan I/O。
  • SupportsAggregatePushDown,间接从 Scan 中读取聚合后果,缩小 Scan 的读 I/O,同时输入给上游的数据更少。
  • SupportsStatisticReport,Connector 汇报统计信息给优化器,以便优化器产生更优的执行打算。

2.8 Use Hints Well

任何优化器都不是完满的,Flink 提供了 Hints 机制来影响优化器以取得更优的执行打算。目前有两种 Hints:

  • Table Hints,次要用于扭转 Table 表的配置,例如 Lookup Table 的缓存策略能够通过 Table Hint 进行批改。
  • Query Hints,以后它能够倡议优化器抉择适合的 Join 策略。Lookup Join 能够通过 Hint 批改 Lookup 策略,请参考上述 Lookup Join 相干优化的介绍;也能够通过 Hint 让优化器为批作业抉择对应的 Join 算法,具体请参考上图所示。

三、Future Works

在将来,咱们将提供更有深度、更多场景、更智能的优化,以进一步提高 Flink 引擎的执行效率。

首先,咱们将会在优化的深度上会继续开掘,例如多个间断的 Join 会有 State 反复的问题,能够将它优化成一个 Join,防止 State 反复。

其次,咱们心愿扩充优化的广度,联合不同的业务场景进行有针对性的优化。

最初,咱们心愿优化更加智能,会摸索做一些动静优化相干工作,例如依据流量变动,在线主动优化 plan。

点击查看原文视频 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

正文完
 0