简介:基于RocketMQ和Hudi零代码构建Lakehouse架构,以及RocketMQ Connector & RocketMQ Stream助力ETL数据分析,为大家提供疾速构建Lakehouse的技术计划和低运维老本实现实时计算的解决方案。

本文目录

  • 背景常识
  • 大数据时代的构架演进
  • RocketMQ Connector&Stream
  • Apache Hudi
  • 构建Lakehouse实操

本文题目蕴含三个关键词:Lakehouse、RocketMQ、Hudi。咱们先从整体Lakehouse架构动手,随后逐渐剖析架构产生的起因、架构组件特点以及构建Lakehouse架构的实操局部。

背景常识

1、Lakehouse架构

Lakehouse最后由Databrick提出,并对Lakehouse架构特色有如下要求:

(1)事务反对

企业外部许多数据管道通常会并发读写数据。对ACID事务的反对确保了多方并发读写数据时的一致性问题;

(2)Schema enforcement and governance

Lakehouse应该有一种形式能够反对模式执行和演进、反对DW schema的范式(如星星或雪花模型),可能对数据完整性进行推理,并且具备强壮的治理和审计机制;

(3)开放性

应用的存储格局是开放式和标准化的(如parquet),并且为各类工具和引擎,包含机器学习和Python/R库,提供API,以便它们能够间接无效地拜访数据;

(4)BI反对

Lakehouse能够间接在源数据上应用BI工具。这样能够进步数据新鲜度、缩小提早,并且升高了在数据池和数据仓库中操作两个数据正本的老本;

(5)存储与计算拆散

在实践中,这意味着存储和计算应用独自的集群,因而这些零碎可能扩大到反对更大的用户并发和数据量。一些古代数仓也具备此属性;

(6)反对从非结构化数据到结构化数据的多种数据类型

Lakehouse可用于存储、优化、剖析和拜访许多数据利用所需的包含image、video、audio、text以及半结构化数据;

(7)反对各种工作负载

包含数据迷信、机器学习以及SQL和剖析。可能须要多种工具来反对这些工作负载,但它们底层都依赖同一数据存储库;

(8)端到端流

实时报表是许多企业中的规范利用。对流的反对打消了须要构建独自零碎来专门用于服务实时数据利用的需要。

从上述对Lakehouse架构的特点形容咱们能够看出,针对繁多性能,咱们能够利用某些开源产品组合构建出一套解决方案。但对于全副性能的反对,目前如同没有一个通用的解决方案。接下来,咱们先理解大数据时代支流的数据处理架构是怎么的。

大数据时代的架构演进

1、大数据时代的开源产品

大数据时代的开源产品种类繁多,音讯畛域的RocketMQ、Kafka;计算畛域的flink、spark、storm;存储畛域的HDFS、Hbase、Redis、ElasticSearch、Hudi、DeltaLake等等。

为什么会产生这么多开源产品呢?首先在大数据时代数据量越来越大,而且每个业务的需要也各不相同,因而就产生出各种类型的产品供架构师抉择,用于反对各类场景。然而泛滥的品类产品也给架构师们带来一些困扰,比方选型艰难、试错老本高、学习老本高、架构简单等等。

2、以后支流的多层架构

大数据畛域的解决解决场景蕴含数据分析、BI、科学计算、机器学习、指标监控等场景,针对不同场景,业务方会依据业务特点抉择不同的计算引擎和存储引擎;例如交易指标能够采纳binlog + CDC+ RocketMQ + Flink + Hbase + ELK组合,用于BI和Metric可视化。

(1)多层架构的长处:反对宽泛的业务场景;

(2)多层架构的毛病:

  • 解决链路长,提早高;
  • 数据正本多,老本翻倍;
  • 学习老本高;

造成多层架构毛病次要起因是存储链路和计算链路太长。

  • 咱们真的须要如此多的解决方案来反对宽泛的业务场景吗?Lakehouse架构是否能够对立解决方案?
  • 多层架构的存储层是否能够合并?Hudi产品是否可能反对多种存储需要?
  • 多层架构的计算层是否能够合并?RocketMQ stream是否可能交融音讯层和计算层?


以后支流的多层架构

3、Lakehouse架构产生

