关于数据湖:DLF-DDI-一站式数据湖构建与分析最佳实践

6次阅读

共计 11148 个字符,预计需要花费 28 分钟才能阅读完成。

简介:本文由阿里云数据湖构建 DLF 团队和 Databricks 数据洞察团队联结撰写,旨在帮忙您更深刻地理解阿里云数据湖构建(DLF)+Databricks 数据洞察(DDI)构建一站式云上数据入湖。

作者

陈鑫伟(熙康),阿里云 计算平台事业部 技术专家
冯加亮(加亮),阿里云 计算平台事业部 技术研发

背景

随着数据时代的一直倒退,数据量爆发式增长,数据模式也变的更加多样。传统数据仓库模式的老本高、响应慢、格局少等问题日益凸显。于是领有老本更低、数据模式更丰盛、剖析计算更灵便的数据湖应运而生。

数据湖作为一个集中化的数据存储仓库,反对的数据类型具备多样性,包含结构化、半结构化以及非结构化的数据,数据起源上蕴含数据库数据、binglog 增量数据、日志数据以及已有数仓上的存量数据等。数据湖可能将这些不同起源、不同格局的数据集中存储管理在高性价比的存储如 OSS 等对象存储中,并对外提供对立的数据目录,反对多种计算剖析形式,无效解决了企业中面临的数据孤岛问题,同时大大降低了企业存储和应用数据的老本。

数据湖架构及关键技术

企业级数据湖架构如下:

数据湖存储与格局

数据湖存储次要以云上对象存储作为次要介质,其具备低成本、高稳定性、高可扩展性等长处。

数据湖上咱们能够采纳反对 ACID 的数据湖存储格局,如 Delta Lake、Hudi、Iceberg。这些数据湖格局有本人的数据 meta 治理能力,可能反对 Update、Delete 等操作,以批流一体的形式解决了大数据场景下数据实时更新的问题。在以后计划中,咱们次要介绍 Delta Lake 的外围能力和利用场景。

Delta Lake 的外围能力

Delta Lake 是一个对立的数据管理系统,为云上数据湖带来数据可靠性和疾速剖析。Delta Lake 运行在现有数据湖之上,并且与 Apache Spark 的 API 齐全兼容。应用 Delta Lake,您能够放慢高质量数据导入数据湖的速度,团队也能够在云服务上疾速应用这些数据,平安且可扩大。

  • ACID 事务性:Delta Lake 在多个写操作之间提供 ACID 事务性。每一次写操作都是一个事务操作,事务日志(Transaction Log)中记录的写操作都有一个程序序列。事务日志(Transaction Log)跟踪了文件级别的写操作,并应用了乐观锁进行并发管制,这十分实用于数据湖,因为尝试批改雷同文件的屡次写操作的状况并不常常产生。当发生冲突时,Delta Lake 会抛出一个并发批改异样,抛给供用户解决并重试其作业。Delta Lake 还提供了最高级别的隔离(可序列化隔离),容许工程师一直地向目录或表写入数据,而使用者一直地从同一目录或表读取数据,读取数据时会看到数据的最新快照。
  • Schema 治理(Schema management):Delta Lake 会主动验证正在写入的 DataFrame 的 Schema 是否与表的 Schema 兼容。若表中存在但 DataFrame 中不存在的列则会被设置为 null。如果 DataFrame 中有额定的列不在表中,那么该操作将会抛出异样。Delta Lake 具备 DDL(数据定义语言)显式增加新列的性能,并且可能自动更新 Schema。
  • 可伸缩的元数据(Metadata)解决:Delta Lake 将表或目录的元数据信息存储在事务日志(Transaction Log)中,而不是元数据 Metastore 中。这使得 Delta Lake 够在固定工夫内列出大目录中的文件,并且在读取数据时效率很高。
  • 数据版本控制和工夫旅行(Time Travel):Delta Lake 容许用户读取表或目录的历史版本快照。当文件在写入过程中被批改时,Delta Lake 会创立文件的新的版本并保留旧版本。当用户想要读取表或目录的较旧版本时,他们能够向 Apach Spark 的 read API 提供工夫戳或版本号,Delta Lake 依据事务日志(Transaction Log)中的信息来构建该工夫戳或版本的残缺快照。这十分不便用户来复现试验和报告,如果须要,还能够将表还原为旧版本。
  • 对立批流一体:除了批处理写入之外,Delta Lake 还能够作为 Apache Spark 的结构化流的高效流接收器(Streaming Sink)。与 ACID 事务和可伸缩元数据处理相结合,高效的流接收器(Streaming Sink)反对大量近实时的剖析用例,而无需保护简单的流和批处理管道。
  • 记录更新和删除:Delta Lake 将反对合并、更新和删除的 DML(数据管理语言)命令。这使得工程师能够轻松地在数据湖中插入和删除记录,并简化他们的变更数据捕捉和 GDPR(个别数据保护条例)用例。因为 Delta Lake 在文件级粒度上进行跟踪和批改数据,因而它比读取和笼罩整个分区或表要高效得多。

