乐趣区

关于flink:Flink-SQL-在字节跳动的优化与实践

整顿 | Aven (Flink 社区志愿者)

摘要:本文由 Apache Flink Committer,字节跳动架构研发工程师李本超分享,以四个章节来介绍 Flink 在字节的利用实战。内容如下:

  • 整体介绍
  • 实际优化
  • 流批一体
  • 将来布局

一、整体介绍

2018 年 12 月 Blink 发表开源,经验了约一年的工夫 Flink 1.9 于 2019 年 8 月 22 公布。在 Flink 1.9 公布之前字节跳动外部基于 master 分支进行外部的 SQL 平台构建。经验了 2~3 个月的工夫字节外部在 19 年 10 月份公布了基于 Flink 1.9 的 Blink planner 构建的 Streaming SQL 平台,并进行外部推广。在这个过程中发现了一些比拟有意思的需要场景,以及一些较为奇怪的 BUG。

基于 1.9 的 Flink SQL 扩大

尽管最新的 Flink 版本曾经反对 SQL 的 DDL,但 Flink 1.9 并不反对。字节外部基于 Flink 1.9 进行了 DDL 的扩大反对以下语法:

  • create table
  • create view
  • create function
  • add resource

同时 Flink 1.9 版本不反对的 watermark 定义在 DDL 扩大后也反对了。

咱们在举荐大家尽量的去用 SQL 表白作业时收到很多“SQL 无奈表白简单的业务逻辑”的反馈。工夫久了发现其实很多用户所谓的简单业务逻辑有的是做一些内部的 RPC 调用,字节外部针对这个场景做了一个 RPC 的维表和 sink,让用户能够去读写 RPC 服务,极大的扩大了 SQL 的应用场景,包含 FaaS 其实跟 RPC 也是相似的。在字节外部增加了 Redis/Abase/Bytable/ByteSQL/RPC/FaaS 等维表的反对。

同时还实现了多个外部应用的 connectors:

  1. source: RocketMQ
  2. sink:
    RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics

并且为 connector 开发了配套的 format:PB/Binlog/Bytes。

在线的界面化 SQL 平台

除了对 Flink 自身性能的扩大,字节外部也上线了一个 SQL 平台,反对以下性能:

  • SQL 编辑
  • SQL 解析
  • SQL 调试
  • 自定义 UDF 和 Connector
  • 版本控制
  • 工作治理

二、实际优化

除了对性能的扩大,针对 Flink 1.9 SQL 的不足之处也做了一些优化。

Window 性能优化

1、反对了 window Mini-Batch

Mini-Batch 是 Blink planner 的一个比拟有特色的性能,其次要思维是积攒一批数据,再进行一次状态拜访,达到缩小拜访状态的次数升高序列化反序列化的开销。这个优化次要是在 RocksDB 的场景。如果是 Heap 状态 Mini-Batch 并没什么优化。在一些典型的业务场景中,失去的反馈是能缩小 20~30% 左右的 CPU 开销。

2、扩大 window 类型

目前 SQL 中的三种内置 window,滚动窗口、滑动窗口、session 窗口,这三种语意的窗口无奈满足一些用户场景的需要。比方在直播的场景,分析师想统计一个主播在开播之后,每一个小时的 UV(Unique Visitor)、GMV(Gross Merchandise Volume) 等指标。天然的滚动窗口的划分形式并不可能满足用户的需要,字节外部就做了一些定制的窗口来满足用户的一些共性需要。

-- my_window 为自定义的窗口,满足特定的划分形式
SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
my_window(ts, INTERVAL '1' HOURS)

3、window offset

这是一个较为通用的性能,在 Datastream API 层是反对的,但 SQL 中并没有。这里有个比拟有意思的场景,用户想要开一周的窗口,一周的窗口变成了从周四开始的非天然周。因为谁也不会想到 1970 年 1 月 1 号那天竟然是周四。在退出了 offset 的反对后就能够反对正确的天然周窗口。

SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)

维表优化

1、提早 Join

维表 Join 的场景下因为维表常常发生变化尤其是新增维度,而 Join 操作产生在维度新增之前,常常导致关联不上。

所以用户心愿如果 Join 不到,则临时将数据缓存起来之后再进行尝试,并且能够管制尝试次数,可能自定义提早 Join 的规定。这个需要场景不单单在字节外部,社区的很多同学也有相似的需要。

基于下面的场景实现了提早 Join 性能,增加了一个能够反对提早 Join 维表的算子。当 Join 没有命中,local cache 不会缓存空的后果,同时将数据临时保留在一个状态中,之后依据设置定时器以及它的重试次数进行重试。

2、维表 Keyby 性能

通过拓扑咱们发现 Cacl 算子和 lookUpJoin 算子是 chain 在一起的。因为它没有一个 key 的语义。

当作业并行度比拟大,每一个维表 Join 的 subtask,拜访的是所有的缓存空间,这样对缓存来说有很大的压力。

