关于spark:SparkSQL数据抽象与执行过程分享

122次阅读

共计 4416 个字符,预计需要花费 12 分钟才能阅读完成。

SparkSQL 数据抽象
引入 DataFrame
就易用性而言,比照传统的 MapReduce API,Spark 的 RDD API 有了数量级的飞跃并不为过。然而,对于没有 MapReduce 和函数式编程教训的老手来说,RDD API 依然存在着肯定的门槛。
另一方面,数据科学家们所相熟的 R、Pandas 等传统数据框架尽管提供了直观的 API,却局限于单机解决,无奈大数据培训胜任大数据场景。
为了解决这一矛盾,Spark SQL 1.3.0 在原有 SchemaRDD 的根底上提供了与 R 和 Pandas 格调相似的 DataFrame API。
新的 DataFrame AP 不仅能够大幅度降低一般开发者的学习门槛,同时还反对 Scala、Java 与 Python 三种语言。更重要的是,因为脱胎自 SchemaRDD,DataFrame 人造实用于分布式大数据场景。
留神:
DataFrame 它不是 Spark SQL 提出来的,而是晚期在 R、Pandas 语言就曾经有了的。
DataFrame 是什么
在 Spark 中,DataFrame 是一种以 RDD 为根底的分布式数据集,相似于传统数据库中的二维表格。DataFrame 与 RDD 的次要区别在于,前者带有 schema 元信息,即 DataFrame 所示意的二维表数据集的每一列都带有名称和类型。
使得 Spark SQL 得以洞察更多的构造信息,从而对藏于 DataFrame 背地的数据源以及作用于 DataFrame 之上的变换进行针对性的优化,最终达到大幅晋升运行时效率。反观 RDD,因为无从得悉所存数据元素的具体内部结构,Spark Core 只能在 stage 层面进行简略、通用的流水线优化。

上图中左侧的 RDD[Person]尽管以 Person 为类型参数,但 Spark 框架自身不理解 Person 类的内部结构。而两头的 DataFrame 却提供了具体的构造信息,使得 Spark SQL 能够分明地晓得该数据集中蕴含哪些列,每列的名称和类型各是什么。理解了这些信息之后,Spark SQL 的查问优化器就能够进行针对性的优化。后者因为在编译期有详尽的类型信息,编译期就能够编译出更加有针对性、更加优化的可执行代码。官网定义:
• Dataset:A DataSet is a distributed collection of data. (分布式的数据集)
• DataFrame:A DataFrame is a DataSet organized into named columns.(以列(列名,列类型,列值)的模式形成的分布式的数据集,依照列赋予不同的名称)

DataFrame 有如下个性:
1)分布式的数据集,并且以列的形式组合的,相当于具备 schema 的 RDD;
2)相当于关系型数据库中的表,然而底层有优化;
3)提供了一些形象的操作,如 select、filter、aggregation、plot;
4)它是因为 R 语言或者 Pandas 语言解决小数据集的教训利用到解决分布式大数据集上;
5)在 1.3 版本之前,叫 SchemaRDD;
Schema 信息
查看 DataFrame 中 Schema 是什么,执行如下命令:
df.schema
Schema 信息封装在 StructType 中,蕴含很多 StructField 对象,源码。
StructType 定义,是一个样例类,属性为 StructField 的数组
StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填

自定义 Schema 构造,官网提供的示例代码:

Row
DataFrame 中每条数据封装在 Row 中,Row 示意每行数据。
如何构建 Row 对象:要么是传递 value,要么传递 Seq
形式一:下标获取,从 0 开始,相似数组下标获取如何获取 Row 中每个字段的值呢?

形式二:指定下标,晓得类型

形式三:通过 As 转换类型

Dataset
引入
Spark 在 Spark 1.3 版本中引入了 Dataframe,DataFrame 是组织到命名列中的分布式数据汇合,然而有如下几点限度:
编译时类型不平安:Dataframe API 不反对编译时安全性,这限度了在构造不晓得时操纵数据。以下示例在编译期间无效。然而,执行此代码时将呈现运行时异样。
无奈对域对象(失落域对象)进行操作:将域对象转换为 DataFrame 后,无奈从中从新生成它;上面的示例中,一旦咱们从 personRDD 创立 personDF,将不会复原 Person 类的原始 RDD(RDD [Person])。
基于上述的两点,从 Spark 1.6 开始呈现 Dataset,至 Spark 2.0 中将 DataFrame 与 Dataset 合并,其中 DataFrame 为 Dataset 非凡类型,类型为 Row。

