乐趣区

关于apache:从-Kafka-到-PulsarBIGO-打造实时消息系统之路

对于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
GitHub 地址:http://github.com/apache/pulsar/

BIGO 于 2014 年成立,是一家高速倒退的科技公司。基于弱小的音视频解决技术、寰球音视频实时传输技术、人工智能技术、CDN 技术,BIGO 推出了一系列音视频类社交及内容产品,包含 Bigo Live(直播)和 Likee(短视频)等,在寰球已领有近 1 亿用户,产品及服务已笼罩超过 150 个国家和地区。

挑战

最后,BIGO 的音讯流平台次要采纳开源 Kafka 作为数据撑持。随着数据规模日益增长,产品一直迭代,BIGO 音讯流平台承载的数据规模呈现了成倍增长,上游的在线模型训练、在线举荐、实时数据分析、实时数仓等业务对音讯流平台的实时性和稳定性提出了更高的要求。开源的 Kafka 集群难以撑持海量数据处理场景,咱们须要投入更多的人力去保护多个 Kafka 集群,这样老本会越来越高,次要体现在以下几个方面:

  • 数据存储和音讯队列服务绑定,集群扩缩容 / 分区平衡须要大量拷贝数据,造成集群性能降落。
  • 当分区正本不处于 ISR(同步)状态时,一旦有 broker 产生故障,可能会造成数据失落或该分区无奈提供读写服务。
  • 当 Kafka broker 磁盘故障 / 空间占用率过高时,须要进行人工干预。
  • 集群跨区域同步应用 KMM(Kafka Mirror Maker),性能和稳定性难以达到预期。
  • 在 catch-up 读场景下,容易呈现 PageCache 净化,造成读写性能降落。
  • Kafka broker 上存储的 topic 分区数量无限,分区数越多,磁盘读写程序性越差,读写性能越低。
  • Kafka 集群规模增长导致运维老本急剧增长,须要投入大量的人力进行日常运维;在 BIGO,扩容一台机器到 Kafka 集群并进行分区平衡,须要 0.5 人 / 天;缩容一台机器须要 1 人 / 天。

如果持续应用 Kafka,老本会一直回升:扩缩容机器、减少运维人力。同时,随着业务规模增长,咱们对音讯零碎有了更高的要求:零碎要更稳固牢靠、便于程度扩大、提早低。为了进步音讯队列的实时性、稳定性和可靠性,升高运维老本,咱们开始思考是否要基于开源 Kafka 做本地化二次开发,或者看看社区中有没有更好的解决方案,来解决咱们在保护 Kafka 集群时遇到的问题。

为什么抉择 Pulsar

2019 年 11 月,咱们开始调研音讯队列,比照以后支流音讯流平台的优缺点,并跟咱们的需要对接。在调研过程中,咱们发现 Apache Pulsar 是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体。Pulsar 可能无缝扩容、提早低、吞吐高,反对多租户和跨地区复制。最重要的是,Pulsar 存储、计算拆散的架构可能完满解决 Kafka 扩缩容的问题。Pulsar producer 把音讯发送给 broker,broker 通过 bookie client 写到第二层的存储 BookKeeper 上。

Pulsar 采纳存储、计算拆散的分层架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐以及低延时的高可扩大流数据存储个性。

  • 程度扩容:可能无缝扩容到成千盈百个节点。
  • 高吞吐:曾经在 Yahoo! 的生产环境中禁受了考验,反对每秒数百万条音讯的公布 - 订阅(Pub-Sub)。
  • 低提早:在大规模的音讯量下仍然可能放弃低提早(小于 5 ms)。
  • 长久化机制:Pulsar 的长久化机制构建在 Apache BookKeeper 上,实现了读写拆散。
  • 读写拆散:BookKeeper 的读写拆散 IO 模型极大施展了磁盘程序写性能,对机械硬盘绝对比拟敌对,单台 bookie 节点撑持的 topic 数不受限制。

