Apache-Pulsar-260-重磅发布新特性独家解读

31次阅读

共计 11149 个字符,预计需要花费 28 分钟才能阅读完成。

在 Pulsar 2.5.2 版本公布后的 1 个月,2020 年 6 月 18 日,Apache Pulsar 正式公布了 2.6.0 版本!

Pulsar 2.6.0 版本新增了诸多性能,并修复了大量破绽,笼罩存储端、Broker 端、客户端、Pulsar Functions、Pulsar IO、Pulsar SQL、Pulsar proxy 和平安等多方面,判若两人地丰盛和欠缺了 Pulsar 作为一个 云原生流数据平台 的能力。

2.6.0 版本总共承受了来自社区约 450 个 commits,越来越多的代码奉献来自于中国开发者,中国力量越发迅猛。

以下是 2.6.0 版本 重要 新增性能的详细信息。

Pulsar 外围

[PIP-37] 反对传输大音讯体的音讯

通过将大音讯体的音讯拆分成多个 chunk,该 PIP 反对生产和生产大音讯体的音讯。目前,该性能仅对 non-shared subscription 无效,并对客户端有改变。如需应用该性能,你须要将 Pulsar 客户端降级至 2.6.0。应用该个性能够在生产端启用音讯 trunking 机制。

client.newProducer()
    .topic("my-topic")
    .enableChunking(true)
    .create();
  • 更多对于 PIP-37 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-4440。

[PIP-39] 新增 system topic,用于存储 namespace 更改事件

在 Pulsar 2.6.0 以前,Pulsar 只能设置 namespace 级策略,属于同一 namespace 的所有 topic 都遵循雷同的 namespace 策略,但许多用户心愿能设置 topic 级别策略。另外,不应用和 namespace 级策略的实现形式是因为更多的 Topic 策略会减轻 ZooKeeper 累赘,而 system topic 的设计初衷是心愿将 topic 策略存储在 topic(而不是 ZooKeeper)中。该 PIP 是实现 topic 级策略的第一步,基于此,将来能实现更多相干性能。

  • 更多对于 PIP-39 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-4955。

[PIP-45] 反对可插拔元数据接口

该 PIP 反对 Pulsar 应用其余 metastore 服务(而不是 ZooKeeper),并反对 ManagedLedger 应用 MetadataStore 接口。通过 MetadataStore 接口,能较容易地减少其余元数据服务(例如 etcd)。

  • 更多对于 PIP-45 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-5358。

[PIP-54] 反对在 batch index 级确认音讯

在 Pulsar 2.6.0 以前,broker 仅在 batch message 级追踪音讯确认状态。如果局部批量音讯中已被确认,音讯从新发送给 consumer 时仍会收到“局部批量音讯已确认”的信息。该 PIP 反对在 batch index 级确认音讯。默认状况下,该性能未开启。如需开启,你能够在 broker.conf 文件中设置。

batchIndexAcknowledgeEnable=true
  • 更多对于 PIP-54 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-6052。

[PIP-58] 反对 consumer 设置自定义音讯重发延时

对于许多在线业务零碎而言,业务逻辑解决时常呈现各种异样,因而须要从新生产音讯,且用户心愿能够自定义设置延迟时间。在 Pulsar 2.6.0 之前,当客户端发送 nack 至 broker,Pulsar 会立即重发消息。从 Pulsar 2.6.0 开始,你能够为每条音讯设置重发延时。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .enableRetry(true)
                .receiverQueueSize(100)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                   .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
                        .build())
                .subscribe();

consumer.reconsumeLater(message, 10 , TimeUnit.SECONDS);
  • 更多对于 PIP-58 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-6449。

[PIP-60] 反对 SNI 路由,以反对更多 proxy server

在 Pulsar 2.6.0 之前,Pulsar 不反对应用其余 proxy(例如,Apache Traffic Server、HAProxy、Nginx 和 Envoy)的 SNI 路由,这些 proxy 的扩展性和安全性更高,并且大多反对 SNI 路由。SNI 路由能在不中断 SSL 连贯的状况下,将流量路由至目的地。

  • 更多对于 PIP- 60 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-6566。

