关于大数据:Apache-Pulsar-在-BIGO-的性能调优实战下

52次阅读

共计 10842 个字符,预计需要花费 28 分钟才能阅读完成。

作者:陈航,BIGO 大数据音讯平台团队负责人。
本期文章排版:Tango@StreamNative。

对于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
GitHub 地址:http://github.com/apache/pulsar/

背 景

在上一篇博客中,咱们探讨了 BIGO 在 Pulsar Broker 性能调优过程中遇到的一些问题并提出相应的解决方案。本篇博客,咱们将探讨 BIGO 在 Pulsar 底层分布式存储服务 BookKeeper 的性能调优工作。

对于 BIGO 而言,Apache Pulsar 在 bookie 端(BookKeeper 的单个存储节点)的零碎性能次要存在以下几个问题:

  1. 读申请耗时较长,排队重大;
  2. Bookie 呈现 direct memory Out of Memory (OOM),导致过程挂掉;
  3. 压测的时候经常出现 broker direct memory OOM;
  4. 当 journal 盘为 HDD 时,尽管敞开了 fsync,然而 bookie add entry 99th latency 仍旧很高, 写入性能很差;
  5. 当大量读申请进入 bookie 时,呈现写被反压,add entry latency 回升。当 Ledger 盘为 HDD 时,体现更加显著。

保障 BookKeeper 的稳定性以及高吞吐和低提早是 Pulsar 稳固、吞吐的基石。本文会基于 BookKeeper 基本原理介绍影响读写吞吐和稳定性的因素。咱们打算次要从以下六个方面介绍 bookie 性能调优:

  • Pulsar Topic Message 写入 / 读取流程
  • BookKeeper Request IO 申请调优
  • Ledger Memtable 刷盘策略调优
  • Journal 刷盘策略调优
  • Entry 读取性能调优
  • GarbageCollector (GC) 性能优化

环境部署与监控

在介绍 Bookkeeper 性能调优之前,咱们须要为零碎增加详尽的监控指标,并且要明确各监控指标背地的含意及关联关系。咱们曾经在 Apache Pulsar 在 BIGO 的性能调优实战(上)中详细描述过环境部署与监控这部分内容,这里不再赘述。

Pulsar Topic Message 写入 / 读取流程

为了形容更加清晰易懂,咱们首先从 Pulsar Topic message 写入 / 读取角度介绍音讯流转全貌,而后联合 bookie 外部实现原理再进行介绍。

因为 Pulsar 默认应用 dbLedgerStorage 存储格局,所以本博客选取 dbLedgerStorage 的实现形式进行解说。

Topic Message 写入流程

当客户端向 BookKeeper 中写入一条 entry(每条 entry 具备惟一的 <legerId, entryId> 元组标识)时,流程如下:(为了叙述不便,此处省略写入 journal 的过程)

  1. 将 entry 放入 Netty 线程解决队列中,期待 Netty 线程进行解决。
  2. Netty 线程会顺次从队列中获取每一个 entry,依据该 entry 的 ledgerId 进行取模,抉择写入的指标磁盘(ledger 盘)。取模算法为:ledgerId % numberOfDirs,其中 numberOfDirs 示意 bookie 过程配置的 ledger 目录的个数。
  3. 抉择指标磁盘对象后,将索引写入 cache 和 rocksDB 进行长久化存储,将 payload 写入 memtable(这是一个内存双缓冲),期待排序和回刷。
  4. 当 memtable 的一个缓冲存满之后,会触发 flush,将 payload flush 到 PageCache 中,再由 PageCache 回刷到 disk 中。

Topic Message 读取流程