为了进一步加深对 Apache Pulsar 的了解,掂量 Pulsar 是否真正满足咱们生产环境大规模音讯 Pub-Sub 的需要,咱们从 2019 年 12 月开始进行了一系列压测工作。因为咱们应用的是机械硬盘,没有 SSD,在压测过程中遇到了一些性能问题,在 StreamNative 的帮助下,咱们别离对 Broker 和 BookKeeper 进行了一系列的性能调优,Pulsar 的吞吐和稳定性均有所提高。

通过 3~4 个月的压测和调优,咱们认为 Pulsar 齐全可能解决咱们应用 Kafka 时遇到的各种问题,并于 2020 年 4 月在测试环境上线 Pulsar。

Apache Pulsar at BIGO:Pub-Sub 生产模式

2020 年 5 月,咱们正式在生产环境中应用 Pulsar 集群。Pulsar 在 BIGO 的场景次要是 Pub-Sub 的经典生产生产模式,前端有 Baina 服务(用 C++ 实现的数据接管服务),Kafka 的 Mirror Maker 和 Flink,以及其余语言如 Java、Python、C++ 等客户端的 producer 向 topic 写入数据。后端由 Flink 和 Flink SQL,以及其余语言的客户端的 consumer 生产数据。

在上游,咱们对接的业务场景有实时数仓、实时 ETL(Extract-Transform-Load,将数据从起源端通过抽取(extract)、转换(transform)、加载(load)至目标端的过程)、实时数据分析和实时举荐。大部分业务场景应用 Flink 生产 Pulsar topic 中的数据,并进行业务逻辑解决;其余业务场景生产应用的客户端语言次要散布在 C++、Go、Python 等。数据通过各自业务逻辑解决后,最终会写入 Hive、Pulsar topic 以及 ClickHouse、HDFS、Redis 等第三方存储服务。

Pulsar + Flink 实时流平台

在 BIGO,咱们借助 Flink 和 Pulsar 打造了实时流平台。在介绍这个平台之前,咱们先理解下 Pulsar Flink Connector 的外部运行机理。在 Pulsar Flink Source/Sink API 中,上游有一个 Pulsar topic,两头是 Flink job,上游有一个 Pulsar topic。咱们怎么生产这个 topic,又怎么解决数据并写入 Pulsar topic 呢?

依照上图左侧代码示例,初始化一个 StreamExecutionEnvironment,进行相干配置,比方批改 property、topic 值。而后创立一个 FlinkPulsarSource 对象,这个 Source 外面填上 serviceUrl(brokerlist)、adminUrl(admin 地址)以及 topic 数据的序列化形式,最终会把 property 传进去,这样就可能读取 Pulsar topic 中的数据。Sink 的应用办法非常简单,首先创立一个 FlinkPulsarSink,Sink 外面指定 target topic,再指定 TopicKeyExtractor 作为 key,并调用 addsink,把数据写入 Sink。这个生产生产模型很简略,和 Kafka 很像。

Pulsar topic 和 Flink 的生产如何联动呢?如下图所示,新建 FlinkPulsarSource 时,会为 topic 的每一个分区新创建一个 reader 对象。要留神的是 Pulsar Flink Connector 底层应用 reader API 生产,会先创立一个 reader,这个 reader 应用 Pulsar Non-Durable Cursor。Reader 生产的特点是读取一条数据后马上提交(commit),所以在监控上可能会看到 reader 对应的 subscription 没有 backlog 信息。

在 Pulsar 2.4.2 版本中,由 Non-Durable Cursor 订阅的 topic,在接管到 producer 写入的数据时,不会将数据保留在 broker 的 cache 中,导致大量数据读取申请落到 BookKeeper 中,升高数据读取效率。BIGO 在 Pulsar 2.5.1 版本中修改了这个问题。

