关于pulsar:详解-Apache-Pulsar-消息生命周期

5次阅读

共计 10043 个字符,预计需要花费 26 分钟才能阅读完成。

文章摘要

本文整顿自 Pulsar Summit Asia 2022 腾讯云高级研发工程师冉小龙的演讲《Deep Dive into Apache Pulsar Lifecycle》。Apache Pulsar 中形象了 Topic 来承载用户发送的音讯,一条音讯发送到 Topic 中之后会通过 Broker 的计算存储到 Bookie 中。本文将具体论述音讯是如何发送到 Broker 并通过 Broker 的计算以及元数据处理最终存储到 Bookie 中,而后会进一步论述 Bookie 如何利用垃圾回收机制回收 Topic 中的数据,以及 Broker 中的 TTL 和 Retention 策略如何作用到 Bookie Client 来触发垃圾回收的机制。

作者简介

冉小龙,腾讯云高级研发工程师,Apache Pulsar Committer,RoP maintainer,Apache Pulsar Go Client、Pulsarctl 与 Go Functions 作者与次要维护者。

导读

本文分为以下几个局部:

  1. 1. 从用户的视角看音讯收发流程
  2. 2. TTL 与 Retention 策略(与音讯生命周期非亲非故)
  3. 3. 从 Topic 的角度看音讯存储模型
  4. 4. Bookie GC 回收机制
  5. 5. 脏数据如孤儿 Ledger 的产生
  6. 6. 如何清理脏数据
  7. 1、2、3 次要在 Broker 层面剖析原理,5 和 6 依据生产环境中遇到的问题来剖析脏数据的产生与清理。

用户视角下的音讯收发流程

在用户视角下,MQ 能够了解为 Pub-Sub 模型,在 Broker 形象一个 Topic,音讯经由生产者发送到 Topic 中而后进入消费者进行生产。

首先须要理解两个概念,Pending Queue 和 Receive Queue。

  • Pending Queue:发送过程中的概念。音讯发送时并不是每次间接投递给 Broker,而是在本地形象 Pending Queue,所有数据先进入 Pending Queue 再被发送到 Broker。
  • Receive Queue:接管过程中的概念。同 Pending Queue 原理雷同,音讯接管时并不是每次间接从 Broker 要数据,而是在本地形象 Receive Queue,数据按批次进入 Receive Queue,再联合 Pulsar 音讯推拉机制一直地填充 Receive Queue 来调动整体流程。

在 Pulsar 中,Broker 不解析批音讯,因而 Broker 无奈晓得音讯是否是批音讯,这里形象了一个 Entry 的概念,Entry 内可能蕴含批音讯或者非批音讯。

下图是用户视角下更深刻的架构图。生产者和消费者能够了解为 Client 模型,Client 把音讯发送给 Broker。Broker 能够了解为 BookKeeper Client,BookKeeper Client 通过增删改查的操作将数据传递给 Bookie。BookKeeper 和 Broker 都有元数据管理核心,目前应用较多的是 ZooKeeper,其内蕴含所有节点信息,如节点调度信息。

上面解析一下数据从 Client 到 Broker 再到 BookKeeper 是怎么的整体流程。首先,BookKeeper 存储层性能比拟繁多且纯正。作为一个分布式日志文件系统,它裸露给下层零碎的、可能供下层零碎调用的仅仅是增删改查的操作,随同这些操作能够察看从 Client 到 BookKeeper 的操作链路:

  • Send -> Broker -> add Entry -> Bookie:发送 Send 命令到 Broker,Broker 向 BookKeeper addEntry
  • Receive -> Broker -> read Entry -> Bookie:发送 Receive 命令到 Broker,Broker 调用 BookKeeper readEntry 接口从 Bookie 中读取音讯
  • Ack -> Broker (TTL) -> move cursor (markDeletePosition) -> Bookie:发送 Ack 命令到 Broker,Broker 会执行 move cursor 操作。Broker 形象的 Topic 外面有一条条的音讯,Ack 相当于操作 cursor 的行为,指针随着 Ack 行为挪动,此处形象了 markDeletePosition 的指针。在 markDeletePosition 之前,所有的音讯都已被正确生产。
  • Retention -> delete Entry -> Bookie:接管到 Retention 策略后,Broker 触发 Retention 阈值后会调用 Bookie delete Entry 接口,来删除 BookKeeper 中数据。delete Entry 是本文重点探讨的话题,后文将具体介绍触发 Retention 策略后,Entry 如何被从 BookKeeper 中删除。