当客户端须要读取某一个 entry(每条 entry 具备惟一的 <ledgerId, entryId> 元组标识)时,流程如下:

  1. 从 ZooKeeper 中获取 entry 所在 ledger 的 metadata。metadata 存储该 ledger 正本所在的 bookie 节点地址,如:Ensembles: [bookie1, bookie2]。
  2. 向其中一个 bookie 发送 entry 读取申请(为了叙述不便,此处省略客户端执行的一系列容错、熔断策略)。
  3. bookie1 收到 read entry 申请后,依据 ledgerId 进行 hash,抉择对应的 readerThread,并将申请放入该 readerThread 的申请解决队列。
  4. readerThread 顺次从申请队列中取出申请,依据 ledgerId 取模,抉择该 ledger 所在的磁盘。
  5. 抉择指标磁盘对象后,首先查看 memtable、readAheadCache 中是否曾经缓存指标 entry。如果有,则间接返回。否则,读取 rocksDB 索引,进而读取磁盘上的指标数据,并将读取到的数据加载到 readAheadCache 中。

以上是 message 写入 / 读取的主体逻辑。那么 message 具体解决细节是怎么的?当咱们的生产零碎中呈现 message 写入慢、读取慢等状况时,咱们如何疾速定位问题,并进行针对性优化?这就须要咱们对 BookKeeper 的 IO 模型具备较为深刻的了解,必要时须要联合 Linux IO 协定栈进行针对性调优。

BookKeeper Request IO 申请调优

咱们首先介绍 BookKeeper 应用 Netty 解决 Request IO 模型,包含 Add Entry Request 和 Read Entry Request 解决流程,并附上每一个解决步骤的监控项名称和含意,而后再针对 BIGO 生产环境遇到的性能问题给出相干解决方案。

BookKeeper Request IO 申请流程应用以下监控指标。

  • bookkeeper_server_BookieReadThreadPool_queue_[0..16]:队列中排队的申请个数;
  • bookkeeper_server_READ_ENTRY_REQUEST:Request 从进入申请队列到被解决实现的时延;
  • bookkeeper_server_BookieReadThreadPool_total_tasks_0:正在被 read 线程解决的申请个数(掂量 read 吞吐能力);
  • bookie_read_entry:Read 申请从开始解决到解决实现的耗时。

依据图示的步骤,上面简要介绍从 BookKeeper Client 发动 Request 申请到被 Bookie 解决的整个过程:

  1. Bookie Server 启动过程中,首先启动 Netty Server(epoll)。默认调配两倍 CPU 核数的线程(thread)解决网络申请。每一个线程领有独立的 thread channel(默认长度为 10000),缓存接管到的 Request,并监听网络端口(默认端口:3181)。
  2. 当 Netty Server 接管到 Client 的 request 申请时,会依据 client sessionId 进行哈希取模,映射到对应的 Netty thread Channel 中,期待相应的 Netty thread 进行解决。
  3. 每个网络解决线程会从各自的 thread channel 中取出待处理的 Request,顺次进行如下解决:LengthFieldBasedFrameDecoderLengthFieldPrependerRequestDecoderResponseEncoderServerSideHandlerrequestHandler。其中 requestHandle 是真正解决申请的操作,其余的都是进行解包等预处理。
  4. BookieRequestProcessor##processRequest 办法中, 依据申请类型调用不同办法进行解决,解决办法有:ADD_ENTRYREAD_ENTRYFORCE_LEDGERAUTHWRITE_LACREAD_LACGET_BOOKIE_INFO START_TLS 等。这是 bookie server 解决不同类型 request 的总入口(这里我只应用最罕用的 ADD_ENTRY READ_ENTRY 进行剖析)。
  5. ADD_ENTRY
  • 将 request 和 channel 传给 WriteEntryProcessorV3 生成相应实例,并调用 run 办法启动。
  • run 办法中,首先调用 addEntry 办法将 entry 写入 bookie 中,并返回处理结果。
  • 将处理结果封装到 sendResponse 中,调用 writeChannel 写出到 netty thread channel,并发送给客户端。
  1. READ_ENTRY
  • 将 Read Request 封装成 ReadEntryProcessorV3 实例,应用 LedgerId 依照线程池大小取模抉择一个 Reader 解决线程,并将 Read Request 退出该线程的解决队列中(每个线程领有独立的队列,长度默认配置为 2500。如果排队长度超过最大值,则会被阻塞)。
  • Reader 解决线程一直从本人的队列中取出 Read Request,调用 ReadEntryProcessorV3 实例的 safeRun 办法读取数据。
  • 数据读取过程是依据 LedgerId 抉择对应的 Ledger 实例将 entry data 读取回来,并塞入 readResponse 中。
  • 将 Read Response 返回给 Netty Thread Channel,再由 Netty 对立发送给客户端。