Reader 订阅 Pulsar topic 后,生产 Pulsar topic 中的数据,Flink 如何保障 exactly-once 呢?Pulsar Flink Connector 应用另外一个独立的 subscription,这个 subscription 应用的是 Durable Cursor。当 Flink 触发 checkpoint,Pulsar Flink Connector 会把 reader 的状态(包含每个 Pulsar Topic Partition 的生产地位 ) checkpoint 到文件、内存或 RocksDB 中,当 checkpoint 实现后,会公布一次 Notify Checkpoint Complete 告诉。Pulsar Flink Connector 收到 checkpoint 实现告诉后,把以后所有 reader 的生产 Offset,即 message id 以独立的 SubscriptionName 提交给 Pulsar broker,此时才会把生产 Offset 信息真正记录下来。

Offset Commit 实现后,Pulsar broker 会将 Offset 信息(在 Pulsar 中以 Cursor 示意)存储到底层的分布式存储系统 BookKeeper 中,这样做的益处是当 Flink 工作重启后,会有两层复原保障。第一种状况是从 checkpoint 复原:能够间接从 checkpoint 里取得上一次生产的 message id,通过这个 message id 获取数据,这个数据流就能持续生产。如果没有从 checkpoint 复原,Flink 工作重启后,会依据 SubscriptionName 从 Pulsar 中获取上一次 Commit 对应的 Offset 地位开始生产。这样就能无效避免 checkpoint 损坏导致整个 Flink 工作无奈胜利启动的问题。

Checkpoint 流程如下图所示。

先做 checkpoint N,实现后公布一次 notify Checkpoint Complete,期待肯定工夫距离后,接下来做 checkpoint N+1,实现后也会进行一次 notify Checkpoint Complete 操作,此时把 Durable Cursor 进行一次 Commit,最终 Commit 到 Pulsar topic 的服务端上,这样能确保 checkpoint 的 exactly-once,也能依据本人设定的 subscription 保障 message“keep alive”。

Topic/Partition Discovery 要解决什么问题呢?当 Flink 工作生产 topic 时,如果 Topic 减少分区,Flink 工作须要可能主动发现分区。Pulsar Flink Connector 如何实现这一点呢?订阅 topic 分区的 reader 之间互相独立,每个 task manager 蕴含多个 reader thread,依据哈希函数把单个 task manager 中蕴含的 topic 分区映射过去,topic 中新增分区时,新退出的分区会映射到某个 task manager 上,task manager 发现新增分区后,会创立一个 reader,生产掉新数据。用户能够通过设置 partition.discovery.interval-millis 参数,调配检测频率。

为了升高 Flink 生产 Pulsar topic 的门槛,让 Pulsar Flink Connector 反对更加丰盛的 Flink 新个性,BIGO 音讯队列团队为 Pulsar Flink Connector 减少了 Pulsar Flink SQL DDL(Data Definition Language,数据定义语言)和 Flink 1.11 反对。此前官网提供的 Pulsar Flink SQL 只反对 Catalog,要想通过 DDL 模式生产、解决 Pulsar topic 中的数据不太不便。在 BIGO 场景中,大部分 topic 数据都以 JSON 格局存储,而 JSON 的 schema 没有提前注册,所以只能在 Flink SQL 中指定 topic 的 DDL 后才能够生产。针对这种场景,BIGO 基于 Pulsar Flink Connector 做了二次开发,提供了通过 Pulsar Flink SQL DDL 模式生产、解析、解决 Pulsar topic 数据的代码框架(如下图所示)。

右边的代码中,第一步是配置 Pulsar topic 的生产,首先指定 topic 的 DDL 模式,比方 rip、rtime、uid 等,上面是生产 Pulsar topic 的根底配置,比方 topic 名称、service-url、admin-url 等。底层 reader 读到音讯后,会依据 DDL 解出音讯,将数据存储在 test_flink_sql 表中。第二步是惯例逻辑解决(如对表进行字段抽取、做 join 等),得出相干统计信息或其余相干后果后,返回这些后果,写到 HDFS 或其余零碎上等。第三步,提取相应字段,将其插入一张 hive 表。因为 Flink 1.11 对 hive 的写入反对比 1.9.1 更加优良,所以 BIGO 又做了一次 API 兼容和版本升级,使 Pulsar Flink Connector 反对 Flink 1.11。BIGO 基于 Pulsar 和 Flink 构建的实时流平台次要用于实时 ETL 解决场景和 AB-test 场景。

