关于2021总结:一文读懂-Apache-Pulsar

54次阅读

共计 9630 个字符,预计需要花费 25 分钟才能阅读完成。

Pulsar 介绍

Apache Pulsar 作为 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、跨区域复制、具备强一致性、高吞吐、低提早及高可扩展性等流数据存储个性。

Pulsar 诞生于 2012 年,最后的目标是为在 Yahoo 外部,整合其余音讯零碎,构建对立逻辑、撑持大集群和跨区域的音讯平台。过后的其余音讯零碎(包含 Kafka),都不能满足 Yahoo 的需要,比方大集群多租户、稳固牢靠的 IO 服务质量、百万级 Topic、跨地区复制等,因而 Pulsar 应运而生。

Pulsar 的要害个性如下:

  • Pulsar 的单个实例原生反对多个集群,可跨机房在集群间无缝地实现音讯复制。
  • 极低的公布提早和端到端的提早。
  • 可无缝扩大到超过一百万个 Topic。
  • 简略的客户端 API,反对 Java、Go、Python 和 C++.
  • 反对多种 Topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
  • 通过 Apache BookKeeper 提供的长久化音讯存储机制保障消息传递。
  • 由轻量级的 Serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更容易移入、移出 Apache Pulsar。
  • 分层存储可在数据古老时,将数据从热存储卸载到冷 / 长期存储(如 S3、GCS)中。

社区:

目前 Apache Pulsar 在 Github 的 star 数量是 10K+,共有 470+ 个 contributor。并且正在继续更新,社区的活跃度比拟好。

概念

Producer

音讯的源头,也是音讯的发布者,负责将音讯发送到 topic。

Consumer

音讯的消费者,负责从 topic 订阅并生产音讯。

Topic

音讯数据的载体,在 Pulsar 中 Topic 能够指定分为多个 partition,如果不设置默认只有一个 partition。

Broker

Broker 是一个无状态组件,次要负责接管 Producer 发送过去的音讯,并交付给 Consumer。

BookKeeper

分布式的预写日志零碎,为音讯零碎比 Pulsar 提供存储服务,为多个数据中心提供跨机器复制。

Bookie

Bookie 是为音讯提供长久化的 Apache BookKeeper 的服务端。

Cluster

Apache Pulsar 实例集群,由一个或多个实例组成。

云原生架构

Apache Pulsar 采纳计算存储拆散的一个架构,不与计算逻辑耦合在一起,能够做到数据独立扩大和疾速复原。计算存储分离式的架构随着云原生的倒退,在各个系统中呈现的频次也是越来越多。Pulsar 的 Broker 层就是一层无状态的计算逻辑层,次要负责接管和散发音讯,而存储层由 Bookie 节点组成,负责音讯的存储和读取。

Pulsar 的这种计算存储分离式的架构,能够做到程度扩容不受限制,如果零碎的 Producer、Consumer 比拟多,那么就能够间接扩容计算逻辑层 Broker,不受数据一致性的影响。如果不是这种架构,咱们在扩容的时候,计算逻辑和存储都是实时变动的,就很容易受到数据一致性的限度。同时计算层的逻辑自身就很简单容易出错,而存储层的逻辑绝对简略,出错的概率也比拟小,在这种架构下,如果计算层呈现谬误,能够进行单方面复原而不影响存储层。

Pulsar 还反对数据分层存储,能够将旧音讯移到便宜的存储计划中,而最新的音讯能够存到 SSD 中。这样能够节约老本、最大化利用资源。

集群架构

Pulsar 的集群由多个 Pulsar 实例组成的,其中包含

  • 多个 Broker 实例,负责接管和散发音讯
  • 一个 ZooKeeper 服务,用来协调集群配置
  • BookKeeper 服务端集群 Bookie,用来做音讯的长久化
  • 集群之间通过跨地区复制进行音讯同步

设计原理

pulsar 采纳公布 - 订阅的设计模式(pub-sub), 在该设计模式中 producer 公布音讯到 topic,consumer 订阅 topic 中的音讯并在解决实现之后发送 ack 确认。