TTL 与 Retention 策略

首先须要明确 TTL 策略和 Retention 策略的概念。

TTL 策略

TTL 策略指音讯在指定工夫内没有被用户 Ack 时会在 Broker 被动 Ack 掉。

Client 在消费者侧裸露两个接口 Receive 和 Ack。当用户消费者接管到音讯时,Broker 并不知道此时用户曾经正确接管到音讯,须要用户手动调用 Ack 通知 Broker 本人胜利接管到了以后音讯,所以 Client 要发动 Oneway 的 Ack 申请告诉 Broker 进行下一步解决。不管音讯是否被推送到 Broker,生产者发送到 Topic 的音讯都会产生 TTL(生命周期)。所有音讯都在 TTL 内受管控,超出这个工夫后 Broker 会代替用户把音讯 Ack 掉。

此处需注意,在上述过程中没有任何与删除相干的操作,因为 TTL 不波及与删除相干的操作。TTL 的作用仅仅是用于 Ack 掉在 TTL 范畴内应被 Ack 的音讯,真正删除的操作与 Pulsar 中形象进去的 Retention 策略相干。

Retention 策略

Retention 策略指音讯被 Ack 之后(消费者 Ack 或者 TTL Ack)持续在 Bookie 侧保留的工夫,以 Ledger 为最小操作单元。

音讯被 Ack 之后(消费者 Ack 或者 TTL Ack)就归属于 Retention 策略,即在 BookKeeper 保留肯定工夫,比方在离线音讯场景下会将数据保留一段时间来进行回查等操作。Retention 以 Ledger 为最小操作单元,删除即是删除整个 Ledger。

上面是在 TTL 内 Ack 音讯的示意图。在 T1 时间段有 10 条音讯,m1 – m5 是被 Ack 的音讯,m6 – m10 是未被 Ack 的音讯。在 T2 时间段,假如达到 TTL 的 3 分钟阈值后音讯还没有被 Ack,m6 – m8 就会被 TTL 策略查看到,Broker 被动将其 Ack。在 T3 时间段,m6 – m8 已被 Broker Ack。这就是 TTL 策略操作行为与作用范畴。

Pulsar 内的所有策略都在 Broker 形象了线程池,周期性地执行线程,比方 TTL 策略或者 Retention 策略默认 5 分钟查看一轮。TTL 策略就是依据设置的工夫,定期检查,不断更新 Cursor 的地位(等价于 Consumer 侧裸露的 Ack 接口),将音讯过期掉;Retention 策略是查看 Ledger 的创立工夫以及 Entry 的大小来决定是否要删除某一个 Ledger。

TTL 策略和 Retention 策略的生命周期在时限上有如下规定:

  • TTL 工夫 < Retention 工夫,音讯的生命周期等于 TTL 工夫 + Retention 工夫。
  • TTL 工夫 ≥ Retention 工夫,音讯的生命周期等于 TTL 工夫。在 TTL 查看时,有一个判断规范是 Ledger 是否进行切换,如产生切换且达到 TTL 工夫,Ledger 会进入 Retention 策略删除动作。所以如果 TTL 工夫 ≥ Retention 工夫,音讯生命周期就是 TTL 工夫。

从 Topic 的角度看音讯存储模型

讲到音讯存储模型,首先接触到的是 Topic,生产者向这个 Topic 发送音讯、消费者从 Topic 生产音讯。Topic 外部形象了 Partition 的概念,一个 Topic 内能够创立多个 Partition,作用是减少并发解决的能力,即一个 Topic 中的音讯能够散发到多个 Partition,由多个 Partition 承载 Topic 的服务。

