关于数据库:技术干货|如何将-Pulsar-数据快速且无缝接入-Apache-Doris

47次阅读

共计 7348 个字符,预计需要花费 19 分钟才能阅读完成。

KoP 架构介绍:

KoP 是 Kafka on Pulsar 的简写,顾名思义就是如何在 Pulsar 上实现对 Kafka 数据的读写。KoP 将 Kafka 协定解决插件引入 Pulsar Broker 来实现 Apache Pulsar 对 Apache Kafka 协定的反对。将 KoP 协定解决插件增加到现有 Pulsar 集群后,用户不必批改代码就能够将现有的 Kafka 应用程序和服务迁徙到 Pulsar。

Apache Pulsar 次要特点如下:

  • 利用企业级多租户个性简化经营。
  • 防止数据搬迁,简化操作。
  • 利用 Apache BookKeeper 和分层存储长久保留事件流。
  • 利用 Pulsar Functions 进行无服务器化事件处理。

KoP 架构如下图,通过图能够看到 KoP 引入一个新的协定解决插件,该协定解决插件利用 Pulsar 的现有组件(例如 Topic 发现、分布式日志库 -ManagedLedger、cursor 等)来实现 Kafka 传输协定。

Routine Load 订阅 Pulsar 数据思路

Apache Doris Routine Load 反对了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的音讯公布和订阅零碎,曾经在很多线上服务应用。那么 Apache Pulsar 用户如何将数据数据接入 Apache Doris 呢,答案是通过 KoP 实现。

因为 Kop 间接在 Pulsar 侧提供了对 Kafka 的兼容,那么对于 Apache Doris 来说能够像应用 Kafka 一样应用 Plusar。整个过程对于 Apache Doris 来说无需工作扭转,就能将 Pulsar 数据接入 Apache Doris,并且能够取得 Routine Load 的事务性保障。

--------------------------
|     Apache Doris       |
|     ---------------    |
|     | Routine Load |   |
|     ---------------    |
--------------------------
            |Kafka Protocol(librdkafka)
------------v--------------
|     ---------------    |
|     |     KoP      |   |
|     ---------------    |
|       Apache Pulsar    |
--------------------------

操作实战

1. Pulsar Standalone 装置环境筹备:
  1. JDK 装置:略
  2. 下载 Pulsar 二进制包,并解压:
# 下载
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#解压并进入装置目录
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
2. KoP 组件编译和装置:
  1. 下载 KoP 源码
git clone https://github.com/streamnative/kop.git
cd kop
  1. 编译 KoP 我的项目:
mvn clean install -DskipTests
  1. protocols 配置:在解压后的 apache-pulsar 目录下创立 protocols 文 件夹,并把编译好的 nar 包复制到 protocols 文件夹中。
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
  1. 增加后的后果查看:
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
3. KoP 配置增加:
  1. 在 standalone.conf 或者 broker.conf 增加如下配置
#kop 适配的协定
messagingProtocols=kafka
#kop 的 NAR 文件门路
protocolHandlerDirectory=./protocols
#是否容许主动创立 topic
allowAutoTopicCreationType=partitioned
  1. 增加如下服务监听配置
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false

当呈现如下谬误:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

增加如下配置,开启 transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true

这个谬误肯定要修复,不然看到的景象就是应用 Kafka 自带的工具:bin/kafka-console-producer.sh 和 bin/kafka-console-consumer.sh 在 Pulsar 上进行数据的生产和生产失常,然而在 Apache Doris 中数据无奈同步过去。

