乐趣区

Pulsar-IO-简介

翻译:StreamNative——Sijia

Apache Pulsar 是业界当先的音讯零碎。应用音讯零碎时,一个较为常见的问题就是:将数据移入或移出音讯平台的最佳办法是什么?当然,用户能够应用 Pulsar 的 consumer 和 producer API 编写自定义代码,来传输数据。但除此之外,是否还有其余办法呢?

以下为用户提出的一些相干问题:

  1. 要将数据公布到 Pulsar 或应用 Pulsar 中的数据,我应该在 哪里 运行相应程序?
  2. 要将数据公布到 Pulsar 或应用 Pulsar 中的数据,我应该 怎么 运行相应程序?

用户之所以会提出这些问题,是因为其余音讯 / 公布 - 订阅零碎没有提供有组织且容错的形式来帮忙用户从内部零碎输出数据或将数据输入到内部零碎,因此用户须要寻求自定义解决方案并手动运行。

为了解决上述问题并简化这一过程,咱们推出了 Pulsar IO。Pulsar IO 通过利用现有的 Pulsar Functions 框架来输出 / 输入数据。而 Pulsar Functions 框架的所有劣势(如:容错性、并行性、弹性、负载平衡、按需更新等)都能够间接被 Pulsar 输出 / 输入数据的应用程序所利用。

而且,咱们发现常常会呈现这样的状况,用户花很大功夫(因为他们不是音讯零碎方面的专家,可能也不想成为这一畛域的专家)去编写自定义程序,用于从消息传递零碎拜访数据。自定义编写这些应用程序不仅会很艰难,而且咱们发现,许多用户在尝试实现执行雷同性能的应用程序时,做了雷同的工作。归根结底,音讯零碎只是用于挪动数据的工具,因而,在设计 Pulsar IO 框架时,咱们的次要指标之一就是易用性。咱们心愿用户可能在 不编写任何代码,也不必同时成为 Pulsar 和内部零碎专家的状况下,能够从内部零碎输出数据或将数据输入到内部零碎。咱们将在下文介绍如何达成这一指标。

Pulsar IO 框架是什么样的?

首先,咱们定义两个应用程序,一个作为 source 将数据输出到 Pulsar,另一个作为 sink 从 Pulsar 接收数据。

Source 将数据从内部零碎导入 Pulsar,而 sink 将数据从 Pulsar 导出到内部零碎。具体来看,source 从内部零碎读取数据,并将数据写入 Pulsar topic,而 sink 从一个或多个 Pulsar topic 读取数据,并将数据写入内部零碎。

Pulsar IO 框架在现有的 Pulsar functions 框架上运行。单个 source 和 sink 能够像 function 一样与 Pulsar broker 一起运行,如图 2 所示。

因而,Pulsar Functions 框架的所有劣势都实用于 Pulsar IO 框架,即 sink 和 source 应用程序。

正如后面提到的,咱们的设计指标包含用户无需编写任何自定义应用程序,也无需编写任何代码就能够将数据移入或移出 Pulsar。因而,Pulsar IO 框架中有多种内置 source 和 sink(Kafka、Twitter Firehose、Cassandra、Aerospike 等,还会反对更多),用户只需应用一个命令便可运行。用户因而能够关注于业务逻辑,而无需放心实现细节。

如何应用 Pulsar IO

应用 Pulsar IO 框架很容易。用户能够在命令行界面应用一行简略的命令启动内置 source 或 sink。例如,用户能够用上面的命令来提交 source 到已有的 Pulsar 集群,命令格局如下:

$ ./bin/pulsar-admin source create \    
  --tenant <tenant> \    
  --namespace <namespace> \    
  --name <source-name> \    
  --destinationTopicName <input-topics> \    
  --source-type <source-type>

以下示例为运行 twitter firehose source 的命令,用于将 Twitter 中的数据导入 Pulsar:

$ ./bin/pulsar-admin source create \--tenant test \
  --namespace ns1 \
  --name twitter-source \
  --destinationTopicName twitter_data \
  --sourceConfigFile examples/twitter.yml \
  --source-type twitter