Lakehouse架构是多层架构的降级版本,将存储层复杂度持续升高到一层。再进一步压缩计算层,将音讯层和计算层交融,RocketMQ stream充当计算的角色。咱们失去如下图所示的新架构。新架构中,音讯出入口通过RocketMQ connector实现,音讯计算层由RocketMQ stream实现,在RocketMQ外部实现音讯计算两头态的流转;计算结果通过RocketMQ-Hudi-connector收口落库Hudi,Hudi反对多种索引,并提供对立的API输入给不同产品。


Lakehouse架构

上面咱们剖析下该架构的特点。

(1)Lakehouse架构的长处:

  • 链路更短,更适宜实时场景,数据新鲜感高;
  • 老本可控,升高了存储老本;
  • 学习成本低,对程序员敌对;
  • 运维复杂度大幅升高;

(2)Lakehouse架构的毛病

对音讯产品和数据湖产品的稳定性、易用性等要求高,同时音讯产品须要反对计算场景,数据湖产品须要提供弱小的索引性能。

(3)抉择

在Lakehouse架构中咱们抉择音讯产品RocketMQ和数据湖产品Hudi。

同时,能够利用RocketMQ stream在RocketMQ集群上将计算层放在其中集成,这样就将计算层升高到一层,可能满足绝大部分中小型大数据处理场景。

接下来咱们逐渐剖析RocketMQ和Hudi两款产品的特点。

RocketMQ Connector & Stream


RocketMQ 倒退历程图

RocketMQ从2017年开始进入Apache孵化,2018年RocketMQ 4.0公布实现云原生化,2021年RocketMQ 5.0公布全面交融音讯、事件、流。

1、业务音讯畛域首选

RocketMQ作为一款“让人睡得着觉的音讯产品”成为业务音讯畛域的首选,这次要源于产品的以下特点:

(1)金融级高牢靠

经验了阿里巴巴双十一的洪峰测验;

(2)极简架构

如下图所示, RocketMQ的架构次要蕴含两局部包含:源数据集群NameServer Cluster和计算存储集群Broker Cluster。


RocketMQ 构架图

NameServer节点无状态,能够非常简单的进行横向扩容。Broker节点采纳主备形式保证数据高可靠性,反对一主多备的场景,配置灵便。

搭建形式:只须要简略的代码就能够搭建RocketMQ集群:

Jar:

 nohup sh bin/mqnamesrv & nohup sh bin/mqbroker -n localhost:9876 &

On K8S:

kubectl apply -f example/rocketmq_cluster.yaml

(3)极低运维老本

RocketMQ的运维老本很低,提供了很好的CLI工具MQAdmin,MQAdmin提供了丰盛的命令反对,笼罩集群衰弱状态查看、集群进出流量管控等多个方面。例如,mqadmin clusterList一条命令能够获取到以后集群全副节点状态(生产生产流量、提早、排队长度、磁盘水位等);mqadmin updateBrokerConfig命令能够实时设置broker节点或topic的可读可写状态,从而能够动静摘除长期不可用节点,达到生产生产的流量迁徙成果。

(4)丰盛的音讯类型

RocketMQ反对的音讯类型包含:一般音讯、事务音讯、提早音讯、定时音讯、程序音讯等。可能轻松反对大数据场景和业务场景。

(5)高吞吐、低提早

压测场景主备同步复制模式,每台Broker节点都能够将磁盘利用率打满,同时能够将p99提早管制在毫秒级别。

2、RocketMQ 5.0详情

RocketMQ 5.0是生于云、长于云的云原生音讯、事件、流超交融平台,它具备以下特点:

(1)轻量级SDK

全面反对云原生通信规范 gRPC 协定;
无状态 Pop 生产模式,多语言敌对,易集成;

(2)极简架构

无内部依赖,升高运维累赘;
节点间涣散耦合,任意服务节点可随时迁徙;

(3)可分可合的存储计算拆散

  • Broker 降级为真正的无状态服务节点,无 binding;
  • Broker 和 Store节点拆散部署、独立扩缩;
  • 多协定规范反对,无厂商锁定;
  • 可分可合,适应多种业务场景,升高运维累赘;

