乐趣区

关于flink:BIGO-使用-Flink-做-OLAP-分析及实时数仓的实践和优化

本文整顿自 BIGO Staff Engineer 邹云鹤在 Flink Forward Asia 2021 的分享。次要内容包含:

  1. 业务背景
  2. 落地实际 & 特色改良
  3. 利用场景
  4. 将来布局

FFA 2021 直播回放 & 演讲 PDF 下载

一、业务背景

BIGO 是一家面向海内的以短视频直播业务为主的公司, 目前公司的次要业务包含 BigoLive (寰球直播服务),Likee (短视频创作分享平台),IMO (收费通信工具) 三局部,在寰球范畴内领有 4 亿用户。随同着业务的倒退,对数据平台解决能力的要求也是越来越高,平台所面临的问题也是日益凸显,接下来将介绍 BIGO 大数据平台及其所面临的问题。BIGO 大数据平台的数据流转图如下所示:

用户在 APP,Web 页面上的行为日志数据,以及关系数据库的 Binlog 数据会被同步到 BIGO 大数据平台音讯队列,以及离线存储系统中,而后通过实时的,离线的数据分析伎俩进行计算,以利用于实时举荐、监控、即席查问等应用场景。然而存在以下几个问题:

  • OLAP 剖析平台入口不对立:Presto/Spark 剖析工作入口并存,用户不分明本人的 SQL 查问适宜哪个引擎执行,自觉抉择,体验不好;另外,用户会在两个入口同时提交雷同查问,以更快的获取查问后果,导致资源节约;
  • 离线工作计算时延高,后果产出太慢:典型的如 ABTest 业务,常常计算到下午才计算出后果;
  • 各个业务方基于本人的业务场景独立开发利用,实时工作烟囱式的开发,短少数据分层,数据血统。

面对以上的问题,BIGO 大数据平台建设了 OneSQL OLAP 剖析平台,以及实时数仓。

  1. 通过 OneSQL OLAP 剖析平台,对立 OLAP 查问入口,缩小用户自觉抉择,晋升平台的资源利用率;
  2. 通过 Flink 构建实时数仓工作,通过 Kafka/Pulsar 进行数据分层;
  3. 将局部离线计算慢的工作迁徙到 Flink 流式计算工作上,减速计算结果的产出;

另外建设实时计算平台 Bigoflow 治理这些实时计算工作,建设实时工作的血缘关系。

二、落地实际 & 特色改良

2.1 OneSQL OLAP 剖析平台实际和优化

OneSQL OLAP 剖析平台是一个集 Flink、Spark、Presto 于一体的 OLAP 查问剖析引擎。用户提交的 OLAP 查问申请通过 OneSQL 后端转发到不同执行引擎的客户端,而后提交对应的查问申请到不同的集群上执行。其整体架构图如下:

该剖析平台整体构造从上到下分为入口层、转发层、执行层、资源管理层。为了优化用户体验,缩小执行失败的概率,晋升各集群的资源利用率,OneSQL OLAP 剖析平台实现了以下性能:

  • 对立查问入口:入口层,用户通过对立的 Hue 查问页面入口以 Hive SQL 语法为规范提交查问;
  • 对立查问语法:集 Flink、Spark、Presto 等多种查问引擎于一体,不同查问引擎通过适配 Hive SQL 语法来执行用户的 SQL 查问工作;
  • 智能路由:在抉择执行引擎的过程中,会依据历史 SQL 查问执行的状况 (在各引擎上是否执行胜利,以及执行耗时),各集群的忙碌状况,以及各引擎对该 SQL 语法的是否兼容,来抉择适合的引擎提交查问;
  • 失败重试:OneSQL 后盾会监控 SQL 工作的执行状况,如果 SQL 工作在执行过程中失败,将抉择其余的引擎执行重试提交工作;

如此一来,通过 OneSQL OLAP 剖析平台,BIGO 大数据平台实现了 OLAP 剖析入口的对立,缩小用户的自觉抉择,同时充分利用各个集群的资源,缩小资源闲暇状况。

2.1.1 Flink OLAP 剖析零碎建设

