关于sql:OpenMLDB-Pulsar-Connector高效打通实时数据到特征工程

5次阅读

共计 7942 个字符,预计需要花费 20 分钟才能阅读完成。

导读: 人工智能工程化落地的关键点之一,在于解决实在业务场景的实时批量预估和实时模型更新问题。更好更快地将线上实时数据转化为 AI 可用的特色,将减速 AI 利用落地的效率及成果。为此,OpenMLDB 和 Apache Pulsar 单干推出 OpenMLDB Pulsar Connector,实现稳固的流式集成,为高效买通实时数据到特色工程提供一条值得期待的清晰门路。

我是黄威,目前是第四范式研发架构师,也是 OpenMLDB 的外围研发。明天次要为大家介绍三个方面的内容:

  • Pulsar Connector 简介
  • OpenMLDB Connector on Pulsar 介绍
  • OpenMLDB Connector on Pulsar 演示

01 Pulsar Connector 简介

Apache Pulsar 是一个云原生的,分布式音讯流平台。它能够作为 OpenMLDB 的在线数据源,将实时的数据流导入到 OpenMLDB 在线。Pulsar 提供了 Connector 框架,在此基础上能够与不同零碎的对接。咱们基于 Connector 框架,开发了 OpenMLDB JDBC Connector,通过它咱们就能够无障碍地连贯 Pulsar 与 OpenMLDB,Pulsar 的音讯将主动地写入 OpenMLDB。

02 OpenMLDB-Pulsar Connector 介绍

1. 定位

OpenMLDB Pulsar Connector,高效买通实时数据到特色工程,大幅晋升数据应用效率、助力开发者构建实时数据管道、使企业更专一和更高效的摸索数据的商业价值。

在 OpenMLDB 的工作流中,Pulsar Connector(地位如下图所示)帮忙开发者轻松地将音讯零碎 Pulsar 与开源机器学习数据库 OpenMLDB 连接起来,造成一条实时数据流。

2. 性能

Pulsar 能够应用 connector 来连贯其余零碎。Source connector 能够使其余零碎的数据流入 Pulsar,sink connector 能够将音讯流出至其余零碎。

OpenMLDB Pulsar Connector 反对了 sink 性能,使 Pulsar 音讯能够写入到 OpenMLDB 在线存储中。

能够通过 Connector Admin CLI 并联合 sinks 子命令来治理 Pulsar connector(例如,创立、更新、启动、进行、重启、重载、删除以及其余操作)。

3. 劣势

想要使 OpenMLDB 与 Pulsar 领有稳固的流式集成,咱们举荐间接应用 Pulsar OpenMLDB connector。它具备诸多劣势,包含但不限于:

  • 易上手 。无需编写任何代码,只需进行简略配置,便可通过 OpenMLDB Pulsar Connector 将 Pulsar 的音讯流入 OpenMLDB。简化的数据导入过程能大幅晋升企业的数据应用效率。
  • 易扩大 。依据不同的业务需要,能够抉择在单机或集群上运行 OpenMLDB Pulsar Connector,助力企业构建实时数据管道。
  • 可继续 。OpenMLDB Pulsar Connector 简略的装置和部署过程,使企业能更专一和更高效地摸索数据的商业价值。

4.Connector 下载地址

OpenMLDB Pulsar Connector:

https://github.com/4paradigm/…

03 Connector 演示

1. 流程介绍

Pulsar OpenMLDB connector 用于 OpenMLDB 线上模式的实时数据流接入。应用 connector 的简要流程,如下图所示。咱们接下来将具体介绍每一步。

整体上,应用流程能够概括为三步:

  • 创立 connector 前须要启动 OpenMLDB 集群,并创立表。
  • 创立 Pulsar standalone,创立 sink,sink 配置中应用 OpenMLDB 集群的 JDBC 地址。并且,创立用于解析音讯的 schema。
  • 向 Pulsar 发送音讯,来测试音讯是否能主动写入到 OpenMLDB。

2. 关键步骤

留神,为了使演示更简略,本文中将应用 Pulsar Standalone,OpenMLDB 集群和一个简略 JSON 音讯生产者程序,来演示 OpenMLDB JDBC Connector 是如何工作的。该 connector 是齐全能够在 Pulsar Cluster 中失常应用的。

步骤 1 | 在 OpenMLDB 创立数据库和数据表

① 启动 OpenMLDB 集群

应用 Docker 能够疾速启动 OpenMLDB,除此之外,咱们还须要创立测试用的表。

揭示:目前只有 OpenMLDB 集群版能够作为 sink 的接收端,数据只会 sink 到集群的在线存储中。

咱们更举荐你应用 host network 模式运行 docker,以及绑定文件目录 files,sql 脚本在该目录中。

docker run -dit --network host -v `pwd`/files:/work/taxi-trip/files --name openmldb 4pdosc/openmldb:0.4.4 bash

在 OpenMLDB 容器中,启动集群:

./init.sh

