导读:人工智能工程化落地的关键点之一,在于解决实在业务场景的实时批量预估和实时模型更新问题。更好更快地将线上实时数据转化为AI可用的特色,将减速AI利用落地的效率及成果。为此,OpenMLDB 和 Apache Pulsar 单干推出OpenMLDB Pulsar Connector,实现稳固的流式集成,为高效买通实时数据到特色工程提供一条值得期待的清晰门路。

我是黄威,目前是第四范式研发架构师,也是OpenMLDB的外围研发。明天次要为大家介绍三个方面的内容:

  • Pulsar Connector简介
  • OpenMLDB Connector on Pulsar介绍
  • OpenMLDB Connector on Pulsar演示

01 Pulsar Connector简介

Apache Pulsar 是一个云原生的,分布式音讯流平台。它能够作为 OpenMLDB 的在线数据源,将实时的数据流导入到 OpenMLDB 在线。Pulsar 提供了Connector 框架,在此基础上能够与不同零碎的对接。咱们基于Connector框架,开发了 OpenMLDB JDBC Connector,通过它咱们就能够无障碍地连贯 Pulsar与OpenMLDB,Pulsar的音讯将主动地写入OpenMLDB。

02 OpenMLDB-Pulsar Connector介绍

1. 定位

OpenMLDB Pulsar Connector,高效买通实时数据到特色工程,大幅晋升数据应用效率、助力开发者构建实时数据管道、使企业更专一和更高效的摸索数据的商业价值。

在OpenMLDB 的工作流中,Pulsar Connector(地位如下图所示)帮忙开发者轻松地将音讯零碎Pulsar与开源机器学习数据库OpenMLDB连接起来,造成一条实时数据流。

2. 性能

Pulsar能够应用connector来连贯其余零碎。Source connector能够使其余零碎的数据流入Pulsar,sink connector能够将音讯流出至其余零碎。

OpenMLDB Pulsar Connector反对了sink性能,使Pulsar音讯能够写入到OpenMLDB在线存储中。

能够通过 Connector Admin CLI并联合 sinks 子命令来治理 Pulsar connector(例如,创立、更新、启动、进行、重启、重载、删除以及其余操作)。

3. 劣势

想要使OpenMLDB与Pulsar领有稳固的流式集成,咱们举荐间接应用Pulsar OpenMLDB connector 。它具备诸多劣势,包含但不限于:

  • 易上手。无需编写任何代码,只需进行简略配置,便可通过OpenMLDB Pulsar Connector 将Pulsar的音讯流入 OpenMLDB 。简化的数据导入过程能大幅晋升企业的数据应用效率。
  • 易扩大。依据不同的业务需要,能够抉择在单机或集群上运行 OpenMLDB Pulsar Connector ,助力企业构建实时数据管道。
  • 可继续。OpenMLDB Pulsar Connector 简略的装置和部署过程,使企业能更专一和更高效地摸索数据的商业价值。

4.Connector下载地址

OpenMLDB Pulsar Connector:

https://github.com/4paradigm/...

03 Connector演示

1. 流程介绍

Pulsar OpenMLDB connector 用于 OpenMLDB 线上模式的实时数据流接入。应用connector的简要流程,如下图所示。咱们接下来将具体介绍每一步。

整体上,应用流程能够概括为三步:

  • 创立connector前须要启动OpenMLDB集群,并创立表。
  • 创立Pulsar standalone,创立sink,sink配置中应用OpenMLDB集群的JDBC地址。并且,创立用于解析音讯的schema。
  • 向Pulsar发送音讯,来测试音讯是否能主动写入到OpenMLDB。

2. 关键步骤

留神,为了使演示更简略,本文中将应用Pulsar Standalone,OpenMLDB集群和一个简略JSON音讯生产者程序,来演示OpenMLDB JDBC Connector是如何工作的。该connector是齐全能够在Pulsar Cluster中失常应用的。

步骤1 | 在 OpenMLDB 创立数据库和数据表

① 启动 OpenMLDB 集群

应用Docker能够疾速启动OpenMLDB,除此之外,咱们还须要创立测试用的表。

揭示:目前只有OpenMLDB集群版能够作为sink的接收端,数据只会sink到集群的在线存储中。

咱们更举荐你应用host network模式运行docker,以及绑定文件目录files,sql脚本在该目录中。

docker run -dit --network host -v `pwd`/files:/work/taxi-trip/files --name openmldb 4pdosc/openmldb:0.4.4 bash

在OpenMLDB容器中,启动集群:

./init.sh

须要留神的是,在macOS平台上,即便应用host网络,也不反对从容器内部去连贯容器内的 OpenMLDB 服务器。但从容器内,去连贯别的容器内的OpenMLDB服务,是可行的。