在 OneSQL 剖析平台上,Flink 也作为 OLAP 剖析引擎的一部分。Flink OLAP 零碎分成两个组成部分:Flink SQL Gateway 和 Flink Session 集群;SQL Gateway 作为 SQL 提交的入口,查问 SQL 通过 Gateway 提交到 Flink Session 集群上执行,同时获取 SQL 执行查问的进度,以及返回查问的后果给客户端。其执行 SQL 查问的流程如下:

首先用户提交过去的 SQL,在 SQL Gateway 进行判断:是否须要将后果长久化写入到 Hive 表,如果须要,则会先通过 HiveCatalog 的接口创立一个 Hive 表,用于长久化查问工作的计算结果;之后,工作通过 SQL Gateway 上执行 SQL 解析,设置作业运行的并行度,生成 Pipeline 并提交到 Session 集群上执行。

为了保障整个 Flink OLAP 零碎的稳定性,以及高效的执行 SQL 查问,在这个零碎中,进行了以下性能加强:

  • 稳定性:

    • 基于 zookeeper HA 来保障 Flink Session 集群的可靠性,SQL Gateway 监听 Zookeeper 节点,感知 Session 集群;
    • 管制查问扫描 Hive 表的数据量,分区个数,以及返回后果数据量,避免 Session 集群的 JobManager,TaskManager 因而呈现 OOM 状况;
  • 性能:

    • Flink Session 集群预分配资源,缩小作业提交后申请资源所需的工夫;
    • Flink JobManager 异步解析 Split,Split 边解析工作边执行,缩小因为解析 Split 阻塞工作执行的工夫;
    • 管制作业提交过程中扫描分区,以及 Split 最大的个数,缩小设置工作并行所须要的工夫;
  • Hive SQL 兼容:

    针对 Flink 对于 Hive SQL 语法的兼容性进行改良,目前针对 Hive SQL 的兼容性大抵为 80%;

  • 监控告警:

    监控 Flink Session 集群的 JobManager,TaskManager,以及 SQL Gateway 的内存,CPU 应用状况,以及工作的提交状况,一旦呈现问题,及时告警和解决;

2.1.2 OneSQL OLAP 剖析平台获得的成绩

基于以上实现的 OneSQL OLAP 剖析平台,获得了以下几个收益:

  1. 对立查问入口,缩小用户的自觉抉择,用户执行出错率降落 85.7%,SQL 执行的成功率晋升 3%;
  2. SQL 执行工夫缩短 10%,充分利用了各个集群的资源,缩小工作排队期待的工夫;
  3. Flink 作为 OLAP 剖析引擎的一部分,实时计算集群的资源利用率晋升了 15%;

2.2 实时数仓建设和优化

为了晋升 BIGO 大数据平台上某些业务指标的产出效率,以及更好的治理 Flink 实时工作,BIGO 大数据平台建设了实时计算平台 Bigoflow,并将局部计算慢的工作迁徙到实时计算平台上,通过 Flink 流式计算的形式来执行,通过音讯队列 Kafka/Pulsar 来进行数据分层,构建实时数仓;在 Bigoflow 上针对实时数仓的工作进行平台化治理,建设对立的实时工作接入入口,并基于该平台治理实时工作的元数据,构建实时工作的血缘关系。

2.2.1 建设计划

BIGO 大数据平台次要基于 Flink + ClickHouse 建设实时数仓,大抵计划如下:

依照传统数据仓库的数据分层办法,将数据划分成 ODS、DWD、DWS、ADS 等四层数据:

  • ODS 层:基于用户的行为日志,业务日志等作为原始数据,寄存于 Kafka/Pulsar 等音讯队列中;
  • DWD 层:这部分数据依据用户的 UserId 通过 Flink 工作进行聚合后,造成不同用户的行为明细数据,保留到 Kafka/Pulsar 中;
  • DWS 层:用户行为明细的 Kafka 流表与用户 Hive/MySQL 维表进行流维表 JOIN,而后将 JOIN 之后产生的多维明细数据输入到 ClickHouse 表中;
  • ADS 层:针对 ClickHouse 中多维明细数据依照不同维度进行汇总,而后利用于不同的业务中。

依照以上计划建设实时数据仓库的过程中,遇到了一些问题:

  • 将离线工作转为实时计算工作后,计算逻辑较为简单(多流 JOIN,去重),导致作业状态太大,作业呈现 OOM (内存溢出) 异样或者作业算子背压太大;
  • 维表 Join 过程中,明细流表与大维表 Join,维表数据过多,加载到内存后 OOM,作业失败无奈运行;
  • Flink 将流维表 Join 产生的多维明细数据写入到 ClickHouse,无奈保障 Exactly-once,一旦作业呈现 Failover,就会导致数据反复写入。