如下图所示,计算集群(Broker)次要包含形象模型和绝对应的协定适配,以及生产能力和治理能力。存储集群(Store)次要分为音讯存储CommitLog(多类型音讯存储、多模态存储)和索引存储Index(多元索引)两局部,如果能够充分发挥云上存储的能力,将CommitLog和Index配置在云端的文件系统就能够人造的实现存储和计算拆散。

(4)多模存储反对

满足不同根底场景下的高可用诉求;
充分利用云上基础设施,降低成本;

(5)云原生基础设施:

  • 可观测性能力云原生化,OpenTelemetry 标准化;
  • Kubernetes 一键式部署扩容交付。


RocketMQ 5.02021年度大事件及将来布局

3、RocketMQConnector

a、传统数据流

(1)传统数据流的弊病

生产者消费者代码须要本人实现,老本高;
数据同步的工作没有对立治理;
反复开发,代码品质参差不齐;

(2)解决方案:RocketMQ Connector

  • 单干共建,复用数据同步工作代码;
  • 对立的治理调度,进步资源利用率;

b、RocketMQ Connector数据同步流程

相比传统数据流,RocketMQ connector数据流的不同在于将 source 和 sink 进行对立治理,同时它开放源码,社区也很沉闷。

4、RocketMQ Connector架构

如上图所示,RocketMQ Connector架构次要蕴含Runtime和Worker两局部,另外还有生态Source&Sink。

(1)规范:OpenMessaging

(2)生态:反对ActiveMQ、Cassandra、ES、JDBC、JMS、MongoDB、Kafka、RabbitMQ、Mysql、Flume、Hbase、Redis等大数据畛域的大部分产品;

(3)组件:Manager对立治理调度,如果有多个工作能够将所有工作对立进行负载平衡,平均的调配到不同Worker上,同时Worker能够进行横向扩容。

5、RocketMQ Stream

RocketMQ Stream是一款将计算层压缩到一层的产品。它反对一些常见的算子如window、join、维表,兼容Flink SQL、UDF/UDAF/UDTF。

Apache Hudi

Hudi 是一个流式数据湖平台,反对对海量数据疾速更新。内置表格局,反对事务的存储层、一系列表服务、数据服务(开箱即用的摄取工具)以及欠缺的运维监控工具。Hudi 能够将存储卸载到阿里云上的 OSS、AWS 的S3这些存储上。

Hudi的个性包含:

  • 事务性写入,MVCC/OCC并发管制;
  • 对记录级别的更新、删除的原生反对;
  • 面向查问优化:小文件主动治理,针对增量拉取优化的设计,主动压缩、聚类以优化文件布局;

Apache Hudi是一套残缺的数据湖平台。它的特点有:

  • 各模块严密集成,自我管理;
  • 应用 Spark、Flink、Java 写入;
  • 应用 Spark、Flink、Hive、Presto、Trino、Impala、
  • AWS Athena/Redshift等进行查问;
  • 进行数据操作的开箱即用工具/服务。

Apache Hudi次要针对以下三类场景进行优化:

1、流式解决栈

(1) 增量解决;

(2) 疾速、高效;

(3) 面向行;

(4) 未优化扫描;

2、批处理栈

(1) 批量解决;

(2) 低效;

(3) 扫描、列存格局;

3、增量解决栈

(1) 增量解决;

(2) 疾速、高效;

(3) 扫描、列存格局。

构建 Lakehouse 实操

该局部只介绍主流程和实操配置项,本机搭建的实操细节能够参考附录局部。

1、筹备工作

RocketMQ version:4.9.0

rocketmq-connect-hudi version:0.0.1-SNAPSHOT

Hudi version:0.8.0

2、构建RocketMQ-Hudi-connector

(1) 下载:

  git clone https://github.com/apache/rocketmq-externals.git

(2) 配置:

/data/lakehouse/rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/target/distribution/conf/connect.conf 中connector-plugin 门路

(3) 编译:

cd rocketmq-externals/rocketmq-connect-hudimvn clean install -DskipTest -U

rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar就是咱们须要应用的rocketmq-hudi-connector

3、运行

(1) 启动或应用现有的RocketMQ集群,并初始化元数据Topic:

connector-cluster-topic (集群信息) connector-config-topic (配置信息)

connector-offset-topic (sink生产进度) connector-position-topic (source数据处理进度 并且为了保障音讯有序,每个topic能够只建一个queue)

