关于pulsar:Apache-Pulsar-技术系列-Pulsar事务实现原理

导语

Apache Pulsar 是一个多租户、高性能的服务间音讯传输解决方案,反对多租户、低延时、读写拆散、跨地区复制、疾速扩容、灵便容错等个性。腾讯云MQ Oteam Pulsar工作组对 Pulsar 做了深刻调研以及大量的性能和稳定性方面优化,目前曾经在TDBank、腾讯云TDMQ落地上线。本篇将简略介绍Pulsar服务端音讯确认的一些概念和原理,欢送大家浏览。

作者简介

林琳

腾讯云中间件专家工程师

Apache Pulsar PMC,《深刻解析Apache Pulsar》作者。目前专一于中间件畛域,在音讯队列和微服务方向具备丰盛的教训。负责 TDMQ的设计与开发工作,目前致力于打造稳固、高效和可扩大的根底组件与服务。

前言

在事务音讯未呈现前,Pulsar中反对的最高等级的消息传递保障,是通过Broker的音讯去重机制,来保障Producer在单个分区上的音讯只准确保留一次。当Producer发送音讯失败后,即便重试发送音讯,Broker也能确保音讯只被长久化一次。但在Partitioned Topic的场景下,Producer没有方法保障多个分区的音讯原子性。

当Broker 宕机时,Producer可能会发送音讯失败,如果Producer没有重试或已用尽重试次数,则音讯不会写入 Pulsar。在消费者方面,目前的音讯确认是尽力而为的操作,并不能确保音讯肯定被确认胜利,如果音讯确认失败,这将导致音讯从新投递,消费者将收到反复的音讯, Pulsar 只能保障消费者至多生产一次。

相似地,Pulsar Functions 仅保障对幂等函数上的单个音讯解决一次,即须要业务保障幂等。它不能保障解决多个音讯或输入多个后果只产生一次。

举个例子,某个Function的执行步骤是:从Topic-A1、Topic-A2中生产音讯,而后Function中对音讯进行聚合解决(如:工夫窗口聚合计算),后果存储到Topic-B,最初别离确认(ACK) Topic-A1和Topic-A2中的音讯。该Function可能会在“输入后果到Topic-B”和“确认音讯”之间失败,甚至在确认单个音讯时失败。这将导致所有(或局部)Topic-A1、Topic-A2的音讯被从新传递和重新处理,并生成新的后果,进而导致整个工夫窗口的计算结果谬误。

因而,Pulsar须要事务机制来保障准确一次的语义(Exactly-once),生产和生产都能保障准确一次,不会反复,也不会失落数据,即便在Broker宕机或Function解决失败的状况下。

事务简介

Pulsar事务音讯的设计初衷是用于保障Pulsar Function的准确一次语义,能够保障Producer发送多条音讯到不同的Partition时,能够同时全副胜利或者同时全副失败。也能够保障Consumer生产多条音讯在时,能够同时全副确认胜利或同时全副失败。当然,也能够把生产、生产都蕴含在同一个事务中,要么全副胜利,要么全副失败。

咱们以本大节结尾处的Function场景为例,演示生产、生产在同一个事务中的场景:

首先,咱们须要在broker.conf中启用事务。

\transactionCoordinatorEnabled=true

而后,咱们别离创立PulsarClient和事务对象。生产者和消费者API中都须要带上这个事务对象,能力确保它们在同一个事务中。

//创立client,并启用事务
PulsarClient pulsarClient = PulsarClient.builder() 
        .serviceUrl("pulsar://localhost:6650")   
        .enableTransaction(true)
        .build();
        // 创立事务Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();
        
String sourceTopic = "public/default/source-topic";
String sinkTopic = "public/default/sink-topic";
//创立生产者和消费者Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING)
        .topic(sourceTopic)
        .subscriptionName("my-sub")
        .subscribe();
        
Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING)
        .topic(sinkTopic)
        .create();
        
// 从原Topic中生产一条音讯,并发送到另外一个Topic中,它们在同一个事务内        Message<String> message = sourceConsumer.receive();
sinkProducer.newMessage(txn).value("sink data").sendAsync();
sourceConsumer.acknowledgeAsync(message.getMessageId(), txn);
// 提交事务
txn.commit().get();

咱们以本大节结尾处的Function例子来说:

当未开启事务时,如果Function先把后果写入SinkTopic,然而音讯确认失败(下图Step-4失败),这会导致音讯被从新被投递(下图Step-1),Function会从新计算一个后果再发送到SinkTopic,这样就会呈现一条数据被反复计算并投递了两次。

