对于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
GitHub 地址:http://github.com/apache/pulsar/
导语:本文是 StreamNative 开发工程师、Apache Pulsar Committer 丛搏在 Pulsar Summit Asia 2020 大会上的演讲《技术探索:Apache Pulsar 的事务型事件流》文字整顿版本,本演讲次要对 Apache Pulsar 事务原理及布局进行了分享,请大家参考。
我叫丛搏,是来自 StreamNative 的开发工程师。明天我所带来的主题是《技术探索:Apache Pulsar 的事务型事件流》。
音讯语义
大家都晓得在所有的音讯零碎流数据平台对于音讯都有不同的语义。个别语义分为三种:At-most once、At-least once、Exactly once。
- At-most once:至少一次,不关怀音讯是否发送胜利、不须要音讯发送的返回值。
- At-least once:至多一次,容许音讯反复但必须保障音讯必达。
- Exactly once:精准一次,保障音讯不丢且不会反复。
At Most Once(最多一次)
Pulsar 在 1.2.0 版本之前,曾经实现 At Most Once 语义。
At Least Once(至多一次)
Pulsar 在设计之初就遵循 At Least Once 语义。发送音讯失败后重试是保障 At Least Once 语义的根本形式。发送重试会导致音讯反复,在某些应用场景下要求 Producer 不能发送反复音讯且 Consumer 不能反复生产,因而产生了 Exactly Once 语义。
Exactly Once(准确一次)
实现 Exactly once 须要实现对生产 / 生产的去重。
Pulsar 中如何去重?
- Producer: 幂等性 Producer;
- Broker: 保障音讯反复数据删除 (PIP-6);
- Consumer: Reader + Checkpoints (Flink / Spark)。
如何开启 Exactly once?
对 topic 的 name space 进行 set-deduplication 的设置。通过 admin 等等的一些操作:
bin/pulsar-admin set-deduplication -e tenant/namespace
;- 创立 Producer 时设置 Producer name、Sequence id;
- 产生音讯时指定递增的序列 id。
限度:
- 仅在向一个分区生成音讯时无效;
- 仅实用于产生一条音讯;
- 在一个分区或多个分区上生成多个音讯时没有原子性;
- Consumer 须要存储 message id 及其状态,并在还原状态时查找 message id。
Transaction 如何处理事件
通过转账的逻辑操作的例子来讲述流音讯零碎中 Transaction 是怎么处理事件的:
当初有 Alice 和 Bob 两个人。Alice 会给 Bob 转账十块钱。通过 Pulsar 如何来实现这个性能呢?
- Transfer Topic : 记录转账的申请;
- Cash Transfer Function: 解决转账的行为;
- BalanceUpdate Topic: 记录余额更新申请。
Alice 转给 Bob。Transfer Function 收到这条转账音讯,会向 BalanceUpdate Topic 发送一条 Bob 余额减少十块钱的音讯,向 BalanceUpdate Topic 发送一条 Alice 余额缩小十块钱的音讯。接管到所有返回值后,Ack 这条转账音讯。在所有的操作不会产生任何失败的时候它是没有问题的。然而往往大失所望,它的所有操作都可能会产生问题。
如图 1 所示,Ack 失败后会从新再生产这条转账的音讯,所带来的结果就是 Alice 再次给 Bob 转了十块钱,Alice 共给 Bob 转了二十块钱。假使每次 Ack 都失败,有可能 Alice 的账户负债累累,Bob 成了亿万富翁。
如图 2 所示,Bob 减少余额的音讯没有胜利发送到所对应的 BalanceUpdate Topic 中,所带来的景象是 Bob 余额没有减少,Alice 的余额却缩小了。
Pulsar Transaction
如何用 Pulsar 的 Transaction 来实现这件事件的?
Transaction 语义:
- 保障多分区原子性音讯写入;
- 保障原子性确认多个订阅;
- 一个事务中进行的所有操作全副胜利或全副失败;
- 容许 Consumers 读取已提交的音讯。
没有 Transaction API 如何实现上述的例子?
Message<String> message = inputConsumer.receive();
CompletableFuture<MessageId> sendFuture1 =
producer1.newMessage().value(“output-message-1”).sendAsync();
CompletableFuture<MessageId> sendFuture2 =
producer2.newMessage().value(“output-message-2”).sendAsync();
inputConsumer.acknowledgeAsync(message.getMessageId());
如图 3 所示:
从 Input Consumer 中接管到音讯之后,Producer1 会发送音讯到 topic1 中,Producer2 会发送一条音讯到 topic2,而后 Ack 接管到的音讯。
Pulsar 的 Transaction API 其实很简略,对于原有需实现的逻辑没有太大的扭转:
Message<String> message = inputConsumer.receive();
Transaction txn = client.newTransaction().withTransactionTimeout(…).build().get();
CompletableFuture<MessageId> sendFuture1 =
producer1.newMessage(txn).value(“output-message-1”).sendAsync();
CompletableFuture<MessageId> sendFuture2 =
producer2.newMessage(txn).value(“output-message-2”).sendAsync();
inputConsumer.acknowledgeAsync(message.getMessageId(), txn);
txn.commit().get();
MessageId msgId1 = sendFuture1.get();
MessageId msgId2 = sendFuture2.get();
inputConsumer.acknowledgeAsync(message.getMessageId(), txn);
txn.commit().get();
Pulsar Transaction 有如下三大组件:
- TC(Transaction Coordinator)负责管理 Transaction 元数据。
- TB(Transaction Buffer)负责解决发送带有 Transaction 的音讯。
- TP(Transaction Pending Ack)负责解决带有 Transaction 的 Ack 申请。
如图 4 所示:创立 Transaction 的操作记录在 TC 中。
如图 5 所示:Pulsar Client 已胜利创立 Txn1,并向 TC 申请 Txn1 将发送音讯到 Topic1 和 Topic2,TC 收到发送申请后记录发送元数据并响应 Client。Client 向 Topic1,Topic2 别离发送一条音讯。
如图 6 所示:与图 5 所形容基本相同。仅发送与签收的区别。
如图 7、图 8 所示:Pulsar Client 期待所有 ACK 和 Produce 实现后 Commit Transaction,TC 接管到 Commit 申请后 Txn1 的状态更改成 Committing 后会解决 TP 和 TB 中 Txn1 的信息。
如图 9 所示:解决 TP、TB 实现后 TC 会更改 Txn1 的状态为 Committed。
以上是一个 Transaction 残缺的生命周期。
再来看一下转账的例子:
当有 Pulsar Transaction 的反对后,所有的操作要么胜利,要么都失败。就保障了对 Alice 和 Bob 余额操作的正确性。
Pulsar Transaction 的将来布局
Pulsar Transaction 的设计目标是让事件流零碎变得更加简略、可靠性更增强。对于许多业务场景来说,在解决业务场景时就可能会少了很多解决幂等性的操作等等。
那么,以下就是 Pulsar Transaction 将来的开发布局:
- Transaction support in other languages (e.g. C++, Go)
- Transaction in Pulsar Functions & Pulsar IO
- Transaction in Kafka-on-Pulsar (KOP)
- Transaction for Flink / Spark job
- Transaction for State storage in Pulsar Functions
大家对于上方的内容感兴趣欢送扫描下方二维码回复「入群」随时在 Pulsar 交换群中与咱们一起探讨。
对于文中的介绍想要理解更多,请扫描下方小程序码查看完整版视频:
相干浏览
- Pulsar 2.7.0 新增个性概览:事务反对、Topic 级别策略配置等
- Apache Pulsar Committer 新成员:Transaction 背地的工程师
点击链接,获取 Apache Pulsar 硬核干货材料!