导读:人工智能工程化落地的关键点之一,在于解决实在业务场景的实时批量预估和实时模型更新问题。更好更快地将线上实时数据转化为AI可用的特色,将减速AI利用落地的效率及成果。为此,OpenMLDB 和 Apache Pulsar 单干推出OpenMLDB Pulsar Connector,实现稳固的流式集成,为高效买通实时数据到特色工程提供一条值得期待的清晰门路。
我是黄威,目前是第四范式研发架构师,也是OpenMLDB的外围研发。明天次要为大家介绍三个方面的内容:
- Pulsar Connector简介
- OpenMLDB Connector on Pulsar介绍
- OpenMLDB Connector on Pulsar演示
01 Pulsar Connector简介
Apache Pulsar 是一个云原生的,分布式音讯流平台。它能够作为 OpenMLDB 的在线数据源,将实时的数据流导入到 OpenMLDB 在线。Pulsar 提供了Connector 框架,在此基础上能够与不同零碎的对接。咱们基于Connector框架,开发了 OpenMLDB JDBC Connector,通过它咱们就能够无障碍地连贯 Pulsar与OpenMLDB,Pulsar的音讯将主动地写入OpenMLDB。
02 OpenMLDB-Pulsar Connector介绍
1. 定位
OpenMLDB Pulsar Connector,高效买通实时数据到特色工程,大幅晋升数据应用效率、助力开发者构建实时数据管道、使企业更专一和更高效的摸索数据的商业价值。
在OpenMLDB 的工作流中,Pulsar Connector(地位如下图所示)帮忙开发者轻松地将音讯零碎Pulsar与开源机器学习数据库OpenMLDB连接起来,造成一条实时数据流。
2. 性能
Pulsar能够应用connector来连贯其余零碎。Source connector能够使其余零碎的数据流入Pulsar,sink connector能够将音讯流出至其余零碎。
OpenMLDB Pulsar Connector反对了sink性能,使Pulsar音讯能够写入到OpenMLDB在线存储中。
能够通过 Connector Admin CLI并联合 sinks 子命令来治理 Pulsar connector(例如,创立、更新、启动、进行、重启、重载、删除以及其余操作)。
3. 劣势
想要使OpenMLDB与Pulsar领有稳固的流式集成,咱们举荐间接应用Pulsar OpenMLDB connector 。它具备诸多劣势,包含但不限于:
- 易上手。无需编写任何代码,只需进行简略配置,便可通过OpenMLDB Pulsar Connector 将Pulsar的音讯流入 OpenMLDB 。简化的数据导入过程能大幅晋升企业的数据应用效率。
- 易扩大。依据不同的业务需要,能够抉择在单机或集群上运行 OpenMLDB Pulsar Connector ,助力企业构建实时数据管道。
- 可继续。OpenMLDB Pulsar Connector 简略的装置和部署过程,使企业能更专一和更高效地摸索数据的商业价值。
4.Connector下载地址
OpenMLDB Pulsar Connector:
https://github.com/4paradigm/...
03 Connector演示
1. 流程介绍
Pulsar OpenMLDB connector 用于 OpenMLDB 线上模式的实时数据流接入。应用connector的简要流程,如下图所示。咱们接下来将具体介绍每一步。
整体上,应用流程能够概括为三步:
- 创立connector前须要启动OpenMLDB集群,并创立表。
- 创立Pulsar standalone,创立sink,sink配置中应用OpenMLDB集群的JDBC地址。并且,创立用于解析音讯的schema。
- 向Pulsar发送音讯,来测试音讯是否能主动写入到OpenMLDB。
2. 关键步骤
留神,为了使演示更简略,本文中将应用Pulsar Standalone,OpenMLDB集群和一个简略JSON音讯生产者程序,来演示OpenMLDB JDBC Connector是如何工作的。该connector是齐全能够在Pulsar Cluster中失常应用的。
步骤1 | 在 OpenMLDB 创立数据库和数据表
① 启动 OpenMLDB 集群
应用Docker能够疾速启动OpenMLDB,除此之外,咱们还须要创立测试用的表。
揭示:目前只有OpenMLDB集群版能够作为sink的接收端,数据只会sink到集群的在线存储中。
咱们更举荐你应用host network模式运行docker,以及绑定文件目录files,sql脚本在该目录中。
docker run -dit --network host -v `pwd`/files:/work/taxi-trip/files --name openmldb 4pdosc/openmldb:0.4.4 bash
在OpenMLDB容器中,启动集群:
./init.sh
须要留神的是,在macOS平台上,即便应用host网络,也不反对从容器内部去连贯容器内的 OpenMLDB 服务器。但从容器内,去连贯别的容器内的OpenMLDB服务,是可行的。
② 创立表
咱们应用一个脚本疾速创立表,脚本内容如下:
create database pulsar_test;use pulsar_test;create table connector_test(id string, vendor_id int, pickup_datetime bigint, dropoff_datetime bigint, passenger_count int, pickup_longitude double, pickup_latitude double, dropoff_longitude double, dropoff_latitude double, store_and_fwd_flag string, trip_duration int);desc connector_test;
执行脚本:
../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/create.sql
目前,Pulsar中JSONSchema和JDBC base connector都不支java.sql.Timestamp。所以咱们应用long作为timestamp列的数据类型(在OpenMLDB能够应用long作为工夫戳)。
步骤2 | 启动Pulsar,创立sink和schem
① 启动 Pulsar Standalone
应用docker,能够更简略疾速地启动Pulsar。咱们举荐你应用host network来运行docker,这样能够防止诸多容器相干的网络连接问题。而且,咱们须要应用pulsar-admin来进行sink创立,这个程序在Pulsar镜像内。所以,咱们应用bash运行容器,在容器外部逐个执行命令。此处,也须要绑定files文件目录。
docker run -dit --network host -v `pwd`/files:/pulsar/files --name pulsar apachepulsar/pulsar:2.9.1 bash
在Pulsar容器中,启动standalone服务端。
bin/pulsar-daemon start standalone --zookeeper-port 5181
OpenMLDB服务曾经应用了端口2181,所以此处咱们为Pulsar从新设置一个zk端口。咱们将应用端口2181来连贯OpenMLDB,但Pulsar standalone内的zk端口不会对外造成影响。
你能够检查一下Pulsar是否失常运行,能够应用ps或者查看日志。
ps axu|grep pulsar
当你启动一个本地standalone集群,会主动创立pulic/default namesapce。这个namespace用于开发,咱们将在此namespace中创立sink。
如果你想要在本地间接启动Pulsar,能够参考Set up a standalone Pulsar locally。
链接:https://pulsar.apache.org/doc...
Q&A
Q1: 碰到以下问题是什么起因
2022-04-07T03:15:59,289+0000 [main] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:51812022-04-07T03:15:59,289+0000 [main] ERROR org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Exception while instantiating ZooKeeperjava.net.BindException: Address already in use
A: Pulsar须要一个未被应用的端口来启动zk,端口5181页曾经被应用,须要再更改一下–zookeeper-port的端口号。
Q2: 8080端口已被应用?
A: 8080是webServicePort默认配置端口,在conf/standalone.conf中,能够更换这个端口。但留神,pulsar-admin会应用conf/client.conf中的webServiceUrl进行连贯,也须要同步更改。
Q3: 6650端口已被应用?
A: 须要同步更改conf/standalone.conf中的brokerServicePort和conf/client.conf中的brokerServiceUrl配置项。
② Connector装置(Optional)
后面的步骤中咱们绑定了files目录,外面曾经提供了connector的nar包。咱们能够应用“非内建connector”模式来设置connector(即在sink配置中指定archive配置项,将在下一个步骤中形容)。
但如果你心愿将OpenMLDB connector作为内建的connector,你须要创立connectors目录,并拷贝nar文件到connectors目录。
mkdir connectorscp files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar connectors
如果在Pulsar运行时,你想扭转或减少connector,你能够告诉Pulsar更新信息:
bin/pulsar-admin sinks reload
当OpenMLDB connector成为内建connector时,它的sink类型名为jdbc-openmldb,你能够间接应用这个类型名来指定应用OpenMLDB connector。
③ 创立sink
咱们应用public/default这个namespace来创立sink, 咱们须要一个sink的配置文件, 它在files/pulsar-openmldb-jdbc-sink.yaml,内容如下:
tenant: "public" namespace: "default" name: "openmldb-test-sink" archive: "files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar" inputs: ["test_openmldb"] configs: jdbcUrl: "jdbc:openmldb:///pulsar_test?zk=localhost:2181&zkPath=/openmldb" tableName: "connector_test"
其中:
- name:sink名。
- archive:咱们应用archive来指定sink connector, 所以这里咱们是将OpenMLDB connector当作非内建connector应用。
- input:能够是多个topic的名字,本文只应用一个。
- config:用于连贯OpenMLDB集群的jdbc配置。
接下来,创立这个sink并查看。留神,咱们设置的输出topic是‘test_openmldb’,后续步骤须要应用到。
./bin/pulsar-admin sinks create --sink-config-file files/pulsar-openmldb-jdbc-sink.yaml./bin/pulsar-admin sinks status --name openmldb-test-sink
④ 创立 Schema
上传schema到topic test_openmldb,schema类型是JSON格局。后续步骤中,咱们将生产一样schema的JSON音讯。schema文件是files/openmldb-table-schema,内容如下:
{ "type": "JSON", "schema":"{\"type\":\"record\",\"name\":\"OpenMLDBSchema\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor_id\",\"type\":\"int\"},{\"name\":\"pickup_datetime\",\"type\":\"long\"},{\"name\":\"dropoff_datetime\",\"type\":\"long\"},{\"name\":\"passenger_count\",\"type\":\"int\"},{\"name\":\"pickup_longitude\",\"type\":\"double\"},{\"name\":\"pickup_latitude\",\"type\":\"double\"},{\"name\":\"dropoff_longitude\",\"type\":\"double\"},{\"name\":\"dropoff_latitude\",\"type\":\"double\"},{\"name\":\"store_and_fwd_flag\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"trip_duration\",\"type\":\"int\"}]}", "properties": {} }
上传并查看schema的命令,如下所示:
./bin/pulsar-admin schemas upload test_openmldb -f ./files/openmldb-table-schema./bin/pulsar-admin schemas get test_openmldb
步骤3 | 测试
① 发送音讯
咱们应用两条OpenMLDB镜像中data/taxi_tour_table_train_simple.csv的样本数据,作为测试用的音讯。数据如下图所示:
测试用Producer要害代码如下:
能够看到,producer将发送两条音讯到topic test_openmldb。这之后,Pulsar将读到音讯,并将其写入OpenMLDB集群的在线存储中。
程序包在files中,你能够间接运行它:
java -cp files/pulsar-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar org.example.Client
② 查看
咱们能够查看Pulsar中的sink状态:
./bin/pulsar-admin sinks status --name openmldb-test-sink
“numReadFromPulsar”: pulsar发送了2条message到sink实例中。
“numWrittenToSink”: sink实例向OpenMLDB写入2条message。
同样,咱们能够在OpenMLDB在线存储中查问到这些音讯数据。查问脚本select.sql内容如下:
set @@execute_mode='online';use pulsar_test;select *, string(timestamp(pickup_datetime)), string(timestamp(dropoff_datetime)) from connector_test;
在OpenMLDB容器中执行脚本:
../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/select.sq
04 写在最初
1.OpenMLDB上下游生态体系
为更好升高开发者应用OpenMLDB的门槛,OpenMLDB社区将继续打造面向上下游技术组件的生态圈,为开发者提供更多简略易用的生态Connector(如下图所示):
- 面向线上数据生态,如Kafka, Flin, RabbitMQ, RocketMQ等
- 面向离线数据生态,如HDFS, HBase, Cassandra, S3等
- 面向模型构建的算法、框架,例如XGBoost, LightGBM, TensorFlow, PyTorch, Scikit Learn等
- 面向机器学习建模全流程的调度框架、部署工具,例如Airflow,Kubeflow,DolphinScheduler,Prometheus,Grafana等
2. OpenMLDB Roadmap v0.5.0
OpenMLDB社区将于4月底公布v0.5.0版本
(链接:https://github.com/4paradigm/...),届时OpenMLDB将具备新个性如下:
- 窗口预聚合技术,指数级晋升长窗口聚合性能
- 欠缺的监控, trace 和 profiling 能力,在企业级应用环境中大幅晋升稳定性、可观测性、和可剖析性
- 线上存储引擎可插拔以适配不同业务需要,既能够反对基于内存的高性能存储引擎,也能够反对基于外存的大容量低成本存储引擎,还能够反对基于长久内存的存储引擎以在性能和老本间保持平衡
- 用户自定义函数(UDF)反对,大幅晋升易用性和适用性
- 上下游数据源生态整合,提供线上数据源的 Kafka, Pulsar connectors
05 相干浏览
① https://github.com/4paradigm/...
(OpenMLDB Pulsar Connector)
② https://openmldb.ai/docs/zh/v...
(OpenMLDB文档)
③ https://pulsar.apache.org/doc...
(Apache Pulsar connector文档, OpenMLDB Pulsar Connector地位如图所示)
心愿这篇文章可能帮忙大家意识Pulsar Connector的开发流程,了解OpenMLDB Connector on Pulsar是什么样的,理解Pulsar如何接入OpenMLDB。
最初,AI 的提高须要付出多方面的致力,而开放式合作是其中的关键环节,咱们期待来自开发者的交换探讨。欢送大家退出OpenMLDB社区,通过下图渠道可退出社区技术交换微信群。
明天的分享就到这里,谢谢大家。