在 Bookie 存储层,一个 Partition 由多个 Ledger 形成。如图,Partition 3 上面有 5 个 Ledger。Ledger 外面存储的是多条 Entry。如前文所说的 Entry 概念,依据音讯是否是批音讯,Entry 就能够分为批和非批两种。如果音讯是批音讯,那么 Entry 外面有多条 Message;如果音讯是非批的,那么一条 Entry 等于一条 Message。这就是 Topic 视角下的存储模型。

Bookie GC 回收机制

后面三个局部都围绕 Broker 层,Broker 作为计算层,实质是 Bookie Client,调用 Bookie 侧裸露的增删查的接口来进行相干的操作,操作逻辑简略。上面将重点介绍 BookKeeper 层如何将数据进行压缩和回收。

Bookie 压缩类型

压缩类型分为两种:

  • 主动压缩:Bookie 有周期性执行的 GC Compaction 线程,GC 分为 Minor GC 和 Major GC,后文会具体介绍两种 GC 的区别。
  • 手动压缩:通过 BookKeeper 裸露的 Http 调用 Admin Rest API 接口来触发 GC 申请。这个操作在日常急救运维中很常见,比方 Bookie 磁盘内存忽然大幅度上涨,用户想要紧急回收数据,那么就能够跳过 Minor GC 和 Major GC 查看周期,手动触发 GC 来开释磁盘空间。

Bookie 压缩形式

Bookie 的压缩形式分为两种:

  • 依照 Entry 大小

    • compactionRateByEntries
    • isThrottleByBytes
  • 依照 Entry 数量(默认)

    • compactionRateByEntries
  • 生产环境中举荐依照 Entry 大小压缩,从理论生产环境的教训来看,每次压缩 100MB,曲线绝对安稳。为什么不举荐依照 Entry 数量压缩呢?首先如前文提到的 Entry 的概念,一个 Entry 可能是单条音讯,也可能是批音讯(蕴含很多 Message),因而如果依照数量压缩的话,每次压缩的 Message 数量是不肯定的。另外,每一个 Message 的 Payload 不同,音讯大小不统一会导致每次压缩大小不同,GC 压缩回收的曲线不安稳。Bookie GC 占用磁盘 IO,每一台机器的磁盘 IO 恒定,极其状况下,不安稳的压缩会映射到 Bookie 主链路读写流程,影响稳定性。依照 Entry 大小压缩,压缩曲线安稳,对稳定性影响较小。

Minor GC 和 Major GC

从代码实现逻辑上来看,Minor GC 和 Major GC 完全相同,二者区别在于触发机会和触发阈值。

Minor GC Major GC
压缩工夫 1h 24h
压缩阈值比例 20%(minorCompactionThreshold) 80%(majorCompactionThreshold)
GC 执行最大耗时 minorCompactionMaxTimeMillis majorCompactionMaxTimeMillis
  • Minor GC 压缩工夫是 1h,Major GC 压缩工夫是 24h。
  • 压缩阈值比例的含意是 Bookie 外面有用数据的占比。在 Minor GC 内,Bookie 有用数据占比为 20%;在 Major GC 内,Bookie 有用数据占比为 80%。当有用数据占比超过 20% 和 80% 时,不对数据进行回收。Entrylog 里文件大小固定为 1.1 GB,假如 Major GC 有用数据超过 80%,那么能够了解为大部分数据都是有用的且不可被删除,Entrylog 全副保留。剩下的 20% 数据没必要消耗磁盘 IO 进行回收,通过多占用肯定空间的形式升高磁盘 IO 的损耗。
  • 为了防止一次 GC 执行工夫过长,因而设定了 GC 执行最大耗时。超过规定的耗时就会强行停止 GC。

留神:

  • 压缩阈值比例不能够超过 100%。
  • Minor GC 的阈值必须小于 Major GC。
  • 压缩时,必须要保障磁盘还有肯定的可用空间。

Bookie 压缩