实时 ETL 解决场景

实时 ETL 解决场景次要使用 Pulsar Flink Source 及 Pulsar Flink Sink。这个场景中,Pulsar topic 实现几百甚至上千个 topic,每个 topic 都有独立的 schema。咱们须要对成千盈百个 topic 进行惯例解决,如字段转换、容错解决、写入 HDFS 等。每个 topic 都对应 HDFS 上的一张表,成千盈百个 topic 会在 HDFS 上映射成千盈百张表,每张表的字段都不一样,这就是咱们遇到的实时 ETL 场景。

这种场景的难点在于 topic 数量多。如果每个 topic 保护一个 Flink 工作,保护老本太高。之前咱们想通过 HDFS Sink Connector 把 Pulsar topic 中的数据间接 sink 到 HDFS 上,但解决外面的逻辑却很麻烦。最终咱们决定应用一个或多个 Flink 工作去生产成千盈百个 topic,每个 topic 配本人的 schema,间接用 reader 来订阅所有 topic,进行 schema 解析后处理,将解决后的数据写到 HDFS 上。

随着程序运行,咱们发现这种计划也存在问题:算子之间压力不平衡。因为有些 topic 流量大,有些流量小,如果齐全通过随机哈希的形式映射到对应的 task manager 下来,有些 task manager 解决的流量会很高,而有些 task manager 解决的流量很低,导致有些 task 机器上积塞十分重大,拖慢 Flink 流的解决。所以咱们引入了 slot group 概念,依据每个 topic 的流量状况进行分组,流量会映射到 topic 的分区数,在创立 topic 分区时也以流量为根据,如果流量很高,就多为 topic 创立分区,反之少一些。分组时,把流量小的 topic 分到一个 group 中,把流量大的 topic 独自放在一个 group 中,很好地隔离了资源,保障 task manager 总体上流量平衡。

AB-test 场景

实时数仓须要提供小时表或天表为数据分析师及举荐算法工程师提供数据查问服务,简略来讲就是 app 利用中会有很多打点,各种类型的打点会上报到服务端。如果间接裸露原始打点给业务方,不同的业务应用方就须要拜访各种不同的原始表从不同维度进行数据抽取,并在表之间进行关联计算。频繁对底层根底表进行数据抽取和关联操作会重大节约计算资源,所以咱们提前从根底表中抽取用户关怀的维度,将多个打点合并在一起,形成一张或多张宽表,笼罩下面举荐相干的或数据分析相干的 80% ~ 90% 场景工作。

在实时数仓场景下还需实时两头表,咱们的解决方案是,针对 topic A 到 topic K,咱们应用 Pulsar Flink SQL 将生产到的数据解析成相应的表。通常状况下,将多张表聚合成一张表的罕用做法是应用 join,如把表 A 到 K 依照 uid 进行 join 操作,造成十分宽的宽表;但在 Flink SQL 中 join 多张宽表效率较低。所以 BIGO 应用 union 来代替 join,做成很宽的视图,以小时为单位返回视图,写入 ClickHouse,提供给上游的业务方实时查问。应用 union 来代替 join 减速表的聚合,可能把小时级别的两头表产出管制在分钟级别。

输入天表可能还须要 join 寄存在 hive 上的表或其余存储介质上的离线表,即流表和离线表之间 join 的问题。如果间接 join,checkpoint 中须要存储的中间状态会比拟大,所以咱们在另外一个维度上做了优化。

左侧局部相似于小时表,每个 topic 应用 Pulsar Flink SQL 生产并转换成对应的表,表之间进行 union 操作,将 union 失去的表以天为单位输出到 HBase(此处引入 HBase 是为了做代替它的 join)。