[PIP-61] 反对多个 advertised address

该 PIP 容许 broker 裸露多个 advertised listeners,并反对内网和外网流量拆散。你能够在 broker.conf 文件中指定多个 advertised listeners。

advertisedListeners=internal:pulsar://192.168.1.11:6660,external:pulsar://110.95.234.50:6650

你也能够为客户端指定 listener 名称。

PulsarClient.builder()
.serviceUrl(url)
.listenerName("internal")
.build();
  • 更多对于 PIP-61 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-6903。

[PIP-65] Pulsar IO sources 反对 BatchSource

该 PIP 新增了以下性能:新增 BatchSource 接口,用于开发基于 batch 的 connector;新增 BatchSourceTriggerer 接口,用于触发 BatchSource 收集数据;提供了 BatchSourceExecutor 的零碎实现。

  • 更多对于 PIP-65 的信息,参阅这里。
  • 更多对于该性能的代码实现细节, 参阅 PR-7090。

Load balancer 增加 ThresholdShedder 策略

ThresholdShedder 策略比 LoadSheddingStrategy 策略更灵便。ThresholdShedder 策略计算 broker 的均匀资源应用状况。每个 broker 资源应用状况会与该平均值进行比照。如果超过(平均值 + 阈值),则触发 namespace bundle 转移至其余低负载 broker。你能够在 broker.conf 文件中启用该性能。

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder

你也能够自定义 ThresholdShedder 策略的参数。

# The broker resource usage threshold.
# When the broker resource usage is greater than the pulsar cluster average resource usage,
# the threshold shedder will be triggered to offload bundles from the broker.
# It only takes effect in ThresholdSheddler strategy.
loadBalancerBrokerThresholdShedderPercentage=10

# When calculating new resource usage, the history usage accounts for.
# It only takes effect in ThresholdSheddler strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=1.0

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=1.0

# Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently.
# It only takes effect in ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=10
  • 更多对于该性能的代码实现细节, 参阅 PR-6772。

Key Shared 在 Key_Shared 订阅中减少一致性 hashing 调配

在 Pulsar 2.6.0 之前,Key_Shared 订阅是通过应用 hash range 主动决裂来实现,该办法基于在新 consumer 退出或来到时决裂现有已调配 hash range。

Pulsar 2.6.0 为 Key_Shared 订阅新增一致性 hash 调配。你能够在 broker.conf 文件中启用该性能。主动决裂(auto split)办法仍默认开启。

# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
# consistent hashing to reassign keys to new consumers
subscriptionKeySharedUseConsistentHashing=false

# On KeyShared subscriptions, number of points in the consistent-hashing ring.
# The higher the number, the more equal the assignment of keys to consumers
subscriptionKeySharedConsistentHashingReplicaPoints=100

后续 Pulsar 版本打算默认开启一致性 hash 调配性能。

  • 更多对于该性能的代码实现细节, 参阅 PR-6791。

Key Shared[PR-7108] 解决了新增 consumer 时,KeyShared dispatcher 呈现的乱序问题

该 PR 对 Key_Shared 订阅性能十分重要。在 Pulsar 2.6.0 之前,当新增 consumer(c2)进入且现有 consumer(c1)退出时,程序在 Key_Shared dispatcher 中不能保障。这是因为之前调配至 c1 的音讯和 key 可能路由至 c2,这可能导致 Key_Shared 订阅中同一个 key 的音讯程序散发保障生效。

该 PR 做出了以下改变:为了保障音讯按序调配,在之前的音讯被确认之前,新增 consumer 会变成“暂停”状态。

如果你仍想应用涣散的程序散发保障,能够在 consumer 端进行以下设置。

pulsarClient.newConsumer()
    .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()。setAllowOutOfOrderDelivery(true))
    .subscribe();
  • 更多对于该性能的代码实现细节, 参阅 PR-7106 and PR-7108。