Producer

发送模式

Producer 发送音讯有两种模式,能够同步(sync)或异步(async)的形式公布音讯到 broker。

  • 同步发送音讯是 Producer 发送音讯当前要等到 broker 的确认当前才会认为音讯发送胜利,如果没有收到确认就认为音讯发送失败。

    MessageId messageId = producer.send("同步发送的音讯".getBytes(StandardCharsets.UTF_8));
  • 异步发送音讯是 Producer 发送音讯,将音讯放到阻塞队列中并立刻返回。不须要期待 broker 的确认。

    CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync("异步发送的音讯".getBytes(StandardCharsets.UTF_8));

拜访形式

Pulsar 为 Producer 提供多种不同类型的 Topic 拜访模式:

  • Shared

    默认状况下多个生产者能够公布音讯到同一个 Topic。

  • Exclusive

    要求生产者以独占模式拜访 Topic,在此模式下 如果 Topic 曾经有了生产者,那么其余生产者在连贯就会失败报错。

    Topic has an existing exclusive producer: standalone-0-12

  • WaitForExclusive

    如果主题曾经连贯了生产者,则将以后生产者挂起,直到生产者取得了 Exclusive 拜访权限。

能够通过以下形式来设置拜访模式:

Producer<byte[]> producer = pulsarClient.newProducer().accessMode(ProducerAccessMode.Shared).topic("test-topic-1").create();

压缩

Pulsar 反对对 Producer 发送的音讯进行压缩,Pulsar 反对以下类型的压缩:

  • LZ4

    LZ4 是无损压缩算法,提供每核 > 500 MB/s 的压缩速度,可通过多核 CPU 进行扩大。它具备极快的解码器,每个内核的速度达数 GB/s,通常达到多核零碎上的 RAM 速度限制。

  • ZLIB

    zlib 旨在成为一个收费的、通用的、不受法律束缚的——也就是说,不受任何专利爱护——无损数据压缩库,简直能够在任何计算机硬件和操作系统上应用。zlib 数据格式自身能够跨平台移植。

  • ZSTD

    Zstandard 是一种疾速压缩算法,提供高压缩率。它还为小数据提供了一种非凡模式,称为字典压缩。参考库提供了十分宽泛的速度 / 压缩衡量,并由极快的解码器提供反对。

  • snappy

    Snappy 是一个压缩 / 解压库。它不以最大压缩为指标,也不与任何其余压缩库兼容; 相同,它的指标是十分高的速度和正当的压缩。

批处理

Producer 反对在单次申请发送批量音讯,能够通过(acknowledgmentAtBatchIndexLevelEnabled = true) 来开启批处理。当一个批次的所有音讯都被 Consumer 生产确认后,这个批次的音讯才会被确认发送胜利。如果呈现了意外故障,可能会导致这个批次的所有音讯从新投递,包含曾经被确认生产过得音讯。

为了防止这个问题,Pulsar 2.6.0 当前开始引入了批量索引确认,由 broker 来保护每个索引的确认状态,防止将已确认的音讯发送给 Consumer。当这个批次的所有音讯索引都被确认后,将会删除批音讯。

音讯分块

Producer 反对将音讯分块,能够应用 chunkingEnabled=true 启动分块,启用分块时,要留神一下几点:

  • 不能同时启动批处理和分块,要启动分块,必须提前禁用批处理。
  • 仅长久化主题反对分块。
  • 仅对独占和故障转移订阅类型反对分块。

当分块被启用的时候,如果 Producer 发送的音讯超过最大负载大小,则 Producer 会将原始音讯拆分成多个块音讯,而后将每个块发送给 broker,分块音讯在 broker 内的存储和失常的音讯是一样的。只是在 Consumer 生产的时候,发现这是一个分块音讯,就须要在缓存分块音讯,当收集完一个音讯的所有分块组合成原始放到接收者队列外面给客户端生产。如果 Producer 未能发送所有的分块音讯,Consumer 有过期解决机制,默认的过期工夫为一个小时。

一个生产者和一个有序的消费者的分块音讯模型:

多个生产者和一个有序消费者的分块音讯模型:

Consumer

Consumer 是音讯的消费者,通过订阅指定的 Topic 从 broker 中获取音讯。

Consumer 向 broker 发送流申请以获取音讯。在 Consumer 端有一个队列来接管从 broker 推送的音讯。您能够应用 receiverQueueSize 参数配置队列大小。默认大小为 1000)。每次 consumer.receive()调用时,都会从缓冲区中取出一条音讯。

接管形式

音讯能够同步(sync)或异步(async)的形式从 broker 接管,还能够通过 MessageListener 返回音讯:接管音讯后回调用户的 MessageListener。

  • 同步接管音讯将被阻塞,直到有音讯可用。

    Message<byte[]> message = consumer.receive();
    System.out.println("接管音讯内容:" + new String(message.getData()));
    consumer.acknowledge(message);  // 确认生产音讯
  • 异步接管音讯将会立刻返回一个将来值。应用 CompletableFuture。如果 CompletableFuture 实现音讯接管,应该随后调用 receiveAsync(),否则,它会在应用程序中创立接管申请的积压。

    能够通过调用.cancel(false) (CompletableFuture.cancel(boolean))在实现之前勾销返回的future,以将其从接管申请的积压中删除。

    CompletableFuture<Message<byte[]>> messageCompletableFuture = consumer.receiveAsync();
    Message<byte[]> message = messageCompletableFuture.get();
    System.out.println("接管音讯内容:" + new String(message.getData()));
    consumer.acknowledge(message); // 确认生产音讯
  • 客户端库为消费者提供侦听器实现。例如,Java 客户端提供了一个 MesssageListener 接口,每当收到新音讯时都会调用 received 办法。

    pulsarClient.newConsumer().topic("test-topic-1").messageListener((MessageListener<byte[]>) (consumer, msg) -> {System.out.println("承受音讯内容:" + new String(msg.getData()));
                try {consumer.acknowledge(msg);  // 确认生产音讯
                } catch (PulsarClientException e) {consumer.negativeAcknowledge(msg);  // 音讯生产失败
                }
            }).subscriptionName("test-subscription-1").subscribe();

生产确认

胜利确认

当 Consumer 胜利生产一条音讯当前,须要向 broker 发送音讯确认。只有在所有的订阅都确认后,音讯才会被删除。如果须要存储曾经确认生产胜利的音讯,须要设置音讯保留策略。否则 Pulsar 会立刻删除所有确认生产胜利的音讯。

对于批处理的音讯,能够通过以下两种形式确认音讯:

  • 音讯被独自确认。通过独自确认,消费者须要确认每条音讯并向代理发送确认申请。
  • 音讯被累积确认。通过累积确认,消费者只须要确认它收到的最初一条音讯。流中直到提供的音讯的所有音讯都不会从新传递给该消费者。

失败确认:

当 Consumer 生产失败一条音讯,想要再次生产该音讯时,须要向 broker 发送否定确认。阐明音讯没有生产胜利,这样 broker 会从新投递这个音讯。音讯会被独自或累积地否定确认,具体取决于生产订阅类型:

  • 在独占和故障转移订阅类型中,消费者仅否定他们收到的最初一条音讯。
  • 在 shared 和 Key_Shared 订阅类型中,您能够独自否定确认音讯。

确认超时:

如果一条音讯没有生产胜利,想要触发 broker 主动重发消息,能够采纳未确认音讯主动重发机制。倡议优先应用失败确认,能够更加精准的管制单个音讯的从新投递。

死信队列

Apache Pulsar 内置死信队列个性,当音讯解决失败收到否定 Ack 时,Apache Pulsar 能够主动重试。超过重试次数能够将音讯寄存至死信队列,以确保新音讯能够失去解决。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .build())
              .subscribe();

生产模型

Apache Pulsar 提供了队列模型和流模型的对立,在 Topic 级别只须要保留一份数据,同一份数据可屡次生产,以流、队列等形式计算不同的订阅模型,大大晋升了灵便度。Apache Pulsar 中有四种订阅类型:exclusive、shared、failover 和 key_shared。这些类型如下图所示。