右侧须要 join 离线数据,应用 Spark 聚合离线的 Hive 表(如表 a1、a2、a3),聚合后的数据会通过精心设计的 row-key 写入 HBase 中。数据聚合后状态如下:假如右边数据的 key 填了宽表的前 80 列,前面 Spark 工作算出的数据对应同样一个 key,填上宽表的后 20 列,在 HBase 中组成一张很大的宽表,把最终数据再次从 HBase 抽出,写入 ClickHouse,供下层用户查问,这就是 AB-test 的主体架构。

业务收益

从 2020 年 5 月上线至今,Pulsar 运行稳固,日均解决音讯数百亿,字节入流量为 2~3 GB/s。Apache Pulsar 提供的高吞吐、低提早、高可靠性等个性极大进步了 BIGO 音讯解决能力,升高了音讯队列运维老本,节约了近 50% 的硬件老本。目前,咱们在几十台物理主机上部署了上百个 Pulsar broker 和 bookie 过程,采纳 bookie 和 broker 在同一个节点的混部模式,曾经把 ETL 从 Kafka 迁徙到 Pulsar,并逐渐将生产环境中生产 Kafka 集群的业务(比方 Flink、Flink SQL、ClickHouse 等)迁徙到 Pulsar 上。随着更多业务的迁徙,Pulsar 上的流量会继续上涨。

咱们的 ETL 工作有一万多个 topic,每个 topic 均匀有 3 个分区,应用 3 正本的存储策略。之前应用 Kafka,随着分区数减少,磁盘由程序读写逐步进化为随机读写,读写性能进化重大。Apache Pulsar 的存储分层设计可能轻松反对百万 topic,为咱们的 ETL 场景提供了优雅反对。

将来瞻望

BIGO 在 Pulsar broker 负载平衡、broker cache 命中率优化、broker 相干监控、BookKeeper 读写性能优、BookKeeper 磁盘 IO 性能优化、Pulsar 与 Flink、Pulsar 与 Flink SQL 联合等方面做了大量工作,晋升了 Pulsar 的稳定性和吞吐,也升高了 Flink 与 Pulsar 联合的门槛,为 Pulsar 的推广打下了坚实基础。

将来,咱们会减少 Pulsar 在 BIGO 的场景利用,帮忙社区进一步优化、欠缺 Pulsar 性能,具体如下:

  1. 为 Apache Pulsar 研发新个性,比方反对 topic policy 相干个性。
  2. 迁徙更多任务到 Pulsar。这项工作波及两方面,一是迁徙之前应用 Kafka 的工作到 Pulsar。二是新业务间接接入 Pulsar。
  3. BIGO 筹备应用 KoP 来保证数据迁徙平滑过渡。因为 BIGO 有大量生产 Kafka 集群的 Flink 工作,咱们心愿可能间接在 Pulsar 中做一层 KoP,简化迁徙流程。
  4. 对 Pulsar 及 BookKeeper 继续进行性能优化。因为生产环境中流量较高,BIGO 对系统的可靠性和稳定性要求较高。
  5. 继续优化 BookKeeper 的 IO 协定栈。Pulsar 的底层存储自身是 IO 密集型零碎,保障底层 IO 高吞吐,才可能晋升下层吞吐,保障性能稳固。

作者简介

陈航,Apache Pulsar Committer,BIGO 大数据音讯平台团队负责人,负责创立与开发承载大规模服务与利用的集中公布 - 订阅音讯平台。他将 Apache Pulsar 引入到 BIGO 音讯平台,并买通上下游零碎,如 Flink、ClickHouse 和其余实时举荐与剖析零碎。他目前次要负责 Pulsar 性能调优、新性能开发及 Pulsar 生态集成。

相干浏览

  • Apache Pulsar 在 BIGO 的性能调优实战(上)
  • Apache Pulsar 在 BIGO 的性能调优实战(下)
  • Apache Pulsar 在能源互联网畛域的落地实际

点击 链接,获取 Apache Pulsar 硬核干货材料!

退出移动版