通过以上步骤,用户即可向 Pulsar 输出数据,而无需编写或编译任何代码。惟一可能须要的是一个配置文件,用于为该 source 或 sink 指定某些配置。用户能够通过以下格局的命令向现有的 Pulsar 集群中提交待运行的内置 sink:

$ ./bin/pulsar-admin sink create \   
  --tenant <tenant> \   
  --namespace <namespace> \   
  --name <sink-name> \   
  --inputs <input-topics> \   
  --sink-type <sink-type>

以下为运行 Cassandra sink 的示例命令,用于将数据从 Pulsar 导出到 Cassandra:

$ ./bin/pulsar-admin sink create \   
  --tenant public \   
  --namespace default \   
  --name cassandra-test-sink \   
  --sink-type cassandra \   
  --sinkConfigFile examples/cassandra-sink.yml \   
  --inputs test_cassandra

更多对于如何运行 Cassandra source 的信息,参阅疾速入门指南。

以上命令显示了如何在“集群”模式下(即作为现有 Pulsar 集群的一部分)运行 source 和 sink。除此之外,还能够在“本地运行”模式下将 source 和 sink 作为独立过程运行,这一模式会在机器上生成本地过程并且运行 source 或者 sink 的逻辑。本地运行模式有助于测试和调试,然而,须要用户自行监控和监督。以下为在本地运行模式下运行 source 的命令示例:

$ ./bin/pulsar-admin sink localrun \
  --tenant public \   
  --namespace default \   
  --name cassandra-test-sink \   
  --sink-type cassandra \   
  --sinkConfigFile examples/cassandra-sink.yml \   
  --inputs test_cassandra

因为 Pulsar IO 框架在 Pulsar Functions 上运行,因而能够通过更新参数和配置来动静更新 source 或 sink。例如,当心愿利用后面提到的 Twitter firehose source 将数据输出到另一个 Pulsar topic 时,能够执行以下命令:

$ ./bin/pulsar-admin source update \--tenant test \
  --namespace ns1 \
  --name twitter-source \
  --destinationTopicName twitter_data_2 \
  --sourceConfigFile examples/twitter.yml \
  --source-type twitter

也能够应用同样格局的命令更新 sink。大多数 source 和 sink 的更新都能够在运行时进行配置,从而简化批改、测试、部署等流程。

如果要自定义实现一个小众的用例,则能够通过实现一个简略的界面来创立 source 或 sink。然而,Pulsar IO 的目标是帮忙用户间接应用现有的内置 source 或 sink,而不用本人手动实现 source 或 sink。

实现自定义 source

要创立自定义 source,用户须要编写一个实现 source 接口的 Java 类:

public interface Source<T> extends AutoCloseable {
/**
 * Open source with configuration
 *
 * @param config initialization config
 * @throws Exception IO type exceptions when opening a connector
 */

    void open(final Map<String, Object> config) throws Exception;
    /**
     * Reads the next message from source.
     * If source does not have any new messages, this call should block.
     * @return next message from source.  The return result should never be null
     * @throws Exception
    */

    Record<T> read() throws Exception;}

这是一个 source 实现的简略示例:

public class TestSource implements Source<Integer> {
    private int i = 0;
    @Override
    public void open(Map<String, Object> config) throws Exception { }

    @Override
    public Record<Integer> read() throws Exception {return () -> i++;
    }

    @Override
    public void close() throws Exception {}
}

在下面的 source 示例中,枯燥递增的整数被传入到 Pulsar。实现“Record”接口的对象须要通过“read”办法返回,因为“Record”接口蕴含可用于实现不同消息传递语义或保障的字段,例如 exactly-once/effectively-once。在后续文章中,我将具体探讨如何执行此操作。

实现自定义 sink

要创立自定义 sink,用户须要编写一个实现 sink 接口的 Java 类:

public interface Sink<T> extends AutoCloseable{
    /**
    * Open Sink with configuration
    *
    * @param config initialization config
    * @throws Exception IO type exceptions when opening a connector
    */
   void open(final Map<String, Object> config) throws Exception;

