关于Flink:滴滴-Flink110-升级之路

41次阅读

共计 4305 个字符,预计需要花费 11 分钟才能阅读完成。

作者|Alan

导读:滴滴实时计算引擎从 Flink-1.4 无缝降级到 Flink-1.10 版本,做到了齐全对用户通明。并且在新版本的指标、调度、SQL 引擎等进行了一些优化,在性能和易用性上相较旧版本都有很大晋升。

这篇文章介绍了咱们降级过程中遇到的艰难和思考,心愿能给大家带来启发。

一、背景

在本次降级之前,咱们应用的次要版本为 Flink-1.4.2,并且在社区版本上进行了一些加强,提供了 StreamSQL 和低阶 API 两种服务模式。现有集群规模达到了 1500 台物理机,运行工作数超过 12000,日均解决数据 3 万亿条左右。

不过随着社区的倒退,尤其是 Blink 合入 master 后有很多性能和架构上的降级,咱们心愿能通过版本升级提供更好的流计算服务。往年 2 月份,里程碑版本 Flink-1.10 公布,咱们开始在新版上上进行开发工作,踏上了充斥挑战的降级之路。

二、Flink-1.10 新个性

作为 Flink 社区至今为止的最大的一次版本升级,退出的新个性解决了之前遇到很多的痛点。

1. 原生 DDL 语法与 Catalog 反对

Flink SQL 原生反对了 DDL 语法,比方 CREATE TABLE/CREATE FUNCTION,能够应用 SQL 进行元数据的注册,而不须要应用代码的形式。

也提供了 Catalog 的反对,默认应用 InMemoryCatalog 将信息长期保留在内存中,同时也提供了 HiveCatalog 能够与 HiveMetastore 进行集成。也能够通过本人拓展 Catalog 接口实现自定义的元数据管理。

2.Flink SQL 的加强

  • 基于 ROW_NUMBER 实现的 TopN 和去重语法,拓展了 StreamSQL 的应用场景。
  • 实现了 BinaryRow 类型作为外部数据交互,将数据间接以二进制的形式构建而不是对象数组,比方应用一条数据中的某个字段时,能够只反序列其中局部数据,缩小了不必要的序列化开销。
  • 新增了大量内置函数,例如字符串解决、FIRST/LAST_VALUE 等等,因为不须要转换为内部类型,相较于自定义函数效率更高。
  • 减少了 MiniBatch 优化,通过微批的解决形式晋升工作的吞吐

3. 内存配置优化

之前对 Flink 内存的治理始终是一个比拟头疼的问题,尤其是在应用 RocksDB 时,因为一个 TaskManager 中可能存在多个 RocksDB 实例,不好估算内存使用量,就导致常常产生内存超过限度被杀。

在新版上减少了一些内存配置,例如 state.backend.rocksdb.memory.fixed-per-slot 能够轻松限度每个 slot 的 RocksDB 内存的应用下限,防止了 OOM 的危险。

三、挑战与应答

本次降级最大的挑战是,如何保障 StreamSQL 的兼容性。StreamSQL 的目标就是为了对用户屏蔽底层细节,可能更加专一业务逻辑,而咱们能够通过版本升级甚至更换引擎来提供更好的服务。保障工作的平滑降级是最根本的要求。

1. 外部 patch 如何兼容

因为逾越多个版本架构差距微小,外部 patch 根本无奈间接合入,须要在新版本上从新实现。咱们首先整顿了所有的历史 commit,筛选出那些必要的批改并且在新版上进行从新实现,目标是能笼罩已有的所有性能,确保新版本能反对现有的所有工作需要。

例如:

  • 新增或批改 Connectors 以反对公司外部须要,例如 DDMQ(滴滴开源音讯队列产品),权限认证性能等。
  • 新增 Formats 实现,例如 binlog,外部日志采集格局的解析等。
  • 减少 ADD JAR 语法,能够在 SQL 工作中援用内部依赖,比方 UDF JAR,自定义 Source/Sink。
  • 减少 SET 语法,能够在 SQL 中设置 TableConfig,领导执行打算的生成

2. StreamSQL 语法兼容

社区在 1.4 版本时,FlinkSQL 还处于比拟初始的阶段,也没有原生的 DDL 语法反对,咱们应用 Antlr 实现了一套自定义的 DDL 语法。然而在 Flink1.10 版本上,社区曾经提供了原生的 DDL 反对,而且与咱们外部的语法差异较大。当初摆在咱们背后有几条路能够抉择:

  • 放弃外部语法的反对,批改全副工作至新语法。(违反了平滑迁徙的初衷,而且对已有用户学习老本高)
  • 批改 Flink 内语法解析的模块(sql-parser),反对对外部语法的解析。(实现较为简单,且不利于后续的版本升级)
  • 在 sql-parser 之上封装一层语法转换层,将本来的 SQL 解析提取无效信息后,通过字符串拼接的形式组织成社区语法再运行。

最终咱们选用了第三种计划,这样能够最大限度的缩小和引擎的耦合,作为插件运行,将来再有引擎降级齐全能够复用现有的逻辑,可能升高很多的开发成本。

例如:咱们在旧版本上应用 “json-path” 的库实现了 json 解析,通过在建表语句里定义相似 $.status 的表达式示意如何提取此字段。

新版本上原生的 json 类型解析能够应用 ROW 类型来示意嵌套构造,在转换为新语法的过程中,将本来的表白是解析为树并构建出新的字段类型,再应用计算列的形式提取出原始表中的字段,确保表构造与之前统一。类型名称、配置属性也通过映射转换为社区语法。

3. 兼容性测试

