摘要:本文整顿自网易游戏资深开发工程师林小铂在 Flink Forward Asia 2021 平台建设专场的演讲。次要内容包含:
- 网易游戏 Flink SQL 倒退历程
- 基于模板 jar 的 StreamflySQL v1
- 基于 SQL Gateway 的 StreamflySQL v2
- 将来工作
点击查看直播回放 & 演讲 PDF
一、网易游戏 Flink SQL 倒退历程
网易游戏实时计算平台叫做 Streamfly,这个名字取名自电影《驯龙高手》中的 Stormfly。因为咱们曾经在从 Storm 迁徙到 Flink,所以将 Stormfly 中的 Storm 替换成了更为通用的 Stream。
Streamfly 前身是离线作业平台 Omega 下的名为 Lambda 的子系统,它负责了所有实时作业的调度,最开始开始反对 Storm 和 Spark Streaming,起初改为只反对 Flink。在 2019 年的时候咱们将 Lambda 独立进去以此为根底建设了 Streamfly 计算平台。随后,咱们在 2019 年底开发并上线了第一个版本 Flink SQL 平台 StreamflySQL。这个版本基于模板 jar 提供了根本 Flink SQL 的性能,然而用户体验还有待晋升,因而咱们在 2021 年年初从零开始从新建设了第二个版本的 StreamflySQL,而第二个版本是基于 SQL Gateway。
要理解这两个版本的不同,咱们须要先回顾下 Flink SQL 的根本工作流程。
用户提交的 SQL 首先会被 Parser 解析为逻辑执行打算;逻辑执行打算通过 Planner Optimizer 优化,会生成物理执行打算;物理执行打算再通过 Planner CodeGen 代码生成,翻译为 DataStream API 常见的 Transformation;最初 StreamGraphGenerator 会将这些 Transformation 转换为 Flink 作业的最终示意 JobGraph 提交到 Flink 集群。
上述一系列过程都产生在 TableEnvironment 外面。取决于部署模式的不同,TableEnvironment 可能运行在 Flink Client 或者 JobManager 里。Flink 当初反对 3 种集群部署模式,包含 Application、Per-Job 和 Session 模式。在 Application 模式下,TableEnvironment 会在 JobManager 端运行,而在其余两种模式下,TableEnvironment 都运行在 Client 端。不过这三种模式都有一个独特的特点,TableEnvironment 都是一次性的,会在提交 JobGraph 之后主动退出。
为了更好地复用 TableEnvironment 提高效率和提供有状态的操作,有的我的项目会将 TableEnvironment 放到一个新的独立 Server 端过程外面去运行,由此产生了一种新的架构,咱们称之为 Server 端 SQL 编译。绝对地,还有 Client 端 SQL 编译。
有同学可能会问,为什么没有 JobManager 端 SQL 编译,这是因为 JobManager 是绝对关闭的组件,不适宜拓展,而且即便做了达到的成果跟 Client 端编译成果根本一样。所以总体来看,个别就有 Client 和 Server 两种常见的 Flink SQL 平台架构。
Client 端 SQL 编译,顾名思义就是 SQL 的解析翻译优化都在 Client 端里进行(这里的 Client 是狭义的 Client,并不一定是 Flink Client)。典型的案例就是通用模板 jar 和 Flink 的 SQL Client。这种架构的长处是开箱即用,开发成本低,而且应用的是 Flink public 的 API,版本升级比拟容易;毛病是难以反对高级的性能,而且每次都要先启动一个比拟重的 TableEnvironment 所以性能比拟差。
而后是 Server 端 SQL 编辑。这种架构将 SQL 解析翻译优化逻辑放到一个独立的 Server 过程去进行,让 Client 变得十分轻,比拟靠近于传统数据库的架构。典型的案例是 Ververica 的 SQL Gateway。这种架构的长处是可拓展性好,能够反对很多定制化性能,而且性能好;毛病则是当初开源界没有成熟的解决方案,像下面提到 SQL Gateway 只是一个比拟初期的原型零碎,不足很多企业级个性,如果用到生产环境须要通过肯定的革新,而且这些革新波及比拟多 Flink 外部 API,须要比拟多 Flink 的背景常识,总体来说开发成本比拟高,而且后续版本升级工作量也比拟大。
编者按:Apache Flink 社区目前正在开发 SQL Gateway 组件,将原生提供 Flink SQL 服务化的能力,并兼容 HiveServer2 协定,打算于 1.16 版本中公布,敬请期待。感兴趣的同学能够关注 FLIP-91 [1] 和 FLIP-223 [2] 理解更多,也十分欢送大家参加奉献。
回到咱们 Flink SQL 平台,咱们 StreamflySQL v1 是基于 Client 端 SQL 编译,而 v2 是基于 Server 端的 SQL 编译。上面就让我一一介绍一下。
二、基于模板 jar 的 StreamflySQL v1
StreamflySQL v1 抉择 Client 端 SQL 编译的次要起因有三个:
- 首先是平台集成。不同于很多公司的作业调度器用大数据中比拟支流的 Java 编写,咱们的 Lambda 调度器是用 Go 开发的。这是因为 Lambda 在设计之初反对了多种实时计算框架,出于松耦合和公司技术栈的思考,Lambda 以 Go 作为开发语言,会采纳与 YARN 相似的动静生成 Shell 脚本的形式来调用不同框架的命令行接口。这样松耦合的接口方式给咱们带来很大的灵活性,比方咱们能够轻松反对多个版本的 Flink,不须要强制用户随着零碎版本升级,但同时也导致没方法间接去调用 Flink 原生的 Java API。
- 第二个起因是松耦合。开发的时候 Flink 版本是 1.9,过后 Client API 比较复杂,不太适宜平台集成,并且过后社区也在推动 Client 的重构,所以咱们尽量避免依赖 Client API 去开发 Flink SQL 平台。
- 第三个起因是实践经验。因为模板 jar + 配置核心模式在网易游戏外部曾经有了比拟多的利用,所以咱们在这方面积攒了很多实践经验。综合之下咱们很天然地采纳了模板 jar + 配置核心的架构来实现 v1 版本。
上图是 v1 版本的整体架构图。咱们在次要在 Lambda 作业平台的根底上新增了 StreamflySQL 后端作为配置核心,负责依据用户提交的 SQL 和作业运行配置加上通用的模板 jar 来生成一个 Lambda 作业。
总体的作业提交流程如下:
- 用户在前端的 SQL 编辑器提交 SQL 和运行配置。
- StreamflySQL 后端收到申请后生成一个 Lambda 作业并传递配置 ID。
- 而后 Lambda 启动作业,背地是执行 Flink CLI run 命令来提交作业。
- Flink CLI run 命令会启动 Flink Client 来加载并执行模版 jar 的 main 函数,这时会读取 SQL 和配置,并初始化 TableEnvironment。
- TableEnvironment 会从 Catalog 读取必要的 Database/Table 等元信息。这里顺带一提是,在网易游戏咱们没有应用对立的 Catalog 来保护不同组件的元信息,而是不同组件有本人的元数据中心,对应不同的 Catalog。
- 最初 TableEnvironment 编译好 JobGraph,以 Per-Job Cluster 的形式部署作业。
StreamflySQL v1 实现了 Flink SQL 平台从零到一的建设,满足了局部业务需要,但仍有不少痛点。
第一个痛点是响应慢。
以一个比拟典型的 SQL 来说,以模板 jar 的形式启动作业须要筹备 TableEnviroment,这可能会破费 5 秒钟,而后执行 SQL 的编译优化包含与 Catalog 交互去获取元数据,也可能会破费 5 秒钟;编译失去 jobgraph 之后还须要筹备 per-job cluster,一般来说也会破费 20 秒以上;最初还须要期待 Flink job 的调度,也就是作业从 scheduled 变成 running 的状态,这个可能也须要 10 秒钟。
总体来说,v1 版本启动一个 Flink SQL 作业至多须要 40 秒的工夫,这样的耗时相对来说是比拟长的。然而仔细分析这些步骤,只有 SQL 的编译优化和 job 调度是不可避免的,其余的比方 TableEnvironment 和 Flink cluster 其实都能够提前准备,这里的慢就慢在资源是懒初始化的,而且简直没有复用。
第二个痛点是调试难。
咱们对 SQL 调试的需要有以下几点:
- 第一点是调试的 SQL 与线上的 SQL 要基本一致。
- 第二点是调试 SQL 不能对线上的数据产生影响,它能够去读线上的数据,但不能去写。
- 第三点,因为调试的 SQL 通常只须要抽取大量的数据样本就能够验证 SQL 的正确性,所以咱们心愿限度调试 SQL 的资源,一方面是出于老本的思考,另外一方面也是为了避免调试的 SQL 与线上作业产生资源竞争。
- 第四点,因为调试 SQL 解决的数据量比拟少,咱们心愿以更快更便捷的形式获取到后果。
在 v1 版本中,咱们对上述需要设计了如下解决方案:
- 首先对于调试的 SQL,零碎会在 SQL 翻译的时候将原来的一个 Sink 替换为专用的 PrintSink,这解决了需要中的前两点。
- 而后对 PrintSink 进行限流,通过 Flink 的反压机制达到总体的限流,并且会限度作业的最长执行工夫,超时之后零碎会主动把作业完结掉,这解决了需要中的资源限度这点。
- 最初为了更快地响应,调试的作业并不会提交到 YARN 集群下来运行,而是会在 Lamdba 服务器本地开启开启一个 MiniCluster 去执行,同时也不便咱们从规范输入去提取 PrintSink 的后果,这点解决了需要中的最初一点。
调试模式的架构如上图所示,比起个别的 SQL 提交流程,次要区别在于作业不会提交到 YARN 上,而是在 Lambda 服务器的本地执行,从而节俭了筹备 Flink 集群的开销,并且更容易管控资源和获取后果。
上述调试解决方案根本可用,然而理论应用过程中仍然存在不少问题。
- 第一,如果用户提交的 SQL 比较复杂,那么 SQL 的编译优化可能会消耗比拟久的工夫,这会导致作业很容易超时,在有后果输入之前可能就被零碎完结掉,同时这样的 SQL 也会给服务器造成比拟大的压力。
- 第二,该架构没法去调试工夫窗口比拟长的作业或者须要 Bootstrap State 的作业。
- 第三,因为执行后果是在作业完结之后才批量返回的,不是在作业执行过程中就流式返回,因而用户须要等到作业完结——通常是 10 分钟以上才能够看到后果。
- 第四,在 SQL 的翻译阶段把调试 SQL 的 Sink 替换掉,这个性能是通过革新 Flink 的 Planner 来实现的,相当于业务逻辑入侵到了 Planner 外面,这样并不优雅。
第三个痛点是 v1 版本只容许单条 DML。
相比传统的数据库,咱们反对的 SQL 语句是很无限的,比方,MySQL 的 SQL 能够分成 DML、DQL、DDL 和 DCL。
- DML 用于 操控数据,常见的语句有 INSERT / UPDATE / DELETE。StreamflySQL v1 只反对了 INSERT,这和 Flink SQL 是保持一致的。Flink SQL 用 Retract 模式 — 也就是相似 Changelog 的形式 — 来示意 UPDATE/DELETE,所以只反对 INSERT,这点其实没有问题。
- DQL 用于 查问数据,常见语句是 SELECT。这在 Flink SQL 是反对的,但因为不足 Sink 不能生成一个有意义的 Flink 作业,所以 StreamflySQL v1 不反对 DQL。
- DDL 用于 定义元数据,常见语句是 CREATE / ALTER /DROP 等。这在 StreamflySQL v1 版本是不反对的,因为模板 jar 调用 SQL 的入口是 sqlUpdate,不反对纯元数据的操作,而且为纯元数据的操作独自启动一个 TableEnvironment 来执行也是齐全不划算。
- 最初是 DCL,用于 治理数据权限,比方 GRANT 跟 REVOKE 语句。这个 Flink SQL 是不反对的,起因是 Flink 目前只是数据的用户而不是管理者,DCL 并没有意义。
综合来看,v1 版本只反对了单条 DML,这让咱们很漂亮的 SQL 编辑器变得空有其表。基于以上这些痛点,咱们在往年调研并开发了 StreamflySQL v2。v2 采纳的是 Server 端 SQL 编译的架构。
三、基于 SQL Gateway 的 StreamflySQL v2
咱们的外围需要是解决 v1 版本的几个痛点,包含改善用户体验和提供更残缺的 SQL 反对。总体的思路是采纳 Server 端的 SQL 编译的架构,进步可拓展性和性能。此外,咱们的集群部署模式也改成 Session Cluster,事后筹备好集群资源,省去启动 YARN application 的工夫。
这里会有两个关键问题。
- 首先是咱们要齐全自研还是基于开源我的项目?在调研期间咱们发现 Ververica 的 SQL Gateway 我的项目很合乎咱们需要,容易拓展而且是 Flink 社区 FLIP-91 SQL Gateway 的一个根底实现,后续也容易与社区的倒退方向交融。
- 第二个问题是,SQL Gateway 自身有提交作业的能力,这点跟咱们已有的 Lambda 平台是重合的,会造成反复建设和难以对立治理的问题,比方认证受权、资源管理、监控告警等都会有两个入口。那么两者该当如何进行分工?咱们最终的解决方案是,利用 Session Cluster 的两阶段调度,即资源初始化和作业执行是拆散的,所以咱们能够让 Lambda 负责 Session Cluster 的治理,而 StreamflySQL 负责 SQL 作业的治理,这样能复用 Lambda 大部分的根底能力。
这是 StreamflySQL v2 的架构图。咱们将 SQL Gateway 内嵌到 SpringBoot 利用中,开发了新的后端。总体看起来比 v1 版本要简单,起因是本来的一级调度变成了会话和作业的两级调度。
首先用户须要创立一个 SQL 会话,StreamflySQL 后端会生成一个会话作业。在 Lambda 看来会话作业是一种非凡作业,启动时会应用 yarn-session 的脚本来启动一个 Flink Session Cluster。在 Session Cluster 初始化之后,用户就能够在会话内去提交 SQL。StreamflySQL 后端会给每个会话开启一个 TableEnvironment,负责执行 SQL 语句。如果是只波及元数据的 SQL,会间接调用 Catalog 接口实现,如果是作业类型的 SQL,会编译成 JobGraph 提交到 Session Cluster 去执行。
v2 版本很大水平上解决了 v1 版本的几个痛点:
- 在响应工夫方面,v1 经常会须要 1 分钟左右,而 v2 版本通常在 10 秒内实现。
- 在调试预览方面,v2 不须要等作业完结,而是在作业运行时,将后果通过 socket 流式地返回。这点是依赖了 SQL gateway 比拟奇妙的设计。对于 select 语句,SQL Gateway 会主动注册一个基于 socket 的长期表,并将 select 后果写入到这个表。
- 在 SQL 反对方面,v1 只反对 DML,而 v2 借助于 SQL Gateway 能够反对 DML/DQL/DDL。
不过 SQL Gateway 尽管有不错的外围性能,但咱们应用起来并不是一帆风顺,也遇到一些挑战。
首先最为重要的是元数据的长久化。
SQL Gateway 自身的元数据只保留在内存中,如果过程重启或是遇到异样解体,就会导致元数据失落,这在企业的生产环境外面是不可承受的。因而咱们将 SQL Gateway 集成到 SpringBoot 程序之后,很天然地就将元数据保留到了数据库。
元数据次要是会话元数据,包含会话的 Catalog、Function、Table 和作业等等。这些元数据依照作用范畴能够分为 4 层。底下的两层是全局的配置,以配置文件的模式存在;下面两层是运行时动静生成的元数据,存在数据库中。下层的配置项优先级更高,能够用于笼罩上层的配置。
咱们从下往上看这些元数据:
- 最底层是全局的默认 Flink Configuration,也就是咱们在 Flink Home 下的 flink-conf yaml 配置。
- 再下面一层是 Gateway 本身的配置,比方部署模式(比方是 YARN 还是 K8S),比方默认要出册的 Catalog 和 Function 等等。
- 第三层是 Session 会话级别的 Session Configuraion,比方会话对应的 Session Cluster 的集群 ID 或者 TaskManager 的资源配置等等。
- 最下面一层是 Job 级别的配置,包含作业动静生成的元数据,比方作业 ID、用户设置 checkpoint 周期等等。
这样比拟灵便的设计除了解决了元数据长久化的问题,也为咱们的多租户个性奠定了根底。
第二个挑战是多租户。
多租户分为资源和认证两个方面:
- 在资源方面,StreamflySQL 利用 Lambda 作业平台能够在不同的队列启动 Session Cluster,它们的 Master 节点和资源很天然就是隔离的,所以没有像 Spark Thrift Server 那样不同用户共用一个 Master 节点和混用资源的问题。
- 在认证方面,因为 Session Cluster 属于不同用户,所以 StreamflySQL 后端须要实现多租户的假装。在网易游戏,组件个别会应用 Kerberos 认证。咱们采纳多租户实现的形式是应用 Hadoop 的 Proxy User,先登录为超级用户,而后伪装成我的项目用户来向不同组件获取 delegation token,这里的组件次要是 Hive MetaStore 跟 HDFS,最初把这些 token 存到 UGI 外面并用 doAS 的形式来提交作业。
第三个挑战是程度拓展。
为了高可用和拓展服务能力,StreamflySQL 很天然须要以多实例的架构部署。因为咱们曾经将次要的状态元数据存到数据库,咱们能够随时从数据库构建出一个新的 TableEnvironment,所以 StreamflySQL 实例相似一般 Web 服务一样十分轻,能够很容易地扩容缩容。
然而并不是所有状态都能够长久化的,另外有些状态咱们成心会不长久化。比方用户应用 SET 命令来扭转 TableEnvironment 的属性,比方开启 Table Hints,这些属于长期属性,会在重建 TableEnvironment 后被重置。这是合乎预期的。再比方用户提交 select 查问做调试预览时,TaskManager 会与 StreamflySQL 后端建设 socket 链接,而 socket 链接显然也是不可长久化的。因而咱们在 StreamflySQL 的多实例前加了亲和性的负载平衡,依照 Session ID 来调度流量,让在失常状况下同一个用户的申请都落到同一个实例上,确保用户应用体验的连续性。
第四个挑战是作业状态治理。
其实这里的状态一词是双关,有两个含意:
- 第一个含意是作业的运行状态。SQL gateway 目前只是提交 SQL 并不监控后续的运行状态。因而,StreamflySQL 设置了监控线程池来定时轮询并更新作业状态。因为 StreamflySQL 是多实例的,它们的监控线程同时操作同一个作业的话,可能会有更新失落的问题,所以咱们这里应用了 CAS 乐观锁来保障过期的更新不会失效。而后咱们会在作业异样退出或者无奈获取状态时进行告警,比方 JobManager 进行 failover 的状况下,咱们无奈得悉 Flink 作业的状态,这时零碎就会收回 disconnected 的异样状态告警。
- 第二个含意是 Flink 的长久化状态,即 Flink State。原生的 SQL gateway 并没有治理 Flink 的 Savepoint 和 Checkpoint,因而咱们加上了 stop 和 stop-with-savepoint 的性能,并强制开启 retained checkpoint。这使得在作业遇到异样终止或者简略 stop 之后,再次重启时零碎能够主动查找到最新的 checkpoint。
这里我能够分享下咱们的算法。其实主动查找最新 checkpoint 的性能 Lambda 也有提供,然而 Lambda 假如作业都是 Per-Job Cluster,因而只有查找集群 checkpoint 目录里最新的一个 checkpoint 就能够了。但这样的算法对 StreamflySQL 却不实用,因为 Session Cluster 有多个作业,最新的 checkpoint 并不一定是咱们指标作业的。因而,咱们改为了应用相似 JobManager HA 的查找形式,先读取作业归档目录元数据,从外面提取最新的一个 checkpoint。
四、将来工作
- 将来咱们首先要解决的一个问题是 State 迁徙的问题,即用户对 SQL 进行变更后,如何从原先的 Savepoint 进行复原。目前只能通过变更类型来告知用户危险,比方通常而言加减字段不会造成 Savepoint 的不兼容,但如果新增一个 join 表,造成的影响就很难说了。因而后续咱们打算通过剖析 SQL 变更前后的执行打算,来事后告知用户变更前后的状态兼容性。
- 第二个问题是细粒度的资源管理。目前咱们并不能在作业编译时去指定 SQL 的资源,比方 TaskManager 的 CPU 和内存在 Session Cluster 启动之后就确定了,是会话级别的。目前调整资源只能通过作业并行度调整,很不灵便并且容易造成节约。当初 Flink 1.14 曾经反对了 DataStream API 的细粒度资源管理,能够在算子级别设置资源,但 SQL API 当初还没有打算,后续咱们可能参加进去推动相干议案的停顿。
- 最初是社区奉献。咱们对 SQL Gateway 有肯定应用教训,而且也对其进行了不少的改良,后续心愿这些改良能回馈给 Flink 社区,推动 FLIP-91 SQL Gateway 的停顿。
点击查看直播回放 & 演讲 PDF
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…