在压测过程中,咱们发现某些 ledger 读取很慢。联合监控,咱们发现 reader 解决线程的期待队列排队状况散布不均,某些 reader 解决线程排队重大。

联合 READ_ENTRY 解决模型,ledger 读取是依照总 reader 线程数取模。能够采取以下两种解决方案缓解这个问题:

  • 减少 reader 解决线程数,缓解解决压力。
  • 为 topic 减少 partition,扩散读取压力。

因为所有读写申请在被解决之前都会被退出相应队列中排队,控制参数别离为: maxPendingReadRequestsPerThread maxPendingAddRequestsPerThread。如果上游解决变慢,可能造成期待队列排满,加大 Direct Memory OOM 的危险。缓解计划如下:

  • 管制期待队列长度。对于无奈疾速解决的申请,间接返回 error。
  • 放慢 Journal 写入和 Ledger 写入 / 读取的处理速度。

Ledger Memtable 刷盘策略调优

数据写入 Journal 之前,须要先保障 Memtable 写胜利。Memtable 的设计是一个内存双缓冲,单个缓冲区默认容量大小为 Direct Memory Size * 1/4 * 1/numberOfLedgers * ½

当咱们向 Memtable 中写入一条 Entry 数据时,会间接将 entry 写入 WriteCache,此时有三种状况:

  • 如果 WriteCache 仍有残余空间,间接将 entry 写入 WriteCache,而后返回胜利。
  • 如果 WriteCache 曾经写满,然而 writeCacheBeingFlushed 是空的,则触发双缓冲旋转,将 entry 写入空 WriteCache 中,并启动独立线程触发排序和回刷。
  • 如果 WriteCache 曾经写满,writeCacheBeingFlushed 尚未实现回刷,entry 写入被阻塞直到 writeCacheBeingFlushed 回刷实现。

WriteCache 排序回刷过程如下:

  1. 应用疾速排序算法对 WriteCache(通过非凡设计的 HashMap)中的数据进行排序。排序后,同一 Ledger 的相近 entry 排在一起,便于读取的时候 OS 预读。
  2. 将排序后的索引写入 RockDB 中。
  3. 将排序后的数据 flush 到 PageCache 中。

咱们能够配置参数 flushEntrylogBytes 来管制将 entry 从 PageCache 中 flush 到 Disk 的频率。

须要留神的是,PageCache 中的 entry flush 到 disk 后,不会 evict 掉 PageCache 中的数据,目标是为 entry 读取提供缓存。

在 HDD 作为 Ledger 盘的场景下,如果一次从 PageCache 中 flush 到 Disk 的数据量太大,容易导致磁盘 IO Util 继续打满,PageCache 回刷变慢,WriteCache flush 到 PageCache 的速度也会变慢,最终导致 entry 写入被阻塞。

失常状况下,WriteCache 中的数据排序后,flush 到 PageCache 中即可返回。整个过程都是写内存,无非是将数据从用户态拷贝到内核态,PageCache 回刷和 WriteCache flush 是异步解耦的,PageCache 回刷变慢不应该影响 WriteCache flush 速度。问题在于,WriteCache flush 的数据最终都会写进 entry log 文件中。当 entry log 文件产生滚动时,须要期待所有 PageCache 中相干数据都 flush 到 disk 中才会将 entry log 文件敞开,并创立新 entry log 文件接管新数据的写入。因而,一旦 PageCache 回刷变慢,最终也会影响 WriteCache flush 到 PageCache 的速度。

在 Catchup Read 场景下,因为 Ledger 盘须要提供 entry 读取,会造成磁盘宏观上随机读,宏观上程序读(排序带来的成果)。此时既有数据写入,也有数据读取,对于磁盘而言,这是读写混合的场景。为了提供读写吞吐,咱们须要想方法升高磁盘读的频率,这部分的调优须要从 Linux IO 协定栈登程。

Journal 刷盘策略调优