如果没有开启事务,Function会先确认音讯,再把数据写入SinkTopic(先执行Step-4 再执行Step-3),此时如果写入SinkTopic失败,而SourceTopic的音讯又曾经被确认,则会造成数据失落,最终的计算结果也不精确。

如果开启了事务,只有最初没有commit,后面所有的步骤都会被回滚,生产的音讯、确认过的音讯都被回滚,从而让整个流程能够从新再来一遍,不会反复计算,也不会失落数据。整个时序图如下所示:

咱们只须要依据下面步骤,理解每一步具体做了什么,就能分明整个事务的实现形式。在上面的大节中,咱们将逐渐介绍。

事务流程

在理解整个事务流程之前,咱们先介绍Pulsar中事务的组件,常见的分布式事务中都会有TC、TM、RM等组件:

  1. TM:事务发起者。定义事务的边界,负责告知 TC,分布式事务的开始,提交,回滚。在Pulsar事务中,由每个PulsarClient来表演这个角色。
  2. RM:每个节点的资源管理者。治理每个分支事务的资源,每一个 RM 都会作为一个分支事务注册在 TC。在Pulsar中定义了一个TopicTransactionBuffer和PendingAckHandle来别离治理生产、生产的资源。
  3. TC :事务协调者。TC用于解决来自Pulsar Client的事务申请以跟踪其事务状态的模块。每个TC都有一个 惟一id (TCID) 标识,TC之间独立保护本人的事务元数据存储。TCID 用于生成事务 ID,播送告诉不同节点提交、回滚事务。

上面,咱们以一个Producer来介绍整个事务的流程,图中灰色局部代表存储,现有内存和Bookkeeper两种存储实现:

  1. 抉择TC。一个Pulsar集群中可能存在多个TC(默认16个),PulsarClient在创立事务时须要先抉择用哪个TC,后续所有事务的创立、提交、回滚等操作都会发往这个TC。抉择的规定很简略,因为TC的Topic是固定的,首先Lookup查看所有分区所在的Broker(每个分区就是一个TC),而后每次Client创立新事务,轮询抉择一个TC即可。
  2. 开启事务。代码中通过pulsarClient.newTransaction()开启一个事务,Client会往对应的TC中发送一个newTxn命令,TC生成并返回一个新事务的ID对象,对象里保留了TC的ID(用于后续申请找节点)和事务的ID,事务ID是递增的,同一个TC生成ID不会反复。
  3. 注册分区。Topic有可能是分区主题,音讯会被发往不同的Broker节点,为了让TC晓得音讯会发送到哪些节点(后续事务提交、回滚时TC须要告诉这些节点),Producer在发送音讯之前,会先往TC上注册分区信息。这样一来,后续TC就晓得要告诉哪些节点的RM来提交、回滚事务了。
  4. 发送音讯。这一步和一般的音讯发送没有太大的区别,不过音讯须要先通过每个Broker上的RM,Pulsar中RM被定义为TopicTransactionBuffer,RM外面会记录一些元数据,最初音讯还是会被写入原始的Topic中。此时尽管音讯曾经被写入了原始Topic,但消费者是不可见的,Pulsar中的事务隔离级别是Read Commit。
  5. 提交事务。Producer发送完所有的音讯后,提交事务,TC会收到提交申请后,会播送告诉RM节点提交事务,更新对应的元数据,让音讯能够被消费者生产。

Setp-4中的音讯是如何保障长久化到Topic中又不可见的呢?

每个Topic中都会保留一个maxReadPosition属性,用来标识以后消费者能够读取的最大地位,当事务还未提交之前,尽管数据曾经长久化到Topic中,然而maxReadPosition是不会扭转的。因而Consumer无奈生产到未提交的数据。

音讯曾经长久化了,最初事务要回滚,这部分数据如何解决?

如果事务要回滚,RM中会记录这个事务为Aborted状态。每条音讯的元数据中都会保留事务的ID等信息,Dispatcher中会依据事务ID判断这条音讯是否须要投递给Consumer。如果发现事务曾经完结,则间接过滤掉(外部确认掉音讯)。

最初提交事务时如果局部胜利、局部失败,如何解决?