Key Shared 反对 key hash range reader

该 PR 反对 reader 读取某几个 hash range 的音讯。Broker 仅发送 key 的 hash 在 hash range 范畴内的音讯。

另外,你也能够为 reader 指定多个 key hash range。

pulsarClient.newReader()
                    .topic(topic)
                    .startMessageId(MessageId.earliest)
                    .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
                    .create();
  • 更多对于该性能的代码实现细节, 参阅 PR-5928。

[PR-5390] 将基于 JNI 的库替换成 AirCompressor(Java 压缩库)

在 Pulsar 2.6.0 之前,基于 JNI 的库用于压缩数据,但这些库有容量开销,且会影响 JNI 开销(通常它在压缩较小 payload 时计算)。

该 PR 将 LZ4、ZStd 和 Snappy 的压缩库替换成 AirCompressor,它是 Presto 应用的纯 Java 压缩库。

  • 更多对于该性能的代码实现细节, 参阅 PR-5390。

[PR-5985] 反对多个 Pulsar 集群应用雷同的 BookKeeper 集群

该 PR 容许多个 Pulsar 集群应用指定 BookKeeper 集群(通过指定 BookKeeper 客户端至 BookKeeper 集群的 ZooKeeper 连贯字符串)。

该 PR 新增配置项 bookkeeperMetadataServiceUri,用于发现 BookKeeper 集群元数据存储和应用元数据服务 URI,以初始化 BookKeeper 客户端。

# Metadata service uri that bookkeeper is used for loading corresponding metadata driver
# and resolving its metadata service location.
# This value can be fetched using `bookkeeper shell whatisinstanceid` command in BookKeeper cluster.
# For example: zk+hierarchical://localhost:2181/ledgers
# The metadata service uri list can also be semicolon separated values like below:
# zk+hierarchical://zk1:2181;zk2:2181;zk3:2181/ledgers
bookkeeperMetadataServiceUri=
  • 更多对于该性能的代码实现细节, 参阅 PR-5985。

[PR-6077] 反对在所有 subscription 已生产至最新消息后,删除非沉闷 topic

在 Pulsar 2.6.0 之前,Pulsar 反对删除非沉闷 topic(这些 topic 不包含 producer 和 subscription)。该 PR 反对当所有 topic 的 subscription 已生产至最新消息后且不存在沉闷 producer 或 consumer 时,删除非沉闷 topic。

你能够在 broker.conf 文件中设置应用该性能。打算未来能在 namespace 级应用该性能。

# Set the inactive topic delete mode. Default is delete_when_no_subscriptions
# 'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers
# 'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)
# and no active producers/consumers
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
  • 更多对于该性能的代码实现细节, 参阅 PR-6077。

[PR-6634] 新增 flag,用于在内存溢出时疏忽 broker 宕机

Topic 呈现高 dispatch rate 时,可能导致 broker 短暂呈现 OOM。一旦内存被开释,broker 能够在几分钟内恢复正常。但在 Pulsar 2.4.0(更多信息,参阅 PR-4196)中,内存溢出时重启 broker 会导致集群不稳固(topic 会在 broker 之间挪动)、重启多个 broker 并扰乱其它 topic。该 PR 新增 flag,在内存溢出时疏忽 broker 宕机,防止集群不稳固。

  • 更多对于该性能的代码实现细节, 参阅 PR-6634。

[PR-6668] 反对配置 ZooKeeper 缓存生效工夫

在 Pulsar 2.6.0 之前,无奈设置 ZooKeeper 缓存生效工夫,但有许多场景须要设置该值。

当初,你能够在 broker.conf 文件中设置 ZooKeeper 缓存生效工夫。

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300
  • 更多对于该性能的代码实现细节, 参阅 PR-6668。

[PR-6719] 优化了 consumer 获取批量音讯性能

