关于数据库:数据生态第三弹-RocketMQ-OpenMLDB-Connector实时数据到特征工程的高速传输

5次阅读

共计 6866 个字符,预计需要花费 18 分钟才能阅读完成。

导读:
基于实在的企业业务场景,将线上实时数据更好更快地转化为 AI 可用特色是减速人工智能落地的无效门路之一。因而,OpenMLDB 踊跃买通数据生态上游,继面向实时音讯队列 Pulsar、分布式流解决平台 Kafka 的两款 connector 公布后,OpenMLDB 和 RocketMQ 单干推出 RocketMQ OpenMLDB Connector,助力实时数据到特色工程的高速传输,减速人工智能工程化落地。
将来 OpenMLDB 社区也将推出面向 Flink 的 connector,为开发者提供丰盛的实时数据生态,为各类利用场景赋予高效特色工程能力。

为什么抉择 RocketMQ OpenMLDB Connector

为了使 OpenMLDB 与 RocketMQ 领有高效稳固的传输通道,RocketMQ OpenMLDB connector 具备诸多优良个性,包含但不限于:

易上手。无需编写任何代码,只需进行简略配置,便可通过 RocketMQ OpenMLDB Connector 将 RocketMQ 的音讯流入 OpenMLDB。简化的数据导入过程能大幅晋升企业数据的无效使用率。

易部署。可能依据不同场景的理论业务需要,抉择在单机或集群上运行 RocketMQ OpenMLDB Connector,助力企业构建实时数据管道。

高牢靠。RocketMQ OpenMLDB Connector 集群部署的形式具备 Failover 能力,能够将有问题节点的任务调度到失常节点并保障集群负载平衡,使企业能更专一和更高效地摸索数据的商业价值。

低延时。秒级提早,满足实时数据及特色开发场景。

RocketMQ OpenMLDB Connector

Connector 概述

定位

RocketMQ Connect 是 RocketMQ 数据集成重要组件,它具备具备低延时,可靠性,高性能,低代码,扩展性强等特点,能够实现各种异构数据系统的连贯,构建数据管道,ETL,CDC,数据湖等能力。RocketMQ OpenMLDB Connector 是一个用于在 RocketMQ 和 OpenMLDB 之间可扩大的、牢靠的流式传输数据的工具。让 RocketMQ 及 RocketMQ connect 生态组件导入数据到 OpenMLDB 变得简略。

性能

能够使 RocketMQ 的音讯流入 OpenMLDB 在线存储。

Connector 插件编译

RocketMQ OpenMLDB Connector

$ git clone git@github.com:apache/rocketmq-connect.git
$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/
$ mvn clean package -Dmaven.test.skip=true

最终将编译好的插件包放在 RocketMQ connect 指定的加载地址。

Connector 演示

流程介绍 RocketMQ OpenMLDB Connector 用于 OpenMLDB 线上模式的实时数据流接入。应用 connector 的简要流程,如下图所示。咱们接下来将具体介绍每一步。整体上,应用流程能够概括为四步:

• 启动 OpenMLDB 并创立数据库

• 启动 RocketMQ 并创立 topic

• 启动 RocketMQ OpenMLDB Connector

• 进行测试或者失常应用

关键步骤
以下仅列出应用此 connector 的关键步骤

步骤 1 | 启动 OpenMLDB
启动 OpenMLDB,并创立数据库 rocketmq_test,用于测试。表能够被 RocketMQ Connector 主动创立,所以这里不须要手动创立表。

cd /work
./init.sh
echo "create database rocketmq_test;" | /work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client

步骤 2 | 启动 RocketMQ

RocketMQ 搭建,启动 RocketMQ
1、下载 RocketMQ
$ wget https://dlcdn.apache.org/rocketmq/4.9.3/rocketmq-all-4.9.3-source-release.zip


2、编译 RocketMQ
如果是曾经编译好的请间接执行第 3 部启动 RocketMQ


$ unzip rocketmq-all-4.9.3-source-release.zip  
$ cd rocketmq-all-4.9.3/  
$ mvn -Prelease-all -DskipTests clean install -U  
$ cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3


3、启动 RocketMQ
启动 namesrv
$ nohup sh bin/mqnamesrv &  
查看 namesrv 是否启动胜利
$ tail -f ~/logs/rocketmqlogs/namesrv.log  
The Name Server boot success...


启动 broker
$ nohup sh bin/mqbroker -n localhost:9876 &
查看 broker 是否启动胜利
$ tail -f ~/logs/rocketmqlogs/broker.log    
The broker[%s, 172.30.30.233:10911] boot success...

步骤 3 | 启动 RocketMQ OpenMLDB Connector
首先, 搭建 RocketMQ connect runtime
环境我的项目下载

$ git clone git@github.com:apache/rocketmq-connect.git

构建我的项目

$ cd rocketmq-connect
$ mvn -Prelease-connect -DskipTests clean install -U

批改配置connect-standalone.conf,重点配置如下

$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
$ vim conf/connect-standalone.conf
# 以后的节点的独特 Id
workerId=DEFAULT_WORKER_1


# REST API 的端口地址
httpPort=8081


# 本地存储门路
storePathRootDir=~/storeRoot


# 须要批改为本人的 rocketmq NameServer 的端口地址
# Rocketmq namesrvAddr
namesrvAddr=127.0.0.1:9876  


#须要批改为 connector-plugins 文件夹所在的地位
# Source or sink connector jar file dir
pluginPaths=/usr/local/connector-plugins/

咱们须要将 OpenMLDB RocketMQ Connector 编译好的包放入这个目录。命令如下:

mkdir -p /usr/local/connector-plugins/rocketmq-connect-jdbc
cd ../../../../
cp connectors/rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins/rocketmq-connect-jdbc

