对于 StreamNative
StreamNative 是一家开源根底软件公司,由 Apache 软件基金会顶级我的项目 Apache Pulsar 开创团队组建而成,围绕 Pulsar 打造下一代云原生批流交融数据平台。StreamNative 作为 Apache Pulsar 商业化公司,专一于开源生态和社区构建,致力于前沿技术畛域的翻新,开创团队成员曾就任于 Yahoo、Twitter、Splunk、EMC 等出名大公司。
导语:本文是 Apache Pulsar PMC 成员,StreamNative 首席架构师李鹏辉在 TGIP-CN 037 直播流动的文字整顿版本。Pulsar 2.10.0 版本行将公布,本场直播为大家带来 Apache Pulsar 2.10.0 的次要新个性及版本解读,解答大家对新版本对于技术细节的疑难。
点击 查看回顾视频
Pulsar 2.10.0 蕴含来自于 99 位贡献者的 1000+ commits,其中诸多奉献来自于国内的贡献者,感激大家对 Pulsar 的反对与奉献。 本次版本公布是一次新的里程碑,如此多的 commit 数量也为文档带来了降级;Apache Pulsar 网站降级中,新网站 Beta 版本对文档进行了从新归档与欠缺,欢送大家试用并提出宝贵意见。
Apache Pulsar 2.10.0 版本新个性内容包含:
- 去除对 ZooKeeper 的强依赖;
- 新的生产类型 TableView;
- 多集群主动故障转移;
- Producer Lazy Loading + Partial RoundRobin;
- Redeliver Backoff;
- Init Subscription for DLQ;
- 引入多集群全局 Topic Policy 设置反对以及 Topic 级别的跨地区复制配置;
- ChunkMessageId;
- 减少批量操作 Metadata 服务的反对:能够在大量 Topic 的场景下晋升 Pulsar 稳定性;
- ...
去除对 ZooKeeper API 强依赖
ZooKeeper 是 Pulsar 中应用十分宽泛的一个 API,旧版对该 API 的依赖无处不在,但这种依赖不利于用户抉择其余类型的元数据服务。为了解决这一问题,Pulsar 通过多个版本的迭代,做了大量筹备和测试工作后终于在 2.10.0 版本去除了对 ZooKeeper 的强依赖。
目前新版反对三种元数据服务:
- ZooKeeper
- Etcd
- RocksDB(standalone)
其中须要留神的是 Etcd 目前没有很好的 Java 客户端,综合考量应用要谨慎。此外从 benchmark 测试问题来看 ZooKeeper 与 Etcd 的性能是相近的,用户能够依据本身状况来抉择。
该个性的提案是 PIP 45 : Pluggable metadata interface(Metadata Store + Coordination Service)。顾名思义,这里的 Metadata Store 是一个元数据存储,而 Coordination Service 则提供了一个中心化服务来取得全局锁。
2.10 版本还减少了 Metadata 批量操作反对,升高客户端与服务端交互需要,大幅加重了 metadata 的操作压力。
# The metadata store URL# Examples:# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181# * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path) metadataStoreUrl=# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl configurationMetadataStoreUrl=
以上就是新个性的 API 实现,能够看到上述配置参数曾经不再须要 ZooKeeper。但去除对 ZooKeeper 的依赖并不代表会将其移除。思考到 ZooKeeper 还有大量用户,应用较为宽泛,所以短期内官网并不会思考删除实现,而只是将其插件化来方便使用。
更多加重 ZooKeeper 依赖细节,能够浏览博客 Apache Pulsar 轻装上阵:迈向轻 ZooKeeper 时代。
新的生产类型 TableView
Pulsar 的生产模式比拟多样化,现在 2.10 版本再引入了 TableView,这是一个相似于 KV 的表格服务。它是一个不反对写入的纯视图,能够间接在客户端内存构建表格视图,适宜数据量不大的场景生成视图。但 TableView 不太适合数据量较大、单台机器的内存都难以承受的场景。
TableView 可用于配合 Topic Compaction,后者能够在服务端做 Key 的压缩。这一个性的原理是只在一个 snapshot 中保留 key 的最新状态,consumer 须要时只需查看这个 snapshot,而无需去耗费更多老本读取原始 backlog。TableView 能够与该个性无缝连接,复原 TableView 时就能够间接利用 broker 生成的 snapshot,从而减小复原开销。原视频 22 分处具体介绍了这种压缩的机制和应用场景。
try (TableView<byte[]> tv = client. newTableViewBuilder (Schema.BYTES) .topic ("public/default/tableview-test") .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .create()) { System.out.println("start tv size: " + tv.size()); tv. forEachAndListen((k, v) -> System.out.println(k + "->"+ Arrays. toString(v))); while (true) f Thread. sleep (20000) ; System.out.println(tv.size)): tv. forEach((k, v) -> System, out.println("checkpoint: "+ k+ "->" + Arrays.toString(v))); }} catch (Exception ex) { System.out.println("Table view failed: " + ex. getCause());}
Cluster 主动故障转移
ServiceUrlProvider provider = AutoClusterFailover.builder() .primary(primary) .secondary(Collections.singletonList(secondary)) .failoverDelay(failoverDelay, TimeUnit.SECONDS) .switchBackDelay(switchBackDelay, TimeUnit.SECONDS) .checkInterval(checkInterval, TimeUnit.MILLISECONDS) .build();
Pulsar 反对多集群,集群间可同步数据,因而用户常常会有集群间的故障转移需要,所以引入主动故障转移个性。过来做故障转移时须要通过域名切换,或者应用自制的辅助节点,但这些办法往往都须要人工染指,SLA 难以保障。新个性的劣势在于自动化与可配置,可设置 primary 与 secondary 集群,配置提早等参数,通过探活实现集群主动按预期切换。但目前该个性在探活时只探测 Pulsar 端口是否接通,将来版本将持续改良探活形式。
Producer Lazy loading + Partial RoundRobin
以后在一个规模较大的集群中,如果 partition 比拟多,则发送音讯时 producer 须要轮询所有 partition,且 partition 可能散布在不同 broker 上可能产生微小的连贯压力,如上图上半局部。为此,该新个性实现了 producer 懒加载,这样一来如果不必某个 partition 就不会创立它,加重了零碎累赘。而局部轮询会先 List 所有 partition 再将它们 shuffle,实现不同客户端写入不同的 partition,缩小 producer 实例与 broker 的连贯数量,同样能够升高零碎压力。须要留神的是 Shared consumer 临时不反对这种机制,将来须要社区独特摸索这一方面是否实现相似的机制。
PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer() .topic(topic) .enableLazyStartPartitionedProducers(true) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.CustomPartition) .messageRouter(new PartialRoundRobinMessageRouterImpl(3)) .create();
Redeliver Backoff
client.newConsumer().negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder() .minDelayMs(1000) .maxDelayMs(60 * 1000) .build()).subscribe();client.newConsumer().ackTimeout(10, TimeUnit.SECOND) .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder() .minDelayMs(1000) .maxDelayMs(60 * 1000) .build()).subscribe();
Pulsar现有 ackTimeout 机制,如果应用 shared 订阅,生产数据时可能在一段时间内无奈签收,则 ackTimeout 能够保障超过肯定工夫仍未签收时客户端主动从新投递音讯,将音讯从新散发到其余 consumer 上。
音讯从新投递的工夫很难确定,长短不一,且随着音讯解决失败次数越来越多,提早须要越来越长。为此引入了该 API,能够逐步缩短提早。相比现有办法,这个个性的劣势在于开销更小,不须要通过另一个 topic,且更加灵便。不足之处是一旦客户端宕机会导致音讯立刻重试。另外该个性应用老本很低,API 简洁,很容易把握。须要留神的是目前只有 Java 客户端反对该个性,另外它能够配合死信队列应用。
初始化死信队列订阅
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .ackTimeout(1, TimeUnit.SECONDS) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) .initialSubscriptionName(my-sub) .build()) .receiverQueueSize(100) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest ) .subscribe();
死信队列的创立是懒创立策略,这就呈现了一个问题:死信音讯尚未产生、topic 尚未创立时,就无奈给 topic 指定数据保留策略。为此只能给 namespace 创立策略,粒度会很大。新版引入 InitialSubscriptionName
,在设置死信队列时能够在 create 时同时创立一个订阅,这样数据就能够保留下来。且对于死信队列,大部分场景仅需一个订阅解决即可,于是 订阅就会和 InitialSubscriptionName
对应,这样无需设置 retention,就能够保留发到死信队列的音讯。
跨集群 topic 策略
bin/pulsar-admin topics set-retention -s 1G -t 1d --global my-topicMessage message = MessageBuilder.create() ... .setReplicationClusters(restrictDatacenters) .build();producer.send(message);
该个性能够跨集群利用 topic policy,通过 -global
参数对所有集群失效。外表上来看一个全局参数很容易实现,其实背地做了很多工作。次要是底层须要将 schema 同步到所有集群,能力做到跨集群利用。须要留神的是 broker 没有重试策略,以下两种形式任选其一:
- 被动告知 broker 重试;
- 断开客户端。
新的音讯 ID 类型 ChunkMessageId
public class ChunkMessageIdImpl extends MessageIdImpl implements Messaged { private final MessageIdImpl firstChunkMsgId; public ChunkMessageIdImpl(MessageIdImplfirstChunkMsgId,MessageIdImpllastChunkMsgId){ super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex()); this. firstChunkMsgId = firstChunkMsgId; } public MessageIdImplgetFirstChunkMessageId(){ return firstChunkMsgId; } public MessageIdImplgetLastChunkMessageId(){ return this; }}
之前版本引入的 ChunkMessage 能够无效加重零碎压力,但存在的问题是 Chunk 中只有最初的 MessageId 返回给客户端,这样客户端就无奈通晓后面的 Id。这个个性解决了 ChunkMessage 开始到完结对应的 Id 缺失问题,不便用户 seek MessageId 并生产。目前该个性只有 Java 客户端反对。
其余个性
- Topic properties:给 Topic 附加 name 之外的更多信息,与 metadata 一并存储;
- Metadata batch operation:晋升性能;
- CPP client:提供 chunk message 反对;
- Broker graceful shutdown:反对 REST API 优雅地敞开 Broker,先将 Broker 从集群拿掉后再敞开 Topic,防止持续发动对 Broker 的连贯;
Support creat a consumer in the paused state
:在 create consumer 时能够指定暂停状态,不向服务端获取音讯;- ...
精选 Q&A
新个性介绍结束后,李老师还对观众弹幕问题一一作了解答,以下为 QA 精选内容概要,详情见视频 54 分钟后。
Q:Pulsar 是否反对 failover?
- Pulsar 反对 failover 模式,且一个 partition 能够有多个 consumer,开销次要存在于音讯签收上。Pulsar 反对保护单条音讯签收状态,因而会有肯定开销;
Q:Pulsar 对 ack 的操作是 exactly once 吗?
- Pulsar 对不开启 transaction 的状况默认是 at least once 实现,而不是 exactly once;
Q:ChunkMessage 反对事务吗?
- 目前暂不反对事务;
Q:Pulsar 的音讯发送是否会有两头失败前面胜利的状况?
- Pulsar 中所有音讯的发送不会呈现两头失败前面胜利的状况,其中一个失败前面都会失败;
Q:元数据导致的集群规模限度问题如何解决?
- 2.10 版本暂未解决元数据导致的集群规模限度问题,将来思考解决;
Q:KoP 反对 Kafka format 吗?
- 当初 KoP 能够反对 Pulsar format 与 Kafka format,防止服务端的序列化/反序列化,把工作交给客户端。客户端加载一个 formatter 就能够解析 Kafka format 数据,加重对 broker 的压力。
对于PPT
请复制链接到浏览器下载PPT:https://pan.baidu.com/s/1sqt9...
明码: 6wtk
相干浏览
- 直播回顾|TGIP-CN 036:Apache Pulsar 最新技术停顿与动静
- 直播回顾| TGIP-CN 035: Apache Pulsar 入手实战第二期:容器部署实战
[博文举荐|Apache Pulsar 轻装上阵:迈向轻 ZooKeeper 时代](https://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng==&mid=2247490737&idx=1&sn=1551d4a31ae062909ce05ee04a6b0541&scene=21#wechat_redirect)
关注公众号「Apache Pulsar」,获取更多技术干货
退出 Apache Pulsar 中文交换群
点击立刻观看 TGIP-CN 37:Apache Pulsar 2.10.0 新特色解析 回顾视频!