   /**
    * Write a message to Sink
    * @param inputRecordContext Context of value
    * @param value value to write to sink
    * @throws Exception
    */
   void write(RecordContext inputRecordContext, T value) throws Exception;
}

例如,一个简略的 sink 实现:

public class TestSink implements Sink<String> {
    private static final String FILENAME = "/tmp/test-out";
    private BufferedWriter bw = null;
    private FileWriter fw = null;
    @Override
    public void open(Map<String, Object> config) throws Exception {File file = new File(FILENAME);
        // if file doesnt exists, then create it
        if (!file.exists()) {file.createNewFile();
        }
        fw = new FileWriter(file.getAbsoluteFile(), true);
        bw = new BufferedWriter(fw);
    }

    @Override
    public void write(RecordContext inputRecordContext, String value) throws Exception {
        try {bw.write(value);
            bw.flush();} catch (IOException e) {throw new RuntimeException(e);
        }
    }

    @Override
    public void close() throws Exception {
        try {if (bw != null)
                bw.close();
            if (fw != null)
                fw.close();} catch (IOException ex) {ex.printStackTrace();
        }
    }
}

以上示例阐明 sink 如何从 Pulsar 读取数据并写入文件。与 source 接口相似,sink 接口中的“write”办法有一个 RecordContext 参数。此参数为 sink 提供须要写入内部零碎的值的 context。RecordContext 参数可用于实现可能提供不同级别的消息传递语义或保障(如:Exactly-once/Effective-once)的 sink。在后续文章中,咱们将对此进行更深刻的探讨。

用户能够通过相似于运行内置 source 和 sink 的形式来提交自定义 source 和 sink:

$ ./bin/pulsar-admin source create \
  --className  <classname> \
  --jar <jar-location> \
  --tenant <tenant> \
  --namespace <namespace> \
  --name <source-name> \
  --destinationTopicName <output-topic>

命令示例如下:

$ ./bin/pulsar-admin source create \
  --className org.apache.pulsar.io.twitter.TwitterFireHose \
  --jar \~/application.jar \
  --tenant test \
  --namespace ns1 \
  --name twitter-source \
  --destinationTopicName twitter_data

在现有 Pulsar 集群中提交待运行的自定义 sink 的命令格局如下:

$ ./bin/pulsar-admin sink create \
  --className  <classname> \
  --jar <jar-location> \
  --tenant test \
  --namespace <namespace> \
  --name <sink-name> \
  --inputs <input-topics>

命令示例:

 $ ./bin/pulsar-admin sink create \
   --className  org.apache.pulsar.io.cassandra \
   --jar \~/application.jar \
   --tenant test \
   --namespace ns1 \
   --name cassandra-sink \
   --inputs test_topic

应用 Pulsar IO 框架的劣势

如上所述,Pulsar IO 框架在现有的 Pulsar Functions 框架上运行。Pulsar IO 充分利用了现有的 Pulsar Functions 框架。作为 Pulsar IO 的组成部分,source 和 sink 领有 Pulsar Functions 的所有劣势:

劣势 具体介绍
执行灵活性 Source 和 sink 都能够作为现有集群的一部分或作为本地过程来运行。
并发性 要减少 source 或 sink 的吞吐量,只需增加简略的配置即可运行更多 source 和 sink 实例。
负载平衡 当 source 和 sink 以集群模式运行时,能达到负载平衡。
容错、监控、metrics 如果 source 和 sink 以“集群”模式运行,则作为 Pulsar function 框架一部分的 worker 服务将主动监控已部署的 source 和 sink。当节点产生故障时,将主动重新部署 source 和 sink 到运作节点,并主动收集 metrics。
动静更新 动静更新多项配置,如:单个 connector 的并行性、源代码、输出 / 输入 topic 等。
数据本地化 因为 broker 为 topic 的读写申请提供服务,因而在 broker 左近运行 source 和 sink 能够缩小网络提早和网络带宽的使用率。

如何试用?

心愿本文分明地向你展示了 Pulsar IO 的框架,易于应用的起因,以及如何将数据导入 / 导出 Pulsar。Pulsar IO 框架随 Pulsar 2.1.0 官网发行版一起正式公布。

更多文档,点击这里

退出移动版