TC中有一个名为TransactionOpRetryTimer的定时对象,所有未全副胜利播送的事务都会交给它来重试,直到所有节点最终全副胜利或超过重试次数。那这个过程不会呈现一致性问题吗?首先咱们想想,呈现这种状况的场景是什么。通常是某些Broker节点宕机导致这些节点不可用,或是网络抖动导致临时不可达。在Pulsar中如果呈现Broker宕机,Topic的归属是会转移的,除非整个集群不可用,否则总是能够找到一个新的Broker,通过重试来解决。在Topic归属转移过程中,maxReadPosition没有扭转,消费者也生产不到音讯。即便整个集群不可用,后续等到集群复原后,Timer还是会通过重试让事务提交。

如果事务未实现,会阻塞一般音讯的生产吗?

会。假如咱们开启事务,发送了几条事务音讯,然而并未提交或回滚事务。此时持续往Topic中发送一般音讯,因为事务音讯始终没有提交,maxReadPosition不会变动,消费者会生产不到新的音讯,会阻塞一般音讯的生产。这是合乎预期的行为,为了保障音讯的程序。而不同Topic之间不会相互影响,因为每个Topic都有本人的maxReadPosition。

事务的实现

咱们能够把事务的实现分为五局部:环境、TC、生产者RM、消费者RM、客户端。因为生产和生产资源的治理是离开的,因而咱们会别离介绍。

环境设定

事务协调者的设置,须要从Pulsar集群的初始化时开始,咱们在第一章中有介绍如何搭建集群,第一次须要执行一段命令,初始化ZooKeeper中的集群元数据。此时,Pulsar会主动创立一个SystemNamespace,并在外面创立一个Topic,残缺的Topic如下所示:

persistent://pulsar/system/transaction_coordinator_assign

这是一个PartitionedTopic,默认有16个分区,每个分区就是一个独立的TC。咱们能够通过–initial-num-transaction-coordinators参数来设置TC的数量。

TC与RM

接下来,咱们看看服务端的事务组件,如下图所示:

  • TransactionMetadataStoreService 是Broker上事务的总体协调者,咱们能够认为它是TC。
  • TransactionMetadataStore 被TC用来保留事务的元数据,如:新创建的事务,Producer注册上来的分区。这个接口有两个实现类,一个是把数据保留到Bookkeeper的实现,另外一个则间接把数据保留在内存中。
  • TransactionTimeoutTracker 服务端用于追踪超时的事务。
  • 各种Provider,它们都属于工厂类,无需特地关注。
  • TopicTransactionBuffer 生产者的RM,当事务音讯被发送到Broker,RM作为代理会记录一些元数据,而后把音讯存入原始Topic。外部蕴含了TopicTransactionBufferRecover和TransactionBufferSnapshotService,RM的元数据会被结构化为快照并定时刷盘,这两个对象别离负责快照的复原和快照的保留。因为生产音讯是以Topic为单位,因而每个Topic/Partition都会有一个。
  • PendingAckHandle 消费者的RM,因为生产是以订阅为单位的,因而每个订阅都有一个。

因为线上环境通常会应用长久化的事务,因而上面的原理都基于长久化实现。

所有事务相干的服务,在BrokerService启动时会初始化。TC主题中,每个Partition都是一个Topic,TransactionMetadataStoreService在初始化时,会依据以后Broker纳管的TC Partition,从Bookkeeper中复原之前长久化的元数据。每个TC会保留以下元数据:

  • newTransaction。新建一个事务,返回一个惟一的事务ID对象。
  • addProducedPartitionToTxn。注册生产者要发送音讯的Partition信息,用于后续TC告诉对应节点的RM提交/回滚事务。
  • addAckedPartitionToTxn。注册消费者要生产音讯的Partition信息,用于后续TC告诉对应节点的RM提交/回滚事务。
  • endTransaction。完结一个事务,能够是提交、回滚或者超时等。

咱们在初始化PulsarClient时,如果设置了enableTransaction=true,则Client初始化时,还会额定初始化一个TransactionCoordinatorClient。因为TC的Tenant、Namespace以及Topic名称都是固定的,因而TC客户端能够通过Lookup发现所有的Partition信息并缓存到本地,后续Client创立事务时,会轮询从这个缓存列表中选取下一个事务要应用的TC。

Producer事务管理

接下来咱们会开启一个事务:

// 创立事务
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();

下面这段代码中,会发送一个newTxn给某个TC,并失去一个Transaction对象。

开启事务时,TransactionCoordinatorClient会从缓存中选取一个TC,而后往选定的TC所在的Broker发送一个newTxn命令,命令的构造定义如下所示:

message CommandNewTxn {
    required uint64 request_id = 1;
    optional uint64 txn_ttl_seconds = 2 [default = 0];
    optional uint64 tc_id = 3 [default = 0];
}

因为命令中蕴含了TCID,因而即便多个TC被同一个Broker纳管也没有问题。Broker会依据TCID找到对应的TC并解决申请。

Producer发送音讯之前,会先发送一个AddPartitionToTxn命令给Broker,只有胜利后,才会持续发送实在的音讯。事务音讯达到Broker后,传递给TransactionBuffer进行解决。期间Broker必定会对音讯进行去重校验,通过校验后,数据会保留到TransactionBuffer里,而TransactionBuffer只是一个代理(会保留一些元数据),它最终会调用原始Topic保留音讯,TransactionBuffer在初始化时,构造方法须要传入原始Topic对象。咱们能够把TransactionBuffer看作是Producer端的RM。

TransactionBuffer中会保留两种信息,一种是原始音讯,间接应用Topic保留。另外一种是快照,快照中保留了Topic名称,最大可读地位信息(防止Consumer读到未提交的数据)、该Topic中曾经中断(aborted)的事务列表。

其中,中断的事务,是由TC播送告知其余Broker节点的,TransactionBuffer接到信息后,会间接在原始Topic中写入一个abortMarker,标记事务曾经中断,而后更新内存中的列表。abortMarker也是一条一般的音讯,然而音讯头中的元数据和一般音讯不一样。这些数据保留在快照中,次要是为了Broker重启后数据能疾速复原。如果快照数据失落,TopicTransactionBufferRecover会从尾到头读取Topic中的所有数据,每遇到一个abortMarker都会更新内存中的中断列表。如果有了快照,咱们只须要从快照处的终点开始读即可复原数据。

Consumer事务管理

消费者须要在音讯确认时带上事务对象,标识应用事务Ack:

\consumer.acknowledge(message, txn);

服务端每个订阅都有一个PendingAckHandle对象用于治理事务Ack信息,咱们能够认为它是治理消费者数据的RM。当Broker发现音讯确认申请中带有事务信息,则会把这个申请转交给对应的PendingAckHandle解决。

所有开启了事务的音讯确认,不会间接批改游标上的MarkDeleted地位,而是先长久化到一个额定的Ledger中,Broker内存中也会缓存一份。这个Ledger由pendingAckStore治理,咱们能够认为是Consumer RM的日志。

事务提交时,RM会调用消费者对应的Subscription,执行方才所有的音讯确认操作。同时,也会在日志Ledger中写入一个非凡的Marker,标识事务须要提交。在事务回滚时,也会先在日志中记录一个AbortMarker,而后触发Message从新投递。

pendingAckStore中保留的日志是redo log,该组件在初始化时,会先从日志Ledger中读取所有redo log,从而在内存中重建先前的音讯确认信息。因为音讯确认是幂等操作,如果Broker不慎宕机,只须要把redo log中的操作从新执行一遍。当订阅中的音讯被真正确认掉后,pendingAckStore中对应的redo log也能够被清理了。清理形式很简略,只须要挪动pendingAckStore中Ledger的MarkDelete地位即可。

再谈TC

所有的事务提交、回滚,因为Client端告知TC,或者因为超时TC主动感知。TC的日志中保留了Producer的音讯要发往哪些Partition,也保留了Consumer会Ack哪些Partition。RM扩散在每个Broker上,记录了整个事务中发送的音讯和要确认的音讯。当事务完结时,TC则以TCID为key,找到所有的元数据,通过元数据得悉须要告诉哪些Broker上的RM,最初发动播送,告诉这些Broker上的RM,事务须要提交/回滚。

序幕

Pulsar中的设计细节十分多,因为篇幅无限,作者会整顿一系列的文章进行技术分享,敬请期待。如果各位心愿系统性地学习Pulsar,能够购买作者出版的新书《深刻解析Apache Pulsar》。

one more thing

目前,腾讯云音讯队列 TDMQ Pulsar版(TDMQ for Pulsar,简称 TDMQ Pulsar 版)已开始正式商业化。音讯队列 Pulsar 版是一款基于 Apache Pulsar 自研的消息中间件,具备极好的云原生和 Serverless 个性,兼容 Pulsar 的各个组件与概念,具备计算存储拆散,灵便扩缩容的底层劣势。

各位想要理解的请点击官网

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理