当 consumer 向 broker 发送获取音讯申请时,该申请包含获取音讯数量(告知 broker 须要发送多少条音讯至 consumer)。如果 producer 应用批量性能生产音讯,broker 会依据 entry(而不是单条音讯)将数据存储在 BookKeeper 或 broker 缓存。解决 consumer 获取申请时,音讯数量和 entry 数量之间的映射存在问题。

该 PR 新增了 avgMessagesPerEntry 变量,用于记录存储在每条 entry 中的均匀音讯数量,并在 broker 发送音讯至 consumer 时更新均匀音讯数量。解决 consumer 获取申请时,avgMessagesPerEntry 变量映射获取申请数量至 entry 数量。另外,该 PR 向 consumer stats 中减少了 avgMessagePerEntry 指标信息。

你能够在 broker.conf 中启用 preciseDispatcherFlowControl

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false
  • 更多对于该性能的代码实现细节, 参阅 PR-6719。

[PR-7078] 为 topic 新增准确公布速率限度

在 Pulsar 2.6.0 之前,Pulsar 反对公布速率限度,但它并非准确管制。而当初,你能够在 broker.conf 文件中启用 topic 准确公布速率限度性能。

preciseTopicPublishRateLimiterEnable=true
  • 更多对于该性能的代码实现细节, 参阅 PR-7078。

[PR-7154] 反对 entry 查看提早

在 Pulsar 2.6.0 之前,新增 entry 查看提早是 10 ms(且用户无奈设置该值)。当初,对于生产提早敏感的场景,你能够在 broker.conf 文件中将新增 entry 查看提早设置成更小值(可能升高生产吞吐)或 0。

managedLedgerNewEntriesCheckDelayInMillis=10
  • 更多对于该性能的代码实现细节, 参阅 PR-7154。

[Schema] [PR-7139] KeyValue schema 反对 null key 和 null value。

  • 更多对于该性能的代码实现细节, 参阅 PR-7139。

[PR-7116] 反对设置 maxLedgerRolloverTimeMinutes 参数,用于触发 ledger 切换

该 PR 实现了一个监测线程,用于查看以后 topic ledger 是否满足 managedLedgerMaxLedgerRolloverTimeMinutes 条件并触发 ledger 切换使配置失效。如果触发 ledger 切换,你能够敞开以后 ledger 并开释以后 ledger 的存储空间。对于不常应用的 topic,以后 ledger 数据可能生效,且以后切换操作仅实用于新增 entry 时。很显然,这会节约磁盘空间。

监测线程在固定间隔时间查看 ledger 是否须要切换。你能够通过 managedLedgerMaxLedgerRolloverTimeMinutes 设置该工夫距离。

  • 更多对于该性能的代码实现细节, 参阅 PR-7116。

Proxy

[PR-6473] 新增用于获取 connection 和 topic stats 的 REST API

在 Pulsar 2.6.0 之前,Pulsar proxy 没有获取 proxy 外部信息的 stats,例如,无效连贯、topic stats(log 级)和其它信息。

该 PR 新增用于获取 connection 和 topic stats 的 REST API。

  • 更多对于该性能的代码实现细节, 参阅 PR-6473。

Admin

[PR-6331] 新增 get-message-by-id 命令

该 PR 新增 get-message-by-id 命令,反对通过 ledger ID 和 entry ID 检查单条信息。

  • 更多对于该性能的代码实现细节, 参阅 PR-6331。

[PR-6383] 新增强制删除 subscription 性能

该 PR 新增 deleteForcefully 办法,用于强制删除 subscription。

  • 更多对于该性能的代码实现细节, 参阅 PR-6383。

Functions

[PR-6895] 内置函数

反对像创立内置 connector 一样创立内置 function。

  • 更多对于该性能的代码实现细节, 参阅 PR-6895。

[PR-6031] 新增 Go function 心跳和 gRPC 服务

  • 更多对于该性能的代码实现细节, 参阅 PR-6031。

[PR-6348] 反对自定义系统配置