数据湖构建与治理

1. 数据入湖

企业的原始数据存在于多种数据库或存储系统,如关系数据库 MySQL、日志零碎 SLS、NoSQL 存储 HBase、音讯数据库 Kafka 等。其中大部分的在线存储都面向在线事务型业务,并不适宜在线剖析的场景,所以须要将数据以无侵入的形式同步至老本更低且更适宜计算剖析的对象存储。

罕用的数据同步形式有基于 DataX、Sqoop 等数据同步工具做批量同步;同时在对于实时性要求较高的场景下,配合应用 Kafka+spark Streaming / flink 等流式同步链路。目前很多云厂商提供了一站式入湖的解决方案,帮忙客户以更快捷更低成本的形式实现数据入湖,如阿里云 DLF 数据入湖。

2. 对立元数据服务

对象存储自身是没有面向大数据分析的语义的,须要联合 Hive Metastore Service 等元数据服务为下层各种剖析引擎提供数据的 Meta 信息。数据湖元数据服务的设计指标是可能在大数据引擎、存储多样性的环境下,构建不同存储系统、格局和不同计算引擎对立元数据视图,并具备对立的权限、元数据,且须要兼容和扩大开源大数据生态元数据服务,反对主动获取元数据,并达到一次治理屡次应用的目标,这样既可能兼容开源生态,也具备极大的易用性。

数据湖计算与剖析

相比于数据仓库,数据湖以更凋谢的形式对接多种不同的计算引擎,如传统开源大数据计算引擎 Hive、Spark、Presto、Flink 等,同时也反对云厂商自研的大数据引擎,如阿里云 MaxCompute、Hologres 等。在数据湖存储与计算引擎之间,个别还会提供数据湖减速的服务,以进步计算剖析的性能,同时缩小带宽的老本和压力。

Databricks 数据洞察 - 商业版的 Spark 数据计算与剖析引擎

DataBricks 数据洞察(DDI)做为阿里云上全托管的 Spark 剖析引擎,可能简略疾速帮忙用户对数据湖的数据进行计算与剖析。

  • Saas 全托管 Spark:免运维,无需关注底层资源状况,升高运维老本,聚焦剖析业务
  • 残缺 Spark 技术栈集成:一站式集成 Spark 引擎和 Delta Lake 数据湖,100% 兼容开源 Spark 社区版;Databricks 做商业反对,最快体验 Spark 最新版本个性
  • 总成本升高:商业版本 Spark 及 Delta Lake 性能劣势显著;同时基于计算存储拆散架构,存储依靠阿里云 OSS 对象存储,借助阿里云 JindoFS 缓存层减速;可能无效升高集群总体应用老本
  • 高品质反对以及 SLA 保障:阿里云和 Databricks 提供笼罩 Spark 全栈的技术支持;提供商业化 SLA 保障与 7 *24 小时 Databricks 专家反对服务

Databricks 数据洞察 + DLF 数据湖构建与流批一体剖析实际

企业构建和利用数据湖个别须要经验数据入湖、数据湖存储与治理、数据湖摸索与剖析等几个过程。本文次要介绍基于阿里云数据湖构建(DLF)+Databricks 数据洞察(DDI)构建一站式的数据入湖,批流一体数据分析实战。

