共计 4421 个字符,预计需要花费 12 分钟才能阅读完成。
编者荐语:
本文转载自腾讯云中间件公众号。作者林琳,Apache Pulsar PMC,腾讯云中间件专家工程师,《深刻解析 Apache Pulsar》作者。本系列第一篇文章为大家解读 Pulsar 中音讯确认的模式。
导语
在 Apache Pulsar 中,为了防止音讯的反复投递,消费者进行音讯确认是十分重要的一步。当一条音讯被消费者生产后,须要消费者发送一个 Ack 申请给 Broker,Broker 才会认为这条音讯被真正生产掉。被标记为曾经生产的音讯,后续不会再次反复投递给消费者。在这篇文章中,咱们会介绍 Pulsar 中音讯确认的模式,以及失常音讯确认在 Broker 侧是如何实现的。
确认音讯的模式
在理解 Pulsar 音讯确认模式之前,咱们须要先理解一些前置常识 —— Pulsar 中的订阅以及游标(Cursor)。Pulsar 中有多种生产模式,如:Shared(共享)、Key_shared(键共享)、Failover(灾备)等等,无论用户应用哪种生产模式都会创立一个订阅。订阅分为长久订阅和非长久订阅,对于长久订阅,Broker 上会有一个长久化的 Cursor,即 Cursor 的元数据被记录在 ZooKeeper。Cursor 以订阅(或称为生产组)为单位,保留了以后订阅曾经生产到哪个地位了。因为不同消费者应用的订阅模式不同,能够进行的 Ack 行为也不一样。总体来说能够分为以下几种 Ack 场景:
1. 单条音讯确认(Acknowledge)
和其余的一些音讯零碎不同,Pulsar 反对一个 Partition 被多个消费者生产。假如音讯 1、2、3 发送给了 Consumer-A,音讯 4、5、6 发送给了 Consumer-B,而 Consumer-B 又生产的比拟快,先 Ack 了音讯 4,此时 Cursor 中会独自记录音讯 4 为已 Ack 状态。如果其余音讯都被生产,但没有被 Ack,并且两个消费者都下线或 Ack 超时,则 Broker 会只推送音讯 1、2、3、5、6,曾经被 Ack 的音讯 4 不会被再次推送。
2. 累积音讯确认(AcknowledgeCumulative)
假如 Consumer 承受到了音讯 1、2、3、4、5,为了晋升 Ack 的性能,Consumer 能够不别离 Ack 5 条音讯,只须要调用 AcknowledgeCumulative,而后把音讯 5 传入,Broker 会把音讯 5 以及之前的音讯全副标记为已 Ack。
3. 累积音讯确认(AcknowledgeCumulative)
这种音讯确认模式,调用的接口和单条音讯的确认一样,然而这个能力须要 Broker 开启配置项 AcknowledgmentAtBatchIndexLevelEnabled
。当开启后,Pulsar 能够反对只 Ack 一个 Batch 外面的某些音讯。假如 Consumer 拿到了一个批音讯,外面有音讯 1、2、3,如果不开启这个选项,咱们只能生产整个 Batch 再 Ack,否则 Broker 会以批为单位从新全副投递一次。后面介绍的选项开启之后,咱们能够通过 Acknowledge 办法来确认批音讯中的单条音讯。
4. 否定应答(NegativeAcknowledge)
客户端发送一个 RedeliverUnacknowledgedMessages
命令给 Broker,明确告知 Broker,以后 Consumer 无奈生产这条音讯,音讯将会被从新投递。
并不是所有的订阅模式下都能用上述这些 Ack 行为,例如:Shared 或者 Key_shared 模式下就不反对累积音讯确认(AcknowledgeCumulative)。因为在 Shared 或者 Key_shared 模式下,后面的音讯不肯定是被以后 Consumer 生产的,如果应用 AcknowledgeCumulative,会把他人的音讯也一起确认掉。订阅模式与音讯确认之间的关系如下所示:
Acknowledge 与 AcknowledgeCumulative 的实现
Acknowledge 与 AcknowledgeCumulative 接口不会间接发送音讯确认申请给 Broker,而是把申请转交给 AcknowledgmentsGroupingTracker 解决。这是咱们要介绍的 Consumer 里的第一个 Tracker,它只是一个接口,接口下有两个实现,一个是长久化订阅的实现,另一个是非长久化订阅的实现。因为非长久化订阅的 Tracker 实现都是空,即不做任何操作,因而咱们只介绍长久化订阅的实现——PersistentAcknowledgmentsGroupingTracker。
在 Pulsar 中,为了保障音讯确认的性能,并防止 Broker 接管到十分高并发的 Ack 申请,Tracker 中默认反对批量确认,即便是单条音讯的确认,也会先进入队列,而后再一零售往 Broker。咱们在创立 Consumer 时能够设置参数 AcknowledgementGroupTimeMicros
,如果设置为 0,则 Consumer 每次都会立刻发送确认申请。所有的单条确认 (IndividualAck) 申请会先放入一个名为 PendingIndividual Acks 的 Set,默认是每 100ms 或者沉积的确认申请超过 1000,则发送一批确认申请。
音讯确认的申请最终都是异步发送进来,如果 Consumer 设置了须要回执(Receipt),则会返回一个 CompletableFuture,胜利或失败都能通过 Future 感知到。默认都是不须要回执的,此时间接返回一个曾经实现的 CompletableFuture。
对于 Batch 音讯中的单条确认(IndividualBatchAck),用一个名为 PendingIndividualBatchIndexAcks
的 Map 进行保留,而不是一般单条音讯的 Set。这个 Map 的 Key 是 Batch 音讯的 MessageId,Value 是一个 BitSet,记录这批音讯里哪些须要 Ack。应用 BitSet 能大幅升高保留音讯 Id 的能存占用,1KB 能记录 8192 个音讯是否被确认。因为 BitSet 保留的内容都是 0 和 1,因而能够很不便地保留在堆外,BitSet 对象也做了池化,能够循环应用,不须要每次都创立新的,对内存十分敌对。
如下图所示,只用了 8 位,就示意了 Batch 外面 8 条音讯的 Ack 状况,下图示意 EntryId 为 0、2、5、6、7 的 Entry 都被确认了,确认的地位会被置为 1:
对于累计确认(CumulativeAck)实现形式就更简略了,Tracker 中只保留最新的确认地位点即可。例如,当初 Tracker 中保留的 CumulativeAck 地位为 5:10,代表该订阅曾经生产到 LedgerId=5,EntryId=10 的这条音讯上了。后续又 Ack 了一个 5:20,则间接替换后面的 5:10 为 5:20 即可。
最初就是 Tracker 的 Flush,所有的确认最终都须要通过触发 Flush 办法发送到 Broker,无论是哪种确认,Flush 时创立的都是同一个命令并发送给 Broker,不过传参中带的 AckType 会不一样。
NegativeAcknowledge 的实现
否定应答和其余音讯确认一样,不会立刻申请 Broker,而是把申请转交给 NegativeAcksTracker 进行解决。Tracker 中记录着每条音讯以及须要提早的工夫。Tracker 复用了 PulsarClient 的工夫轮,默认是 33ms 左右一个工夫刻度进行查看,默认延迟时间是 1 分钟,抽取出曾经到期的音讯并触发从新投递。Tracker 次要存在的意义是为了合并申请。另外如果延迟时间还没到,音讯会暂存在内存,如果业务侧有大量的音讯须要提早生产,还是倡议应用 ReconsumeLater 接口。NegativeAck 惟一的益处是,不须要每条音讯都指定工夫,能够全局设置延迟时间。
未确认音讯的解决
如果消费者获取到音讯后始终不 Ack 会怎么样?这要分两种状况,第一种是业务侧曾经调用了 Receive 办法,或者曾经回调了正在异步期待的消费者,此时音讯的援用会被保留进 UnAckedMessageTracker,这是 Consumer 里的第三个 Tracker。UnAckedMessageTracker 中保护了一个工夫轮,工夫轮的刻度依据 AckTimeout
、TickDurationInMs
这两个参数生成,每个刻度工夫=AckTimeout / TickDurationInMs
。新追踪的音讯会放入最初一个刻度,每次调度都会移除队列头第一个刻度,并新增一个刻度放入队列尾,保障刻度总数不变。每次调度,队列头刻度里的音讯将会被清理,UnAckedMessageTracker 会主动把这些音讯做重投递。
重投递就是客户端发送一个 RedeliverUnacknowledgedMessages
命令给 Broker。每一条推送给消费者然而未 Ack 的音讯,在 Broker 侧都会有一个汇合来记录(PengdingAck),这是用来防止反复投递的。触发重投递后,Broker 会把对应的音讯从这个汇合里移除,而后这些音讯就能够再次被生产了。留神,当重投递时,如果消费者不是 Share 模式是无奈重投递单条音讯的,只能把这个消费者所有曾经接管然而未 ack 的音讯全副从新投递。下图是一个工夫轮的简略示例:
另外一种状况就是消费者做了预拉取,然而还没调用过任何 Receive 办法,此时音讯会始终沉积在本地队列。预拉取是客户端 SDK 的默认行为,会事后拉取音讯到本地,咱们能够在创立消费者时通过 ReceiveQueueSize 参数来管制预拉取音讯的数量。Broker 侧会把这些曾经推送到 Consumer 本地的音讯记录为 PendingAck,并且这些音讯也不会再投递给别的消费者,且不会 Ack 超时,除非以后 Consumer 被敞开,音讯才会被从新投递。Broker 侧有一个 Redelivery Tracker 接口,临时的实现是内存追踪(InMemoryRedeliveryTracker)。这个 Tracker 会记录音讯到底被从新投递了多少次,每条音讯推送给消费者时,会先从 Tracker 的哈希表中查问一下重投递的次数,和音讯一并推送给消费者。
由下面的逻辑咱们能够晓得,创立消费者时设置的 Receive QueueSize 真的要谨慎,防止大量的音讯沉积在某一个 Consumer 的本地预拉取队列,而其余 Consumer 又没有音讯可生产。
PulsarClient 上能够设置启用 ConsumerStatsRecorder
,启用后,消费者会在固定距离会打印出以后消费者的 metrics 信息,例如:本地音讯沉积量、承受的音讯数等,不便业务排查性能问题。
后记
Pulsar 中的设计细节十分多,因为篇幅无限,作者会整顿一系列的文章进行技术分享,敬请期待。
关注 公众号「Apache Pulsar」, 获取干货与动静
退出 Apache Pulsar 中文交换群 👇🏻