针对 RDD、DataFrame 与 Dataset 三者编程比拟来说,Dataset API 无论语法错误和剖析谬误在编译时都能发现,然而 RDD 和 DataFrame 有的须要在运行时能力发现。
此外 RDD 与 Dataset 相比较而言,因为 Dataset 数据应用非凡编码,所以在存储数据时更加节俭内存。
总结:
Dataset 是在 Spark1.6 中增加的新的接口,是 DataFrame API 的一个扩大,是 Spark 最新的数据抽象,联合了 RDD 和 DataFrame 的长处。
与 RDD 相比:保留了更多的形容信息,概念上等同于关系型数据库中的二维表;
与 DataFrame 相比:保留了类型信息,是强类型的,提供了编译时类型查看,调用 Dataset 的办法先会生成逻辑打算,而后被 Spark 的优化器进行优化,最终生成物理打算,而后提交到集群中运行;
Dataset 是什么
Dataset 是一个强类型的特定畛域的对象,这种对象能够函数式或者关系操作并行地转换。
从 Spark 2.0 开始,DataFrame 与 Dataset 合并,每个 Dataset 也有一个被称为一个 DataFrame 的类型化视图,这种 DataFrame 是 Row 类型的 Dataset,即 Dataset[Row]。
Dataset API 是 DataFrames 的扩大,它提供了一种类型平安的,面向对象的编程接口。它是一个强类型,不可变的对象汇合,映射到关系模式。在数据集的外围 API 是一个称为编码器的新概念,它负责在 JVM 对象和表格示意之间进行转换。表格示意应用 Spark 外部 Tungsten 二进制格局存储,容许对序列化数据进行操作并进步内存利用率。Spark 1.6 反对主动生成各种类型的编码器,包含根本类型(例如 String,Integer,Long),Scala 案例类和 Java Bean。
针对 Dataset 数据结构来说,能够简略的从如下四个要点记忆与了解:

Spark 框架从最后的数据结构 RDD、到 SparkSQL 中针对结构化数据封装的数据结构 DataFrame,最终应用 Dataset 数据集进行封装,倒退流程如下。

所以在理论我的项目中倡议应用 Dataset 进行数据封装,数据分析性能和数据存储更加好。
SparkSQL 底层如何执行
RDD 的运行流程

大抵运行步骤:
• 先将 RDD 解析为由 Stage 组成的 DAG, 后将 Stage 转为 Task 间接运行

问题:
• 工作会依照代码所示运行, 依赖开发者的优化, 开发者的会在很大水平上影响运行效率

解决办法:
• 创立一个组件, 帮忙开发者批改和优化代码, 但这在 RDD 上是无奈实现的

为什么 RDD 无奈自我优化?
• RDD 没有 Schema 信息
• RDD 能够同时解决结构化和非结构化的数据

SparkSQL 提供了什么?

和 RDD 不同, SparkSQL 的 Dataset 和 SQL 并不是间接生成打算交给集群执行, 而是通过了一个叫做 Catalyst 的优化器, 这个优化器可能主动帮忙开发者优化代码。也就是说, 在 SparkSQL 中, 开发者的代码即便不够优化, 也会被优化为绝对较好的模式去执行。
为什么 SparkSQL 提供了这种能力?
首先, SparkSQL 大部分状况用于解决结构化数据和半结构化数据, 所以 SparkSQL 能够获知数据的 Schema, 从而依据其 Schema 来进行优化。
Catalyst
为了解决过多依赖 Hive 的问题, SparkSQL 应用了一个新的 SQL 优化器代替 Hive 中的优化器, 这个优化器就是 Catalyst, 整个 SparkSQL 的架构大抵如下:

1.API 层简略的说就是 Spark 会通过一些 API 承受 SQL 语句 2. 收到 SQL 语句当前, 将其交给 Catalyst, Catalyst 负责解析 SQL, 生成执行打算等 3.Catalyst 的输入应该是 RDD 的执行打算 4. 最终交由集群运行
具体流程:

Step 1 : 解析 SQL, 并且生成 AST (形象语法树)

Step 2 : 在 AST 中退出元数据信息, 做这一步次要是为了一些优化, 例如 col = col 这样的条件, 下图是一个简略图, 便于了解

• score.id → id#1#L 为 score.id 生成 id 为 1, 类型是 Long
• score.math_score → math_score#2#L 为 score.math_score 生成 id 为 2, 类型为 Long
• people.id → id#3#L 为 people.id 生成 id 为 3, 类型为 Long
• people.age → age#4#L 为 people.age 生成 id 为 4, 类型为 Long

Step 3 : 对曾经退出元数据的 AST, 输出优化器, 进行优化, 从两种常见的优化开始, 简略介绍:

谓词下推 Predicate Pushdown, 将 Filter 这种能够减小数据集的操作下推, 放在 Scan 的地位, 这样能够缩小操作时候的数据量。

• 列值裁剪 Column Pruning, 在谓词下推后, people 表之上的操作只用到了 id 列, 所以能够把其它列裁剪掉, 这样能够缩小解决的数据量, 从而优化处理速度
• 还有其余很多优化点, 大略一共有一二百种, 随着 SparkSQL 的倒退, 还会越来越多, 感兴趣的同学能够持续通过源码理解, 源码在 org.apache.spark.sql.catalyst.optimizer.Optimizer

Step 4 : 下面的过程生成的 AST 其实最终还没方法间接运行, 这个 AST 叫做 逻辑打算, 完结后, 须要生成 物理打算, 从而生成 RDD 来运行。
在生成物理打算的时候, 会通过老本模型对整棵树再次执行优化, 抉择一个更好的打算。
在生成物理打算当前, 因为思考到性能, 所以会应用代码生成, 在机器中运行。
能够应用 queryExecution 办法查看逻辑执行打算, 应用 explain 办法查看物理执行打算。

也能够应用 Spark WebUI 进行查看:

SparkSQL 和 RDD 不同的次要点是在于其所操作的数据是结构化的, 提供了对数据更强的感知和剖析能力, 可能对代码进行更深层的优化, 而这种能力是由一个叫做 Catalyst 的优化器所提供的。
Catalyst 的次要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑打算, 后对逻辑打算进行优化, 再生成物理打算, 最初生成代码到集群中以 RDD 的模式运行。

正文完
 0