共计 9842 个字符,预计需要花费 25 分钟才能阅读完成。
简介:基于 RocketMQ 和 Hudi 零代码构建 Lakehouse 架构,以及 RocketMQ Connector & RocketMQ Stream 助力 ETL 数据分析,为大家提供疾速构建 Lakehouse 的技术计划和低运维老本实现实时计算的解决方案。
本文目录
- 背景常识
- 大数据时代的构架演进
- RocketMQ Connector&Stream
- Apache Hudi
- 构建 Lakehouse 实操
本文题目蕴含三个关键词:Lakehouse、RocketMQ、Hudi。咱们先从整体 Lakehouse 架构动手,随后逐渐剖析架构产生的起因、架构组件特点以及构建 Lakehouse 架构的实操局部。
背景常识
1、Lakehouse 架构
Lakehouse 最后由 Databrick 提出,并对 Lakehouse 架构特色有如下要求:
(1)事务反对
企业外部许多数据管道通常会并发读写数据。对 ACID 事务的反对确保了多方并发读写数据时的一致性问题;
(2)Schema enforcement and governance
Lakehouse 应该有一种形式能够反对模式执行和演进、反对 DW schema 的范式(如星星或雪花模型),可能对数据完整性进行推理,并且具备强壮的治理和审计机制;
(3)开放性
应用的存储格局是开放式和标准化的(如 parquet),并且为各类工具和引擎,包含机器学习和 Python/ R 库,提供 API,以便它们能够间接无效地拜访数据;
(4)BI 反对
Lakehouse 能够间接在源数据上应用 BI 工具。这样能够进步数据新鲜度、缩小提早,并且升高了在数据池和数据仓库中操作两个数据正本的老本;
(5)存储与计算拆散
在实践中,这意味着存储和计算应用独自的集群,因而这些零碎可能扩大到反对更大的用户并发和数据量。一些古代数仓也具备此属性;
(6)反对从非结构化数据到结构化数据的多种数据类型
Lakehouse 可用于存储、优化、剖析和拜访许多数据利用所需的包含 image、video、audio、text 以及半结构化数据;
(7)反对各种工作负载
包含数据迷信、机器学习以及 SQL 和剖析。可能须要多种工具来反对这些工作负载,但它们底层都依赖同一数据存储库;
(8)端到端流
实时报表是许多企业中的规范利用。对流的反对打消了须要构建独自零碎来专门用于服务实时数据利用的需要。
从上述对 Lakehouse 架构的特点形容咱们能够看出,针对繁多性能,咱们能够利用某些开源产品组合构建出一套解决方案。但对于全副性能的反对,目前如同没有一个通用的解决方案。接下来,咱们先理解大数据时代支流的数据处理架构是怎么的。
大数据时代的架构演进
1、大数据时代的开源产品
大数据时代的开源产品种类繁多,音讯畛域的 RocketMQ、Kafka;计算畛域的 flink、spark、storm;存储畛域的 HDFS、Hbase、Redis、ElasticSearch、Hudi、DeltaLake 等等。
为什么会产生这么多开源产品呢?首先在大数据时代数据量越来越大,而且每个业务的需要也各不相同,因而就产生出各种类型的产品供架构师抉择,用于反对各类场景。然而泛滥的品类产品也给架构师们带来一些困扰,比方选型艰难、试错老本高、学习老本高、架构简单等等。
2、以后支流的多层架构
大数据畛域的解决解决场景蕴含数据分析、BI、科学计算、机器学习、指标监控等场景,针对不同场景,业务方会依据业务特点抉择不同的计算引擎和存储引擎;例如交易指标能够采纳 binlog + CDC+ RocketMQ + Flink + Hbase + ELK 组合,用于 BI 和 Metric 可视化。
(1)多层架构的长处:反对宽泛的业务场景;
(2)多层架构的毛病:
- 解决链路长,提早高;
- 数据正本多,老本翻倍;
- 学习老本高;
造成多层架构毛病次要起因是存储链路和计算链路太长。
- 咱们真的须要如此多的解决方案来反对宽泛的业务场景吗?Lakehouse 架构是否能够对立解决方案?
- 多层架构的存储层是否能够合并?Hudi 产品是否可能反对多种存储需要?
- 多层架构的计算层是否能够合并?RocketMQ stream 是否可能交融音讯层和计算层?
以后支流的多层架构
3、Lakehouse 架构产生
Lakehouse 架构是多层架构的降级版本,将存储层复杂度持续升高到一层。再进一步压缩计算层,将音讯层和计算层交融,RocketMQ stream 充当计算的角色。咱们失去如下图所示的新架构。新架构中,音讯出入口通过 RocketMQ connector 实现,音讯计算层由 RocketMQ stream 实现,在 RocketMQ 外部实现音讯计算两头态的流转;计算结果通过 RocketMQ-Hudi-connector 收口落库 Hudi,Hudi 反对多种索引,并提供对立的 API 输入给不同产品。
Lakehouse 架构
上面咱们剖析下该架构的特点。
(1)Lakehouse 架构的长处:
- 链路更短,更适宜实时场景,数据新鲜感高;
- 老本可控,升高了存储老本;
- 学习成本低,对程序员敌对;
- 运维复杂度大幅升高;
(2)Lakehouse 架构的毛病
对音讯产品和数据湖产品的稳定性、易用性等要求高,同时音讯产品须要反对计算场景,数据湖产品须要提供弱小的索引性能。
(3)抉择
在 Lakehouse 架构中咱们抉择音讯产品 RocketMQ 和数据湖产品 Hudi。
同时,能够利用 RocketMQ stream 在 RocketMQ 集群上将计算层放在其中集成,这样就将计算层升高到一层,可能满足绝大部分中小型大数据处理场景。
接下来咱们逐渐剖析 RocketMQ 和 Hudi 两款产品的特点。
RocketMQ Connector & Stream
RocketMQ 倒退历程图
RocketMQ 从 2017 年开始进入 Apache 孵化,2018 年 RocketMQ 4.0 公布实现云原生化,2021 年 RocketMQ 5.0 公布全面交融音讯、事件、流。
1、业务音讯畛域首选
RocketMQ 作为一款“让人睡得着觉的音讯产品”成为业务音讯畛域的首选,这次要源于产品的以下特点:
(1)金融级高牢靠
经验了阿里巴巴双十一的洪峰测验;
(2)极简架构
如下图所示,RocketMQ 的架构次要蕴含两局部包含:源数据集群 NameServer Cluster 和计算存储集群 Broker Cluster。
RocketMQ 构架图
NameServer 节点无状态,能够非常简单的进行横向扩容。Broker 节点采纳主备形式保证数据高可靠性,反对一主多备的场景,配置灵便。
搭建形式:只须要简略的代码就能够搭建 RocketMQ 集群:
Jar:
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &
On K8S:
kubectl apply -f example/rocketmq_cluster.yaml
(3)极低运维老本
RocketMQ 的运维老本很低,提供了很好的 CLI 工具 MQAdmin,MQAdmin 提供了丰盛的命令反对,笼罩集群衰弱状态查看、集群进出流量管控等多个方面。例如,mqadmin clusterList 一条命令能够获取到以后集群全副节点状态(生产生产流量、提早、排队长度、磁盘水位等);mqadmin updateBrokerConfig 命令能够实时设置 broker 节点或 topic 的可读可写状态,从而能够动静摘除长期不可用节点,达到生产生产的流量迁徙成果。
(4)丰盛的音讯类型
RocketMQ 反对的音讯类型包含:一般音讯、事务音讯、提早音讯、定时音讯、程序音讯等。可能轻松反对大数据场景和业务场景。
(5)高吞吐、低提早
压测场景主备同步复制模式,每台 Broker 节点都能够将磁盘利用率打满,同时能够将 p99 提早管制在毫秒级别。
2、RocketMQ 5.0 详情
RocketMQ 5.0 是生于云、长于云的云原生音讯、事件、流超交融平台,它具备以下特点:
(1)轻量级 SDK
全面反对云原生通信规范 gRPC 协定;
无状态 Pop 生产模式,多语言敌对,易集成;
(2)极简架构
无内部依赖,升高运维累赘;
节点间涣散耦合,任意服务节点可随时迁徙;
(3)可分可合的存储计算拆散
- Broker 降级为真正的无状态服务节点,无 binding;
- Broker 和 Store 节点拆散部署、独立扩缩;
- 多协定规范反对,无厂商锁定;
- 可分可合,适应多种业务场景,升高运维累赘;
如下图所示,计算集群(Broker)次要包含形象模型和绝对应的协定适配,以及生产能力和治理能力。存储集群(Store)次要分为音讯存储 CommitLog(多类型音讯存储、多模态存储)和索引存储 Index(多元索引)两局部,如果能够充分发挥云上存储的能力,将 CommitLog 和 Index 配置在云端的文件系统就能够人造的实现存储和计算拆散。
(4)多模存储反对
满足不同根底场景下的高可用诉求;
充分利用云上基础设施,降低成本;
(5)云原生基础设施:
- 可观测性能力云原生化,OpenTelemetry 标准化;
- Kubernetes 一键式部署扩容交付。
RocketMQ 5.02021 年度大事件及将来布局
3、RocketMQConnector
a、传统数据流
(1)传统数据流的弊病
生产者消费者代码须要本人实现,老本高;
数据同步的工作没有对立治理;
反复开发,代码品质参差不齐;
(2)解决方案:RocketMQ Connector
- 单干共建,复用数据同步工作代码;
- 对立的治理调度,进步资源利用率;
b、RocketMQ Connector 数据同步流程
相比传统数据流,RocketMQ connector 数据流的不同在于将 source 和 sink 进行对立治理,同时它开放源码,社区也很沉闷。
4、RocketMQ Connector 架构
如上图所示,RocketMQ Connector 架构次要蕴含 Runtime 和 Worker 两局部,另外还有生态 Source&Sink。
(1)规范:OpenMessaging
(2)生态:反对 ActiveMQ、Cassandra、ES、JDBC、JMS、MongoDB、Kafka、RabbitMQ、Mysql、Flume、Hbase、Redis 等大数据畛域的大部分产品;
(3)组件:Manager 对立治理调度,如果有多个工作能够将所有工作对立进行负载平衡,平均的调配到不同 Worker 上,同时 Worker 能够进行横向扩容。
5、RocketMQ Stream
RocketMQ Stream 是一款将计算层压缩到一层的产品。它反对一些常见的算子如 window、join、维表,兼容 Flink SQL、UDF/UDAF/UDTF。
Apache Hudi
Hudi 是一个流式数据湖平台,反对对海量数据疾速更新。内置表格局,反对事务的存储层、一系列表服务、数据服务(开箱即用的摄取工具)以及欠缺的运维监控工具。Hudi 能够将存储卸载到阿里云上的 OSS、AWS 的 S3 这些存储上。
Hudi 的个性包含:
- 事务性写入,MVCC/OCC 并发管制;
- 对记录级别的更新、删除的原生反对;
- 面向查问优化:小文件主动治理,针对增量拉取优化的设计,主动压缩、聚类以优化文件布局;
Apache Hudi 是一套残缺的数据湖平台。它的特点有:
- 各模块严密集成,自我管理;
- 应用 Spark、Flink、Java 写入;
- 应用 Spark、Flink、Hive、Presto、Trino、Impala、
- AWS Athena/Redshift 等进行查问;
- 进行数据操作的开箱即用工具 / 服务。
Apache Hudi 次要针对以下三类场景进行优化:
1、流式解决栈
(1) 增量解决;
(2) 疾速、高效;
(3) 面向行;
(4) 未优化扫描;
2、批处理栈
(1) 批量解决;
(2) 低效;
(3) 扫描、列存格局;
3、增量解决栈
(1) 增量解决;
(2) 疾速、高效;
(3) 扫描、列存格局。
构建 Lakehouse 实操
该局部只介绍主流程和实操配置项,本机搭建的实操细节能够参考附录局部。
1、筹备工作
RocketMQ version:4.9.0
rocketmq-connect-hudi version:0.0.1-SNAPSHOT
Hudi version:0.8.0
2、构建 RocketMQ-Hudi-connector
(1) 下载:
git clone https://github.com/apache/rocketmq-externals.git
(2) 配置:
/data/lakehouse/rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/target/distribution/conf/connect.conf 中 connector-plugin 门路
(3) 编译:
cd rocketmq-externals/rocketmq-connect-hudi
mvn clean install -DskipTest -U
rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar 就是咱们须要应用的 rocketmq-hudi-connector
3、运行
(1) 启动或应用现有的 RocketMQ 集群,并初始化元数据 Topic:
connector-cluster-topic(集群信息)connector-config-topic(配置信息)
connector-offset-topic(sink 生产进度)connector-position-topic(source 数据处理进度 并且为了保障音讯有序,每个 topic 能够只建一个 queue)
(2) 启动 RocketMQ connector 运行时
cd /data/lakehouse/rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime
sh ./run_worker.sh ## Worker 能够启动多个
(3) 配置并启动 RocketMQ-hudi-connector 工作
申请 RocketMQ connector runtime 创立工作
curl http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name} ?config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","src-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/data/lakehouse/config/user.avsc"\}’启动胜利会打印如下日志:2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient successfully
(4) 此时向 source topic 生产的数据会主动写入到 1Hudi 对应的 table 中,能够通过 Hudi 的 api 进行查问。
4、配置解析
(1) RocketMQ connector 须要配置 RocketMQ 集群信息和 connector 插件地位,蕴含:connect 工作节点 id 标识 workerid、connect 服务命令接管端口 httpPort、rocketmq 集群 namesrvAddr、connect 本地配置贮存目录 storePathRootDir、connector 插件目录 pluginPaths。
RocketMQ connector 配置表
(2) Hudi 工作须要配置 Hudi 表门路 tablePath 和表名称 tableName,以及 Hudi 应用的 Schema 文件。
Hudi 工作配置表
点击此处即可查看 Lakehouse 构建实操视频
附录:在本地 Mac 零碎构建 Lakehouse demo
波及到的组件:rocketmq、rocketmq-connector-runtime、rocketmq-connect-hudi、hudi、hdfs、avro、spark-shell0、启动 hdfs
下载 hadoop 包
https://www.apache.org/dyn/cl…
cd /Users/osgoo/Documents/hadoop-2.10.1
vi core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<!-- 能够通过命令 hostname 查看主机名字 这里的主机名字是 hadoop1-->
<value>hdfs://localhost:9000</value>
</property>
<!-- 笼罩掉 core-default.xml 中的默认配置 -->
</configuration>
vi hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
./bin/hdfs namenode -format
./sbin/start-dfs.sh
jps 看下 namenode,datanode
lsof -i:9000
./bin/hdfs dfs -mkdir -p /Users/osgoo/Downloads
1、启动 rocketmq 集群,创立 rocketmq-connector 内置 topic
QickStart:https://rocketmq.apache.org/docs/quick-start/
sh mqadmin updatetopic -t connector-cluster-topic -n localhost:9876 -c DefaultCluster
sh mqadmin updatetopic -t connector-config-topic -n localhost:9876 -c DefaultCluster
sh mqadmin updatetopic -t connector-offset-topic -n localhost:9876 -c DefaultCluster
sh mqadmin updatetopic -t connector-position-topic -n localhost:9876 -c DefaultCluster
2、创立数据入湖的源端 topic,testhudi1
sh mqadmin updatetopic -t testhudi1 -n localhost:9876 -c DefaultCluster
3、编译 rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar
cd rocketmq-connect-hudi
mvn clean install -DskipTest -U
4、启动 rocketmq-connector runtime
配置 connect.conf
--------------
workerId=DEFAULT_WORKER_1
storePathRootDir=/Users/osgoo/Downloads/storeRoot
## Http port for user to access REST API
httpPort=8082
# Rocketmq namesrvAddr
namesrvAddr=localhost:9876
# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
pluginPaths=/Users/osgoo/Downloads/connector-plugins
---------------
拷贝 rocketmq-hudi-connector.jar 到 pluginPaths=/Users/osgoo/Downloads/connector-plugins
sh run_worker.sh
5、配置入湖 config
curl http://localhost:8082/connectors/rocketmq-connect-hudi?config='\{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"testhudi1","tablePath":"hdfs://localhost:9000/Users/osgoo/Documents/base-path7","tableName":"t7","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","source-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/Users/osgoo/Downloads/user.avsc"\}'
6、发送音讯到 testhudi1
7、## 利用 spark 读取
cd /Users/osgoo/Downloads/spark-3.1.2-bin-hadoop3.2/bin
./spark-shell \
--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "t7"
val basePath = "hdfs://localhost:9000/Users/osgoo/Documents/base-path7"
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select * from hudi_trips_snapshot").show()
原文链接
本文为阿里云原创内容,未经容许不得转载。