Bookie 压缩时,首先须要理解以下几个概念。(生产环境中配置 DBLedgerStorage,社区目前应用居多。后文所有 GC 回收流程和 BookKeeper 相干内容都在默认此配置的前提下开展。)

  • Metadata Store:元数据存储核心默认应用 ZooKeeper。我应用的是社区提供的工具 ZK-Web,能够看到 Ledger 门路下存储了很多 Ledger。

  • LedgerIndex:RocksDB 中存储的 Ledger 汇合。应用 DBLedgerStorage 即相当于用 RocksDB 做 Entrylog 的索引存储,读取数据时先读取 RocksDB 来找到索引数据,而后去 Entrylog 读 Value。这是一个拿 Key 取“V”的操作。

  • LedgersMap:以后的单个 EntryLog 中存储的 Ledger 汇合。
  • EntryLogMetaMap:以后 Bookie 下所有 EntryLog,Key 是 Entrylog ID,Value 是 Entrylog Metadata。EntryLogMetaMap 是 EntryLogMeta 的汇合,EntryLogMetaMap 中蕴含 LedgersMap 汇合。

有了下面的形象后,咱们就能够进行判断。EntryLogMetaMap 的 Key 是 Entrylog ID,映射到 LedgersMap 汇合。

在整个压缩过程中,有三个外围的解决逻辑与函数:

  1. doGcLedgers():解决 LedgerIndex 的汇合(RocksDB),通过汇合判断数据是否能够删除。
  2. doGcEntryLogs():解决 LedgersMap 和 EntryLogMetaMap 的汇合,以 doGcLedgers() 得出的汇合为基准来判断以后 LedgersMap 中哪些 Ledger 能够删除,以及以后 EntryLogMetaMap 中哪些 Entrylog 能够删除。
  3. doCompactionEntryLogs():在进行完下面两个步骤后就能够进行具体的删除操作。
    doCompactionEntryLogs() 解决 EntryLog 文件自身是否能够被删除,对于一个 Key Value 库来说如何进行删除也是一门学识。删除操作不能间接从 Key Value 汇合删除,这样会造成很多音讯空洞(音讯不间断)。BookKeeper 中删除操作是从旧的 EntryLog 文件读取不可删除的数据写入到新的 EntryLog 文件中,相当于在新的 EntryLog 文件中进行备份,因而旧的 EntryLog 文件能够一次性删除。

前文屡次提到了 EntryLog,上面将介绍 BookKeeper 中 EntryLog 如何存储、存储了什么。Entrylog 的形成从上至下外围数据分为三局部。下图能够帮忙大家理解 Entrylog 的大抵构造,如需准确理解,能够浏览相干源码。

  • Header:蕴含指纹信息(BKLO,标识 Entrylog 文件,用于校验)、BookKeeper 版本、Ledgers Map Offset(Offset 偏移量、如何读取等)与 Ledgers Count(一个 Entrylog 内 Ledger 的数量)。
  • LedgerEntry List:LedgerEntry 对象,蕴含 Entry Size、Ledger ID、Entry ID 和 Count。
  • Ledgers Map:蕴含 Ledgers Map Size、Ledgers Count 和 Ledgers Map Entries。每一个 Ledgers Map Entries 是 Key Value 构造,由 Ledger 映射到 Size。

数据回收全流程

有了下面介绍的根底概念,咱们就能够把数据从 Broker 到 BookKeeper 的回收流程串联起来。

首先 Client 触发流程。创立 Topic 倡议设置 Retention 策略,不设置的话默认策略是生产实现即删除该音讯。设置 Retention 策略后,Broker 有定期检查的线程,周期性针对 Topic 执行 Retention 策略。到期可删除的 Ledger 调用裸露的 Delete Ledger 接口,如图 Ledger 0 可删除,即调用 Delete Ledger 删除 Ledger 0。删除 Ledger 0 后 ZooKeeper 中移除 Ledger 0 的 ZooKeeper 门路。这就是残缺的删除流程,上图不蕴含返回逻辑。

Delete Ledger 从调用到返回胜利的过程中没有应用 BookKeeper 磁盘上的数据。用户可能会困惑调用接口删除 Ledger 为何没有开释磁盘空间,起因在此,因为删除操作和 BookKeeper 回收磁盘的操作是齐全异步化的。BookKeeper 回收磁盘的操作由 GC Compaction 线程固定进行解决。

