关于pulsar:Apache-Pulsar-技术系列-Pulsar事务实现原理

9次阅读

共计 8403 个字符,预计需要花费 22 分钟才能阅读完成。

导语

Apache Pulsar 是一个多租户、高性能的服务间音讯传输解决方案,反对多租户、低延时、读写拆散、跨地区复制、疾速扩容、灵便容错等个性。腾讯云 MQ Oteam Pulsar 工作组对 Pulsar 做了深刻调研以及大量的性能和稳定性方面优化,目前曾经在 TDBank、腾讯云 TDMQ 落地上线。本篇将简略介绍 Pulsar 服务端音讯确认的一些概念和原理,欢送大家浏览。

作者简介

林琳

腾讯云中间件专家工程师

Apache Pulsar PMC,《深刻解析 Apache Pulsar》作者。目前专一于中间件畛域,在音讯队列和微服务方向具备丰盛的教训。负责 TDMQ 的设计与开发工作,目前致力于打造稳固、高效和可扩大的根底组件与服务。

前言

在事务音讯未呈现前,Pulsar 中反对的最高等级的消息传递保障,是通过 Broker 的音讯去重机制,来保障 Producer 在单个分区上的音讯只准确保留一次。当 Producer 发送音讯失败后,即便重试发送音讯,Broker 也能确保音讯只被长久化一次。但在 Partitioned Topic 的场景下,Producer 没有方法保障多个分区的音讯原子性。

当 Broker 宕机时,Producer 可能会发送音讯失败,如果 Producer 没有重试或已用尽重试次数,则音讯不会写入 Pulsar。在消费者方面,目前的音讯确认是尽力而为的操作,并不能确保音讯肯定被确认胜利,如果音讯确认失败,这将导致音讯从新投递,消费者将收到反复的音讯,Pulsar 只能保障消费者至多生产一次。

相似地,Pulsar Functions 仅保障对幂等函数上的单个音讯解决一次,即须要业务保障幂等。它不能保障解决多个音讯或输入多个后果只产生一次。

举个例子,某个 Function 的执行步骤是:从 Topic-A1、Topic-A2 中生产音讯,而后 Function 中对音讯进行聚合解决(如:工夫窗口聚合计算),后果存储到 Topic-B,最初别离确认(ACK)Topic-A1 和 Topic-A2 中的音讯。该 Function 可能会在“输入后果到 Topic-B”和“确认音讯”之间失败,甚至在确认单个音讯时失败。这将导致所有(或局部)Topic-A1、Topic-A2 的音讯被从新传递和重新处理,并生成新的后果,进而导致整个工夫窗口的计算结果谬误。

因而,Pulsar 须要事务机制来保障准确一次的语义(Exactly-once),生产和生产都能保障准确一次,不会反复,也不会失落数据,即便在 Broker 宕机或 Function 解决失败的状况下。

事务简介

Pulsar 事务音讯的设计初衷是用于保障 Pulsar Function 的准确一次语义,能够保障 Producer 发送多条音讯到不同的 Partition 时,能够同时全副胜利或者同时全副失败。也能够保障 Consumer 生产多条音讯在时,能够同时全副确认胜利或同时全副失败。当然,也能够把生产、生产都蕴含在同一个事务中,要么全副胜利,要么全副失败。

咱们以本大节结尾处的 Function 场景为例,演示生产、生产在同一个事务中的场景:

首先,咱们须要在 broker.conf 中启用事务。

\transactionCoordinatorEnabled=true

而后,咱们别离创立 PulsarClient 和事务对象。生产者和消费者 API 中都须要带上这个事务对象,能力确保它们在同一个事务中。

// 创立 client,并启用事务
PulsarClient pulsarClient = PulsarClient.builder() 
        .serviceUrl("pulsar://localhost:6650")   
        .enableTransaction(true)
        .build();
        // 创立事务 Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();
        
String sourceTopic = "public/default/source-topic";
String sinkTopic = "public/default/sink-topic";
// 创立生产者和消费者 Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING)
        .topic(sourceTopic)
        .subscriptionName("my-sub")
        .subscribe();
        
Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING)
        .topic(sinkTopic)
        .create();
        
// 从原 Topic 中生产一条音讯,并发送到另外一个 Topic 中,它们在同一个事务内        Message<String> message = sourceConsumer.receive();
sinkProducer.newMessage(txn).value("sink data").sendAsync();
sourceConsumer.acknowledgeAsync(message.getMessageId(), txn);
// 提交事务
txn.commit().get();

咱们以本大节结尾处的 Function 例子来说:

当未开启事务时,如果 Function 先把后果写入 SinkTopic,然而音讯确认失败(下图 Step- 4 失败),这会导致音讯被从新被投递(下图 Step-1),Function 会从新计算一个后果再发送到 SinkTopic,这样就会呈现一条数据被反复计算并投递了两次。