该 PR 反对在提交 function 时自定义零碎变量。该性能能够通过零碎变量传递认证信息。

  • 更多对于该性能的代码实现细节, 参阅 PR-6348。

[PR-6602] 拆散 function worker 和 broker 的 TLS 配置

  • 更多对于该性能的代码实现细节, 参阅 PR-6602。

[PR-6954] 反对在 function 和 source 中创立 consumer

在 Pulsar 2.6.0 之前,你能通过 function context 和 source context 创立 publisher,但不能创立 consumer。该 PR 修复了这一问题。

  • 更多对于该性能的代码实现细节, 参阅 PR-6954。

Pulsar SQL

[PR-6325] 反对 KeyValue schema

在 Pulsar 2.6.0 之前,Pulsar SQL 无奈读取 KeyValue schema 数据。

该 PR 反对 KeyValue schema,并为 key field 名称新增前缀 key.,为 value field 名称新增前缀 value.

  • 更多对于该性能的代码实现细节, 参阅 PR-6325。

[PR-4847] 反对多个 Avro schema 版本

在 Pulsar 2.6.0 之前,如果 topic 有多个 Avro schema,应用 Pulsar SQL 查问 topic 的数据会引起一些问题。从 Pulsar 2.6.0 开始,如需查问 topic 中的数据,能够更新 schema,该 schema 能够兼容 topic 中的所有 schema。

  • 更多对于该性能的代码实现细节, 参阅 PR-4847。

Java client

[PR-6648] 新增 API,用于在敞开 producer 时,Pulsar 客户端会持续期待正在传输的音讯传输实现

在 Pulsar 2.6.0 之前,当敞开 producer 时,Pulsar 客户端会立即让正在传输的音讯生效(即便音讯已在 broker 长久化)。大部分状况下,用户心愿能在敞开 producer 前期待正在传输的音讯实现(而不是使这些音讯生效),但 Pulsar 客户端 lib 未实现该性能。

该 PR 反对通过 flag(管制是否期待正在传输的音讯)敞开 API。你能够在敞开 producer 之前期待正在传输的音讯,Pulsar 客户端不会立刻使这些音讯生效。

  • 更多对于该性能的代码实现细节, 参阅 PR-6648。

[PR-6760] 反对从输出流中动静加载 TLS certs/key

在 Pulsar 2.6.0 之前,Pulsar 客户端提供 TLS 认证性能,默认 TLS provider AuthenticationTls 的值为 cert 和 key 文件的文件门路,但在有些利用场景中很难为 TLS 认证存储 cert 或 key。

该 PR 为 AuthenticationTls 新增流反对,以提供 X509Certs 和 PrivateKey(当指定 provider 呈现流变动时,PrivateKey 会自动更新)。

  • 更多对于该性能的代码实现细节, 参阅 PR-6760。

[PR-6825] 异步发送音讯时,如果程序抛出异样,零碎会返回 sequence ID

在 Pulsar 2.6.0 之前,如果异步发送音讯失败,程序会抛出异样,但并不显示哪些音讯不失常,用户也无奈理解须要重试哪些音讯。

此次改变更新了客户端。当抛出异样时,程序会设置 sequenceId org.apache.pulsar.client.api.PulsarClientException

  • 更多对于该性能的代码实现细节, 参阅 PR-6825。

参考信息

Pulsar

  • 如需下载 Apache Pulsar 2.6.0,点击这里。
  • 更多对于 Apache Pulsar 2.6.0 的信息,查阅 2.6.0 版本阐明 和 2.6.0 PR 列表。

任何问题或倡议,欢送通过 Pulsar mailing list 或 Slack 分割咱们:

  • Pulsar mailing list: users@pulsar.apache.org and dev@pulsar.apache.org。
  • Pulsar Slack: https://apache-pulsar.slack.c…

期待你为 Pulsar 的倒退添砖加瓦。

如果你对 Pulsar 示例、demo、工具或扩大感兴趣,欢送查阅 StreamNative GitHub。

正文完
 0