但察看 Join 的 SQL,等值 Join 是人造具备 Hash 属性的。间接凋谢了配置,运行用户间接把维表 Join 的 key 作为 Hash 的条件,将数据进行分区。这样就能保障上游每一个算子的 subtask 之间的拜访空间是独立的,这样能够大大的晋升开始的缓存命中率。

除了以上的优化,还有两点目前正在开发的维表优化。

1、 播送维表 :有些场景下维表比拟小,而且更新不频繁,但作业的 QPS 特地高。如果仍然拜访内部零碎进行 Join,那么压力会十分大。并且当作业 Failover 的时候 local cache 会全副生效,进而又对外部零碎造成很大拜访压力。那么改良的计划是定期全量 scan 维表,通过 Join key hash 的形式发送到上游,更新每个维表 subtask 的缓存。
2、Mini-Batch:次要针对一些 I/O 申请比拟高,零碎又反对 batch 申请的能力,比如说 RPC、HBase、Redis 等。以往的形式都是逐条的申请,且 Async I/O 只能解决 I/O 提早的问题, 并不能解决访问量的问题。通过实现 Mini-Batch 版本的维表算子,大量升高维表关联拜访内部存储次数。

Join 优化

目前 Flink 反对的三种 Join 形式;别离是 Interval Join、Regular Join、Temporal Table Function。

前两种语义是一样的流和流 Join。而 Temporal Table 是流和表的的 Join,左边的流会以主键的模式造成一张表,右边的流去 Join 这张表,这样一次 Join 只能有一条数据参加并且只返回一个后果。而不是有多少条都能 Join 到。

它们之间的区别列了几点:

能够看到三种 Join 形式都有它自身的一些缺点。

  1. Interval Join 目前应用上的缺点是它会产生一个 out join 数据和 watermark 乱序的状况。
  2. Regular Join 的话,它最大的缺点是 retract 放大 (之后会具体阐明这个问题)。
  3. Temporal table function 的问题较其它多一些,有三个问题。
  • 不反对 DDl
  • 不反对 out join 的语义 (FLINK-7865 的限度)
  • 右侧数据断流导致 watermark 不更新,上游无奈正确计算 (FLINK-18934)

对于以上的不足之处字节外部都做了对应的批改。

加强 Checkpoint 恢复能力

对于 SQL 作业来说一旦产生条件变动都很难从 checkpoint 中复原。

SQL 作业的确从 checkpoint 复原的能力比拟弱,因为有时候做一些看起来不太影响 checkpoint 的批改,它依然无奈复原。无奈复原次要有两点;

  • 第一点:operate ID 是主动生成的,而后因为某些起因导致它生成的 ID 扭转了。
  • 第二点:算子的计算的逻辑产生了扭转,即算子外部的状态的定义产生了变动。

例子 1:并行度产生批改导致无奈复原。

source 是一个最常见的有状态的算子,source 如果和之后的算子的 operator chain 逻辑产生了扭转,是齐全无奈复原的。

下图左上是失常的社区版的作业会产生的一个逻辑,source 和前面的并行度一样的算子会被 chain 在一起,用户是无奈去扭转的。但算子并行度是常会会产生批改,比如说 source 由原来的 100 批改为 50,cacl 的并发是 100。此时 chain 的逻辑就会发生变化。

针对这种状况,字节外部做了批改,容许用户去配置,即便 source 的并行度跟前面整体的作业的并行度是一样的,也让其不与之后的算子 chain 在一起。

例子 2:DAG 扭转导致无奈复原。

这是一种比拟非凡的状况,有一条 SQL (上图),能够看到 source 没有发生变化,之后的三个聚合相互之间没有关系,状态居然也是无奈复原。

作业之所以无奈复原,是因为 operator ID 生成规定导致的。目前 SQL 中 operator ID 的生成的规定与上游、自身配置以及上游能够 chain 在一起的算子的数量都有关系。因为新增指标,会导致新增一个 Calc 的上游节点,进而导致 operator ID 发生变化。

为了解决这种状况,反对了一种非凡的配置模式,容许用户配置生成 operator ID 的时候能够疏忽上游 chain 在一起算子数量的条件。

例子 3:新增聚合指标导致无奈复原

这块是用户诉求最大的,也是最简单的局部。用户冀望新增一些聚合指标后,原来的指标要能从 checkpoint 中复原。

能够看到图中左局部是 SQL 生成的算子逻辑。count,sum,sum,count,distinct 会以一个 BaseRow 的构造存储在 ValueState 中。distinct 比拟非凡一些,还会独自存储在一个 MapState 中。

这导致了如新增或者缩小指标,都会使原先的状态没方法从 ValueState 中失常复原,因为 VauleState 中存储的状态“schema”和新的(批改指标后)的“schema”不匹配,无奈失常反序列化。

