共计 9233 个字符,预计需要花费 24 分钟才能阅读完成。
摘要:本文整顿自哔哩哔哩基础架构部资深研发工程师张杨在 Flink Forward Asia 2021 平台建设专场的演讲。次要内容包含:
- 平台建设
- 增量化
- AI On Flink
点击查看直播回放 & 演讲 PDF
在过来的一年里,B 站围绕 Flink 次要做了三个方面的工作:平台建设、增量化和 AI on Flink。实时平台是实时业务的技术底座,也是 Flink 面向用户的窗口,须要保持继续迭代优化,一直加强性能,晋升用户效率。增量化是咱们在增量化数仓和流批一体上的尝试,在实时和离线之间找到一个更好的均衡,减速数仓效率,解决计算口径问题。AI 方向,咱们也正在联合业务做进一步的摸索,与 AIFlow 社区进行单干,欠缺优化机器学习工作流。
一、平台建设
1.1 根底功能完善
在平台的根底性能方面,咱们做了很多新的性能和优化。其中两个重点的是反对 Kafka 的动静 sink 和工作提交引擎的优化。
咱们遇到了大量这样的 ETL 场景,业务的原始实时数据流是一条较大的混合数据流,蕴含了数个子业务数据。数据通过 Kafka 传输,末端的每个子业务都对应独自的解决逻辑,每个子业务都去生产全量数据,再进行过滤,这样的资源耗费对业务来说是难以承受的,Kafka 的 IO 压力也很大。因而咱们会开发一个 Flink 工作,对混合数据流依照子业务进行拆分,写到子业务对应的 topic 里,让业务应用。
技术实现上,晚期 Flink SQL 的写法就是写一个 source 再写多个 sink,每个 sink 对应一个业务的 topic,这的确能够满足短期的业务诉求,但存在的问题也较多:
- 第一是数据的歪斜,不同的子业务数据量不同,数据拆分后,不同 sink 之解决的数据量也存在较大差异,而且 sink 都是独立的 Kafka producer,顶峰期间会造成 sink 之间资源的争抢,对性能会有显著的影响;
- 第二是无奈动静增减 sink,须要扭转 Flink SQL 代码,而后重启工作能力实现增减 sink。过程中,不仅所有上游工作都会抖动,还有一个重大的问题就是无奈从 savepoint 复原,也就意味着数据的一致性无奈保障;
- 第三是保护老本高,局部业务存在上百个子分流需要,会导致 SQL 太长,保护老本极高。
基于以上起因,咱们开发了一套 Kafka 动静 sink 的性能,反对在一个 Kafka sink 外面动静地写多个 topic 数据,架构如上图。咱们对 Kafka 表的 DDL 定义进行了扩大,在 topic 属性里反对了 UDF 性能,它会依据入仓的数据计算出这条数据应该写入哪个 Kafka 集群和 topic。sink 收到数据后会先调用 UDF 进行计算,拿到后果后再进行指标集群和 topic 数据的写入,这样业务就不须要在 SQL 里编写多个 sink,代码很洁净,也易于保护,并且这个 sink 被所有 topic 共用,不会产生歪斜问题。UDF 间接面向业务零碎,分流规定也会平台化,业务方配置好规定后,分流施行主动失效,工作不须要做重启。而且为了防止 UDF 的性能问题,防止用户本人去开发 UDF,咱们提供了一套规范的分流,做了大量的缓存优化,只有依照标准定义好分流,规定的业务表就能够间接应用 UDF。
目前外部几个千亿级别的分流场景,都在这套计划下高效运行中。
根底性能上做的第二个优化就是工作的提交引擎优化。做提交器的优化次要是因为存在以下几个问题:
- 第一,本地编译问题。Flink SQL 工作在 Yarn 上的部署有三种模式:per-job、application 和 yarn-session。早前咱们始终沿用 per-job 模式,然而随着工作规模变大,这个模式呈现了很多的问题。per-job 模式下,工作的编译是在本地进行再提交到近程 app master,编译耗费提交引擎的服务性能,在短时批量操作时很容易导致性能有余;
- 第二,多版本的反对问题。咱们反对多个 Flink 版本,因而在版本与提交引擎耦合的状况下,须要保护多个不同代码版本的提交引擎,保护老本高;
- 第三,UDF 的加载。咱们始终应用 Flink 命令里的 -c 命令进行 UDF 传递,UDF 代码包存在 UDFS 上,通过 Hadoop 的 web HDFS 协定进行 cluster 加载,一些大的工作启动时,web HDFS 的 HTTP 端口压力会霎时增大,存在很大的稳固隐患;
- 第四,代码包的传输效率。用户代码包或者 Flink 引擎代码包都要做屡次的上传下载操作,遇到 HDFS 反馈较慢的场景, 耗时较长,而实时工作心愿做到极致的疾速高低线。
因而咱们做了提交器的优化:
- 首先引入了 1.11 版本以上反对的 application 模式,这个模式与 per-job 最大的区别就是 Flink 工作的编译全副移到了 APP master 里做,这样就解决了提交引擎的瓶颈问题;
- 在多版本的反对下面,咱们对提交引擎也做了革新,把提交器与 Flink 的代码彻底解耦,所有依赖 Flink 代码的操作全副形象了规范的接口放到了 Flink 源码侧,并在 Flink 源码侧减少了一个模块,这个模块会随着 Flink 的版本一起降级提交引擎,对通用接口的调用全副进行反射和缓存,在性能上也是可承受的;
- 而且 Flink 的多版本源码全副依照 maled 模式进行治理,寄存在 HDFS。依照业务指定的工作版本,提交引擎会从近程下载 Flink 相干的版本包缓存到本地,所以只须要保护一套提交器的引擎。Flink 任何变更齐全和引擎无关,降级版本提交引擎也不须要参加;
- 实现 application 模式降级后,咱们对 UDF 和其余资源包的上传下载机制也进行了批改,通过 HDFS 近程间接散发到 GM/TM 上,缩小了上传下载次数,同时也防止了 cluster 的近程加载。
1.2 新工作构建模式
平台之前反对 Flink 的构建模式次要有两种,SQL 和 JAR 包。两者的优劣势都很显著,SQL 简略易用门槛低,然而不够灵便,比方一些定时操作在 SQL 外面无奈进行。JAR 包功能完善也灵便,然而门槛高,须要学习 Flink datastream 一整套 API 的概念,非开发人员难以把握,而咱们大量的用户是数仓,这种 JAR 包的工作难以标准化治理。业务方大多心愿应用 SQL,防止应用 JAR 包。
咱们调研了平台已有的 Datastream JAR 包工作,发现大部分的 JAR 包工作还是以 Table API 为主,只有大量过程用 Datastream 做了一些数据的转换,实现之后还是注册成了 Table 进行 Table 操作。如果平台能够反对在 SQL 外面做一些简单的自定义转换,业务其实齐全不须要编写代码。
因而咱们反对了一种新的工作构建模式——算子化,模块化地构建一个 Flink 工作,混合 JAR 包与 SQL,在进行工作构建时,先定义一段 SQL,再定义一个 JAR 包,再接一段 SQL,每段都称为算子,算子之间互相串联,形成一个残缺的工作。
采纳 Flink 规范的 SQL 语法,对 JAR 包进行了接口的限度,必须继承平台的接口定义进行开发。输入输出都是定义好的 Datastream。它比 UDF 的扩展性更强,灵活性也更好。而且整个工作的输入输出根本能够做到和 SQL 同级别的管控力,算子的开发也比纯 JAR 包简略得多,不须要学习太多 Flink API 的操作,只须要对 Datastream 进行变换。而且对于一些罕用的公共算子,平台能够对立开发提供,领有更业余的性能优化,业务方只有援用即可。
目前在实时数仓等一些偏固定业务的场景,咱们都在尝试进行标准化算子的推广和应用。
1.3 智能诊断
平台建设的第三点是流工作的智能诊断。目前实时反对的业务场景包含 ETL、AI、数据集成等,且工作规模增长速度很快。越来越大的规模对平台的服务能力也提出了更高的要求。
此前,平台人员须要破费很多的工夫在帮助业务解决资源或各种业务问题上,次要存在以下几个方面的问题:
- 资源配置:初始资源确认艰难,碎片化重大,应用资源周期性变动;
- 性能调优:数据歪斜,网络资源优化,state 性能调优,gc 性能调优;
- 错误诊断:工作失败起因剖析,修复倡议。
这些问题日常都靠平台人员兜底,规模小的时候大家勉强能够累赘,然而规模疾速变大后曾经齐全有力消化,须要一套自动化的零碎来解决这些问题。
因而咱们做了一套流工作的智能诊断系统,架构如上图。
零碎会继续抓取工作运行时的 metrics 进行性能剖析,剖析实现后推给用户,让用户本人执行具体的优化改良操作;也会实时抓取工作失败的日志,并与词库进行匹配,将谬误进行翻译,使用户更容易了解,同时也会给出更好了解的解决方案,让用户自行进行故障解决;同时还会依据工作的历史运行资源进行自动化缩容解决,解决资源节约和资源有余的问题。
目前此性能曾经节俭了整个队列 10% 的资源左右,分担了相当一部分平台的运维压力,在将来咱们会继续进行优化迭代,更进一步进步这套零碎在自动化运维下面的能力以及覆盖度。
将来,在提交引擎方面,咱们心愿交融 Yarn session 模式与 application 模式做 session 的复用,解决工作上线的资源申请效率问题。同时心愿大 state 工作也可能在 session 的根底上复用本地的 state,启动时无需从新下载 state。
智能诊断方面,咱们心愿实现更多自动化的操作,实现主动进行优化改良,而不须要用户手动操作,做到用户低感知;扩容缩容也会继续提速,目前缩容的频率只在天级,扩容还未实现自动化。将来咱们心愿整个操作的周期和频率做到分钟级的自动化。
算子方面,咱们心愿能对立目前的 SQL 和 JAR 包两种模式,对立工作构建形式,让用户以更低的老本更多简单的操作,平台也更方便管理。
二、增量化
上图是咱们晚期的数据架构,是典型的 Lambda 架构。实时和离线从源头上就齐全拆散、互不干涉,实时占较低,离线数仓是外围的数仓模型,占次要的比例,但它存在几个显著的问题。
- 第一,时效性。数仓模型是分层架构,层与层之间的转换靠调度零碎驱动,而调度零碎是有周期的,常见的根本都是天或小时。源头生产的数据,数仓各层根本须要隔一天或几个小时才可见,无奈满足实时性要求稍高的场景;
- 第二,数据的应用效率低。ETL 和 adhoc 的数据应用齐全一样,没有针对性的读写优化,也没有依照用户的查问习惯进行从新组织,不足数据布局优化的能力。
针对第一个问题,是否全副实时化即可?然而实时数仓的老本高,而且不太好做大规模的数据回溯。大部分业务也不须要做到 Kafka 的秒级时效。第二个问题也不好解决,流式写入为了谋求效率,对数据的布局能力较弱,不具备数据的从新组织能力。因而咱们在实时和离线之间找到了一个均衡——做分钟级的增量化。
咱们采纳 Flink 作为计算引擎,它的 checkpoint 是一个人造的增量化机制,实时工作进行一次 checkpoint,产出一批增量数据进行增量化解决。数仓起源次要有日志数据和 binlog 数据,日志数据应用 Append 传统的 HDFS 存储即可做到增量化的生产;binlog 数据是 update 模式,但 HDFS 对 update 的反对并不好,因而咱们引入了 Hudi 存储,它可能反对 update 操作,并且具备肯定的数据布局能力,同时它也能够做 Append 存储,并且可能解决 HDFS 的一些小文件问题。因而日志数据也抉择了 Hudi 存储,采纳 Append 模式。
最终咱们的增量化计划由 Flink 计算引擎 + Hudi 存储引擎形成。
增量化场景的落地上,思考到落地的复杂性,咱们先选取了业务逻辑绝对简略、没有简单聚合逻辑的 ODS 和 DWD 层进行落地。目前的数据是由 Flink 间接写到 Hive 的 ODS 层,咱们对此进行了针对性的适配,反对了 Hive 表的增量化读取,开发了 HDFSStreamingSource,同时为了防止对 HDFS 门路频繁扫描的压力,ODS 层写入时会进行索引创立,记录写入的文件门路和工夫,只须要追踪索引文件即可。
source 也是分层架构,有文件散发层和读取层,文件散发层进行协调,调配读取文件数,避免读取层某个文件读取过慢沉积过多文件,两头的转换可能反对 FlinkSQL 操作,具备残缺的实时数仓的能力。
sink 侧咱们引入了 Hudi connector,反对数据 Append 写入 Hudi,咱们还对 Hudi 的 compaction 机制进行了一些扩大,次要有三个:DQC 检测、数据布局的优化以及映射到 Hive 表的分区目录。目前数据的布局仍旧还很弱,次要依赖 Hudi 自身的 min、max 和 bloom 的优化。
实现所有上述操作后,ODS 到 DWD 的数据时效性有了显著晋升。
从数据生产到 DWD 可见,进步到了分钟级别;DWD 层的生产实现工夫也从传统的 2:00~5:00 提前到了凌晨 1 点之前。此外,采纳 Hudi 存储也为日后的湖仓一体打下了以一个好的根底。
除了日志数据,咱们对 CDC 也采纳这套计划进行减速。基于 Flink 的 CDC 能力,针对 MySQL 的数据同步实现了全增量一体化操作。依赖 Hudi 的 update 能力,单任务实现了 MySQL 的数据同步工作,并且数据只提早了一个 checkpoint 周期。CDC 临时不反对全量拉取,须要额定进行一次全量的初始化操作,其余的流程则完全一致。
Hudi 自身的模型和离线的分区全量有较大的区别,为了兼容离线调度须要的分区全量数据,咱们也批改了 Hudi 的 compaction 机制。在做划分区的 compaction 时会做一次数据的全量拷贝,生成全量的历史数据分区,映射到 Hive 表的对应分区。同时对于 CDC 场景下的数据品质,咱们也做了很多的保障工作。
为了保障 CDC 数据的一致性,咱们从以下 4 个方面进行了欠缺和优化:
- 第一,binlog 条数的一致性。依照工夫窗口进行 binlog 生产侧和生产侧的条数校验,防止中间件丢数据;
- 第二,数据内容抽样检测。思考到老本,咱们在 DB 端和源端、Hudi 存储端抽样增量数据进行内容的准确比拟,防止 update 出错;
- 第三,全链路的黑盒测试。测试库表模仿了线上状况,进行 7×24 小时不间断的 Kafka 生产 MySQL 数据,而后串通整套流程避免链路故障;
- 第四,定期的全量比照。业务的库表个别比拟大,历史数据会低频地定期进行全量比对,避免抽样观测漏掉的谬误。
刚开始应用 Hudi 的时候,Hudi on Flink 还是处于高级的阶段,因而存在大量问题,咱们也一起和 Hudi 社区做了大量优化工作,次要有 4 个方面:Hudi 表的冷启动优化、checkpoint 一致性问题解决、Append 效率低的优化以及 get list 的性能问题。
首先是冷启动的问题。Hudi 的索引存储在 Flink state 里,一张存在的 Hudi 表如果要通过 Flink 进行增量化更新写入,就必然面临一个问题:如何把 Hudi 表已有的信息写入到 Flink state 里。
MySQL 能够借助 Flink CDC 实现全量 + 增量的过程构建,能够绕开从已有 Hudi 表冷启动的过程,然而 TiDB 不行,它的存量表在借助别的伎俩构建完之后,想要增量化就会面临如何从 FlinkSQL 冷启动的问题。
社区有个原始计划,在记录所有的算子 BucketAssigner 外面读取全副的 Hudi 表数据,而后进行 state 构建,从性能上是可行的,然而在性能上根本无法承受,尤其是大表,因为 Flink 的 key state 机制原理,BucketAssigner 每个并发度都要读取全表数据,而后挑选出属于以后这个并发的数据存储到本人的 state 外面,每个并计划都要去读全量的表,这在性能上难以满足。
业务能启动的工夫太长了,很多百亿级别的表能启动的工夫可能是在几个小时,而且读取的数据太多,很容易失败。
和社区进行了沟通交流后,他们提供了一套全新的计划,新增了独立的 Bootstrap 机制,专门负责冷启动过程。Bootstrap 由 coordinator 和 IndexBootstrap 两个算子组成,IndexBootstrap 负责读取工作,coordinator 负责协调调配文件读取,避免单个 IndexBootstrap 读取速度慢而升高整个初始化流程的效率。
IndexBootstrap 算子读取到数据后,会依照与业务数据一样的 Keyby 规定,Keyby 到对应的 BucketAssigner 算子上,并在数据下面打标,告知 BucketAssigner 这条数据是有 Bootstrap 的,不须要往上游 writer 发送。整个流程里,原始数据只需读取一遍,而且是多并发一起读,效率取得了极大的晋升。而且 BucketAssigner 只须要解决本人应该解决的数据,不再须要解决全表的数据。
其次是 Hudi 的 checkpoint 一致性问题。Hudi on checkpoint 在每次 checkpoint 实现的时候会进行一次 commit 操作,具体流程是 writer 算子在 checkpoint 的时候 flush 内存数据,而后给 writer coordinator 算子汇报汇总信息,writer coordinor 算子收到汇报信息时会将其缓存起来,checkpoin 实现后,收到 notification 信息时会进行一次 commit 操作。
然而在 Flink 的 checkpoint 机制里,notification 无奈保障肯定胜利,因为它并不在 checkpoint 的生命周期里,而是一个回调操作,是在 checkpoin 胜利后执行。checkpoin 胜利后,如果这个接口还没有执行实现,commit 操作就会失落,也就意味着 checkpoint 周期内的数据会失落。
针对上述问题,咱们进行了重构。Writer 算子在 cehckpoint 时,会对汇报的 writer coordinator 的信息进行 state 长久化,工作重启后从新汇报给 writer coordinator 算子。writer coordinator 算子再收集所有 writer 算子信息并做一次 commit 判断,确保对应的 commit 曾经实现。此时,Writer 算子也会放弃阻塞,确保上次长久化的 commit 实现之后才会解决最新的数据,这样就对齐了 Hudi 与 Flink 的 checkpoint 机制,保障了边界场景数据的一致性。
第三是针对 Hudi 在 Append 写入场景下的优化。
因为 Append 模式是复用 update 模式的代码,所以在没有反复 key 的 Append 场景下,很多操作是能够简化的,因为 update 为了解决反复,须要做很多额定的操作。如果可能简化这些操作,吞吐能力能够有较大的晋升。
- 第一个操作是小文件的查找,每次 checkpoint 后,update 都会从新 list 文件,而后从文件中找到大小不达标的文件持续 open 并写入。update 场景存在歪斜,会造成很多文件大小不平均,然而 Append 场景不存在这种问题,它所有的文件大小都很平均;
- 第二个是 keyby。在 update 的模式上面,单个 key 只能被一个节点解决,因而上游须要依照 Hudi key 进行 keyby 操作。然而 Append 场景没有反复 key,能够间接用 chain 代替 keyby,大大减少了节点之间序列化传输的开销。同时 Append 场景下不存在内存合并,整体效率也会更高。
最初一个是 GetListing 的优化。Hudi 表与底层 HDFS 文件的映射是通过 ViewManager 来做的,Hudi table 对象和 TimelineService 都会本人去初始化一个 ViewManager,每个 ViewManager 在初始化的时候都会进行 HDFS 目录的 list 操作,因为每个并发都持有多个 Hudi table 或 TimelineService,会造成大并发工作启动时 HDFS 的压力很大。咱们对 TimelineService 进行了单例化的优化,保障每个过程只有一 TimelineService,可能数倍地升高 HDFS list 的压力。后续咱们还会基于 Flink 的 coordinator 机制做工作级别的单例化。
将来,咱们会持续开掘增量的能力,给业务带来更多的价值。
三、AI on Flink
传统的机器学习链路里数据的传输、特色的计算以及模型的训练,都是离线解决的,存在两个大的问题。
第一个是时效性低,模型和特色的更新周期根本是 t+1 天或者 t+1 小时,在谋求时效性的场景下体验并不好。第二个是计算训练的效率很低,必须等天或小时的分区数据全副筹备好之后能力开始特色计算和训练。全量分区数据导致计算和训练的压力大。
在实时技术成熟后,大部分模型训练流程都切换到实时架构上,数据传输、特色计算和训练都能够做到简直实时,从全量变成了短时的小批量增量进行,训练的压力也大大加重。同时因为实时对离线的兼容性,在很多场景比方特色回补上,也能够尝试应用 Flink 的流批一体进行落地。
上图是咱们典型的机器学习链路图。从图上能够看出,样本数据生产特色的计算、模型的训练和成果的评估都大量实时化,两头也夹杂着大量离线过程,比方一些超长周期的特色计算。
同时也能够看出,残缺的业务的模型训练链路长,须要治理和保护大量的实时工作和离线工作。呈现故障的时候,具体问题的定位也异样艰巨。如何在整个机器学习的链路中同时治理号这么多实时和离线工作,并且让工作之间的协同和调度有序进行、高效运维,是咱们始终在思考的问题。
因而咱们引入了 Flink 生态下 AIFlow 零碎。AIFlow 自身的定位就是做机器学习链路的治理,外围的机器计算引擎是 Flink,这和咱们的诉求不约而同。这套零碎有三个次要的个性合乎咱们的业务需要。
- 第一,流批的混合调度。在咱们理论的业务生产上,一套残缺的实时链路都会夹杂着实时和离线两种类型的工作。AIFlow 反对流批的混合调度,反对数据依赖与管制依赖,可能很好地反对咱们现有的业务状态。并且将来在 Flink 流批一体方面也会有更多的施展空间;
- 第二,元数据的治理,AIFlow 对所有数据和模型都反对版本治理。有了版本治理,各种试验成果和试验参数就都可追溯;
- 第三,凋谢的 notification 机制。整个链路中存在很多的内部零碎节点,难以归纳到平台外部,然而通过 notification 机制,能够买通 AIFlow 外部节点与内部节点的依赖。整套零碎的部署分为三局部,notification service、meta service 以及 scheduler,扩展性也很好,咱们在内部化的过程中实现了很多本人的扩大。
实时平台在往年引入 AIFlow 的之后曾经经验了两个版本的迭代,V2 版本是社区 release 之前的一个外部版本,咱们进行了分装提供试用。V3 版本是往年 7 月社区正式 release 之后,咱们进行了版本的对接。
AIFlow 的构建应用 Python 进行形容,运行时会有可视化的节点展现,能够很不便地追踪各个节点的状态,运维也能够做到节点级的治理,不须要做整个链路级别的运维。
将来咱们会对这套零碎在流批一体、特色治理以及模型训练三个方向进行重点的迭代与开发,更好地施展它的价值。
点击查看直播回放 & 演讲 PDF
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~