ADD_ENTRY 操作最终采纳调用 Bookie#addEntry 办法执行 entry 写入操作。Entry 写入首先会写 Memtable,待写胜利后再写 Journal,这个过程是串行的。

假如咱们配置了多个 journal 目录,那么咱们具体抉择哪一个 journal 进行写入?咱们采纳 ledgerId % numberOfJournalDirs 取模算法,抉择 journal 实例。而后将 add request(蕴含数据 payload)放入待处理队列中,由专门的 journal thread 进行解决。用户能够通过配置 bookkeeper.conf numJournalCallbackThreads 参数,管制 journal thread 线程数,默认为 8。

写申请放入 journal 待处理队列后,解决步骤如下:

  1. Journal 实例有独立运行的后盾线程,一直从 queue 中取出 QueueEntry 进行以下后续解决(如果 queue 中没有 QueueEntry,则阻塞)。
  2. 将 QueueEntry 中 entry data 剥离进去,写入 BufferedChannel(每个 journal file 对应一个 BufferedChannel。当上一个 journal file 滚动时,须要创立下一个 journal file 并与新的 BufferedChannel 进行关联)。
  3. 将已剥离 entry data 的 request meta 放入 toFlush qeueue 中。一旦满足以下任一条件,toFlush queue 就会进行 flush 操作:
  • toFlush queue 中的 entry 等待时间超过了阈值。
  • toFlush queue 大小超过了 buffWrite 阈值或者 bufferedEntries 阈值。
  1. 将数据 flush 到 PageCache。
  • 将 BufferedChannel 中的数据 flush 到 OS 的 PageCache 中。
  • 满足以下任一条件,就生成 forceWriteRequest 申请,并将 forceWriteRequest 申请退出 forceWriteRequests 队列(BookKeeper 4.9.2 版本,只有满足前两个条件中的任一条件就会 flush;BookKeeper 4.10 版本每次都会 flush;以后 master 分支代码须要满足以下任一条件就会 flush):
  • 开启 journalSyncData。
  • Journal file 达到了最大 size,须要进行文件滚动。
  • 间隔上一次 flush PageCache 距离达到了最长距离(默认 1s)。
  1. 将 PageCache 中数据 flush 到 Disk。forceWriteRequests queue 领有独立的后盾线程,一直从队列中取出 forceWriteRequest,而后将 PageCache 中数据 flush 到 Disk。这里会波及到 ADD_ENTRY ACK 机会的问题,次要区别在于是否开启了 JournalSyncData(Pulsar 默认开启,即每次刷盘实现之后才返回 ACK)。
  • 如果开启 JournalSyncData,则在 flush Disk(步骤 5)之后才返回 ACK。
  • 如果敞开 JournalSyncData,则在 flush PageCache(步骤 4)之后就会执行回调, 返回 ACK。

为了更加清晰地发现 journal 写入过程中的瓶颈,咱们须要弄清楚每一个监控项的含意,并配置 Grafana 监控指标,便于疾速定位问题。下图是每一个 queue 的长度监控以及每一阶段的解决耗时。

须要留神的是:

  • BufferedChannel 与 journal transection file 间接关联。如果没有创立新的 journal file,journal add request 解决会被阻塞。尽管上一个 journal file roll 与下一个 journal file 创立是异步进行,然而如果此刻 journal 盘 io util 继续打满,新 journal file 创立被阻塞(IO 等待时间较长),journal 写入耗时就会上涨。须要关注的指标为:bookie_journal_JOURNAL_CREATION_LATENCY
  • 当 journal 盘为 HDD 时,咱们个别会将 journalSyncData 开关敞开,让数据写入到 PagaCache 中就返回 ACK,从而升高 entry 写入提早。然而,在压测过程中,咱们发现敞开 journalSyncData 后,add entry 99th latency 非常不稳固,偶然会到几秒甚至十几秒。这是因为 PageCache 回刷带来的磁盘 IO 抖动。触发 PageCache 中数据回刷到磁盘有三个机会:
  • OS 每距离 30s 回刷一次。
  • PageCache dirty page 超过阈值会触发回刷。
  • Roll File 会触发回刷。