②  创立表

咱们应用一个脚本疾速创立表,脚本内容如下:

create database pulsar_test;use pulsar_test;create table connector_test(id string, vendor_id int, pickup_datetime bigint, dropoff_datetime bigint, passenger_count int, pickup_longitude double, pickup_latitude double, dropoff_longitude double, dropoff_latitude double, store_and_fwd_flag string, trip_duration int);desc connector_test;

执行脚本:

../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/create.sql

目前,Pulsar中JSONSchema和JDBC base connector都不支java.sql.Timestamp。所以咱们应用long作为timestamp列的数据类型(在OpenMLDB能够应用long作为工夫戳)。

步骤2 | 启动Pulsar,创立sink和schem

① 启动 Pulsar Standalone

应用docker,能够更简略疾速地启动Pulsar。咱们举荐你应用host network来运行docker,这样能够防止诸多容器相干的网络连接问题。而且,咱们须要应用pulsar-admin来进行sink创立,这个程序在Pulsar镜像内。所以,咱们应用bash运行容器,在容器外部逐个执行命令。此处,也须要绑定files文件目录。

docker run -dit --network host -v `pwd`/files:/pulsar/files --name pulsar apachepulsar/pulsar:2.9.1 bash

在Pulsar容器中,启动standalone服务端。

bin/pulsar-daemon start standalone --zookeeper-port 5181

OpenMLDB服务曾经应用了端口2181,所以此处咱们为Pulsar从新设置一个zk端口。咱们将应用端口2181来连贯OpenMLDB,但Pulsar standalone内的zk端口不会对外造成影响。

你能够检查一下Pulsar是否失常运行,能够应用ps或者查看日志。

ps axu|grep pulsar

当你启动一个本地standalone集群,会主动创立pulic/default namesapce。这个namespace用于开发,咱们将在此namespace中创立sink。

如果你想要在本地间接启动Pulsar,能够参考Set up a standalone Pulsar locally。

链接:https://pulsar.apache.org/doc...

Q&A

Q1: 碰到以下问题是什么起因

2022-04-07T03:15:59,289+0000 [main] INFO  org.apache.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:51812022-04-07T03:15:59,289+0000 [main] ERROR org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Exception while instantiating ZooKeeperjava.net.BindException: Address already in use

A: Pulsar须要一个未被应用的端口来启动zk,端口5181页曾经被应用,须要再更改一下–zookeeper-port的端口号。

Q2: 8080端口已被应用?

A: 8080是webServicePort默认配置端口,在conf/standalone.conf中,能够更换这个端口。但留神,pulsar-admin会应用conf/client.conf中的webServiceUrl进行连贯,也须要同步更改。

Q3: 6650端口已被应用?

A: 须要同步更改conf/standalone.conf中的brokerServicePort和conf/client.conf中的brokerServiceUrl配置项。

② Connector装置(Optional)

后面的步骤中咱们绑定了files目录,外面曾经提供了connector的nar包。咱们能够应用“非内建connector”模式来设置connector(即在sink配置中指定archive配置项,将在下一个步骤中形容)。

但如果你心愿将OpenMLDB connector作为内建的connector,你须要创立connectors目录,并拷贝nar文件到connectors目录。

mkdir connectorscp files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar  connectors

如果在Pulsar运行时,你想扭转或减少connector,你能够告诉Pulsar更新信息:

bin/pulsar-admin sinks reload

当OpenMLDB connector成为内建connector时,它的sink类型名为jdbc-openmldb,你能够间接应用这个类型名来指定应用OpenMLDB connector。

③ 创立sink

咱们应用public/default这个namespace来创立sink, 咱们须要一个sink的配置文件, 它在files/pulsar-openmldb-jdbc-sink.yaml,内容如下:

tenant: "public" namespace: "default" name: "openmldb-test-sink" archive: "files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar" inputs: ["test_openmldb"] configs:     jdbcUrl: "jdbc:openmldb:///pulsar_test?zk=localhost:2181&zkPath=/openmldb"     tableName: "connector_test"

其中:

  • name:sink名。
  • archive:咱们应用archive来指定sink connector, 所以这里咱们是将OpenMLDB connector当作非内建connector应用。
  • input:能够是多个topic的名字,本文只应用一个。
  • config:用于连贯OpenMLDB集群的jdbc配置。

接下来,创立这个sink并查看。留神,咱们设置的输出topic是‘test_openmldb’,后续步骤须要应用到。

./bin/pulsar-admin sinks create --sink-config-file files/pulsar-openmldb-jdbc-sink.yaml./bin/pulsar-admin sinks status --name openmldb-test-sink