如果没有开启事务,Function 会先确认音讯,再把数据写入 SinkTopic(先执行 Step-4 再执行 Step-3),此时如果写入 SinkTopic 失败,而 SourceTopic 的音讯又曾经被确认,则会造成数据失落,最终的计算结果也不精确。

如果开启了事务,只有最初没有 commit,后面所有的步骤都会被回滚,生产的音讯、确认过的音讯都被回滚,从而让整个流程能够从新再来一遍,不会反复计算,也不会失落数据。整个时序图如下所示:

咱们只须要依据下面步骤,理解每一步具体做了什么,就能分明整个事务的实现形式。在上面的大节中,咱们将逐渐介绍。

事务流程

在理解整个事务流程之前,咱们先介绍 Pulsar 中事务的组件,常见的分布式事务中都会有 TC、TM、RM 等组件:

  1. TM:事务发起者。定义事务的边界,负责告知 TC,分布式事务的开始,提交,回滚。在 Pulsar 事务中,由每个 PulsarClient 来表演这个角色。
  2. RM:每个节点的资源管理者。治理每个分支事务的资源,每一个 RM 都会作为一个分支事务注册在 TC。在 Pulsar 中定义了一个 TopicTransactionBuffer 和 PendingAckHandle 来别离治理生产、生产的资源。
  3. TC:事务协调者。TC 用于解决来自 Pulsar Client 的事务申请以跟踪其事务状态的模块。每个 TC 都有一个 惟一 id (TCID) 标识,TC 之间独立保护本人的事务元数据存储。TCID 用于生成事务 ID,播送告诉不同节点提交、回滚事务。

上面,咱们以一个 Producer 来介绍整个事务的流程,图中灰色局部代表存储,现有内存和 Bookkeeper 两种存储实现:

  1. 抉择 TC。一个 Pulsar 集群中可能存在多个 TC(默认 16 个),PulsarClient 在创立事务时须要先抉择用哪个 TC,后续所有事务的创立、提交、回滚等操作都会发往这个 TC。抉择的规定很简略,因为 TC 的 Topic 是固定的,首先 Lookup 查看所有分区所在的 Broker(每个分区就是一个 TC),而后每次 Client 创立新事务,轮询抉择一个 TC 即可。
  2. 开启事务。代码中通过 pulsarClient.newTransaction()开启一个事务,Client 会往对应的 TC 中发送一个 newTxn 命令,TC 生成并返回一个新事务的 ID 对象,对象里保留了 TC 的 ID(用于后续申请找节点)和事务的 ID,事务 ID 是递增的,同一个 TC 生成 ID 不会反复。
  3. 注册分区。Topic 有可能是分区主题,音讯会被发往不同的 Broker 节点,为了让 TC 晓得音讯会发送到哪些节点(后续事务提交、回滚时 TC 须要告诉这些节点),Producer 在发送音讯之前,会先往 TC 上注册分区信息。这样一来,后续 TC 就晓得要告诉哪些节点的 RM 来提交、回滚事务了。
  4. 发送音讯。这一步和一般的音讯发送没有太大的区别,不过音讯须要先通过每个 Broker 上的 RM,Pulsar 中 RM 被定义为 TopicTransactionBuffer,RM 外面会记录一些元数据,最初音讯还是会被写入原始的 Topic 中。此时尽管音讯曾经被写入了原始 Topic,但消费者是不可见的,Pulsar 中的事务隔离级别是 Read Commit。
  5. 提交事务。Producer 发送完所有的音讯后,提交事务,TC 会收到提交申请后,会播送告诉 RM 节点提交事务,更新对应的元数据,让音讯能够被消费者生产。

Setp- 4 中的音讯是如何保障长久化到 Topic 中又不可见的呢?

每个 Topic 中都会保留一个 maxReadPosition 属性,用来标识以后消费者能够读取的最大地位,当事务还未提交之前,尽管数据曾经长久化到 Topic 中,然而 maxReadPosition 是不会扭转的。因而 Consumer 无奈生产到未提交的数据。

音讯曾经长久化了,最初事务要回滚,这部分数据如何解决?

如果事务要回滚,RM 中会记录这个事务为 Aborted 状态。每条音讯的元数据中都会保留事务的 ID 等信息,Dispatcher 中会依据事务 ID 判断这条音讯是否须要投递给 Consumer。如果发现事务曾经完结,则间接过滤掉(外部确认掉音讯)。

最初提交事务时如果局部胜利、局部失败,如何解决?