(2) 启动RocketMQ connector运行时

cd /data/lakehouse/rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime  sh ./run_worker.sh    ##  Worker能够启动多个

(3) 配置并启动RocketMQ-hudi-connector工作

申请RocketMQ connector runtime创立工作

curl http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name} ?config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","src-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/data/lakehouse/config/user.avsc"\}’  启动胜利会打印如下日志:2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient successfully

(4) 此时向source topic生产的数据会主动写入到1Hudi对应的table中,能够通过Hudi的api进行查问。

4、配置解析

(1) RocketMQ connector须要配置RocketMQ集群信息和connector插件地位,蕴含:connect工作节点id标识workerid、connect服务命令接管端口httpPort、rocketmq集群namesrvAddr、connect本地配置贮存目录storePathRootDir、connector插件目录pluginPaths 。


RocketMQ connector配置表

(2) Hudi工作须要配置Hudi表门路tablePath和表名称tableName,以及Hudi应用的Schema文件。


Hudi工作配置表

点击此处即可查看Lakehouse构建实操视频

附录:在本地Mac零碎构建Lakehouse demo

波及到的组件:rocketmq、rocketmq-connector-runtime、rocketmq-connect-hudi、hudi、hdfs、avro、spark-shell0、启动hdfs

下载hadoop包

https://www.apache.org/dyn/cl...

cd /Users/osgoo/Documents/hadoop-2.10.1vi core-site.xml<configuration><property> <name>fs.defaultFS</name> <!-- 能够通过命令hostname 查看主机名字  这里的主机名字是hadoop1--> <value>hdfs://localhost:9000</value></property><!--笼罩掉core-default.xml中的默认配置--></configuration>vi hdfs-site.xml<configuration><property>        <name>dfs.replication</name>        <value>1</value>  </property></configuration>./bin/hdfs namenode -format./sbin/start-dfs.sh jps 看下namenode,datanodelsof -i:9000./bin/hdfs dfs -mkdir -p /Users/osgoo/Downloads1、启动rocketmq集群,创立rocketmq-connector内置topicQickStart:https://rocketmq.apache.org/docs/quick-start/sh mqadmin updatetopic -t connector-cluster-topic -n localhost:9876 -c DefaultClustersh mqadmin updatetopic -t connector-config-topic -n localhost:9876 -c DefaultClustersh mqadmin updatetopic -t connector-offset-topic -n localhost:9876 -c DefaultClustersh mqadmin updatetopic -t connector-position-topic -n localhost:9876 -c DefaultCluster2、创立数据入湖的源端topic,testhudi1sh mqadmin updatetopic -t testhudi1 -n localhost:9876 -c DefaultCluster3、编译rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jarcd rocketmq-connect-hudimvn clean install -DskipTest -U4、启动rocketmq-connector runtime配置connect.conf--------------workerId=DEFAULT_WORKER_1storePathRootDir=/Users/osgoo/Downloads/storeRoot## Http port for user to access REST APIhttpPort=8082# Rocketmq namesrvAddrnamesrvAddr=localhost:9876# Source or sink connector jar file dir,The default value is rocketmq-connect-samplepluginPaths=/Users/osgoo/Downloads/connector-plugins---------------拷贝 rocketmq-hudi-connector.jar 到 pluginPaths=/Users/osgoo/Downloads/connector-pluginssh run_worker.sh5、配置入湖configcurl http://localhost:8082/connectors/rocketmq-connect-hudi?config='\{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"testhudi1","tablePath":"hdfs://localhost:9000/Users/osgoo/Documents/base-path7","tableName":"t7","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","source-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/Users/osgoo/Downloads/user.avsc"\}'6、发送音讯到testhudi17、## 利用spark读取cd /Users/osgoo/Downloads/spark-3.1.2-bin-hadoop3.2/bin./spark-shell \  --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 \  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'import org.apache.hudi.QuickstartUtils._import scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._val tableName = "t7"val basePath = "hdfs://localhost:9000/Users/osgoo/Documents/base-path7"val tripsSnapshotDF = spark.  read.  format("hudi").  load(basePath + "/*")tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")spark.sql("select * from hudi_trips_snapshot").show()

原文链接
本文为阿里云原创内容,未经容许不得转载。