须要留神的是,在 macOS 平台上,即便应用 host 网络,也不反对从容器内部去连贯容器内的 OpenMLDB 服务器。但从容器内,去连贯别的容器内的 OpenMLDB 服务,是可行的。

②  创立表

咱们应用一个脚本疾速创立表,脚本内容如下:

create database pulsar_test;
use pulsar_test;
create table connector_test(id string, vendor_id int, pickup_datetime bigint, dropoff_datetime bigint, passenger_count int, pickup_longitude double, pickup_latitude double, dropoff_longitude double, dropoff_latitude double, store_and_fwd_flag string, trip_duration int);
desc connector_test;

执行脚本:

../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/create.sql

目前,Pulsar 中 JSONSchema 和 JDBC base connector 都不支 java.sql.Timestamp。所以咱们应用 long 作为 timestamp 列的数据类型(在 OpenMLDB 能够应用 long 作为工夫戳)。

步骤 2 | 启动 Pulsar,创立 sink 和 schem

① 启动 Pulsar Standalone

应用 docker,能够更简略疾速地启动 Pulsar。咱们举荐你应用 host network 来运行 docker,这样能够防止诸多容器相干的网络连接问题。而且,咱们须要应用 pulsar-admin 来进行 sink 创立,这个程序在 Pulsar 镜像内。所以,咱们应用 bash 运行容器,在容器外部逐个执行命令。此处,也须要绑定 files 文件目录。

docker run -dit --network host -v `pwd`/files:/pulsar/files --name pulsar apachepulsar/pulsar:2.9.1 bash

在 Pulsar 容器中,启动 standalone 服务端。

bin/pulsar-daemon start standalone --zookeeper-port 5181

OpenMLDB 服务曾经应用了端口 2181,所以此处咱们为 Pulsar 从新设置一个 zk 端口。咱们将应用端口 2181 来连贯 OpenMLDB,但 Pulsar standalone 内的 zk 端口不会对外造成影响。

你能够检查一下 Pulsar 是否失常运行,能够应用 ps 或者查看日志。

ps axu|grep pulsar

当你启动一个本地 standalone 集群,会主动创立 pulic/default namesapce。这个 namespace 用于开发,咱们将在此 namespace 中创立 sink。

如果你想要在本地间接启动 Pulsar,能够参考 Set up a standalone Pulsar locally。

链接:https://pulsar.apache.org/doc…

Q&A

Q1: 碰到以下问题是什么起因

2022-04-07T03:15:59,289+0000 [main] INFO  org.apache.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:5181

2022-04-07T03:15:59,289+0000 [main] ERROR org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Exception while instantiating ZooKeeper

java.net.BindException: Address already in use

A: Pulsar 须要一个未被应用的端口来启动 zk,端口 5181 页曾经被应用,须要再更改一下–zookeeper-port 的端口号。

Q2: 8080 端口已被应用?

A: 8080 是 webServicePort 默认配置端口,在 conf/standalone.conf 中,能够更换这个端口。但留神,pulsar-admin 会应用 conf/client.conf 中的 webServiceUrl 进行连贯,也须要同步更改。

Q3: 6650 端口已被应用?

A: 须要同步更改 conf/standalone.conf 中的 brokerServicePort 和 conf/client.conf 中的 brokerServiceUrl 配置项。

② Connector 装置 (Optional)

后面的步骤中咱们绑定了 files 目录,外面曾经提供了 connector 的 nar 包。咱们能够应用“非内建 connector”模式来设置 connector(即在 sink 配置中指定 archive 配置项,将在下一个步骤中形容)。

但如果你心愿将 OpenMLDB connector 作为内建的 connector,你须要创立 connectors 目录,并拷贝 nar 文件到 connectors 目录。

mkdir connectors
cp files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar  connectors

如果在 Pulsar 运行时,你想扭转或减少 connector,你能够告诉 Pulsar 更新信息:

bin/pulsar-admin sinks reload

当 OpenMLDB connector 成为内建 connector 时,它的 sink 类型名为 jdbc-openmldb,你能够间接应用这个类型名来指定应用 OpenMLDB connector。

③ 创立 sink

咱们应用 public/default 这个 namespace 来创立 sink, 咱们须要一个 sink 的配置文件, 它在 files/pulsar-openmldb-jdbc-sink.yaml,内容如下:


tenant: "public"
 namespace: "default"
 name: "openmldb-test-sink"
 archive: "files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar"
 inputs: ["test_openmldb"]
 configs:
     jdbcUrl: "jdbc:openmldb:///pulsar_test?zk=localhost:2181&zkPath=/openmldb"
     tableName: "connector_test"

其中:

  • name:sink 名。
  • archive:咱们应用 archive 来指定 sink connector, 所以这里咱们是将 OpenMLDB connector 当作非内建 connector 应用。
  • input:能够是多个 topic 的名字,本文只应用一个。
  • config:用于连贯 OpenMLDB 集群的 jdbc 配置。

接下来,创立这个 sink 并查看。留神,咱们设置的输出 topic 是‘test_openmldb’,后续步骤须要应用到。