Topic

Topic 命名

在 Pulsar 中 Topic 负责将音讯从生产者传递到消费者,主题名称是具备明确定义构造的 URL:

{persistent|non-persistent}://tenant/namespace/topic
  • persistent / non-persistent

    示意主题的类型,主题分为长久化和非长久化主题,默认是长久化的类型。长久化的主题会将音讯保留到磁盘上,而非长久化的主题就不会将音讯保留到磁盘。

  • tenant

    Pulsar 实例中主题的租户,租户对于 Pulsar 中的多租户至关重要,并且散布在集群中。

  • namespace

    将相关联的 Topic 作为一个组来治理,是治理 Topic 的根本单元。每个租户能够有一个或多个命名空间。

  • topic

    Pulsar 中的主题被命名为通道,用于将音讯从生产者传输到消费者。

在 Pulsar 中不须要显示的创立 Topic,如果尝试向不存在的 Topic 发送或接管音讯,会在默认租户和命名空间中创立 Topic。

Topic 分区

一般的 Topic 被保留在单个 broker 中,而 Topic 能够分为多个 partition,别离存储到不同的 broker 中,由多个 broker 来解决,这大大的晋升了 Topic 的吞吐量。

如上图,Topic1 被分为了 5 个分区,别离是 P0、P1、P2、P3、P4。这 5 个分区被分到了 3 个 Broker(Broker1、Broker2、Broker3)中,因为分区的数量多于代理的数量,前两个代理别离解决两个分区,第三个代理只解决一个,Pulsar 会主动解决这种分区散布。

公布到分区主题时,必须指定 路由模式。路由模式决定了每条音讯应该公布到哪个分区。

共有三种 MessageRoutingMode 可用:

  • RoundRobinPartition

    如果没有提供密钥,生产者将以循环形式跨所有分区公布音讯,以实现最大吞吐量。请留神,循环不是针对单个音讯进行的,而是设置为与批处理提早雷同的边界,以确保批处理无效。而如果在音讯上指定了一个键,分区的生产者将散列键并将音讯调配给特定的分区。

  • SinglePartition

    如果没有提供密钥,分区生产者将随机抉择一个分区并将所有音讯公布到该分区中。如果在音讯上提供了密钥,则分区生产者将散列密钥并将音讯调配给特定分区。

  • CustomPartition

    应用将被调用的自定义音讯路由器实现来确定特定音讯的分区。

多租户

Apache Pulsar 的多租户个性能够满足企业的治理需要,租户、命名空间是 Apache Pulsar 反对多租户的 2 个外围概念。

  • 在租户级别,Pulsar 为特定的租户预留适合的存储空间、利用受权与认证机制。
  • 在命名空间级别,Pulsar 提供一些列的配置策略,包含存储配额、流控、音讯过期策略和命名空间之间的隔离策略。

跨地区复制

跨地区复制机制为大型分布式系统多数据中心提供了冗余,避免服务无奈失常运行。也为跨地区生产、跨地区生产提供了根底。

分层存储

Pulsar 的 分层存储 性能容许将较旧的积压数据从 BookKeeper 挪动到长期且更便宜的存储,升高存储老本,同时依然容许客户端拜访积压,就如同没有任何变动一样。管理员可配置命名空间大小阈值策略,实现数据主动迁徙到长期存储。

组件

Pulsar Schema Registry

Schema registry 使 Producer 和 Consumer 通过 broker 即可沟通 Topic 的数据结构,而不须要借助内部协调机制,从而防止各种潜在的问题如序列化、反序列化问题。

Pulsar Functions

Pulsar Functions 是一个轻量化计算框架,它给用户提供一个部署简略、运维简略、API 简略的 FAAS(Function as a service)平台,指标是帮忙用户轻松创立各种级别的简单解决逻辑,而无需部署独自的计算零碎。

Pulsar IO

Pulsar IO 反对 Apache Pulsar 与内部零碎如数据库、其余音讯零碎进行交互,如 Apache Cassandra 等零碎,用户无需关注实现细节,用一个命令就能够疾速运行。