2.2.2 问题解决 & 优化

优化作业执行逻辑,减小状态

离线的计算工作逻辑较为简单,波及多个 Hive 表之间的 Join 以及去重操作,其大抵逻辑如下:

当将离线的作业转为 Flink 的流式工作之后,原先离线 Join 多个 Hive 表的场景就转变为 Join 多个 Kafka Topic 的场景。因为 Join 的 Kafka topic 的流量较大,且 Join 的窗口工夫较长 (窗口最长的为 1 天),当作业运行一段时间内,Join 算子上就积攒了大量的状态 (一小时后状态就靠近 1T),面对如此大的状态,Flink 作业采取 Rocksdb State Backend 来寄存状态数据,然而依然防止不了 Rocksdb 内存应用超过导致被 YARN kill 的问题,或者是 Rocksdb State 上存的状态太多,吞吐降落导致作业重大背压。

针对这个问题,咱们将这多个 Topic,依照雷同的 Schema 进行 Unoin all 解决,失去一个大的数据流,而后在这个大的数据流中,再依据不同事件流的 event_id 进行判断,就能晓得这条数据来自哪一个事件流的 Topic,再进行聚合计算,获取对应事件流上的计算指标。

这样一来,通过 UNION ALL 代替 JOIN,防止了因为 JOIN 计算带来的大 State 带来的影响。

另外,在计算工作中还存在有比拟多的 count distinct 计算,相似如下:

select
count(distinct if(events['a'] = 1, postid, null))
 as cnt1,
count(distinct if(events['b'] = 1, postid, null))
as cnt2
……
count(distinct if(events['x'] = 1, postid, null))
As cntx
From table_a
Group by uid

这些 count distinct 计算在同一个 group by 中,并基于雷同的 postid 进行去重计算,因此能够让这些 distinct state 能够共享一组 key 来进行去重计算,那么就能够通过一个 MapState 来存储这若干个 count distinct 的状态,如下:

这些 count distinct 函数去重的 key 雷同,因此能够共享 MapState 中的 key 值,从而优化存储空间;而 Mapstate 的 Value 是 Byte 数组,每个 Byte 8 个 bit,每个 bit 为 0 或者 1,第 n 个 bit 对应了 n 个 count distinct 函数在该 key 上的取值:1 示意该 count disitnct 函数在对应的 key 上须要进行计数,0 示意不须要计数;当计算聚合后果的时候,则将所有 key 第 n 位的数字相加,即为第 n 个 count distinct 的取值,这样一来,就更进一步节约了状态的存储空间。

通过以上优化,胜利的将 ABTest 的离线工作迁徙到 Flink 流式计算工作上,将作业的状态管制在 100GB 以内,让作业失常的运行起来。

流维表 JOIN 优化

生成多维明细宽表的过程中,须要进行流维表 JOIN, 应用了 Flink Join Hive 维表的性能:Hive 维表的数据会被加载到工作的 HashMap 的内存数据结构中,流表中的数据再依据 Join Key 与 HashMap 中的数据进行 Join。然而面对上亿,十亿行的 Hive 大维表,加载到内存的数据量太大,很容易导致 OOM (内存溢出)。针对以上问题,咱们将 Hive 大维表依照 Join Key 进行 Hash 分片,如下图:

这样一来,Hive 大维表的数据通过 Hash 函数计算后散布到 Flink 作业的不同并行子工作的 HashMap 中,每个 HashMap 只寄存大维表的一部分数据,只有作业的并行度够大,就可能将大维表的数据拆分成足够多份,进行分片保留;对于一些太大的维表,也能够采取 Rocksdb Map State 来保留分片数据。

Kafka 流表中的数据,当要下发到不同的 subtask 上进行 Join 时,也通过雷同的 Join Key 依照雷同的 Hash 函数进行计算,从而将数据调配到对应的 subtask 进行 Join,输入 Join 后的后果。

通过以上优化,胜利 Join 了一些 Hive 大维表工作来执行流维表 Join 计算,最大的维表超过 10 亿行。

ClickHouse Sink 的 Exactly-Once 语义反对

