作者介绍:
腾讯云中间件专家工程师
Apache Pulsar PMC,《深刻解析Apache Pulsar》作者。
目前专一于中间件畛域,在音讯队列和微服务方向具备丰盛的教训。
负责 CKafka、TDMQ的设计与开发工作,目前致力于打造稳固、高效和可扩大的根底组件与服务。
导语
在 Apache Pulsar 中,为了防止音讯的反复投递,消费者进行音讯确认是十分重要的一步。当一条音讯被消费者生产后,须要消费者发送一个Ack申请给Broker,Broker才会认为这条音讯被真正生产掉。被标记为曾经生产的音讯,后续不会再次反复投递给消费者。在这篇文章中,咱们会介绍Pulsar中音讯确认的模式,以及失常音讯确认在Broker侧是如何实现的。
1 确认音讯的模式
在理解Pulsar音讯确认模式之前,咱们须要先理解一些前置常识 —— Pulsar中的订阅以及游标(Cursor)。Pulsar中有多种生产模式,如:Share、Key_share、Failover等等,无论用户应用哪种生产模式都会创立一个订阅。订阅分为长久订阅和非长久订阅,对于长久订阅,Broker上会有一个长久化的Cursor,即Cursor的元数据被记录在ZooKeeper。Cursor以订阅(或称为生产组)为单位,保留了以后订阅曾经生产到哪个地位了。因为不同消费者应用的订阅模式不同,能够进行的ack行为也不一样。总体来说能够分为以下几种Ack场景:
(1)单条音讯确认(Acknowledge)
和其余的一些音讯零碎不同,Pulsar反对一个Partition被多个消费者生产。假如音讯1、2、3发送给了Consumer-A,音讯4、5、6发送给了Consumer-B,而Consumer-B又生产的比拟快,先Ack了音讯4,此时Cursor中会独自记录音讯4为已Ack状态。如果其余音讯都被生产,但没有被Ack,并且两个消费者都下线或Ack超时,则Broker会只推送音讯1、2、3、5、6,曾经被Ack的音讯4不会被再次推送。
(2)累积音讯确认(AcknowledgeCumulative)
假如Consumer承受到了音讯1、2、3、4、5,为了晋升Ack的性能,Consumer能够不别离Ack 5条音讯,只须要调用AcknowledgeCumulative,而后把音讯5传入,Broker会把音讯5以及之前的音讯全副标记为已Ack。
(3)批音讯中的单个音讯确认(Acknowledge)
这种音讯确认模式,调用的接口和单条音讯的确认一样,然而这个能力须要Broker开启配置项AcknowledgmentAtBatchIndexLevelEnabled。当开启后,Pulsar能够反对只Ack一个Batch外面的某些音讯。假如Consumer拿到了一个批音讯,外面有音讯1、2、3,如果不开启这个选项,咱们只能生产整个Batch再Ack,否则Broker会以批为单位从新全副投递一次。后面介绍的选项开启之后,咱们能够通过Acknowledge办法来确认批音讯中的单条音讯。
(4)否定应答(NegativeAcknowledge)
客户端发送一个RedeliverUnacknowledgedMessages
命令给Broker,明确告知Broker,以后Consumer无奈生产这条音讯,音讯将会被从新投递。
并不是所有的订阅模式下都能用上述这些ack行为,例如:Shared或者Key_shared模式下就不反对累积音讯确认(AcknowledgeCumulative)。因为在Shared或者Key_Shared模式下,后面的音讯不肯定是被以后Consumer生产的,如果应用AcknowledgeCumulative,会把他人的音讯也一起确认掉。订阅模式与音讯确认之间的关系如下所示:
订阅模式 | 单条Ack | 累积Ack | 批量音讯中单个Ack | 否定Ack |
---|---|---|---|---|
Exclusive | 反对 | 反对 | 反对 | 不反对 |
Shared | 反对 | 不反对 | 反对 | 反对 |
Failover | 反对 | 反对 | 反对 | 不反对 |
Key_Shared | 反对 | 不反对 | 反对 | 反对 |
2 Acknowledge与AcknowledgeCumulative的实现
Acknowledge与AcknowledgeCumulative接口不会间接发送音讯确认申请给Broker,而是把申请转交给AcknowledgmentsGroupingTracker解决。这是咱们要介绍的Consumer里的第一个Tracker,它只是一个接口,接口下有两个实现,一个是长久化订阅的实现,另一个是非长久化订阅的实现。因为非长久化订阅的Tracker实现都是空,即不做任何操作,因而咱们只介绍长久化订阅的实现——PersistentAcknowledgmentsGroupingTracker。
在Pulsar中,为了保障音讯确认的性能,并防止Broker接管到十分高并发的Ack申请,Tracker中默认反对批量确认,即便是单条音讯的确认,也会先进入队列,而后再一零售往Broker。咱们在创立Consumer时能够设置参数AcknowledgementGroupTimeMicros,如果设置为0,则Consumer每次都会立刻发送确认申请。所有的单条确认(individualAck)申请会先放入一个名为PendingIndividualAcks
的Set,默认是每100ms或者沉积的确认申请超过1000,则发送一批确认申请。
音讯确认的申请最终都是异步发送进来,如果Consumer设置了须要回执(Receipt),则会返回一个CompletableFuture,胜利或失败都能通过Future感知到。默认都是不须要回执的,此时间接返回一个曾经实现的CompletableFuture。
对于Batch音讯中的单条确认(IndividualBatchAck),用一个名为PendingIndividualBatchIndexAcks
的Map进行保留,而不是一般单条音讯的Set。这个Map的Key是Batch音讯的MessageId,Value是一个BitSet,记录这批音讯里哪些须要Ack。应用BitSet能大幅升高保留音讯Id的能存占用,1KB能记录8192个音讯是否被确认。因为BitSet保留的内容都是0和1,因而能够很不便地保留在堆外,BitSet对象也做了池化,能够循环应用,不须要每次都创立新的,对内存十分敌对。
如下图所示,只用了8位,就示意了Batch外面8条音讯的Ack状况,下图示意EntryId为0、2、5、6、7的Entry都被确认了,确认的地位会被置为1:
对于累计确认(CumulativeAck)实现形式就更简略了,Tracker中只保留最新的确认地位点即可。例如,当初Tracker中保留的CumulativeAck地位为5:10,代表该订阅曾经生产到LedgerId=5,EntryId=10的这条音讯上了。后续又ack了一个5:20,则间接替换后面的5:10为5:20即可。
最初就是Tracker的Flush,所有的确认最终都须要通过触发flush办法发送到Broker,无论是哪种确认,Flush时创立的都是同一个命令并发送给Broker,不过传参中带的AckType会不一样。
3 NegativeAcknowledge的实现
否定应答和其余音讯确认一样,不会立刻申请Broker,而是把申请转交给NegativeAcksTracker进行解决。Tracker中记录着每条音讯以及须要提早的工夫。Tracker复用了PulsarClient的工夫轮,默认是33ms左右一个工夫刻度进行查看,默认延迟时间是1分钟,抽取出曾经到期的音讯并触发从新投递。Tracker次要存在的意义是为了合并申请。另外如果延迟时间还没到,音讯会暂存在内存,如果业务侧有大量的音讯须要提早生产,还是倡议应用ReconsumeLater接口。NegativeAck惟一的益处是,不须要每条音讯都指定工夫,能够全局设置延迟时间。
4 未确认音讯的解决
如果消费者获取到音讯后始终不Ack会怎么样?这要分两种状况,第一种是业务侧曾经调用了Receive办法,或者曾经回调了正在异步期待的消费者,此时音讯的援用会被保留进UnAckedMessageTracker,这是Consumer里的第三个Tracker。UnAckedMessageTracker中保护了一个工夫轮,工夫轮的刻度依据AckTimeout
、TickDurationInMs
这两个参数生成,每个刻度工夫=AckTimeout / TickDurationInMs。新追踪的音讯会放入最初一个刻度,每次调度都会移除队列头第一个刻度,并新增一个刻度放入队列尾,保障刻度总数不变。每次调度,队列头刻度里的音讯将会被清理,UnAckedMessageTracker会主动把这些音讯做重投递。
重投递就是客户端发送一个RedeliverUnacknowledgedMessages
命令给Broker。每一条推送给消费者然而未Ack的音讯,在Broker侧都会有一个汇合来记录(PengdingAck),这是用来防止反复投递的。触发重投递后,Broker会把对应的音讯从这个汇合里移除,而后这些音讯就能够再次被生产了。留神,当重投递时,如果消费者不是Share模式是无奈重投递单条音讯的,只能把这个消费者所有曾经接管然而未Ack的音讯全副从新投递。下图是一个工夫轮的简略示例:
另外一种状况就是消费者做了预拉取,然而还没调用过任何Receive办法,此时音讯会始终沉积在本地队列。预拉取是客户端SDK的默认行为,会事后拉取音讯到本地,咱们能够在创立消费者时通过ReceiveQueueSize参数来管制预拉取音讯的数量。Broker侧会把这些曾经推送到Consumer本地的音讯记录为PendingAck,并且这些音讯也不会再投递给别的消费者,且不会Ack超时,除非以后Consumer被敞开,音讯才会被从新投递。Broker侧有一个RedeliveryTracker接口,临时的实现是内存追踪(InMemoryRedeliveryTracker)。这个Tracker会记录音讯到底被从新投递了多少次,每条音讯推送给消费者时,会先从Tracker的哈希表中查问一下重投递的次数,和音讯一并推送给消费者。
由下面的逻辑咱们能够晓得,创立消费者时设置的ReceiveQueueSize真的要谨慎,防止大量的音讯沉积在某一个Consumer的本地预拉取队列,而其余Consumer又没有音讯可生产。PulsarClient上能够设置启用ConsumerStatsRecorder,启用后,消费者会在固定距离会打印出以后消费者的metrics信息,例如:本地音讯沉积量、承受的音讯数等,不便业务排查性能问题。
序幕
Pulsar中的设计细节十分多,因为篇幅无限,作者会整顿一系列的文章进行技术分享,敬请期待。如果各位心愿系统性地学习Pulsar,能够购买作者出版的新书《深刻解析Apache Pulsar》。