那么,GC Compaction 周期性执行线程如何运行?GC Compaction 周期性执行线程就是 Minor GC 和 Major GC。在操作流程上,首先会获取 ZooKeeper 内所有 Ledger 列表。因为创立 Ledger 须要向 ZooKeeper 注册对应的 ZooKeeper 门路,删除 Ledger 也须要从 ZooKeeper 上删除门路。ZooKeeper 上的 Ledger 门路最全面也最精确,因而以 Metadata Store (zk) 为基准来获取所有 Ledger 列表的汇合。而后进行 doGcLedgers() 操作,把 RocksDB 中所有 Ledger 列表汇合与 ZooKeeper 上获取的 Ledger 列表汇合做比拟,找出能够删除的 Ledger。删除后进行 doGcEntryLogs() 操作,解决 LedgersMap 和 EntryLogMetaMap 的汇合,判断 EntryLog 中哪些 Ledger 能够删除。进一步删除后进行 doCompactionEntryLogs() 操作,最现实的状况下,Entrylog 外面所有的 Ledger 都能够被删除,那么就能够间接革除这个 Entrylog。大部分状况是 Entrylog 里局部数据可删、另一部分不可删,那么如何判断是否保留 Entrylog 呢?由 Minor GC 和 Major GC 的压缩阈值比例决定。

咱们联合下图理解如何通过 doGcEntryLogs() 来 doCompactionEntryLogs()。假如 doCompactionEntryLogs() 时通过 Major GC 的阈值断定一部分未达标的数据能够进行回收,那么 GC Compaction 线程首先从旧的 Entrylog 中查看 Ledger 是否能够删除。假设 Ledger 0 和 Ledger 2 能够删除,Ledger 1 和 Ledger 3 不能够删除,查看到可用性占比后依据阈值判断 Entrylog 能够删除,那么就把 Ledger 1 和 Ledger 3 的有用数据写入新的 Entrylog 文件,有用数据有备份后就能够删除旧的 Entrylog 文件。

此处须要补充一点,创立新的 Entrylog 文件时还有一个动作叫做 Flush。旧的 Entrylog 文件在创立时会产生索引信息,Bookie 里 Entrylog 在读取 Entry 时,比方读取 Entry 0、Ledger 1 的数据,会依据索引信息来追溯对应的 Entrylog。在删除旧的 Entrylog 文件并创立新的 Entrylog 文件操作实现之后,新的 Entrylog 文件索引信息须要更新到 RocksDB,告诉下层的读申请去寻找新的 Entrylog 文件中生成的十六进制的 ID 来读取 Entry 0、Ledger 1 的数据。

以上是音讯残缺的生命周期,蕴含从 TTL 与 Retention 策略到 Bookie GC 回收机制的全流程。

脏数据的产生