4. Pulsar 启动
# 前台启动
#bin/pulsar standalone
#后盾启动
pulsar-daemon start standalone
5. 创立 Doris 数据库和建表
# 进入 Doris
mysql -u root  -h 127.0.0.1 -P 9030
# 创立数据库
create database pulsar_doris;
#切换数据库
use pulsar_doris;
#创立 clicklog 表
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
    `clickTime` DATETIME NOT NULL COMMENT "点击工夫",
    `type` String NOT NULL COMMENT "点击类型",
    `id`  VARCHAR(100) COMMENT "惟一 id",
    `user` VARCHAR(100) COMMENT "用户名称",
    `city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
6. 创立 Routine Load 工作
CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list" = "127.0.0.1:9092",
    "kafka_topic" = "test",
    "property.group.id" = "doris"
 );

上述命令中的参数解释如下:

  • pulsar_doris:Routine Load 工作所在的数据库
  • load_from_pulsar_test:Routine Load 工作名称
  • clicklog:Routine Load 工作的指标表,也就是配置 Routine Load 工作将数据导入到 Doris 哪个表中。
  • strict_mode:导入是否为严格模式,这里设置为 False。
  • format:导入数据的类型,这里配置为 Json。
  • kafka_broker_list:Kafka Broker 服务的地址
  • kafka_broker_list:Kafka Topic 名称,也就是同步哪个 Topic 上的数据。
  • property.group.id:生产组 ID
7. 数据导入和测试
  1. 数据导入

    结构一个 ClickLog 的数据结构,并调用 Kafka 的 Producer 发送 5000 万条数据到 Pulsar。

ClickLog 数据结构如下:

public class ClickLog {
    private String id;
    private String user;
    private String city;
    private String clickTime;
    private String type;
    ... // 省略 getter 和 setter
   }

音讯结构和发送的外围代码逻辑如下:

       String strDateFormat = "yyyy-MM-dd HH:mm:ss";
       @Autowired
       private Producer producer;
        try {for(int j =0 ; j<50000;j++){
              int batchSize = 1000;
                for(int i = 0 ; i<batchSize ;i++){ClickLog clickLog  = new ClickLog();
                    clickLog.setId(UUID.randomUUID().toString());
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
                    clickLog.setClickTime(simpleDateFormat.format(new Date()));
                    clickLog.setType("webset");
                    clickLog.setUser("user"+ new Random().nextInt(1000) +i);
                    producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
                }
            }
        } catch (Exception e) {e.printStackTrace();
        }
  1. ROUTINE LOAD 工作查看 执行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G; 命令,查看导入工作的状态。
mysql>  SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
                  Id: 87873
                Name: load_from_pulsar_test
          CreateTime: 2022-05-31 12:03:34
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:pulsar_doris
           TableName: clicklog1
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
    CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
           Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
            Progress: {"0":"51139566"}
                 Lag: {"0":0}
ReasonOfStateChanged: 
        ErrorLogUrls: 
            OtherMsg: 
1 row in set (0.00 sec)
ERROR: 
No query specified

从下面后果能够看到 totalRows 为 50000000,errorRows 为 0。阐明数据不丢不重的导入 Apache Doris 了。

  1. 数据统计验证 执行如下命令统计表中的数据,发现统计的后果也是 50000000,合乎预期。
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql> 

通过 KoP 咱们实现了将 Apache Pulsar 数据无缝接入 Apache Doris,无需对 Routine Load 工作进行任何批改,并保障了数据导入过程中的事务性。与此同时,Apache Doris 社区曾经启动了 Apache Pulsar 原生导入反对的设计,置信在不久后就能够间接订阅 Pulsar 中的音讯数据,并保证数据导入过程中的 Exactly-Once 语义。

退出社区

如果你对 Apache Doris 感兴趣,请退出 Doris 社区交换群。欢送更多的开源技术爱好者退出 Apache Doris 社区,携手成长,共建社区生态。

SelectDB 是一家开源技术公司,致力于为 Apache Doris 社区提供一个由全职工程师、产品经理和反对工程师组成的团队,凋敝开源社区生态,打造实时剖析型数据库畛域的国内工业界规范。基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为用户和客户提供开箱即用的能力。

相干链接:

SelectDB 官方网站:

https://selectdb.com

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

mailto:dev@doris.apache.org

正文完
 0