TC 中有一个名为 TransactionOpRetryTimer 的定时对象,所有未全副胜利播送的事务都会交给它来重试,直到所有节点最终全副胜利或超过重试次数。那这个过程不会呈现一致性问题吗?首先咱们想想,呈现这种状况的场景是什么。通常是某些 Broker 节点宕机导致这些节点不可用,或是网络抖动导致临时不可达。在 Pulsar 中如果呈现 Broker 宕机,Topic 的归属是会转移的,除非整个集群不可用,否则总是能够找到一个新的 Broker,通过重试来解决。在 Topic 归属转移过程中,maxReadPosition 没有扭转,消费者也生产不到音讯。即便整个集群不可用,后续等到集群复原后,Timer 还是会通过重试让事务提交。

如果事务未实现,会阻塞一般音讯的生产吗?

会。假如咱们开启事务,发送了几条事务音讯,然而并未提交或回滚事务。此时持续往 Topic 中发送一般音讯,因为事务音讯始终没有提交,maxReadPosition 不会变动,消费者会生产不到新的音讯,会阻塞一般音讯的生产。这是合乎预期的行为,为了保障音讯的程序。而不同 Topic 之间不会相互影响,因为每个 Topic 都有本人的 maxReadPosition。

事务的实现

咱们能够把事务的实现分为五局部:环境、TC、生产者 RM、消费者 RM、客户端。因为生产和生产资源的治理是离开的,因而咱们会别离介绍。

环境设定

事务协调者的设置,须要从 Pulsar 集群的初始化时开始,咱们在第一章中有介绍如何搭建集群,第一次须要执行一段命令,初始化 ZooKeeper 中的集群元数据。此时,Pulsar 会主动创立一个 SystemNamespace,并在外面创立一个 Topic,残缺的 Topic 如下所示:

persistent://pulsar/system/transaction_coordinator_assign

这是一个 PartitionedTopic,默认有 16 个分区,每个分区就是一个独立的 TC。咱们能够通过 –initial-num-transaction-coordinators 参数来设置 TC 的数量。

TC 与 RM

接下来,咱们看看服务端的事务组件,如下图所示:

  • TransactionMetadataStoreService 是 Broker 上事务的总体协调者,咱们能够认为它是 TC。
  • TransactionMetadataStore 被 TC 用来保留事务的元数据,如:新创建的事务,Producer 注册上来的分区。这个接口有两个实现类,一个是把数据保留到 Bookkeeper 的实现,另外一个则间接把数据保留在内存中。
  • TransactionTimeoutTracker 服务端用于追踪超时的事务。
  • 各种 Provider,它们都属于工厂类,无需特地关注。
  • TopicTransactionBuffer 生产者的 RM,当事务音讯被发送到 Broker,RM 作为代理会记录一些元数据,而后把音讯存入原始 Topic。外部蕴含了 TopicTransactionBufferRecover 和 TransactionBufferSnapshotService,RM 的元数据会被结构化为快照并定时刷盘,这两个对象别离负责快照的复原和快照的保留。因为生产音讯是以 Topic 为单位,因而每个 Topic/Partition 都会有一个。
  • PendingAckHandle 消费者的 RM,因为生产是以订阅为单位的,因而每个订阅都有一个。

因为线上环境通常会应用长久化的事务,因而上面的原理都基于长久化实现。

所有事务相干的服务,在 BrokerService 启动时会初始化。TC 主题中,每个 Partition 都是一个 Topic,TransactionMetadataStoreService 在初始化时,会依据以后 Broker 纳管的 TC Partition,从 Bookkeeper 中复原之前长久化的元数据。每个 TC 会保留以下元数据:

  • newTransaction。新建一个事务,返回一个惟一的事务 ID 对象。
  • addProducedPartitionToTxn。注册生产者要发送音讯的 Partition 信息,用于后续 TC 告诉对应节点的 RM 提交 / 回滚事务。
  • addAckedPartitionToTxn。注册消费者要生产音讯的 Partition 信息,用于后续 TC 告诉对应节点的 RM 提交 / 回滚事务。
  • endTransaction。完结一个事务,能够是提交、回滚或者超时等。

咱们在初始化 PulsarClient 时,如果设置了 enableTransaction=true,则 Client 初始化时,还会额定初始化一个 TransactionCoordinatorClient。因为 TC 的 Tenant、Namespace 以及 Topic 名称都是固定的,因而 TC 客户端能够通过 Lookup 发现所有的 Partition 信息并缓存到本地,后续 Client 创立事务时,会轮询从这个缓存列表中选取下一个事务要应用的 TC。

Producer 事务管理

接下来咱们会开启一个事务:

// 创立事务
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();

下面这段代码中,会发送一个 newTxn 给某个 TC,并失去一个 Transaction 对象。

开启事务时,TransactionCoordinatorClient 会从缓存中选取一个 TC,而后往选定的 TC 所在的 Broker 发送一个 newTxn 命令,命令的构造定义如下所示:

message CommandNewTxn {
    required uint64 request_id = 1;
    optional uint64 txn_ttl_seconds = 2 [default = 0];
    optional uint64 tc_id = 3 [default = 0];
}

因为命令中蕴含了 TCID,因而即便多个 TC 被同一个 Broker 纳管也没有问题。Broker 会依据 TCID 找到对应的 TC 并解决申请。

Producer 发送音讯之前,会先发送一个 AddPartitionToTxn 命令给 Broker,只有胜利后,才会持续发送实在的音讯。事务音讯达到 Broker 后,传递给 TransactionBuffer 进行解决。期间 Broker 必定会对音讯进行去重校验,通过校验后,数据会保留到 TransactionBuffer 里,而 TransactionBuffer 只是一个代理(会保留一些元数据),它最终会调用原始 Topic 保留音讯,TransactionBuffer 在初始化时,构造方法须要传入原始 Topic 对象。咱们能够把 TransactionBuffer 看作是 Producer 端的 RM。

TransactionBuffer 中会保留两种信息,一种是原始音讯,间接应用 Topic 保留。另外一种是快照,快照中保留了 Topic 名称,最大可读地位信息(防止 Consumer 读到未提交的数据)、该 Topic 中曾经中断(aborted)的事务列表。

其中,中断的事务,是由 TC 播送告知其余 Broker 节点的,TransactionBuffer 接到信息后,会间接在原始 Topic 中写入一个 abortMarker,标记事务曾经中断,而后更新内存中的列表。abortMarker 也是一条一般的音讯,然而音讯头中的元数据和一般音讯不一样。这些数据保留在快照中,次要是为了 Broker 重启后数据能疾速复原。如果快照数据失落,TopicTransactionBufferRecover 会从尾到头读取 Topic 中的所有数据,每遇到一个 abortMarker 都会更新内存中的中断列表。如果有了快照,咱们只须要从快照处的终点开始读即可复原数据。

Consumer 事务管理

消费者须要在音讯确认时带上事务对象,标识应用事务 Ack:

\consumer.acknowledge(message, txn);

服务端每个订阅都有一个 PendingAckHandle 对象用于治理事务 Ack 信息,咱们能够认为它是治理消费者数据的 RM。当 Broker 发现音讯确认申请中带有事务信息,则会把这个申请转交给对应的 PendingAckHandle 解决。

所有开启了事务的音讯确认,不会间接批改游标上的 MarkDeleted 地位,而是先长久化到一个额定的 Ledger 中,Broker 内存中也会缓存一份。这个 Ledger 由 pendingAckStore 治理,咱们能够认为是 Consumer RM 的日志。

事务提交时,RM 会调用消费者对应的 Subscription,执行方才所有的音讯确认操作。同时,也会在日志 Ledger 中写入一个非凡的 Marker,标识事务须要提交。在事务回滚时,也会先在日志中记录一个 AbortMarker,而后触发 Message 从新投递。

pendingAckStore 中保留的日志是 redo log,该组件在初始化时,会先从日志 Ledger 中读取所有 redo log,从而在内存中重建先前的音讯确认信息。因为音讯确认是幂等操作,如果 Broker 不慎宕机,只须要把 redo log 中的操作从新执行一遍。当订阅中的音讯被真正确认掉后,pendingAckStore 中对应的 redo log 也能够被清理了。清理形式很简略,只须要挪动 pendingAckStore 中 Ledger 的 MarkDelete 地位即可。

再谈 TC

所有的事务提交、回滚,因为 Client 端告知 TC,或者因为超时 TC 主动感知。TC 的日志中保留了 Producer 的音讯要发往哪些 Partition,也保留了 Consumer 会 Ack 哪些 Partition。RM 扩散在每个 Broker 上,记录了整个事务中发送的音讯和要确认的音讯。当事务完结时,TC 则以 TCID 为 key,找到所有的元数据,通过元数据得悉须要告诉哪些 Broker 上的 RM,最初发动播送,告诉这些 Broker 上的 RM,事务须要提交 / 回滚。

序幕

Pulsar 中的设计细节十分多,因为篇幅无限,作者会整顿一系列的文章进行技术分享,敬请期待。如果各位心愿系统性地学习 Pulsar,能够购买作者出版的新书《深刻解析 Apache Pulsar》。

one more thing

目前,腾讯云音讯队列 TDMQ Pulsar 版(TDMQ for Pulsar,简称 TDMQ Pulsar 版)已开始正式商业化。音讯队列 Pulsar 版是一款基于 Apache Pulsar 自研的消息中间件,具备极好的云原生和 Serverless 个性,兼容 Pulsar 的各个组件与概念,具备计算存储拆散,灵便扩缩容的底层劣势。

各位想要理解的请点击 官网

正文完
 0