上面介绍在理论生产中遇到的问题。在下图中,咱们监控了每个 Bookie 上的 Entrylog 文件发现,假如设置的 Retention 策略周期为 1 天或 5 天,然而这些 Entrylog 文件曾经存在超出 200 天还没有被删除。这是异常情况,文件不删除会始终占用磁盘空间。通过剖析,以下三个状况可能导致脏数据的产生:

  • Ledger 删除逻辑出错,导致孤儿 Ledger 产生:回顾数据回收全流程,Ledger 删除操作分为两个局部:从 ZooKeeper 中清理门路和 GC Compaction 线程清理 Entrylog。社区发动了 PIP[1] 进行双阶段删除,来保障删除过程中不会产生孤儿 Ledger。
  • Broker 不会加载不沉闷的 Topic,导致 Retention 策略没有失效:目前社区正在改良该逻辑。BookKeeper 惟一裸露的 Delete Ledger 操作只有在设置 Retention 策略后能力掉入行为。因而如果 Retention 策略没有失效,Broker 不沉闷 Topic 产生的 Ledger 就无奈被删除。
  • GC 回收阈值设置不合理,导致一部分数据无奈从 EntryLog 移除:这是上图中产生存在 200 多天的 Entrylog 的次要起因。依据对用户数据的调配发现,零碎没有依照 80% 的有用数据占比来设置回收阈值,而是调整为 50%,导致一半的数据始终存在于 Entrylog 中,无奈删除 Entrylog。
  • 存在不沉闷的 Cursor(不沉闷即是 Sub 下没有对应的消费者),这些 Cursor 对应的 Ledger 无奈被删除:目前提出的计划是减少校验逻辑,如果 Cursor 一段时间内不更新则删除,此计划还有待商讨与验证。无论以上哪一种状况,都会导致 Ledger 脏数据无奈删除。因而上面咱们开展解说如何删除脏数据。在理解删除脏数据前,须要理解一个概念叫 Custom Metadata。在 Broker 生成或者创立 Ledger 时,能够给 Ledger 设置一部分元数据,即自定义 Ledger 的元数据属性。下图是 Pulsar 默认提供的 Custom Metadata,通过 BookKeeper Admin ctl 获取到的 Pulsar Managed Ledger Base64 信息。这一串属性反写进去就是一个 Topic 的信息,只有领有 Topic 信息能力进行前面的操作。

通过 Ledger Metadata 能够获取 Topic 信息,即 Ledger 的 Owner Topic。而后咱们就能够开始革除这些脏数据。

革除孤儿 Ledger

革除孤儿 Ledger 应用 Clear Tool 革除工具。过程如下:

  • 从 ZooKeeper Snapshot 中获取所有的 Ledger 列表(如果线上环境压力不大,也能够间接连贯 ZooKeeper 读取,不须要应用 Snapshot。)从 ZooKeeper Snapshot 中获取所有的 Ledger 列表后,通过 BookKeeper Admin 工具获取 Ledger 的 Custom Metadata。
  • 通过 Custom Metadata 找到该 Ledger 的 Owner Topic,并在 Broker 内查看是否存在该 Topic。

    • 如果 Broker 内 Topic 不存在,Client 首先拜访 Broker 就无奈胜利。BookKeeper 存储数据没有意义,能够间接删除。
    • 如果 Broker 内 Topic 存在,就会进一步查看 Ledger 是否存在,Topic Stats Internal 列表展现了 Topic 内所有 Ledger 的状况,用来确认该 Ledger 是否蕴含在该 Topic 中。留神,Topic Stats Internal 命令有时候能够能够获取到 Ledger 列表,有时无奈获取,解决办法是反复获取,如果仍获取不到,那么将断定为列表不存在。

      ![图片](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/889dc444f8c2467bb8f07aa5f0accd29~tplv-k3u1fbpfcp-zoom-1.image "null")
      

Topic 所有的属性以及 Topic Stats Internal 等指标信息都是 Broker 向 ZooKeeper 获取的。以上查看都过后就能够从 BookKeeper 中删除 Ledger。Ledger 删除逻辑和前文回收流程雷同,首先删除 Ledger 的 ZooKeeper 门路,Ledger 占用的磁盘空间通过 GC Compaction 线程走异步流程进行删除。

此外,Schema 和 Cursor 信息也会应用 Ledger 来存储。下图中有一个信息是 Pulsar Schema ID,如果用户指定了 Schema 是 String、Json,那么就会产生也对应 Ledger 的 Schema 属性,ZooKeeper 上面也会存储 Schema 信息。查看 Stats Internal 时能够获取到 Schema Ledger 和 Cursor Ledger,须要认真查看。

留神:清理脏数据时肯定要备份。ZooKeeper Snapshot 备份能够在谬误删除后复原数据。

总结

文章从用户视角登程,讲述了音讯存储到 Bookie 中的流程,并论述 Bookie 的垃圾回收机制,以及 TTL 和 Retention 策略如何作用到 Bookie Client 触发垃圾回收机制。心愿能够为用户在生产环境中的操作提供参考。

援用链接

[1] PIP: https://github.com/apache/pulsar/issues/16569

正文完
 0