最初是测试阶段,须要进行欠缺的测试确保所有工作都能做到平滑降级。咱们本来的打算是筹备进行回归测试,对已有的所有工作替换配置后进行回放,然而在实际操作中有很多问题:

  • 测试流程过长,一次运行可能须要数个小时。
  • 呈现问题时不好定位,可能产生在工作的整个生命周期的任何阶段。
  • 无奈验证计算结果,即新旧版本语义是否统一

所以咱们按工作的提交流程分成多个阶段进行测试,只有在以后阶段可能全副测试通过后后进入下一个阶段测试,提前发现问题,将问题定位范畴放大到以后阶段,进步测试效率。

  • 转换测试:对所有工作进行转换,测试后果合乎预期,形象典型场景为单元测试。
  • 编译测试:确保所有工作能够通过 TablePlanner 生成执行打算,在编译成 JobGraph,真正提交运行前完结。
  • 回归测试:在测试环境对工作替换配置后进行回放,确认工作能够提交运行
  • 对照测试:对采样数据以文件的模式提交至新旧两个版本中运行,比照后果是否完全一致(因为局部工作后果不具备确定性,所以应用旧版本间断运行 2 次,筛选出确定性工作,作为测试用例)

四、引擎加强

除了对旧版本的兼容,咱们也联合了新版本的个性,对引擎进行了加强。

1. Task-Load 指标

咱们始终心愿能准确掂量工作的负载情况,应用反压指标指标只能粗略的判断工作的资源够或者不够。

联合新版的 Mailbox 线程模型,所有互斥操作全副运行在 TaskThread 中,只需统计出线程的占用工夫,就能够准确计算工作负载的百分比。

将来能够应用指标进行工作的资源举荐,让工作负载维持在一个比拟衰弱的程度。

2. SubTask 平衡调度

在 FLIP-6 后,Flink 批改了资源调度模型,移除了 –container 参数,slot 按需申请确保不会有闲置资源。然而这也导致了一个问题,Source 的并发数经常是小于最大并发数的,而 SubTask 调度是按 DAG 的拓扑程序调度,这样 SourceTask 就会集中在某些 TaskManager 中导致热点。

咱们退出了 ” 最小 slot 数 ” 的配置,保障在 Flink session 启动后立刻申请相应数量的 slot,且闲置时也不被动退出,搭配 cluster.evenly-spread-out-slots 参数能够保障在 slot 数短缺的状况下,SubTask 会均匀分布在所有的 TaskManager 上。

3. 窗口函数加强

以滚动窗口为例 TUMBLE(time_attr, INTERVAL ‘1’ DAY),窗口为一天时开始和完结工夫固定为每天 0 点 -24 点,无奈做到生产每天 12 点 - 次日 12 点的窗口。

对于代码能够通过指定偏移量实现,然而 SQL 目前还未实现,通过减少参数 TUMBLE(time_attr, INTERVAL ‘1’ DAY, TIME ’12:00:00′) 示意偏移工夫为 12 小时。

还有另外一种场景,比方统计一天的 UV,同时心愿展现以后时刻的计算结果,例如每分钟触发窗口计算。对于代码开发的形式能够通过自定义 Trigger 的形式决定窗口的触发逻辑,而且 Flink 也内置了一些 Tigger 实现,比方 ContinuousTimeTrigger 就很适宜这种场景。所以咱们又在窗口函数里减少了一种可选参数,代表窗口的触发周期,TUMBLE(time_attr, INTERVAL ‘1’ DAY, INTERVAL ‘1’ MINUTES)。

通过减少 offset 和 tiggger 周期参数(TUMBLE(time_attr, size,offset_time)),拓展了 SQL 中窗口的应用场景,相似下面的场景能够间接应用 SQL 开发而不须要应用代码的形式。

4. RexCall 后果复用

在很多 SQL 的应用场景里,会屡次应用上一个计算结果,比方将 JSON 解析成 Map 并提取多个字段。

尽管通过子查问,看起来 json 解析只调用一次,然而通过引擎的优化后,通过后果表的投影 (Projection) 生成函数调用链 (RexCall),后果相似:

这样会导致 json 解析的计算反复运行了 3 次,即便应用视图宰割成两步操作,通过 Planner 的优化一样会变成上边的样子。

对于确定性 (isDeterministic=true) 的函数来说,雷同的输出肯定代表雷同的后果,反复执行 3 次 json 解析其实是没有意义的,如何优化能力实现对函数后果的复用呢?

在代码生成时,将 RexCall 生成的惟一标识(Digest)和变量符号的映射保留在 CodeGenContext 中,如果遇到 Digest 雷同的函数调用,则能够复用曾经存在的后果变量,这样解析 JSON 只须要执行第一次,之后就能够复用第一次的后果。

五、总结

通过几个月的致力,新版本曾经上线运行, 并且作为 StreamSQL 的默认引擎,工作重启后间接应用新版本运行。兼容性测试的通过率达到 99.9%,能够根本做到对用户的通明降级。对于新接触 StreamSQL 用户能够应用社区 SQL 语法进行开发,已有工作也能够批改 DML 局部语句来应用新个性。当初新版本曾经反对了公司内许多业务场景,例如公司实时数据仓库团队依靠于新版本更强的表达能力和性能,承接了多种多样的数据需要做到稳固运行且与离线口径保持一致。

版本升级不是咱们的起点,随着实时计算的倒退,公司内也有越来越多团队须要应用 Flink 引擎, 也向咱们提出了更多的挑战,例如与 Hive 的整合做到将后果间接写入 Hive 或间接应用 Flink 作为批处理引擎,这些也是咱们摸索和倒退的方向,通过一直的迭代向用户提供更加简略好用的流计算服务。

正文完
 0