这三种回刷策略的共同之处在于一次回刷的数据量很大。一次回刷大量数据会造成 HDD 磁盘短时间内继续 IO Util 打满,内核下发的其余 IO 申请会在调度队列中排队,包含 new journal file create 申请。为此,咱们在 PR 2287 中提出了分时 flush PageCache 的策略,管制单次从 PageCache 中 flush 到 disk 的数据量,从而管制 disk io util 打满工夫。从 BIGO 压测实际来看,开启此策略后,HDD 达到了近似 SATA SSD 的性能。

因为所有 Journal 写申请首先都会被放到 Journal Queue 中,如果上游处理速度变慢,Journal Queue 中可能积攒大量 Add Request 期待解决,且耗费大量内存,减少 Direct Memory OOM 的危险。缓解计划如下:

  • 应用 journalQueueSize 参数管制 Journal Queue 大小,默认为 10000。
  • 放慢 Journal 线程数据写入处理速度。

Entry 读取性能调优

Entry 读取解决流程如下图所示,传入参数为(ledgerId, entryId)。

  1. 依据 ledgerId 依照 ledgerId % numberOfLedgers 取模,抉择 entry 所在的指标 Ledger 磁盘实例。
  2. 查看该 Ledger 实例对应的 MemTable(writeCache 和 WriteCacheBeingFlushed)中是否存在想要读取的 entry(依照 <ledgerId, entryId> 为 Key,从索引中查问,工夫复杂度为 O(1))。如果 cache hit,间接返回 entry。
  3. 查看 Read Cache 中是否有想要读取的 entry(依照 <ledgerId,entryId> 为 Key,从索引中查问,工夫复杂度为 O(1))。如果 Read Cache 命中,则间接返回 entry。
  4. 如果 Read Cache 也没有命中,就从主存中读取数据,并且启动预读(预读是一次读一个 entry,屡次循环读取)并将读取的所有 entry 退出 Read Cache 进行缓存。整个预读流程是串行的,预读的 entry 数量由参数 dbStorage_readAheadCacheBatchSize 管制。这个参数配置的越大,Read Cache Miss 时带来的长尾提早越高。

预读流程如下:

  1. 获取 LedgerId 对应的 Ledger File 实例,并查问 BufferedLogChannel 中是否存在想要读取的 entry。
  • 如果有,则间接返回 entry。
  • 如果没有,启动 RandomAccessFile 并套上 internalNioBuffer,批量从文件中读取一个 batch 数据到 readBuffer 中。batch 大小由 readBufferSizeBytes 参数管制。
  1. 将预读取的 entry 放入 BufferedLogChannel Cache 中缓存,并将后果返回。

预读过程中,如果读取的是热数据,仍在 PageCache 中,则间接从 PageCache 读取,否则从 Disk 读取。从 Disk 读取数据时,OS 是有预读性能的,会将读取的数据缓存在 PageCache 中。因为 Bookie 并不齐全依赖 PageCache 做缓存命中,所以 Catch up 读带来的 PageCache 净化对整体影响较小。

当读取某一个 entry 时,须要依据指标 entry 在 entry log file 中的偏移量(索引)进行读取。Bookie 为了减速索引读取,将索引保留在 RocksDB 中,咱们须要保障索引在 RocksDB 中查问的命中率,那么 RocksDB 缓存大小配置成为要害,相干配置参数为 dbStorage_rocksDB_blockCacheSize

GarbageCollector(GC)性能优化

Bookie 在解决数据写入时,会将同一段时间内写入的数据(可能归属于多个 ledger 的 entry)通过排序后 flush 到同一个 entry log 文件中,将索引寄存在 RocksDB 中。这就带来了一个问题:当某些 topic 数据过期或者被删除时,过期数据关联的所有 ledger 都应该被清理掉。然而,因为多个 ledger 同一段时间内的数据被写入到同一个 entry log 文件,清理过程就会变得略微简单些。

