作者 | 冉小龙
Apache Pulsar 作为云原生时代音讯流零碎,采纳存储计算拆散架构,反对大集群、多租户、百万级 Topic、跨地区数据复制、长久化存储、分层存储、高可扩展性等企业级和金融级性能。Apache Pulsar 提供了对立的生产模型,反对音讯队列和流两种场景,既能为队列场景提供企业级读写服务质量和强一致性保障,又能为流场景提供高吞吐、低提早。
Apache Pulsar 在腾讯云中曾经失去大规模的生产实践,在过来一年中承接了诸多行业生态中不同的应用场景。在理论的生产实践中,腾讯云针对 Apache Pulsar 做了一系列的性能优化和稳定性性能方面的工作,来保障用户在不同的场景下零碎的稳固高效的运行。本文围绕腾讯云近一年在 Pulsar 稳定性和性能方面优化最佳实际。
Pulsar 在腾讯云百万级 Topic 上的利用
为什么抉择在生产环境中应用 Pulsar?
此前该用户应用 Kafka 集群来承载业务,因为业务的特定场景,集群的整体流量绝对不大,然而须要应用的 Topic 较多。此前应用 Kafka 集群时,因为 Kafka 本身架构的限定,用户不能在一套集群中创立较多的 Topic,所以为了满足业务多 Topic 的应用场景,须要部署多套 Kafka 集群来满足业务的应用,导致业务应用的老本较大。
Pulsar 自身除了具备 Pub-Sub 的传统 MQ 性能外,其底层架构计算存储拆散,在存储层分层分片,能够很容易地把 BookKeeper 中的数据 offload 到便宜存储上。Pulsar Functions 是 Serverless 的轻量化计算框架,为用户提供了 Topic 之间直达的能力。在开源之前,Pulsar 已在 Yahoo! 的生产环境中经验 5 年的打磨,并且能够轻松扩缩容,撑持多 Topic 场景。为了升高应用的老本,同时满足多 Topic 的业务场景,该用户切换到了 Pulsar 的集群上。
以后该用户的一套 Pulsar 集群能够承载 60W 左右的 Topic,在很好地满足了业务应用的场景的同时升高了应用老本。
Apache Pulsar 稳定性优化实际
实际 1:音讯空洞的影响及躲避措施
应用 Shared 订阅模式或单条 Ack 音讯模型时,用户常常会遇到 Ack 空洞的状况。Pulsar 中独自形象出了 individuallyDeletedMessages 汇合来记录空洞音讯的状况。该汇合是开闭区间汇合,开区间表明音讯是空洞音讯,闭区间表明音讯已被解决。晚期 Pulsar 反对单条 Ack 和批量 Ack 两种模型,后者对标 Kafka 的 Ack Offset。引入单条 Ack 模型次要针对在线业务场景,但也因而带来了 Ack 空洞问题。Ack 空洞即下图中 individuallyDeletedMessage
所展现的汇合。
如何了解 individuallyDeletedMessage?以下图为例:
该记录中第一个 Ledger id 是 5:1280,该汇合是闭区间,阐明音讯曾经被 Ack;之后的 5:1281 是开区间,阐明音讯没有被 Ack。这里就用开闭区间的模式来辨别音讯是否被 Ack。
Ack 空洞的呈现起因可能因为 Broker 解决失败,源于晚期版本的设计缺点,Ack 解决没有返回值。在 2.8.0 及以上版本中,对事务音讯反对上引入了 AckResponse 概念,反对返回值。因而在晚期版本中,调用 Ack 后无奈确保 Broker 能够正确处理 Ack 申请。第二个起因可能因为客户端出于各种起因没有调用 Ack,在生产实践中呈现较多。
为了躲避 Ack 空洞,一种办法是准确计算 Backlog Size。因为在 Broker 上解析 Batch 音讯会节约性能,在 Pulsar 中对 Batch 音讯的解析在消费者侧,因而一个 Entry 可能是单条音讯也可能是 Batch 音讯的。后者状况下 Batch 内的音讯数量或状态是未知的。为此要准确计算 Backlog Size,但通过调研发现这种办法的复杂性和难度较大。
另一种办法是 Broker 的被动弥补策略。因为 individuallyDeletedMessage 存储在每一个 ManagedCursor,也就是每一个订阅对象到 Broker 理论类中的映射。每一个订阅都能够拿到对应的 individuallyDeletedMessage 汇合,Broker 就能够被动把汇合推送到客户端,也就是被动弥补。
接下来咱们理解一下 Broker 被动弥补机制,即 Backlog 策略。在理解弥补机制之前,先要理解 Topic 可能的散布与形成。
失常来说,生产者向 Topic 公布音讯,消费者从 Topic 接管音讯。如上图,红、灰、蓝色代表音讯在 Topic 中的三种状态。Pulsar 中引入了 Backlog 策略,用来形容生产者和消费者之间的 Gap。该策略提供了三种选项,包含 Producer Exception、Producer Request Hold 和 Consumer Backlog Eviction。
其中,Producer Exception 绝对用户敌对,在生产环境中更加罕用。当音讯沉积到肯定水平,消费者解决音讯的能力有余时,Producer Exception 会告诉生产者呈现了问题。Producer Request Hold 原理雷同,然而 Producer Request Hold 只是会让生产者进行发送,而不会告知其起因(即不会向业务侧返回标识),用户感知为 Producer 进行发送音讯然而无异样抛出。而 Consumer Backlog Eviction 则会主动抛弃最早的音讯来保障音讯继续解决,可能导致丢音讯的状况呈现。
此外,还须要留神的是 Pulsar 计算 Backlog Size 的形式。上图能够了解为一个事件流,生产者源源不断地 append message。Pulsar 计算 Backlog Size 时,是计算从以后 MarkedDeletedPosition 的地位,到 ReadPosition 的地位之前的 Backlog Size,而后联合 Producer Exception 策略裸露进去。如果 Ack 空洞,比方 Broker 侧申请失败,或者客户代码产生异样导致 Ack 永远不会被调用,Backlog Size 会达到肯定速率,就相当于限度生产者。上图中,M4 和 M2 是两条空洞音讯,呈现这样的空洞音讯时,生产者的发送流就迟早会被打断。
Broker 被动弥补机制的实现形式如上图。因为 individuallyDeletedMessage 记录了所有音讯的 Ack 胜利与否的状态,就能够从中获取 MarkedDeletedPosition 地位的音讯,开启一个 Executor Service 定时工作,设置监听频率,距离一段时间将音讯从新推送到客户端侧,实现 Broker 的被动弥补,防止 Ack 空洞导致 Producer Exception 被频繁触发。
实际 2:再谈 TTL、Backlog 及 Retention 策略
咱们先看下这三个概念:
- TTL:示意音讯在指定工夫内没有被用户 Ack 时会在 Broker 被动 Ack。
- Backlog:示意生产者发送的音讯与消费者接管音讯之间的差距。
- Retention:示意当音讯被 Ack 之后,持续在 Bookie 侧保留多久的工夫,以 Ledger 为最小操作单元。
如果 TTL 和 Retention 同时设置,那么一条音讯的生命周期该如何计算?来看以下代码:
void updateCursor (ManagedCursor Impl cursor, PositionImpl newPosition) t
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated (cursor, newPosition);
if (pair == nulL) {
Cursor has been removed in the meantime
trimConsumedLedgersInBackground();
return;
}
PositionImplpreviousSlowestReader = pair.getLeftO);
PositionImpl currentSlowestReader = pair.getRightO);
if (previousSlowestReader.compareTo(currentSlowestReader)==0){
// The slowest consumer has not changed position. Nothing to do right now
return;
}
//Only trigger a trimming when switching to the next Ledger
if (previousSlowestReader.getLedgerId() != newPosition.getLedgerId0)) f
trimConsumedLedgersInBackground();}
- TTL:依据设置的工夫(默认五分钟)定期检查,依据触发的策略不断更新 cursor 地位,解决音讯过期。
- Retention:查看 Ledger 的创立工夫(通过元数据工夫戳能够理解 Ledger 的生命周期)以及 Entry 的大小两个阈值来决定是否删除某一个 Ledger。
在以上代码中的最初三行中,将之前最慢的 LedgerId 与 newPosition 的 LedgerId 比照,查看 ManagedLedger 是否产生过切换,一旦切换就调用 trimConsumedLedgersInBackground()。该函数办法的外围代码策略就是 Retention 的逻辑。
由此可知:
- 当 TTL 工夫小于 Retention 工夫时,音讯的残缺生命周期就是 TTL 工夫 + Retention 工夫;
- 当 TTL 工夫大于等于 Retention 工夫,音讯的生命周期就是 TTL 工夫。
这里又引出了一个新问题:TTL 策略为什么要抉择在 Ledger 切换的机会来触发 Ledger 的删除操作呢?因为 Retention 删除 Ledger 时是以 Ledger 为最小操作单元。如果 Ledger 不切换,Retention 也不会触发删除。所以上述代码逻辑会抉择切换机会来交给 Retention 执行删除动作。
实际 3:提早音讯与 TTL 的关系
在团队已经遇到的场景中,某用户发送了数十万提早音讯,提早设置为十天,但 TTL 过期工夫设置为五天,五天后所有提早音讯都已被过期。咱们能够从源码层面看一下 TTL 策略。
public boolean expireMessages(int messageTTLInSeconds) {if (expirationCheckInProgressUpdater.compareAndSet( obj: this, FALSE, TRUE)) {log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,
messageTTLInSeconds);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
try {long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return Messaqelmpl.isEntryExpired(messageTTLInSeconds. entryTimestamp);
} catch (Exception e) {log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
} finally {entry<release();
}
return false;
}, callback: this, ctx: null);
return true;
public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
return messageTTLInSeconds != 0
&& (System.currentTimeMillis() >
entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
}
TTL 的外围逻辑是通过 cursor 传入的值决定音讯是否过期,即是否能找到 Entry。TTL 只获取了音讯的公布工夫,却没有理睬音讯的提早设置。联合下面两段代码,isEntryExpired 只关怀 PublishedTime 工夫戳元数据属性,FindNewestMatchingEntry 对象时能够从元数据中获取 PublishedTime。所以当提早设置小于 TTL 工夫就会导致提早音讯被过期,在用户侧就会发现音讯失落。
针对这一问题,腾讯团队向社区提供了 PR,次要逻辑是别离查看音讯的公布工夫和延迟时间,达到公布工夫后如果延迟时间大于 TTL 工夫,则 TTL 工夫达到后仍然不能过期音讯。IsEntryExpired 会判断并查看 TTL 工夫与延迟时间。这里公布工夫和延迟时间要一次性从 Entry 中获取,否则每次获取的 Entry 对象是不一样的。此外,延迟时间须要发送工夫点的工夫戳,依据具体计算出提早的工夫长度来做判断。
实际 4:Admin API Block 的优化解决
在 Pulsar 之前的代码逻辑中:
- 如果在异步代码中频繁调用同步逻辑,那么其中的株连关系很可能导致 Pulsar 内部的线程卡住,这时只能重启对应的 Broker 节点来复原工作。
- Pulsar 的 Http Lookup 服务调用的是内部端口,一旦异步调用同步导致阻塞,那么该服务内部端口的数据流也会呈现阻塞。
- Pulsar Web 服务的性能较差,次要是因为 CompletableFuture 的误用。当咱们定义一个 CompletableFuture 对象后,常常调用 thenapply 或者 thencompose 来返回对象。这其实是 CompletableFuture 内对象的同步返回,是由以后线程栈执行的。如果异步工作没有返回,则由回调线程执行工作。
- Pulsar 高版本退出了 Metadata Store 线程池的形象。这个形象会增大 ZooKeeper 的压力。当同一时间内的内部服务调用量增大,ZooKeeper 负载增大会导致音讯提早等指标呈现进化。
腾讯团队针对上述问题,一方面剥离了 Metadata Store 线程池,另一方面通过服务监听来定位和发现 Web 服务的性能较弱的地位,去做进一步的优化解决。此外,团队还退出了超时解决逻辑,所有 Pulsar 内部线程如果在最初限定工夫(30 秒)内无奈解决实现就会抛出超时。尽管单个内部线程超时、重启影响不大,但这样防止了整个数据流阻塞的状况。
实际 5:zk-node 泄露
有时用户正在应用的 Topic 不多,但 zk-node 数量却很大,Pulsar 对 zk-node 的放大倍数较高。上图拐点是 zk-node 脏数据清理的时点,能够看到 zk-node 数据透露的状况十分重大,达到 5 倍之多。
在创立一个 Topic 时,首先要在 zk-path 的六级目录下涵盖所有 Topic 信息,在 ZooKeeper 上创立的资源量很大。此目录下涵盖了所有的 Topic,问题即呈现在六个层级中。为此团队做了以下操作来解决 zk-node 脏数据:
- 首先通过 ZooKeeper client 读取 zk-path,依照指定的格局拼接所有 Topic 名字,获取 Topic 列表;
- 通过 pulsar-admin 查看集群中是否存在该 Topic;如果集群中不存在该 Topic,则相干数据肯定是脏数据;(修复 zk-node 泄露问题的相干代码已 merge 进 2.8 + 的社区版本。)
- 切记在清理 ZookKeeper 脏数据之前备份 ZookKeeper 数据。
实际 6:Bookie Ledger 透露
团队在实践中发现,尽管 Retention 策略设置的音讯生命周期最长应不超过 30 天,但检测扫描到的一些音讯曾经有数百天历史,且难以从 BookKeeper 中删除。针对这一问题,团队剖析如下:
- 触发 Ledger 删除的惟一门路是 Retention 策略。这些音讯产生的起因只能定位到一些 Bookie CLI 命令,这些命令生成了一些 Retention 策略管控不到的 Ledger。
- 每一个 Ledger 都有对应的 LedgerInfo,记录了它的元数据信息,包含创立工夫等。获取元数据后,就能够确定 Ledger 是多久前创立的,还能够确定 Ledger 具体是在哪些 Bookie 节点上。
- 一个 Ledger 惟一归属于一个 Topic,所以能够获取 Topic 中存在 Ledger 的信息,进而确定某个 Ledger 是否存在于 Topic 的 Ledger 列表中,如果不在就是脏数据,能够清理。
- 如果 Ledger 对应的元数据曾经失落,那么 Ledger 自身也能够间接删除。
- 留神 Schema,如果疏忽 Schema 可能会删除用户 Schema。复原用户 Schema 时,Schema 的 Ledger 信息是存在 Bookie 中,Schema 本身的信息存在 Broker 归属的 ZK 中。复原时须要先把 Broker 中存在的 Schema 信息删除,再让用户尝试应用生产端重建 Schema。
留神:执行以上操作前,切记提前备份数据。
实际 7:Apache Pulsar 多级缓存优化
如上图,Pulsar 现有缓存策略会导致显著的毛刺景象,呈现服务周期性的激烈性能稳定和用户端的显著感知。
try {
//We need to check all the segments, starting from the current
//backward to minimize the
//checks for recently inserted entries
int size = cacheSegments.size();
for (int i = 0; i < size; i++)
int segmentIdx = (currentSegmentIdx + (size - i)) % size;
try {int offset = currentSegmentOffset.getAndAdd(entrySize);
if (offset + entrySize > segmentSize) {
// Rollover to next segment
currentSegmentIdx = (currentSegmentIdx + 1) % cacheSegments.size();
currentSegment0ffset. set(alignedSize);
cacheIndexes.get(currentSegmentIdx).clear();
offset = 0;
}
这里腾讯团队次要做了读取缓存的优化。在读取缓存层面,能够看到 Pulsar 在读取缓存时迭代了缓存中的所有音讯,如第一段代码倒数第二行所示。同时,一旦 offset + entrySize 大于 segmentSize,就会革除全副缓存,如第二段代码所示。这也就是为什么之前会呈现显著的性能稳定点的起因所在。
为此团队应用了 OHC + LRU 的策略,防止了缓存状况导致的激烈稳定,成果如下图:
总结与瞻望
本文分享了腾讯云团队在 Apache Pulsar 稳定性上的实践经验,重点介绍了音讯空洞的影响及躲避措施等最佳实际,为更多开发者提供参考。同时,腾讯云团队也在参加社区奉献中,和社区探讨以下重要问题并摸索相干解决方案,如客户端超时工夫内的重试策略,借鉴其余 MQ 的思路进行改良,尝试在客户端退出超时重试策略,通过多次重试机制来防止发送失败的状况产生;优化 Broker 和 Bookie OOM,针对 Ack 空洞对应汇合无奈缩容的问题进行改良;以及优化 Bookie Auto Recover,退出超时重试逻辑,防止 BookKeeper 和 ZooKeeper 之间产生 Session 超时的状况下服务重启。
作者介绍:
冉小龙,腾讯云高级研发工程师,Apache Pulsar Committer,RoP maintainer,Apache Pulsar Go Client、Pulsarctl 与 Go Functions 作者与次要维护者。