将流维表 Join 生成的多维明细数据输入到 ClickHouse 表的过程中,因为社区的 ClickHouse 不反对事务,所以没方法保证数据 sink 到 ClickHouse 过程中的 Exactly-Once 语义。在此过程中,一旦呈现作业 Failover,数据就会反复写入到 ClickHouse。

针对这个问题,BIGO ClickHouse 实现了一个二阶段提交事务机制:当须要写入数据到 ClickHouse 时,能够先设置写入的模式为 temporary,表明当初写入的数据是长期数据;当数据执行插入实现后,返回一个 Insert id,而后依据该 Insert id 执行 Commit 操作,那么长期数据就转为正式数据。

基于 BIGO ClickHouse 的二阶段提交事务机制,并联合 Flink 的 checkpoint 机制,实现了一个 ClickHouse Connector,保障 ClickHouse Sink 的 Exactly Once 写入语义,如下:

  • 在失常写入的状况下,Connector 随机抉择 ClickHouse 的某一个 shard 写入,依据用户配置写单正本,或者双副原本执行 insert 操作,并记录写入后的 insert id;在两次 checkpoint 之间就会有屡次这种 insert 操作,从而产生多个 insert id,当 checkpoint 实现时,再将这些 insert id 批量提交,将长期数据转为正式数据,即实现了两次 checkpoint 间数据的写入;
  • 一旦作业呈现 Failover,Flink 作业 Failover 重启实现后,将从最近一次实现的 checkpoint 来复原状态,此时 ClickHouse Sink 中的 Operator State 可能会蕴含上一次还没有来得及提交实现的 Insert id,针对这些 insert id 进行重试提交;针对那些数据曾经写入 ClickHouse 中之后,然而 insert id 并没有记录到 Opeator State 中的数据,因为是长期数据,在 ClickHouse 中并不会被查问到,一段时间后,将会由 ClickHouse 的过期清理机制,被清理掉,从而保障了状态回滚到上一次 checkpoint 之后,数据不会反复。

通过以上机制,胜利保障了数据从 Kafka 通过 Flink 计算后写入到 ClickHouse 整个链路中端到端的 Exactly-Once 语义,数据不反复也不失落。

2.2.3 平台建设

为了更好的治理 BIGO 大数据平台的实时计算工作,公司外部建设了 BIGO 实时计算平台 Bigoflow,为用户提供对立的 Flink 实时工作接入,平台建设如下:

  • 反对 Flink JAR、SQL、Python 等多种类型作业;反对不同的 Flink 版本,笼罩公司外部大部分实时计算相干业务;
  • 一站式治理:集作业开发、提交、运行、历史展现、监控、告警于一体,便于随时查看作业的运行状态和发现问题;
  • 血缘关系:不便查问每个作业的数据源、数据目标、数据计算的前因后果。

三、利用场景

3.1 Onesql OLAP 剖析平台利用场景

Onesql OLAP 剖析平台在公司外部的利用场景是:利用于 AdHoc 查问,如下:

用户通过 Hue 页面提交的 SQL,通过 OneSQL 后端转发给 Flink SQL Gateway,并提交到 Flink Session 集群上执行查问工作,Flink SQL Gateway 获取查问工作的执行进度返回给 Hue 页面,并返回查问后果。

3.2 实时数据仓库利用场景

实时数据仓库利用场景目前次要是 ABTest 业务,如下:

用户的原始行为日志数据通过 Flink 工作聚合后生成用户明细数据,而后与维表数据进行流维表 JOIN,输入到 ClickHouse 生成多维明细宽表,依照不同维度汇总后,利用于不同的业务。通过革新 ABTest 业务,将该业务的后果指标的生成工夫提前了 8 个小时,同时缩小了应用资源一倍以上。

四、将来布局

为了更好的建设 OneSQL OLAP 剖析平台以及 BIGO 实时数据仓库,实时计算平台的布局如下:

  • 欠缺 Flink OLAP 剖析平台,欠缺 Hive SQL 语法反对,以及解决计算过程中呈现的 JOIN 数据歪斜问题;
  • 欠缺实时数仓建设,引入数据湖技术,解决实时数仓中工作数据的可重跑回溯范畴小的问题;
  • 基于 Flink 打造流批一体的数据计算平台。

FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

退出移动版