在探讨解决方案之前,咱们先回顾一下失常的复原流。先从 checkpoint 中复原出状态的 serializer,再通过 serializer 把状态复原。接下来 operator 去注册新的状态定义,新的状态定义会和原先的状态定义进行一个兼容性比照,如果是兼容则状态复原胜利,如果不兼容则抛出异样工作失败。

不兼容的另一种解决状况是容许返回一个 migration(实现两个不匹配类型的状态复原)那么也能够复原胜利。

针对下面的流程做出对应的批改:

  1. 第一步使新旧 serializer 相互晓得对方的信息,增加一个接口,且批改了 statebackend resolve compatibility 的过程,把旧的信息传递给新的,并使其获取整个 migrate 过程。
  2. 第二步判断新老之间是否兼容,如果不兼容是否须要做一次 migration。而后让旧的 serializer 去复原一遍状态,并应用新的 serializer 写入新的状态。
  3. 对 aggregation 的代码生成进行解决,当发现 aggregation 拿到的是指标是 null,那么将做一些初始化的工作。

通过以上的批改根本就能够做到失常的,新增的聚合指标从拆开的计划复原。

三、流批一体摸索

业务现状

字节跳动外部对流批一体和业务推广之前,技术团队提前做了大量技术方面的摸索。整体判断是 SQL 这一层是能够做到流批一体的语义,但实际中却又发现不少不同。

比如说流计算的 session window,或是基于解决工夫的 window,在批计算中无奈做到。同时 SQL 在批计算中一些简单的 over window,在流计算中也没有对应的实现。

但这些特地的场景可能只占 10% 甚至更少,所以用 SQL 去落实流批一体是可行的。

流批一体

这张图是比拟常见的和大多数公司里的架构都相似。这种架构有什么缺点呢?

  1. 数据不同源:批工作个别会有一次前置解决工作,不论是离线的也好实时的也好,事后进过一层加工后写入 Hive。而实时工作是从 kafka 读取原始的数据,可能是 json 格局,也可能是 avro 等等。间接导致批工作中可执行的 SQL 在流工作中没有后果生成或者执行后果不对。
  2. 计算不同源:批工作个别是 Hive + Spark 的架构,而流工作根本都是基于 Flink。不同的执行引擎在实现上都会有一些差别,导致后果不统一。不同的执行引擎有不同的 API 定义 UDF,它们之间也是无奈被专用的。大部分状况下都是保护两套基于不同 API 实现的雷同性能的 UDF。

鉴于下面的问题,提出了基于 Flink 的流批一体架构来解决。

  1. 数据不同源:流式解决先通过 Flink 解决之后写入 MQ 供上游流式 Flink job 去生产,对于批式解决由 Flink 解决后流式写入到 Hive,再由批式的 Flink job 去解决。
  2. 引擎不同源:既然都是基于 Flink 开发的流式,批式 job,天然没有计算不同源问题,同时也防止了保护多套雷同性能的 UDF。

基于 Flink 实现的流批一体架构:

业务收益

  1. 对立的 SQL:通过一套 SQL 来表白流和批计算两种场景,缩小开发保护工作。
  2. 复用 UDF:流式和批式计算能够共用一套 UDF。这对业务来说是有积极意义的。
  3. 引擎对立:对于业务的学习老本和架构的保护老本都会升高很多。
  4. 优化对立:大部分的优化都是能够同时作用在流式和批式计算上,比方对 planner、operator 的优化流和批能够共享。

四、将来工作和布局

优化 retract 放大问题

什么是 retract 放大?

上图有 4 张表,第一张表进行去重操作 (Dedup),之后别离和另外三张表做 Join。逻辑比较简单,表 A 输出 (A1),最初产出 (A1,B1,C1,D1) 的后果。

当表 A 输出一个 A2,因为 Dedup 算子,导致数据须要去重,则向上游发送一个撤回 A1 的操作 -(A1) 和一个新增 A2 的操作 +(A2)。第一个 Join 算子收到 -(A1) 后会将 -(A1) 变成 -(A1,B1) 和 +(null,B1)(为了放弃它认为的正确语义) 发送到上游。之后又收到了 +(A2),则又向上游发送 -(null,B1) 和 +(A2,B1) 这样操作就放大了两倍。再经由上游的算子操作会始终被放大,到最终的 sink 输入可能会被放大 1000 倍之多。

如何解决?

将原先 retract 的两条数据变成一条 changelog 的格局数据,在算子之间传递。算子接管到 changelog 后处理变更,而后仅仅向上游发送一个变更 changelog 即可。

将来布局

1. 性能优化
  • 反对所有类型聚合指标变更的 checkpoint 恢复能力
  • window local-global
  • 事件工夫的 Fast Emit
  • 播送维表
  • 更多算子的 Mini-Batch 反对: 维表,TopN,Join 等
  • 全面兼容 Hive SQL 语法
2. 业务扩大
  • 进一步推动流式 SQL 达到 80%
  • 摸索落地流批一体产品状态
  • 推动实时数仓标准化
退出移动版