共计 7003 个字符,预计需要花费 18 分钟才能阅读完成。
对于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
GitHub 地址:http://github.com/apache/pulsar/
我的项目背景介绍
拉卡拉领取成立于 2005 年,是国内当先的第三方领取企业,致力于整合信息科技,服务线下实体,从领取切入,全维度为中小微商户的经营赋能。2011 年成为首批取得《领取业务许可证》企业的一员,2019 年上半年服务商户超过 2100 万家。2019 年 4 月 25 日,登陆创业板。
性能需要
因为拉卡拉的项目组数量较多,各个我的项目在建设时,别离依据须要抉择了本人的音讯零碎。这就导致一方面很多零碎的业务逻辑和具体的音讯零碎之间存在耦合,为后续系统维护和降级带来麻烦;另一方面业务团队成员对音讯零碎的治理和应用程度存在差别,从而使得整体零碎服务质量和性能不稳固;此外,同时保护多套零碎,物理资源利用率和治理老本都比拟高。
因而,咱们打算建设一套分布式根底音讯平台,同时为各个团队提供服务。该平台须要具备以下个性:高牢靠、低耦合、租户隔离、易于程度扩大、易于经营保护、对立治理、按需申请应用,同时反对传统的音讯队列和流式队列。表 1 展现了这两类服务应该具备的个性。
为什么抉择 Apache Pulsar
大厂开源背书
当初可供用户抉择的大厂开源音讯平台有很多,架构设计大多相似,比方 Kafka 和 RocketMQ 都采纳存储与计算一体的架构,只有 Pulsar 采纳存储与计算拆散的多层架构。咱们比拟选型的音讯零碎有三个:Kafka、RocketMQ 和 Pulsar。测试之前,咱们通过网上的公开数据,对三者的性能和性能进行了简略的比照,表 2 为比照后果。从中能够看出 Pulsar 更合乎咱们的需要。
Pulsar 的架构劣势
Pulsar 是云原生的分布式音讯流平台,源于 Yahoo!,反对 Yahoo! 利用,服务 140 万个 topic,日解决超过 1000 亿条音讯。2016 年 Yahoo! 开源 Pulsar 并将其捐献给 Apache 软件基金会,2018 年 Pulsar 成为 Apache 软件基金会的顶级我的项目。
作为一种高性能解决方案,Pulsar 具备以下个性:反对多租户,通过多租户可为每个租户独自设置认证机制、存储配额、隔离策略等;高吞吐、低提早、高容错;原生反对多集群部署,集群间反对无缝数据复制;高可扩大,可能撑持上百万个 topic;反对多语言客户端,如 Java、Go、Python、C++ 等;反对多种音讯订阅模式(独占、共享、灾备、Key_Shared)。
架构正当
Kafka 采纳计算与存储一体的架构,当 topic 数量较多时,Kafka 的存储机制会导致缓存净化,升高性能。Pulsar 采纳计算与存储拆散的架构(如图 1)。无状态计算层由一组接管和投递音讯的 broker 组成,broker 负责与业务零碎进行通信,承当协定转换,序列化和反序列化、选主等性能。有状态存储层由一组 bookie 存储节点组成,能够长久存储音讯。
Broker 架构
Broker 次要由四个模块组成。咱们能够依据理论需要对相应的性能进行二次开发。
- Dispatcher:调度散发模块,承当协定转换、序列化反序列化等。
- Load balancer:负载平衡模块,对拜访流量进行管制治理。
- Global replicator:跨集群复制模块,承当异步的跨集群音讯同步性能。
- Service discovery:服务发现模块,为每个 topic 抉择无状态的主节点。
长久层(BookKeeper)架构
图 3 为 Pulsar 中长久层的架构图。Bookie 是 BookKeeper 的存储节点,提供独立的存储服务。ZooKeeper 为元数据存储系统,提供服务发现以及元数据管理服务。BookKeeper 架构属于典型的 slave-slave 架构,所有 bookie 节点的角色都是 slave,负责长久化数据,每个节点的解决逻辑都雷同;BookKeeper 客户端为 leader 角色,承当协调工作,因为其自身无状态,所以能够疾速实现故障转移。
隔离架构
保障了 Pulsar 的低劣性能,次要体现在以下几个方面:
- IO 隔离:写入、追尾读和追赶读隔离。
- 利用网络流入带宽和磁盘程序写入的个性实现高吞吐写:传统磁盘在程序写入时,带宽很高,零散读写导致磁盘带宽升高,采取程序写入形式能够晋升性能。
- 利用网络流出带宽和多个磁盘独特提供的 IOPS 解决能力实现高吞吐读:收到数据后,写到性能较好的 SSD 盘里,进行一级缓存,而后再应用异步线程,将数据写入到传统的 HDD 硬盘中,升高存储老本。
- 利用各级缓存机制实现低提早投递:生产者发送音讯时,将音讯写入 broker 缓存中;实时生产时(追尾读),首先从 broker 缓存中读取数据,防止从长久层 bookie 中读取,从而升高投递提早。读取历史音讯(追赶读)场景中,bookie 会将磁盘音讯读入 bookie 读缓存中,从而防止每次都读取磁盘数据,升高读取延时。
比照总结
左侧为 Kafka、RabbitMQ 等音讯零碎采纳的架构设计,broker 节点同时负责计算与存储,在某些场景中应用这种架构,能够实现高吞吐;但当 topic 数量减少时,缓存会受到净化,影响性能。
右侧为 Pulsar 的架构,Pulsar 对 broker 进行了拆分,减少了 BookKeeper 长久层,尽管这样会减少零碎的设计复杂性,但能够升高零碎的耦合性,更易实现扩缩容、故障转移等性能。表 3 总结了分区架构和分片架构的次要个性。
基于对 Pulsar 的架构和性能特点,咱们对 Pulsar 进行了测试。在操作系统层面应用 NetData 工具进行监控,应用不同大小的数据包和频率进行压测,测试的几个重要指标是磁盘、网络带宽等的稳定状况。
测试论断如下:
- 部署形式:混合部署优于离开部署。broker 和 bookie 能够部署在同一个节点上,也能够离开部署。节点数量较多时,离开部署较好;节点数量较少或对性能要求较高时,将二者部署在同一个节点上较好,能够节俭网络带宽,升高提早。
- 负载大小:随着测试负载的增大,tps 升高,吞吐量稳固。
- 刷盘形式:异步刷盘优于同步刷盘。
- 压缩算法:压缩算法举荐应用 LZ4 形式。咱们别离测试了 Pulsar 自带的几种压缩形式,应用 LZ4 压缩算法时,CPU 使用率最低。应用压缩算法能够升高网络带宽使用率,压缩比率为 82%。
- 分区数量:如果单 topic 未达到单节点物理资源下限,倡议应用单分区;因为 Pulsar 存储未与分区耦合,能够依据业务倒退状况,随时调整分区数量。
- 主题数量:压测过程中,减少 topic 数量,性能不受影响。
- 资源束缚:如果网络带宽为千兆,网络会成为性能瓶颈,网络 IO 能够达到 880 MB/s;在网络带宽为万兆时,磁盘会成为瓶颈,磁盘 IO 使用率为 85% 左右。
- 内存与线程:如果应用物理主机,需注意内存与线程数目的比例。默认配置参数为 IO 线程数等于 CPU 核数的 2 倍。这种状况下,实体机核数为 48 核,如果内存设置得较小,比拟容易呈现 OOM 的问题。
除了上述测试以外,咱们还复测了 Jack Vanlightly(RabbitMQ 的测试工程师)的破坏性测试用例,失去如下论断:
- 所有测试场景中,没有呈现音讯失落与音讯乱序;
- 开启音讯去重的场景中,没有呈现音讯反复。
反对团队业余
另外,咱们与 Apache Pulsar 我的项目的外围开发人员交换沟通工夫较早,他们在 Yahoo! 和推特有过丰盛的实践经验,准备成立公司在全世界范畴内推广应用 Pulsar,并且会将中国作为最重要的基地,这为咱们的应用提供了强有力的保障。当初大家也都晓得,他们成立了 StreamNative 公司,并且已取得多轮融资,队伍也在一直壮大。
Pulsar 在根底音讯平台的实际
咱们基于 Pulsar 构建的根底音讯平台架构如下图,图中绿色局部为基于 Pulsar 实现的性能或开发的组件。本节将结合实际应用场景,具体介绍咱们如何在理论应用场景中利用 Pulsar 及基于 Pulsar 开发的组件。
场景 1:流式队列
1. OGG For Pulsar 适配器
源数据存储在 Oracle 中,咱们心愿实时抓取 Oracle 的变更数据,进行实时计算、数据分析、提供给上游业务零碎查问等场景。
咱们应用 Oracle 的 OGG(Oracle Golden Gate)工具进行实时抓取,它蕴含两个模块:源端 OGG 和指标 OGG。因为 OGG 官网没有提供 Sink 到 Pulsar 的组件,咱们依据须要开发了 OGG For Pulsar 组件。下图为数据处理过程图,OGG 会抓取到表中每条记录的增删改操作,并且把每次操作作为一条音讯推送给 OGG For Pulsar 组件。OGG For Pulsar 组件会调用 Pulsar 客户端的 producer 接口,进行音讯投递。投递过程中,须要严格保障音讯程序。咱们应用数据库表的主键作为音讯的 key,数据量大时,能够依据 key 对 topic 进行分区,将雷同的 key 投递到同一分区,从而保障对数据库表中主键雷同的记录所进行的增删改操作有序。
2. Pulsar To TiDB 组件
咱们通过 Pulsar To TiDB 组件将抓取到的变更音讯存储到 TiDB 中,对上游零碎提供查问服务。这一组件的解决逻辑为:
- 应用灾备订阅形式,生产 Pulsar 音讯。
- 依据音讯的 key 进行哈希运算,将雷同的 key 散列到同一长久化线程中。
- 启用 Pulsar 的音讯去重性能,防止音讯反复投递。假如 MessageID2 反复投递,那么数据一致性将被毁坏。
3. Pulsar 的音讯长久化过程剖析
Pulsar 的音讯长久化过程包含以下四步:
- OGG For Pulsar 组件调用 Pulsar 客户端的 producer 接口,投递音讯。
- Pulsar 客户端依据配置文件中的 broker 地址列表,获取其中一个 broker 的地址,而后发送 topic 归属查问服务,获取服务该 topic 的 broker 地址(下图示例中为 broker2)。
- Pulsar 客户端将音讯投递给 Broker2。
- Broker2 调用 BookKeeper 的客户端做长久化存储,存储策略包含本次存储可抉择的 bookie 总数、正本数、胜利存储确认回复数。
4. 数据库表构造动静传递
OGG 应用 AVRO 形式进行序列化操作时,如果将多个表投递到同一个 topic 中,AVRO Schema 为二级构造:wrapper schema 和 table schema。wrapper schema 构造始终不变,蕴含 table_name、schema_fingerprint、payload 三局部信息;OGG 在抓取数据时,会感知数据库表构造的变动并告诉给 OGG For Pulsar,即表构造决定其 table schema,再由 table schema 生成对应的 schema_fingerprint。
咱们将获取到的 table schema 发送并存储在指定的 Schema topic 中。Data topic 中的音讯只蕴含 schema_fingerprint 信息,这样能够升高序列化后音讯包的大小。Pulsar To TiDB 启动时,从 Schema topic 生产数据,应用 schema_fingerprint 为 Key 将 table schema 缓存在内存中。反序列化 Data Topic 中的音讯时,从缓存中依据 schema_fingerprint 提取 table schema,对 payload 进行反序列化操作。
5. 一致性保障
要保障音讯有序和去重,须要从 broker、producer、consumer 三方面进行设置。
Broker
- 在 namespace 级别开启去重性能:bin/pulsar-admin namespaces set-deduplication namespace –enable
- 修复 / 优化 Pulsar 客户端死锁问题。2.7.1 版本已修复,详细信息可参考 PR 9552。
Producer
- pulsar.producer.batchingEnabled=false
在 producer 设置中,敞开批量发送。如果开启批量发送音讯,则音讯可能会乱序。
- pulsar.producer.blocklfQueueFull=true
为了提高效率,咱们采纳异步发送音讯,须要开启阻塞队列解决,否则可能会呈现音讯失落。
调用异步发送超时,发送至异样 topic。如果在异步超时重发消息时,呈现音讯反复,能够通过开启主动去重性能进行解决;其它状况下呈现的音讯发送超时,须要独自解决,咱们将这些音讯存储在异样 topic 中,后续通过对账程序从源库间接获取终态数据。
Consumer
实现拦截器:ConsumerInterceptorlmpl implements ConsumerInterceptor 配置确认超时:pulsarClient.ackTimeout(3000, TimeUnit.MILLISECONDS).ackTimeoutTickTime(500, TimeUnit.MILLISECONDS) 应用累积确认:consumer.acknowledgeCumulative(sendMessageID)
备注:配置确认超时参数,如果没有在 ackTimeout 工夫内进行生产确认的话,音讯将从新投递。为了严格保障一致性,咱们须要应用累计确认形式进行确认。
6. 音讯生产的确认形式
如果在 MessageID 为 1 的音讯已确认生产胜利,开始采纳累积确认形式,此时正在确认 MessageID 为 3 的音讯,则已生产但未确认的 MessageID 为 2 的音讯也会被确认胜利。如果在“确认超时”工夫内始终未收到确认,则会依照原程序从新投递 MessageID 为 2、3、4、5 的音讯。
如果采纳单条确认形式,图中 MessageID 为 1、3、4 的音讯确认生产胜利,而 MessageID 为 2 的音讯“确认超时”。在这种状况下,如果应用程序处理不当,未依照生产程序逐条确认,则呈现音讯“确认超时”时,只有产生超时的音讯(即 MessageID 为 2 的音讯)会被从新投递,导致生产程序产生错乱。
总结:队列生产模式倡议应用单条确认形式,流式生产模式倡议应用累积确认形式。
7. 音讯确认超时(客户端)检测机制
确认超时机制中有两个参数,超时工夫和轮询距离。超时检测机制通过一个双向队列 + 多个 HashSet 实现。HashSet 的个数为(超时工夫)除以(轮询距离)后取整,因而每次轮询解决一个 HashSet,从而无效躲避全局锁带来的性能损耗。
场景 2:音讯队列:OpenMessaging 协定实现(通明层协定)
咱们过来应用的很多业务零碎都和音讯零碎强耦合,导致后续降级和保护很麻烦,因而咱们决定应用 OpenMessaging 协定作为中间层进行解耦。
- 通过 Pulsar 实现 OpenMessaging 协定。
- 开发框架(基于 spring boot)调用 OpenMessaging 协定接口,发送和接管音讯。
场景 3:流式队列:自定义 Kafka 0.8-Source(Source 开发)
Pulsar IO 能够轻松对接到各种数据平台。咱们的局部业务零碎应用的是 Kafka 0.8,官网没有提供对应的 Source,因而咱们依据 Pulsar IO 的接口定义,开发了 Kafka 0.8 Source 组件。
场景 4:流式队列:Function 音讯过滤(音讯过滤)
咱们通过 Pulsar Functions 把 Pulsar IDC 集群音讯中的敏感字段(比方身份证号,手机号)脱敏后实时同步到星散群中,供云上利用生产。
场景 5:流式队列:Pulsar Flink Connector 流式计算(流式计算)
商户经营剖析场景中,Flink 通过 Pulsar Flink Connector 连贯到 Pulsar,对流水数据依据不同维度,进行实时计算,并且将计算结果再通过 Pulsar 长久化到 TiDB 中。从目前的应用状况来看,Pulsar Flink Connector 的性能和稳定性均体现良好。
场景 6:流式队列:TiDB CDC 适配(TiDB 适配)
咱们须要基于 TiDB 数据变更进行实时抓取,但 TiDB CDC For Pulsar 序列化形式不反对 AVRO 形式,因而咱们针对这一应用场景进行了定制化开发,即先封装从 TiDB 收回的数据,再投递到 Pulsar 中。TiDB CDC For Pulsar 组件的开发语言为 Go 语言。
将来布局
咱们基于 Pulsar 构建的根底音讯平台无效进步了物理资源的应用效率;应用一套音讯平台简化了系统维护和降级等操作,整体服务质量也得以晋升。咱们对 Pulsar 的将来应用布局次要包含以下两点:
- 陆续下线其它音讯零碎,最终全副接入到 Pulsar 根底音讯平台;
- 深度应用 Pulsar 的资源隔离和流控机制。
在实际过程中,借助 Pulsar 诸多原生个性和基于 Pulsar 开发的组件,新音讯平台完满实现了咱们预期的性能需要。
作者简介
姜殿璟,基础架构部架构师,负责根底音讯平台及其生态的建设与经营,团队单干将 Apache Pulsar 引入到拉卡拉外围架构中,并且在强一致性的流式生产场景和队列生产场景中获得了成功实践,目前次要负责 Pulsar 性能调优、新性能开发及 Pulsar 生态集成。
点击链接,获取 Apache Pulsar 硬核干货材料!