BookKeeper 的解决计划是:启动一个独立的 GarbageCollector(GC)线程来实现清理工作。GC 清理线程分为 minorCompaction majorCompaction,两者的区别在于阈值不同。默认状况下, minorCompaction 清理距离为 1 小时,阈值为 0.2; majorCompaction 清理距离为 24 小时,阈值为 0.5。这里阈值指一个 entry log file 中无效数据占比。当 Topic 中有数据过期时,零碎会将对应 ledger 在 ZooKeeper 中的 metadata 标记为已过期。GC 线程会定时扫描每个 entry log 中所有 ledger 的过期状况,并统计残余无效数据比例。

GC 的解决形式为顺次读取 entry log 文件中每一个 entry,判断 entry 是否过期。如果曾经过期,则间接抛弃,否则将其写入新 entry log 文件中,并更新 entry 在 RocksDB 中的索引信息。

尽管 GC 过程是顺次读取 entry log 文件中的 entry,对磁盘而言是程序读,然而如果此时有大量数据写入,则变成了读写混合场景,机械硬盘在读写混合场景下性能会急剧下降。从景象来看,当 bookie 产生 GC 时,数据读写吞吐会呈现抖动。

为了升高 GC 带来的影响,BookKeeper 提供了两种限速策略:依照 entry 限速和依照 bytes 限速。默认依照 entry 来限速,即每秒读取 entry 的最大数量,由参数 compactionRate 进行管制,默认值是 1000。也能够依照 bytes 来限速,即每秒读取多少字节数据,由参数 isThrottleByBytes compactionRateByBytes 一起管制,默认值是 1000000。

因为每个 entry 承载的数据量大小各异,依照 entry 限速可能会引起从磁盘读取的数据量抖动,从而影响失常数据读写。因而倡议依照 bytes 进行限速。

Linux IO 协定栈优化

对于 Linux IO 协定栈,咱们倡议从以下方面进行优化:

  1. HDD 磁盘应用 CFQ(Completely Fair Queuing,齐全偏心队列调度算法)调度算法,SSD 磁盘应用 NOOP 调度算法。
  2. 应用基于 OpenCAS 的 SSD 进行读缓存减速。在缓存层采纳不同的替换策略,升高读申请下沉到 ledger 盘的概率,尽可能保障 ledger 盘的程序读写,从而进步读写吞吐。这一部分 BIGO 正在进行相干压测工作,后续停顿会及时同步。

总 结

本文从 message 写入和读取角度介绍了 BookKeeper 音讯解决运行机制,并具体阐明对性能产生影响的要害要点。在叙述过程中,咱们着重联合每一个环节的监控指标对运行机制进行解说。在遇到性能或者稳定性问题时,大家能够先依据监控指标进行排查,疾速定位出问题所在,并联合原理进行剖析和解决。

在机械硬盘场景下,如何优化 IO 性能是保障 BookKeeper 稳定性和吞吐量的要害。在设计上,BookKeeper 尽可能保障了 Journal 和 Ledger 的程序写,但仍旧无奈防止读写混合的烦扰,如 Catch up 读、Compaction、Auto Recovery 等,机械硬盘在读写混合场景下性能会急剧下降。对于 Compaction 和 Auto Recovery,咱们能够通过限速来缓解这个问题,对于 Catch up 读,咱们能够思考采纳 SSD 缓存来进步读命中率,升高随机 IO 下沉到机械硬盘的概率。BIGO 音讯队列团队会继续在 IO 层进行一系列优化,定制相干缓存策略,保证系统吞吐和稳固。

对于作者

陈航,BIGO 大数据音讯平台团队负责人,负责承载大规模服务与利用的集中公布 - 订阅音讯平台的创立与开发。他将 Apache Pulsar 引入到 BIGO 音讯平台,并买通上下游零碎,如 Flink、ClickHouse 和其余实时举荐与剖析零碎。他目前聚焦 Pulsar 性能调优、新性能开发及 Pulsar 生态集成方向。

相干浏览

  • Apache Pulsar 在 BIGO 的性能调优实战(上)
  • Apache Pulsar 在能源互联网畛域的落地实际
  • Apache Pulsar 在腾讯 Angel PowerFL 联邦学习平台上的实际

点击 链接,获取 Apache Pulsar 硬核干货材料!

正文完
 0