共计 9337 个字符,预计需要花费 24 分钟才能阅读完成。
本文整顿自 BIGO Staff Engineer 陈航在 Flink Forward Asia 2020 分享的议题《借助 Flink 与 Pulsar,BIGO 打造实时音讯解决零碎》。次要内容包含:
- 对于 BIGO
- BIGO 为什么会抉择 Apache Pulsar
- Apache Pulsar 在 BIGO 中的角色
- BIGO 借助 Apache Pulsar 和 Flink 结构本人的实时音讯流解决零碎。
- 将来打算
一、对于 BIGO
借助于大数据和人工智能技术,BIGO 基于视频的服务和产品取得了宽泛的欢送,在 150 多个国家和地区取得了大量的用户。BIGO 次要有两款十分风行的产品,第一款是 BIGO Live,另外一款是 Likee。BIGO Live 是一个直播平台,而 Likee 是一个短视频平台。
二、为什么抉择 Apache Pulsar
在过来的几年里,BIGO 的音讯平台次要还是以开源的 Kafka 集群为主,然而随着业务的一直增长、用户一直裁减,整个音讯流平台所承载的音讯量和数据量也呈现了成倍的增长,同时也对整个音讯流零碎提出了更高的要求。
次要体现在以下几个方面:
- 第一,它对整个零碎的稳定性、可扩展性以及鲁棒性提出了更高的要求。
- 第二,因为咱们是短视频举荐相干的服务,所以对整个音讯流的低提早也提出了十分高的要求。
随着数量增长,整个 BIGO 的音讯流平台的团队在保护多个 Kafka 集群上付出了大量的工作,摆在咱们背后的有很多 Kafka 集群运维相干的问题。这个时候,咱们就在思考,咱们是抉择开源 Kafka 的一个基线版本进行本人的迭代开发呢?还是看一下开源社区外面有哪些能够借鉴的计划,来打造一个合乎咱们利用场景需要的音讯流平台。
于是咱们进行了一系列调研。在调研的过程中,咱们的眼光留神到了 Apache Pulsar,它有以下几点 feature 比拟 match 咱们的利用场景:
- 首先,它可能程度地扩大。咱们晓得对于 Kafka 而言,它是一个服务和存储绑定的零碎。当咱们须要去扩容一个集群的时候,单单把机器上线是不可能满足需要的,咱们须要对整个 topic 的 partition 进行相应操作,这个时候就是耗人力去运维的。所以,咱们须要有一个可能程度扩大的零碎。而 Apache Pulsar 提供的是存储和服务拆散的一个架构,应用的是 bookkeeper 作为底层的数据存储,下层有一个 broker 来提供相干的服务。
- 另外,就是它的 low latency 还有高吞吐、低提早以及在雅虎的生产环境下面禁受了大数据量的考验。
- 跨集群的复制等一系列的 feature 对于咱们而言也是十分须要的。
- 并且,这样一个存储和服务拆散的架构也极大地缩小了人工运维的老本。
所以咱们抉择了 Apache Pulsar。
三、Apache Pulsar 在 BIGO 中的角色
1. 引入 Pulsar 的历程
在 2019 年 11 月,咱们从新开始思考 BIGO 的利用场景上面所须要的音讯流平台到底是什么样的。是基于一个开源的 Kakfa 框架去开发,还是抉择另外一套整个音讯流零碎?
在 2019 年 11 月份,咱们做了一次整个音讯流平台的调研工作。在调研过程中,咱们比照了 Kafka、RocketMQ、Apache Pulsar 等业界相近的绝对的音讯队列的实现。而后咱们做了一系列的横向比照,并且跟咱们的业务需要进行了相应的比拟。最终发现应用 Apache Pulsar 可能解决咱们生产上的一些问题,可能为咱们的音讯流平台提供十分好的运维相干的累赘的加重,以及整个零碎的稳定性和吞吐的晋升,所以咱们就抉择了 Apache Pulsar。
在 2019 年 12 月份,咱们进行了一系列的压测。任何一个开源的框架,如果没有通过公司外部的大流量场景下的压测,是不敢上线的。所以从 2019 年 12 月份始终到 2020 年 4 月份,通过了一系列的长时间的压测工作。
在压测的过程中,咱们同时也发现了 Apache Pulsar 的一些问题,并且给社区修了一系列的 bug。在 2020 年 4 月份,咱们把 Apache Pulsar 部署在了咱们的生产测试环境;在稳固运行一个月之后,咱们就把它部署到了生产环境;在 2020 年 5 月份,正式上线。
现有的 Apache Pulsar 集群规模,目前有十几个 Apache Pulsar 的节点。整个集群的入流量是在 2~3 GB/s。随着工夫的推移,也有越来越多的利用会一直地迁徙到 Apache Pulsar 来代替现有的 Kafka 集群。
2. Apache Pulsar 的角色
Apache Pulsar 在整个流处理过程中提供的是一个 PUB-SUB 的角色。
- 首先,有 BIGO 这边的 Baina,一个 C++ 实现的音讯收集服务,把相干的数据写到 Apache Pulsar 相干的 topic 外面去,这是第一种场景。
- 第二场景就是 KMM,也就是 Kafka 的 Mirror Maker。
- 第三种场景是 Flink。另外就是一些各种语言的客户端所实现的 producer。它的上游次要有 Flink、Flink SQL 以及各个语言所实现的 consumer,比如说 golang、JAVA,C++ 的等等。
3. 上游撑持的业务场景
第一个是实时数仓,第二个是实时的 ETL,第三个是实时数据分析,另外就是实时举荐,还有更多的业务场景也在逐步的染指。上游的数据会写到 HIVE、Pulsar 的 topic、ClickHouse、Hadoop、redis 等一系列上游的相干存储系统。
四、Apache Pulsar 和 Flink 结构实时音讯流解决零碎。
这里须要分为以下三个方面来讲:
- 第一,是对于 Pulsar-Flink-Connector 的一些底细。我置信在介绍 Pulsar-Flink-Connector 的一些底细之后,大家会对整个 Flink 与 Pulsar 之间联合的关系会更加地清晰亮堂,在应用过程中也会更加地清晰;
- 第二,是 BIGO 的一个 use case,就是应用 Apache Pulsar 和 Flink 来打造本人的实时 ETL 解决零碎;
- 第三,是借助 Apache Pulsar 和 Flink 打造 AB-test 零碎。
首先看一下 Pulsar-Flink-Connector 整个生产和生产的逻辑。它次要包含一个 source 的 API 和 sink 的 API。对于生产的时候,也就是应用一个 Pulsar-Flink-Connector 的 source 来订阅 Pulsar 的一个 topic。另外一个就是咱们写一个 sink,会把 Flink 外面的数据写出到 Pulsar 的 topic 外面。下图右边的代码展现怎么去订阅这样一个 topic,实际上只须要 new 一个 FlinkPulsarSource 的一个流,而后把这条流退出到 DataStream 外面去就能够了。
对于 Flink 数据的写出而言,只须要 new 一个 FlinkPulsar 的 Sink,而后咱们调用第二个 DataStream 的 sink 就能够把数据给写出去了。实际上,整个的实现而言,跟 Kafka 的 API 是十分相似的。这里须要留神的几点就是,对于 FlinkPulsarSource 外面须要传入的是 serviceUrl 以及 adminUrl。
- serviceUrl 相似于 Kafka 的 broker_list;
- adminUrl 就是咱们去以管理员的形式来管制 Pulsar 的一些相干的操作。
Pulsar Flink 怎么样来订阅 Pulsar 的 topic,怎么样生产以及它的 offset 是怎么样 commit 回去的?
这里就会波及到 Pulsar Flink 的 exactly-once source。咱们首先来看一下图右边局部。这个图外面有一个 Pulsar 的 topic,当咱们 new 一个 PulsarFlinkSource 的时候,实际上会对每一个 Pulsar topic 的 partition 创立一个 reader。这个 reader 应用的是 Non-Durable Cursor,当这个 reader 订阅了这个 topic 之后,这个 topic 的数据流就会源源不断地流到这个 reader 的线程外面去。当 reader 的线程触发一次 checkpoint 的时候,这个 Flink 工作就会把本人的一些状态 checkpoint 起来。当 checkpoint 实现的时候,就会调用一次 Notify checkpoint complete 这样的一个办法。触发的是另外一个 subscription 的一个 commit。
这个 subscription 实际上是一个 durable cursor。当它 commit offset 的时候,这个 offset 会保留在 bookkeeper 外面,这是一个永恒保留的 offset。这样做的益处是,当 checkpoint 失败或者 checkpoint 丢了的时候,咱们须要以一个 subscription name 从 Pulsar 外面去复原的时候,就能够从 bookkeeper 外面去把 message id 读出来,而后从这边复原。
实际上对于 Pulsar-Flink-Connector 的生产而言,它是由一条数据流和一条控制流来组成的:
- 对于数据流,就是 Pulsar Topic 的数据源源不断的会流入到 reader 的这样一个线程外面,由 reader 线程进行相应的解决。
- 控制流就是通过 subscription name 来提交生产的 message id,也就是相似于 Kafka 的一个 offset,会提交到 Pulsar 的客户端来保障生产的地位。
接下来看一下 checkpoint 实现的流程。
- 首先,当咱们去做 checkpoint N 的时候,当 N 完结了之后,它会把 durable cursor 进行一次 commit;
- 当咱们去做 checkpoint N+1 的时候,N+1 实现之后,会接着去 commit N+1 的 durable cursor。
这样做的益处是,当这个工作失败之后,如果须要从某一个 checkpoint 复原,只须要从 checkpoint 外面去读到上一次 checkpoint 胜利的 offset 的 durable cursor 的 message id 的地位,就能够从上一次的地位去生产了。这也是保障 source 的 exactly once 的实现。
Topic/Partition 的 Discovery
- 第一点是,在 Pulsar-Flink-Connector 实现的逻辑里,会为每一个 Topic/Partition 调配一个 reader 的线程。
- 第二点是,每一个 task manager 会包含多个 reader 的线程,这中央会有一个什么样的映射关系?
举个例子:假如咱们订阅的 Topic 外面,有 10 个 partition,Flink 外面只给它调配 5 个 task manager,那么怎么将 partition 映射到 5 个 task manager 外面去?这就会波及到一个调配的逻辑。整个调配的逻辑,实际上是应用一个哈希的形式把某一个 Topic/Partition hash 到指标的 task manager 下面。
这就会存在一些隐患:当咱们订阅了几百个甚至上千个 topic 的时候,可能会存在肯定的调配不平衡。成千盈百个 Topic/Partition 外面,并不是每一个 partition 的流量都是平衡的。假如咱们订阅了十个 Topic,其中有九个 Topic 的流量很小,另外一个 Topic 的流量很大,那么均摊到某一个 partition 时候也是这样的。这个很大的 topic 的 Partition 的流量很大,另外 Topic/Partition 的流量很小。如果咱们只是单纯地进行一次 hash 的话,就会造成某些 task manager 下面的流量不平衡,可能会导致频繁 GC 的问题。这个问题在下一个 use case 里会具体地提到,以及怎么样去解决它。
另外就是当某一个 Topic/Partition 进行一次分区扩容时,怎么样去主动订阅这样一个新的分区?在 Pulsar-Flink-Connector 外面会启动一个定时 check 的线程的逻辑。假如咱们每一分钟 check 一次,是否有新的 partition 的退出,并且这个新 Topic/Partition 调配到了某一个 task manager 下面,那么这个 task manager 就会主动地新创建一个 reader 的线程,而后把这个 partition 订阅下来。
这整个的流程,会有一个 discover 会一直的去 check。当有新的 partition 的时候就会 new 一个 reader 起来。每一个 reader 独立生产某一个 Topic/Partition,把数据拿过去之后会定期进行本人的反序列化操作以及后续的解决。
下面讲到的是整个 connector 的一个逻辑。在 Pulsar-Flink-Connector 外面提供了 job 的形式,还提供了 catalog 的形式来生产 Pulsar 的 topic。然而目前它是没有提供 SQL DDL 的形式,在 BIGO 的利用场景外面大部分的 topic 都是以 json 的格局。大部分的数据,都是以 json 格局写入的。
对于这一类 json 格局的 topic,它可能没有当时配置本人的 schema 就写进来了,那么咱们在生产的时候,如果想用 SQL,怎么办呢?这里就须要用到 Flink DDL 的框架,所以 BIGO 的音讯流平台团队在咱们的应用过程中为 Pulsar-Flink-Connector 开发了 Flink SQL DDL 的反对。接下来看一下 Flink SQL DDL 的框架。
- 第一步,图右边就是 fetch message from Pulsar topic,首先会定义这个 topic 的外面数据的一个字段信息,也就是 create table test_Flink_SQL,这外面有 rid 等字段。上面的地位外面蕴含的是怎么去和 Pulsar 的服务端建设连贯的,这里会指定 topic 名称,指定 service url,admin url 以及 subscribe name,还有一些一系列相干的配置操作。这样一段 SQL 的代码就可能很好地实现把数据从 Pulsar topic 外面给生产进去。
- 第二步,就能够进行一系列应用层相干逻辑的解决。比方做 join,count、union 等操作。另外就是一些应用层逻辑的解决,比如说去做统计相干的一些操作。在第二步操作完了之后,咱们须要将最终的后果写出到第三方存储外面。第三方存储会包含 Hive 表、HDFS 和 Pulsar 的 topic 等。
- 对于最终的写入写出就会进入到第三步,咱们会调用一个 insert into 的办法,间接把咱们解决的后果,写出到相干的 Hive 表外面去,这就是整个 Flink SQL DDL 的一个解决逻辑。咱们借助 Flink SQL DDL 可能很好地来实现咱们的 AB test 相干的操作。那么在后面的解说外面,咱们可能会应用一个 job 的形式来提交,有了 Flink SQL DDL 的反对,咱们就能够很不便地应用一个 SQL 的形式来生产 Pulsar 的 topic,会进行一系列逻辑解决,最终把后果写出去。
当初来看一下基于 SQL 形式的 use case。
Case 1
首先来看一下 BIGO reall-time ETL 的实现。这个实时 ETL 的背景,是咱们在 Pulsar 外面,会有成千盈百个 topic,每一个 topic 会有本人独立的 schema。咱们当初的一个需要是想要把每一个 topic 应用本人的 schema 进行一次解析,把最终解析的后果以 bucket 的格局落到 HDFS 的 Hive 表下面去。对于这样一个需要,咱们可能会有几种计划:
- 第一种计划,咱们会间接应用 Pulsar 的 HDFS 的 connector,会把 topic 外面的数据会生产进去而后落到 HDFS 下面去,这样做的话,当咱们须要对 topic 外面进行一系列的解决的时候,可能就不大好办了。另外一个就是咱们有成千盈百个 topic,那么也会有成千盈百个 schema,也就是说咱们可能要保护成千盈百个线程,去解相应的 topic 外面的数据,而后把它落进来。这样对于整个工作的保护老本可能会比拟高。
- 第二种计划。咱们能够用 Flink SQL 去生产每个 topic,每一个 SQL 指令本人的 schema,而后把这个 topic 给生产进去,之后进行一系列的解决,而后写出去。这种形式,实际上也会带来几百个甚至上千个 SQL 工作的保护工作。
- 第三个计划,咱们想到了应用一个 Flink 工作来生产成千盈百个 Pulsar 的 topic。而后进行一系列的 ETL 解决,首先进行 schema 的解析,而后进行一系列逻辑解决,最终把它写出到 HDFS 下面去。上面这张图,就是咱们采纳的第三种计划:应用一个 Flink 的 job 把成千盈百个 topic 订阅了。订阅完了之后,获取相应的线程去生产。解析完了之后会通过一系列逻辑解决,最终显示到 HDFS 下面去。
这个 case 可能存在数据分布不均的问题。假如,咱们有 500 个 topic,其中 400 个 topic 的流量很小,另外 100 个 topic 的流量很大。那么咱们在订阅的时候,假如我起了 100 个 task manager 去生产。那么这可能就会按均匀来算,有 5-10 个 topic partition 会落到同一个 task manager 下面去。如果咱们不干涉的话,因为这个 partition 本身的流量不平衡,可能会导致它从运行工作的过程的流量也是不平衡的,带来了频繁 GC 的问题。
为了解决生产端下面的 task manager 流量不平衡的状况。咱们引入了一个 slot group 的概念。咱们会当时对 topic partition 的流量进行一个预估,预估完了之后,会通过人工计算的形式把几个低流量的 topic 组合成同一个 group 外面。同一个 group 的 topic 会被放在同一个 slot 外面,而后去进行调度,这样就可能很好的去把 task manager 下面的生产流量不均的问题解决掉了,整个 Flink job 就会运行的很好。
Case 2
第二个 case 是一个 AB test 的利用场景,做这个 AB test 场景的一个初衷是什么呢?在咱们实时的数仓外面,须要去产出小时级别的两头表,以及天级的两头表,给举荐算法的工程师以及数据分析师来应用。对于小时级别的两头表以及天级的两头表的产生,须要通过实时的去计算底层的各种类型的打点,比方用户观看的打点、某个视频的下发打点,还有用户其余行为的打点等等,会依照某一个维度进行聚合。聚合了之后会进行相干的一些统计,最终会造成一张宽带供举荐算法工程师以及数据分析师来应用。
如果咱们不提供这样一个宽表的话,那么对于下层的业务方而言,可能要一直的去拜访底层的表,对底层表进行各种相应的操作。这样岂但会节约数据分析师以及举荐算法工程师的工夫,也会造成整个集群计算资源的节约。那么在 BIGO 这边,之前的解决方案是应用 Hive。应用 Map Reduce 的形式,来把每张底层的表进行一次聚合操作。聚合完了之后会提供一个小时级别两头表以及天级的两头表给下层业务应用,这样做的弊病是:Hive Map Reduce 的时效性是没法保障的。所以咱们就在想是否应用 Flink 流式计算的形式来进步实时数仓的数据产出效率。
接下来就是咱们这边的一个解决方案:首先咱们会用 Flink SQL 去生产 Pulsar 的 topic。从下图的右边来看,咱们有 Topic A、Topic B 和 Topic K。每个 topic 有本人的 DDL。咱们首先会应用 Flink SQL 加上每一个 topic 的 scanner,也就是 DDL 会把 topic 的数据从 Pulsar 外面加载进去,而后把它做成每个 topic 的一个视图。
这个中央咱们就会有 Table A、Table B 和 Table K 的一个表。假如有 K 张表,那么咱们须要对 K 张表进行一次聚合操作。假如是依照 uid 进行一次聚合,那么这个聚合有两种形式:
第一种形式是做 join。对于 Flink 而言,它的流式 join 可能耗时会比拟长,整个计算资源的耗费也是十分大的。所以咱们这边做了一个比拟奇妙的计划就是应用 union 代替 join。咱们会把 Table A、Table B 和 Table K 通过 union 的形式会生成一个 View X。而后把 View X 间接写出以小时为粒度,到 ClickHouse 供用户查问。在 union 的过程当中,咱们还会做一些相干的聚合的操作。来把相干的指标给聚合起来供用户应用。这个就是小时级别的两头表。
对于天级的两头表而言,咱们所遇到的挑战是:它并不是单单的只依赖了 Table A、Table B 和 Table K,可能还依赖了离线的表。假如有 Table a1、Table a2 和 Table a3 三张表。咱们怎么样把实时的表和离线的表做一个关联?这里咱们也是应用的一个比拟奇妙的形式。
首先。在右边 Table A、Table B 和 Table K 会应用 Flink SQL 把数据从 Pulsar 生产进去,而后做成一个独立的 table。而后同样也是以 union 的形式把实时的流表给 union 起来,做一些统计相干的解决生成一个视图,一个 View X。这个 View X 会依据咱们精心设计过的一个 row-key,把它以天为维度写出到 HBase 外面去。
另外,对于离线而言。因为咱们 Table A、Table B 和 Table K 只是代表了咱们实时的一些数据,对于离线的数据,也是须要 join 进来的,那么就会应用一个 Spark 来把 Table a1、Table a2 和 Table a3 相干的数据给 join 起来,而后也以雷同的规定生成一个 row-key 写在 HBase 外面去。
对于 HBase 而言,它实际上提供的就是一个 join 操作,写到 HBase 就很好的防止了咱们将 View X 以及 Spark 所生成的这样一张表做 join 了。因为如果是有雷同的 key,那么假如 HBase 这样一张宽表有 100 列,View X 占了前 80 列,那么前面的 Spark 所算进去的这个表会主动地填充到那个后 20 列外面去,那么最终会生成同一个 row-key 的一个 100 维的一张宽表。那么咱们接下来会把 HBase 外面这样一张宽表读出来,而后写到 ClickHouse 供下层用户去查问。这样就可能很好的去防止表之间的 join 操作,极大地提高 join 的效率。
五、将来工作
- 首先,咱们会接着在 Pulsar-Flink-Connector 下面持续的去开发新的 feature 并且继续的去进行一系列的 bug 修复;
- 第二点,咱们会更多的将 Flink 工作继续地从 Kakfa 迁徙到 Apache Pulsar 下面去;
- 第三点,在咱们整个的音讯流平台里,之前应用的是 Kakfa,可能有成千盈百个 Flink 的工作或者是其余的工作,应用 Kafka 的 API 来生产 Kafka 的 topic。如果不提供一个简略的形式让用户来生产 Pulsar 的 topic 的话,这个迁徙工作是十分难进行的。所以咱们会借助于 KOP,也就是 Kakfa on Pulsar,不便下层利用的迁徙,有了这样一层 KOP 的一个 proxy,对于下面应用程序是不须要改任何的代码就可能主动的从 Kafka 切到 Pulsar 下面的;
- 第四点,咱们打算实现一个批流对立的数据的生产,从 Pulsar topic 外面以批或者是流的形式来生产 topic 里的数据;
- 第五点,咱们会继续增强 Pulsar 以及 bookkeeper 的稳定性以及吞吐的调优;
- 第六点,咱们会继续的去优化 Bookkeeper 的 IO 协定栈。
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群;
第一工夫获取最新技术文章和社区动静,请关注公众号~