流解决场景:

实时场景保护更新两张 Delta 表:

  • delta_aggregates_func 表:RDS 数据实时入湖。
  • delta_aggregates_metrics 表:工业 metric 数据通过 IoT 平台采集到云 Kafka,经由 Spark Structured Streaming 实时入湖。

批处理场景:

以实时场景生成两张 Delta 作为数据源,进行数据分析执行 Spark jobs,通过 Databrick 数据洞察作业调度定时执行。

前置条件

1. 服务开明

确保 DLF、OSS、Kafka、DDI、RDS、DTS 等云产品服务已开明。留神 DLF、RDS、Kafka、DDI 实例均需在同一 Region 下。

2. RDS 数据筹备

RDS 数据筹备,在 RDS 中创立数据库 dlfdb。在账户核心创立可能读取 engine_funcs 数据库的用户账号,如 dlf_admin。

通过 DMS 登录数据库,运行一下语句创立 engine_funcs 表,及插入大量数据。

CREATE TABLE `engine_funcs` (`emp_no` int(11) NOT NULL,
  `engine_serial_number` varchar(20) NOT NULL,
  `engine_serial_name` varchar(20) NOT NULL,
  `target_engine_serial_number` varchar(20) NOT NULL,
  `target_engine_serial_name` varchar(20) NOT NULL,
  `operator` varchar(16) NOT NULL,
  `create_time` DATETIME NOT NULL,
  `update_time` DATETIME NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

INSERT INTO `engine_funcs` VALUES (10001,'1107108133','temperature','1107108144','temperature','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10002,'1107108155','temperature','1107108133','temperature','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10003,'1107108155','runTime','1107108166','speed','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10004,'1107108177','pressure','1107108155','electricity','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10005,'1107108188','flow' ,'1107108111','runTime','/', now(), now());

RDS 数据实时入湖

1. 创立数据源

进入 DLF 控制台界面:https://dlf.console.aliyun.co…,点击菜单 数据入湖 -> 数据源治理。
点击 新建数据源。填写连贯名称,抉择数据筹备中的应用的 RDS 实例,填写账号密码,点击“连贯测试”验证网络连通性及账号可用性。

点击下一步,确定,实现数据源创立。

2. 创立元数据库

在 OSS 中新建 Bucket,databricks-data-source;

点击左侧菜单“元数据管理”->“元数据库”,点击“新建元数据库”。填写名称,新建目录 dlf/,并抉择。

3. 创立入湖工作

  • 点击菜单“数据入湖”->“入湖工作治理”,点击“新建入湖工作”。
  • 抉择“关系数据库实时入湖”,依照下图的信息填写数据源、指标数据湖、工作配置等信息。并保留。
  • 配置数据源,抉择方才新建的“dlf”连贯,应用表门路“dlf/engine_funcs”,抉择新建 dts 订阅,填写名称。

  • 回到工作治理页面,点击“运行”新建的入湖工作。就会看到工作进入“初始化中”状态,随后会进入“运行”状态。
  • 点击“详情”进入工作详情页,能够看到相应的数据库表信息。

该数据入湖工作,属于全量 + 增量入湖,大概 3 至 5 分钟后,全量数据会实现导入,随后主动进入实时监听状态。如果有数据更新,则会自动更新至 Delta Lake 数据中。

数据湖摸索与剖析

DLF 数据查问摸索

DLF 产品提供了轻量级的数据预览和摸索性能,点击菜单“数据摸索”->“SQL 查问”进入数据查问页面。

在元数据库表中,找到“fjl_dlf”,开展后能够看到 engine_funcs_delta 表曾经主动创立实现。双击该表名称,右侧 sql 编辑框会呈现查问该表的 sql 语句,点击“运行”,即可取得数据查问后果。

回到 DMS 控制台,运行下方 DELETE 和 INSERT SQL 语句。

DELETE FROM `engine_funcs` where `emp_no` = 10001;
UPDATE `engine_funcs` SET `operator` = '+', `update_time` = NOW() WHERE `emp_no` =10002;
INSERT INTO `engine_funcs` VALUES (20001,'1107108199','speed','1107108122','runTime','*', now(), now());

大概 1 至 3 分钟后,在 DLF 数据摸索再次执行方才的 select 语句,所有的数据更新曾经同步至数据湖中。

创立 Databricks 数据洞察(DDI)集群

集群创立实现后,点击“详情”进入详情页,增加以后拜访机器 ip 白名单。

点击 Notebook 进入交互式剖析页查问同步至 Delta Lake 中 engine_funcs_delta 表数据。

IoT 平台采集到云 Kafka 数据实时写入 Delta Lake

1. 引入 spark-sql-kafka 三方依赖

%spark.conf

spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

2. 应用 UDF 函数定义流数据写入 Delta Lake 的 Merge 规定

发往 Kafka 的测试数据的格局:

{"sn": "1107108111","temperature": "12" ,"speed":"1115", "runTime":"160","pressure":"210","electricity":"380","flow":"740","dia":"330"}
{"sn": "1107108122","temperature": "13" ,"speed":"1015", "runTime":"150","pressure":"220","electricity":"390","flow":"787","dia":"340"}
{"sn": "1107108133","temperature": "14" ,"speed":"1215", "runTime":"140","pressure":"230","electricity":"377","flow":"777","dia":"345"}
{"sn": "1107108144","temperature": "15" ,"speed":"1315", "runTime":"145","pressure":"240","electricity":"367","flow":"730","dia":"430"}
{"sn": "1107108155","temperature": "16" ,"speed":"1415", "runTime":"155","pressure":"250","electricity":"383","flow":"750","dia":"345"}
{"sn": "1107108166","temperature": "10" ,"speed":"1515", "runTime":"145","pressure":"260","electricity":"350","flow":"734","dia":"365"}
{"sn": "1107108177","temperature": "12" ,"speed":"1115", "runTime":"160","pressure":"210","electricity":"377","flow":"733","dia":"330"}
{"sn": "1107108188","temperature": "13" ,"speed":"1015", "runTime":"150","pressure":"220","electricity":"381","flow":"737","dia":"340"}
{"sn": "1107108199","temperature": "14" ,"speed":"1215", "runTime":"140","pressure":"230","electricity":"378","flow":"747","dia":"345"}

%spark
import org.apache.spark.sql._
import io.delta.tables._
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView(“dataStream”)
// 对流数据 DF 执行列转行的操作;

val df=microBatchOutputDF.sparkSession.sql(s"""
            select `sn`,
             stack(7, 'temperature', `temperature`, 'speed', `speed`, 'runTime', `runTime`, 'pressure', `pressure`, 'electricity', `electricity`, 'flow', `flow` , 'dia', `dia`) as (`name`, `value`)
             from  dataStream  
""")
df.createOrReplaceTempView("updates")
// 实现实时更新动静的数据,后果 merge 到表外面
val mergedf=df.sparkSession.sql(s"""
MERGE INTO delta_aggregates_metrics t
USING updates s
ON  s.sn = t.sn and s.name=t.name
WHEN MATCHED THEN UPDATE SET 
t.value = s.value,
t.update_time=current_timestamp()
WHEN NOT MATCHED THEN INSERT 
(t.sn,t.name,t.value ,t.create_time,t.update_time)

values (s.sn,s.name,s.value,current_timestamp(),current_timestamp())
“””)
}

3. 应用 Spark Structured Streaming 实时流写入 Delta Lake

%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

def getquery(checkpoint_dir:String,servers:String,topic:String){
    var streamingInputDF =  
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", servers)
    .option("subscribe", topic)     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "10")  
    .option("failOnDataLoss", "true")
    .load()
var streamingSelectDF = 
  streamingInputDF
   .select(get_json_object(($"value").cast("string"), "$.sn").alias("sn"),
   get_json_object(($"value").cast("string"), "$.temperature").alias("temperature"),
   get_json_object(($"value").cast("string"), "$.speed").alias("speed"),
   get_json_object(($"value").cast("string"), "$.runTime").alias("runTime"),
   get_json_object(($"value").cast("string"), "$.electricity").alias("electricity"),
   get_json_object(($"value").cast("string"), "$.flow").alias("flow"),
   get_json_object(($"value").cast("string"), "$.dia").alias("dia"),
   get_json_object(($"value").cast("string"), "$.pressure").alias("pressure")
   )
val query = streamingSelectDF
      .writeStream
      .format("delta")
      .option("checkpointLocation", checkpoint_dir)
      .trigger(Trigger.ProcessingTime("5 seconds")) // 执行流解决工夫距离
      .foreachBatch(upsertToDelta _) // 援用 upsertToDelta 函数
      .outputMode("update")
      .start()}

4. 执行程序

%spark
val my_checkpoint_dir="oss://databricks-data-source/checkpoint/ck"
val servers= "***.***.***.***:9092"
val topic= "your-topic"
getquery(my_checkpoint_dir,servers,topic)

5. 启动 Kafka 并向生产里发送测试数据

查问数据实时写入并更新

查问从 MySQL 实时同步入湖的 engine_funcs_delta 数据

%spark
val rds_dataV=spark.table("fjl_dlf.engine_funcs_delta")
rds_dataV.show()

批处理作业

联合业务,须要将对应的 delta_aggregates_metrics 里的 Value 参数 join 到 engine_funcs_delta 表里

%spark
// 读取实时更新的 delta_aggregates_metrics 数据表
val aggregateDF=spark.table("log_data_warehouse_dlf.delta_aggregates_metrics")
// 读取实时更新的 engine_funcs_delta 函数表
val rds_dataV=spark.table("fjl_dlf.engine_funcs_delta").drop("create_time","update_time")
// rds_dataV.show()
val aggregateSDF= aggregateDF.withColumnRenamed("value","esn_value").withColumnRenamed("name","engine_serial_name").withColumnRenamed("sn","engine_serial_number")
// aggregateSDF.show()
val aggregateTDF=aggregateDF.withColumnRenamed("value","tesn_value").withColumnRenamed("name","target_engine_serial_name").withColumnRenamed("sn","target_engine_serial_number").drop("create_time","update_time")
// aggregateTDF.show()
// 将对应的 delta_aggregates_metrics 里的 Value 参数 join 到 engine_funcs_delta 表里;val  resdf=rds_dataV.join(aggregateSDF,Seq("engine_serial_name","engine_serial_number"),"left").join(aggregateTDF,Seq("target_engine_serial_number","target_engine_serial_name"),"left")
            .selectExpr("engine_serial_number","engine_serial_name","esn_value","target_engine_serial_number","target_engine_serial_name","tesn_value","operator","create_time","update_time")

// 数据展现
resdf.show(false)
// 将后果写入到 Delta 表外面
resdf.write.format("delta")
    .mode("append")
    .saveAsTable("log_data_warehouse_dlf.delta_result")

性能优化:OPTIMIZE & Z-Ordering

在流解决场景下会产生大量的小文件,大量小文件的存在会重大影响数据系统的读性能。Delta Lake 提供了 OPTIMIZE 命令,能够将小文件进行合并压缩,另外,针对 Ad-Hoc 查问场景,因为波及对单表多个维度数据的查问,咱们借助 Delta Lake 提供的 Z-Ordering 机制,能够无效晋升查问的性能。从而极大晋升读取表的性能。DeltaLake 自身提供了 Auto Optimize 选项,然而会就义大量写性能,减少数据写入 delta 表的提早。相同,执行 OPTIMIZE 命令并不会影响写的性能,因为 Delta Lake 自身反对 MVCC,反对 OPTIMIZE 的同时并发执行写操作。因而,咱们采纳定期触发执行 OPTIMIZE 的计划,每小时通过 OPTIMIZE 做一次合并小文件操作,同时执行 VACCUM 来清理过期数据文件:

OPTIMIZE log_data_warehouse_dlf.delta_result ZORDER by engine_serial_number;

VACUUM log_data_warehouse_dlf.delta_result RETAIN 1 HOURS;

原文链接
本文为阿里云原创内容,未经容许不得转载。

正文完
 0