Source 将数据从内部零碎导入 Apache Pulsar,Sink 将数据从 Apache Pulsar 导出到内部零碎。

Pulsar SQL

Pulsar SQL 是构建在 Apache Pulsar 之上的查问层,反对用户动静查问存储在 Pulsar 外部的所有旧数据流。用户能够在同一零碎上注入数据的同时进行数据流的清理、转化和查问,极大简化了数据管道。

疾速上手

二进制装置

这里就简略介绍一个 Pulsar Demo 的案例,只装置一个 Pulsar 的服务端,首先通过以下命令下载 Pulsar:

wget https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz

下载到本地当前,通过以下命令解压 apache-pulsar-2.8.1-bin.tar.gz 压缩包:

tar xvfz apache-pulsar-2.8.1-bin.tar.gz

而后 cd 到 apache-pulsar-2.8.1 文件夹下,蕴含以下目录:

  • bin:Pulsar 的命令行工具,例如 [pulsar](https://pulsar.apache.org/docs/en/reference-cli-tools#pulsar)[pulsar-admin](https://pulsar.apache.org/tools/pulsar-admin/)
  • conf : Pulsar 的配置文件,包含 broker 配置、ZooKeeper 配置等。
  • examples : 蕴含 Pulsar 函数示例的 Java JAR 文件。
  • lib : Pulsar 应用的 JAR)文件。
  • licenses:.txt以 Pulsar 代码库的各种组件的模式存在的许可证文件。

独立启动 Pulsar

当您在本地装置好 Pulsar 当前,您就能够应用 [pulsar](https://pulsar.apache.org/docs/en/reference-cli-tools#pulsar) 存储在 bin 目录中的命令启动本地集群,并指定您要以独立模式启动 Pulsar。

$ bin/pulsar standalone

如果你曾经胜利启动了 Pulsar,你会看到这样的INFOlevel 日志音讯:

2017-06-01 14:46:29,192 - INFO  - [main:WebSocketService@95] - Configuration Store cache started
2017-06-01 14:46:29,192 - INFO  - [main:AuthenticationService@61] - Authentication is disabled
2017-06-01 14:46:29,192 - INFO  - [main:WebSocketService@108] - Pulsar WebSocket Service started

一个发送 / 接管音讯案例

public class PulsarDemo {

    private static PulsarClient PULSAR_CLIENT = null;

    static {
        try {
            // 创立 pulsar 客户端
            PULSAR_CLIENT = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();} catch (PulsarClientException e) {e.printStackTrace();
        }
    }

    public static void main(String[] args) throws PulsarClientException {

        // 创立生产者
        Producer<byte[]> producer = PULSAR_CLIENT.newProducer().topic("test-topic-1").create();
        // 同步发送音讯
        MessageId messageId = producer.send("同步发送的音讯".getBytes(StandardCharsets.UTF_8));
        System.out.println("音讯发送胜利,音讯 id:" + messageId);

        // 创立消费者
        Consumer<byte[]> consumer = PULSAR_CLIENT.newConsumer().topic("test-topic-1")
                .subscriptionName("test-subscription-1").subscribe();
        // 获取一个音讯内容
        Message<byte[]> message = consumer.receive();
        System.out.println("接管的音讯内容:" + new String(message.getData()));
        // 确认生产胜利,以便 pulsar 删除生产胜利的音讯
        consumer.acknowledge(message);

        // 敞开客户端
        producer.close();
        consumer.close();
        PULSAR_CLIENT.close();}
}

输入:

音讯发送胜利,音讯 id: 66655:0:-1:0
接管的音讯内容: 同步发送的音讯

参考

Apache Pulsar 官网

StreamNative 产品手册

Apache Pulsar 与 Apache Kafka 在金融场景下的性能比照剖析

Pulsar 与 Kafka 全方位比照(上篇)

Pulsar 与 Kafka 全方位比照(下篇)

Kafka 已掉队,转角遇见 Pulsar!

正文完
 0