./bin/pulsar-admin sinks create --sink-config-file files/pulsar-openmldb-jdbc-sink.yaml
./bin/pulsar-admin sinks status --name openmldb-test-sink

④ 创立 Schema

上传 schema 到 topic test_openmldb,schema 类型是 JSON 格局。后续步骤中,咱们将生产一样 schema 的 JSON 音讯。schema 文件是 files/openmldb-table-schema,内容如下:


 {
    "type": "JSON",
     "schema":"{\"type\":\"record\",\"name\":\"OpenMLDBSchema\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor_id\",\"type\":\"int\"},{\"name\":\"pickup_datetime\",\"type\":\"long\"},{\"name\":\"dropoff_datetime\",\"type\":\"long\"},{\"name\":\"passenger_count\",\"type\":\"int\"},{\"name\":\"pickup_longitude\",\"type\":\"double\"},{\"name\":\"pickup_latitude\",\"type\":\"double\"},{\"name\":\"dropoff_longitude\",\"type\":\"double\"},{\"name\":\"dropoff_latitude\",\"type\":\"double\"},{\"name\":\"store_and_fwd_flag\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"trip_duration\",\"type\":\"int\"}]}",
    "properties": {}}

上传并查看 schema 的命令,如下所示:


./bin/pulsar-admin schemas upload test_openmldb -f ./files/openmldb-table-schema
./bin/pulsar-admin schemas get test_openmldb

步骤 3 | 测试

① 发送音讯

咱们应用两条 OpenMLDB 镜像中 data/taxi_tour_table_train_simple.csv 的样本数据,作为测试用的音讯。数据如下图所示:

测试用 Producer 要害代码如下:

能够看到,producer 将发送两条音讯到 topic test_openmldb。这之后,Pulsar 将读到音讯,并将其写入 OpenMLDB 集群的在线存储中。

程序包在 files 中,你能够间接运行它:

java -cp files/pulsar-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar org.example.Client

② 查看

咱们能够查看 Pulsar 中的 sink 状态:

./bin/pulsar-admin sinks status --name openmldb-test-sink

“numReadFromPulsar”: pulsar 发送了 2 条 message 到 sink 实例中。

“numWrittenToSink”: sink 实例向 OpenMLDB 写入 2 条 message。

同样,咱们能够在 OpenMLDB 在线存储中查问到这些音讯数据。查问脚本 select.sql 内容如下:

set @@execute_mode='online';
use pulsar_test;
select *, string(timestamp(pickup_datetime)), string(timestamp(dropoff_datetime)) from connector_test;

在 OpenMLDB 容器中执行脚本:

../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/select.sq

04 写在最初

1.OpenMLDB 上下游生态体系

为更好升高开发者应用 OpenMLDB 的门槛,OpenMLDB 社区将继续打造面向上下游技术组件的生态圈,为开发者提供更多简略易用的生态 Connector(如下图所示):

  • 面向线上数据生态,如 Kafka, Flin, RabbitMQ, RocketMQ 等
  • 面向离线数据生态,如 HDFS, HBase, Cassandra, S3 等
  • 面向模型构建的算法、框架,例如 XGBoost, LightGBM, TensorFlow, PyTorch, Scikit Learn 等
  • 面向机器学习建模全流程的调度框架、部署工具,例如 Airflow,Kubeflow,DolphinScheduler,Prometheus,Grafana 等

2. OpenMLDB Roadmap v0.5.0

OpenMLDB 社区将于 4 月底公布 v0.5.0 版本
(链接:https://github.com/4paradigm/…),届时 OpenMLDB 将具备新个性如下:

  • 窗口预聚合技术,指数级晋升长窗口聚合性能
  • 欠缺的监控, trace 和 profiling 能力,在企业级应用环境中大幅晋升稳定性、可观测性、和可剖析性
  • 线上存储引擎可插拔以适配不同业务需要,既能够反对基于内存的高性能存储引擎,也能够反对基于外存的大容量低成本存储引擎,还能够反对基于长久内存的存储引擎以在性能和老本间保持平衡
  • 用户自定义函数(UDF)反对,大幅晋升易用性和适用性
  • 上下游数据源生态整合,提供线上数据源的 Kafka, Pulsar connectors

05 相干浏览

① https://github.com/4paradigm/…

(OpenMLDB Pulsar Connector)

② https://openmldb.ai/docs/zh/v…

(OpenMLDB 文档)

③ https://pulsar.apache.org/doc…

(Apache Pulsar connector 文档, OpenMLDB Pulsar Connector 地位如图所示)

心愿这篇文章可能帮忙大家意识 Pulsar Connector 的开发流程,了解 OpenMLDB Connector on Pulsar 是什么样的,理解 Pulsar 如何接入 OpenMLDB。

最初,AI 的提高须要付出多方面的致力,而开放式合作是其中的关键环节,咱们期待来自开发者的交换探讨。欢送大家退出 OpenMLDB 社区,通过下图渠道可退出社区技术交换微信群。

明天的分享就到这里,谢谢大家。

正文完
 0