应用 standalone 的模式启动 RocketMQ Connect Runtime 环境。

$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
$ sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

以下示意 Rocketmq connect runtime 运行胜利:

步骤 4 | 测试

• 创立 Mysql 数据表,并初始化测试数据

• 创立 mysql source,从测试表中拉取数据

• 创立 OpenMLDB sink,将 source 拉取的数据写入到 OpenMLDB 中

初始化 Mysql 测试数据;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;


-- ----------------------------
-- Table structure for employee_test
-- ----------------------------
DROP TABLE IF EXISTS `employee_test`;
CREATE TABLE `employee_test` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(128) DEFAULT NULL,
  `howold` int DEFAULT NULL,
  `male` int DEFAULT NULL,
  `company` varchar(128) DEFAULT NULL,
  `money` double DEFAULT NULL,
  `begin_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;


-- ----------------------------
-- Records of employee_test
-- ----------------------------
BEGIN;
INSERT INTO `employee_test` VALUES (2, 'name-02', 19, 7, 'company', 32232, '2021-12-29 08:00:00');
INSERT INTO `employee_test` VALUES (4, 'gjk', 25, 8, 'company', 3232, '2021-12-24 20:43:36');
INSERT INTO `employee_test` VALUES (12, 'name-06', 19, 3, NULL, NULL, NULL);
INSERT INTO `employee_test` VALUES (14, 'name-08', 25, 15, 'company', 32255, '2022-02-08 19:06:39');
COMMIT;


SET FOREIGN_KEY_CHECKS = 1;

创立并启动 RocketMQ conect mysql source connector,如下所示:

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbc-mysql-source-test
-d  '{"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","max-task":"1","connection.url":"jdbc:mysql://127.0.0.1:3306","connection.user":"*****","connection.password":"*****","table.whitelist":"test_database.employee_test","mode":"incrementing",     // 增量拉取形式"incrementing.column.name":"id",   // 指定增量拉取的字段"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}'st

确认工作启动并开始拉取数据:

建设一个 OpenMLDB RocketMQ sink connector 将数据写入到 OpenMLDB 表中,信息如下。(注:监听的 Topic 为 source 拉取表的表名)

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbc-openmldb-sink-test
-d '{"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector","max-task":"1","connect-topicname":"employee_test","connection.url":"jdbc:openmldb:///rocketmq_test?zk=127.0.0.1:2181&zkPath=/openmldb_cluster","insert.mode":"INSERT","db.timezone":"UTC","table.types":"TABLE","auto.create":"true","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}'

察看数据是否入库而后,咱们能够在 OpenMLDB 中查问是否插入胜利。内容如下:

set @@execute_mode='online';
use rocketmq_test;
select * from employee_test;

后果如下:

写在最初

对于 OpenMLDB

OpenMLDB 是一个开源机器学习数据库,致力于闭环解决 AI 工程化落地的数据治理难题。自 2021 年 6 月开源以来,OpenMLDB 优先公布了特色数据治理能力,依靠 SQL 的开发能力,为企业提供全栈性能的、低门槛特色数据计算和治理平台。

OpenMLDB 蕴含 Feature Store 的全副性能,并且提供更为残缺的 FeatureOps 全栈计划。除了提供特色存储性能,还具备基于 SQL 的低门槛数据库开发体验、面向特色计算优化的 OpenMLDB Spark 发行版,针对实时特色计算优化的索引构造,特色上线服务、企业级运维和治理等性能,让特色工程开发回归于实质——专一于高质量的特色计算脚本开发,不再被工程化效率落地所羁绊。

对于 RocketMQ

Apache RocketMQ 是一款由阿里巴巴开源的低提早、高并发、高可用、高牢靠,可撑持万亿级数据洪峰的分布式消息中间件。2016 年阿里巴巴正式将 RocketMQ 募捐给 Apache 基金会,2017 年毕业成为 Apache 顶级开源我的项目。在中国,目前 Apache RocketMQ 曾经被 75% 以上的互联网及金融公司所应用,国内包含阿里云在内的 10+ 云厂商均提供了 RocketMQ 商业服务,寰球超过 500 位贡献者参加其中,近乎成为成为业务音讯畛域首选音讯平台。

随着云原生时代的到来,Apache RocketMQ 进行了全面的架构降级,5.0 版本的公布,标记着 RocketMQ 从 Messaging 平台降级为云原生事件、音讯流交融解决平台,用以帮忙用户轻松构建事件驱动服务,满足轻量级计算诉求,放大数据价值。

OpenMLDB 上下游生态体系

为更好升高开发者应用 OpenMLDB 的门槛,OpenMLDB 社区将继续打造面向上下游技术组件的生态圈,为开发者提供更多简略易用的生态 Connector。

• 面向线上数据生态,如 Pulsar(已实现),Kafka(已实现),RocketMQ(已实现),Flink,RabbitMQ 等

• 面向离线数据生态,如 HDFS,HBase,Cassandra,S3 等

• 面向模型构建的算法、框架,如 XGBoost,LightGBM,TensorFlow,PyTorch,Scikit Learn 等

• 面向机器学习建模全流程的调度框架、部署工具,如 DolphinScheduler(已实现),Airflow,Kubeflow,Prometheus,Grafana 等

相干浏览

Apache RocketMQ Quick Start
​​
​​https://rocketmq.apache.org/d…​​

Apache RocketMQ Connect
​​
​​https://github.com/apache/roc…​​

OpenMLDB 官网
​​
​​https://openmldb.ai​​
​​​
OpenMLDB github 主页

​​https://github.com/4paradigm/…​​
​​​
OpenMLDB 文档 ​​疾速上手

​​https://openmldb.ai/docs/zh/v…​

正文完
 0