④ 创立 Schema

上传schema到topic test_openmldb,schema类型是JSON格局。后续步骤中,咱们将生产一样schema的JSON音讯。schema文件是files/openmldb-table-schema,内容如下:

 {    "type": "JSON",     "schema":"{\"type\":\"record\",\"name\":\"OpenMLDBSchema\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor_id\",\"type\":\"int\"},{\"name\":\"pickup_datetime\",\"type\":\"long\"},{\"name\":\"dropoff_datetime\",\"type\":\"long\"},{\"name\":\"passenger_count\",\"type\":\"int\"},{\"name\":\"pickup_longitude\",\"type\":\"double\"},{\"name\":\"pickup_latitude\",\"type\":\"double\"},{\"name\":\"dropoff_longitude\",\"type\":\"double\"},{\"name\":\"dropoff_latitude\",\"type\":\"double\"},{\"name\":\"store_and_fwd_flag\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"trip_duration\",\"type\":\"int\"}]}",    "properties": {} }

上传并查看schema的命令,如下所示:

./bin/pulsar-admin schemas upload test_openmldb -f ./files/openmldb-table-schema./bin/pulsar-admin schemas get test_openmldb

步骤3 | 测试

① 发送音讯

咱们应用两条OpenMLDB镜像中data/taxi_tour_table_train_simple.csv的样本数据,作为测试用的音讯。数据如下图所示: 

测试用Producer要害代码如下:

能够看到,producer将发送两条音讯到topic test_openmldb。这之后,Pulsar将读到音讯,并将其写入OpenMLDB集群的在线存储中。

程序包在files中,你能够间接运行它:

java -cp files/pulsar-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar org.example.Client

② 查看

咱们能够查看Pulsar中的sink状态:

./bin/pulsar-admin sinks status --name openmldb-test-sink

“numReadFromPulsar”: pulsar发送了2条message到sink实例中。

“numWrittenToSink”: sink实例向OpenMLDB写入2条message。

同样,咱们能够在OpenMLDB在线存储中查问到这些音讯数据。查问脚本select.sql内容如下:

set @@execute_mode='online';use pulsar_test;select *, string(timestamp(pickup_datetime)), string(timestamp(dropoff_datetime)) from connector_test;

在OpenMLDB容器中执行脚本:

../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/select.sq

04 写在最初

1.OpenMLDB上下游生态体系

为更好升高开发者应用OpenMLDB的门槛,OpenMLDB社区将继续打造面向上下游技术组件的生态圈,为开发者提供更多简略易用的生态Connector(如下图所示):

  • 面向线上数据生态,如Kafka, Flin, RabbitMQ, RocketMQ等
  • 面向离线数据生态,如HDFS, HBase, Cassandra, S3等
  • 面向模型构建的算法、框架,例如XGBoost, LightGBM, TensorFlow, PyTorch, Scikit Learn等
  • 面向机器学习建模全流程的调度框架、部署工具,例如Airflow,Kubeflow,DolphinScheduler,Prometheus,Grafana等

2. OpenMLDB Roadmap v0.5.0

OpenMLDB社区将于4月底公布v0.5.0版本
(链接:https://github.com/4paradigm/...),届时OpenMLDB将具备新个性如下:

  • 窗口预聚合技术,指数级晋升长窗口聚合性能
  • 欠缺的监控, trace 和 profiling 能力,在企业级应用环境中大幅晋升稳定性、可观测性、和可剖析性
  • 线上存储引擎可插拔以适配不同业务需要,既能够反对基于内存的高性能存储引擎,也能够反对基于外存的大容量低成本存储引擎,还能够反对基于长久内存的存储引擎以在性能和老本间保持平衡
  • 用户自定义函数(UDF)反对,大幅晋升易用性和适用性
  • 上下游数据源生态整合,提供线上数据源的 Kafka, Pulsar connectors

05 相干浏览

① https://github.com/4paradigm/...

(OpenMLDB Pulsar Connector)

② https://openmldb.ai/docs/zh/v...

(OpenMLDB文档)

③ https://pulsar.apache.org/doc...

(Apache Pulsar connector文档, OpenMLDB Pulsar Connector地位如图所示)

心愿这篇文章可能帮忙大家意识Pulsar Connector的开发流程,了解OpenMLDB Connector on Pulsar是什么样的,理解Pulsar如何接入OpenMLDB。

最初,AI 的提高须要付出多方面的致力,而开放式合作是其中的关键环节,咱们期待来自开发者的交换探讨。欢送大家退出OpenMLDB社区,通过下图渠道可退出社区技术交换微信群。

明天的分享就到这里,谢谢大家。