在《第二篇 |Spark Core 编程指南》一文中,对 Spark 的外围模块进行了解说。本文将探讨 Spark 的另外一个重要模块 –Spark SQL,Spark SQL 是在 Shark 的根底之上构建的,于 2014 年 5 月公布。从名称上能够看出,该模块是 Spark 提供的关系型操作 API,实现了 SQL-on-Spark 的性能。对于一些相熟 SQL 的用户,能够间接应用 SQL 在 Spark 上进行简单的数据处理。通过本文,你能够理解到:
- Spark SQL 简介
- DataFrame API&DataSet API
- Catalyst Optimizer 优化器
- Spark SQL 基本操作
- Spark SQL 的数据源
- RDD 与 DataFrame 互相转换
- Thrift server 与 Spark SQL CLI
Spark SQL 简介
Spark SQL 是 Spark 的其中一个模块,用于结构化数据处理。与根本的 Spark RDD API 不同,Spark SQL 提供的接口为 Spark 提供了无关数据结构和正在执行的计算的更多信息,Spark SQL 会应用这些额定的信息来执行额定的优化。应用 SparkSQL 的形式有很多种,包含 SQL、DataFrame API 以及 Dataset API。值得注意的是,无论应用何种形式何种语言,其执行引擎都是雷同的。实现这种对立,意味着开发人员能够轻松地在不同的 API 之间来回切换,从而使数据处理更加地灵便。
DataFrame API&DataSet API
DataFrame API
DataFrame 代表一个不可变的分布式数据汇合,其外围目标是让开发者面对数据处理时,只关怀要做什么,而不必关怀怎么去做,将一些优化的工作交由 Spark 框架自身去解决。DataFrame 是具备 Schema 信息的,也就是说能够被看做具备字段名称和类型的数据,相似于关系型数据库中的表,然而底层做了很多的优化。创立了 DataFrame 之后,就能够应用 SQL 进行数据处理。
用户能够从多种数据源中结构 DataFrame,例如:结构化数据文件,Hive 中的表,内部数据库或现有 RDD。DataFrame API 反对 Scala,Java,Python 和 R,在 Scala 和 Java 中,row 类型的 DataSet 代表 DataFrame,即 Dataset[Row]
等同于 DataFrame。
DataSet API
DataSet 是 Spark 1.6 中增加的新接口,是 DataFrame 的扩大,它具备 RDD 的长处(强类型输出,反对弱小的 lambda 函数)以及 Spark SQL 的优化执行引擎的长处。能够通过 JVM 对象构建 DataSet,而后应用函数转换(map,
flatMap,
filter)。值得注意的是,Dataset API 在 Scala 和 Java 中可用,Python 不反对 Dataset API。
另外,DataSet API 能够缩小内存的应用,因为 Spark 框架晓得 DataSet 的数据结构,因而在长久化 DataSet 时能够节俭很多的内存空间。
Catalyst Optimizer 优化器
在 Catalyst 中,存在两种类型的打算:
- 逻辑打算(Logical Plan):定义数据集上的计算,尚未定义如何去执行计算。每个逻辑打算定义了一系列的用户代码所须要的属性 (查问字段) 和束缚(where 条件),然而不定义该如何执行。具体如下图所示:
- 物理打算(Physical Plan): 物理打算是从逻辑打算生成的,定义了如何执行计算,是可执行的。举个栗子:逻辑打算中的 JOIN 会被转换为物理打算中的 sort merge JOIN。须要留神,Spark 会生成多个物理打算,而后抉择老本最低的物理打算。具体如下图所示:
在 Spark SQL 中,所有的算子操作会被转换成 AST(abstract syntax tree, 形象语法树),而后将其传递给 Catalyst 优化器。该优化器是在 Scala 的函数式编程根底会上构建的,Catalyst 反对基于规定的 (rule-based) 和基于老本的 (cost-based) 优化策略。
Spark SQL 的查问打算包含 4 个阶段(见下图):
- 1. 剖析
- 2. 逻辑优化
- 3. 物理打算
- 4. 生成代码,将查问局部编译成 Java 字节码
留神:在物理打算阶段,Catalyst 会生成多个打算,并且会计算每个打算的老本,而后比拟这些打算的老本的大小,即基于老本的策略。在其余阶段,都是基于规定的的优化策略。
剖析
Unresolved Logical plan –> Logical plan。Spark SQL 的查问打算首先起始于由 SQL 解析器返回的 AST,或者是由 API 构建的 DataFrame 对象。在这两种状况下,都会存在未解决的属性援用 (某个查问字段可能不存在,或者数据类型谬误),比方查问语句:SELECT col FROM sales
, 对于字段col
的类型,或者该字段是否是一个无效的字段,只有等到查看该 sales
表时才会分明。当不能确定一个属性字段的类型或者没可能与输出表进行匹配时,称之为 未解决的
。Spark SQL 应用 Catalyst 的规定以及 Catalog 对象(可能拜访数据源的表信息) 来解决这些属性。首先会构建一个 Unresolved Logical Plan 树,而后作用一系列的规定,最初生成 Logical Plan。
逻辑优化
Logical plan –> Optimized Logical Plan。逻辑优化阶段应用基于规定的优化策略,比方谓词下推、投影裁剪等。通过一些列优化过后,生成优化的逻辑打算 Optimized Logical Plan。
物理打算
Optimized Logical Plan –>physical Plan。在物理打算阶段,Spark SQL 会将优化的逻辑打算生成多个物理执行打算,而后应用 Cost Model 计算每个物理打算的老本,最终抉择一个物理打算。在这个阶段,如果确定一张表很小(能够长久化到内存),Spark SQL 会应用 broadcast join。
须要留神的是,物理打算器也会应用基于规定的优化策略,比方将投影、过滤操作管道化一个 Spark 的 map 算子。此外,还会将逻辑打算阶段的操作推到数据源端(反对谓词下推、投影下推)。
代码生成
查问优化的最终阶段是生成 Java 字节码,应用 Quasi quotes 来实现这项工作的。
通过下面的剖析,对 Catalyst Optimizer 有了初步的理解。对于 Spark 的其余组件是如何与 Catalyst Optimizer 交互的呢?具体如下图所示:
如上图所示:ML Pipelines, Structured streaming 以及 GraphFrames 都应用了 DataFrame/Dataset
APIs,并且都得益于 Catalyst optimiser。
Quick Start
创立 SparkSession
SparkSession 是 Dataset 与 DataFrame API 的编程入口,从 Spark2.0 开始反对。用于对立原来的 HiveContext 和 SQLContext,为了兼容两者,依然保留这两个入口。通过一个 SparkSession 入口,进步了 Spark 的易用性。上面的代码展现了如何创立一个 SparkSession:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 导入隐式转换,比方将 RDD 转为 DataFrame
import spark.implicits._
创立 DataFrame
创立完 SparkSession 之后,能够应用 SparkSession 从曾经存在的 RDD、Hive 表或者其余数据源中创立 DataFrame。上面的示例应用的是从一个 JSON 文件数据源中创立 DataFrame:
/**
* {"name":"Michael"}
* {"name":"Andy", "age":30}
* {"name":"Justin", "age":19}
*/
val df = spark.read.json("E://people.json")
// 输入 DataFrame 的内容
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
DataFrame 基本操作
创立完 DataFrame 之后,能够对其进行一些列的操作,具体如上面代码所示:
// 打印该 DataFrame 的信息
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 查问 name 字段
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// 将每个人的 age + 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// 查找 age 大于 21 的人员信息
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 依照 age 分组,统计每种 age 的个数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
在程序中应用 SQL 查问
下面的操作应用的是 DSL(domain-specific language) 形式,还能够间接应用 SQL 对 DataFrame 进行操作,具体如下所示:
// 将 DataFrame 注册为 SQL 的长期视图
// 该办法创立的是一个本地的长期视图,生命周期与其绑定的 SparkSession 会话相干
// 即如果创立该 view 的 session 完结了,该 view 也就隐没了
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Global Temporary View
下面应用的是 Temporary views 的形式,该形式是 Spark Session 范畴的。如果将创立的 view 能够在所有 session 之间共享,能够应用 Global Temporary View 的形式创立 view,具体如下:
// 将 DataFrame 注册为全局长期视图(global temporary view)
// 该办法创立的是一个全局的长期视图,生命周期与其绑定的 Spark 应用程序相干,// 即如果应用程序完结,会主动被删除
// 全局长期视图是能够跨 Spark Session 的,零碎保留的数据库名为 `global_temp`
// 当查问时,必须要加上全限定名,如 `SELECT * FROM global_temp.view1`
df.createGlobalTempView("people")
// 全局长期视图默认的保留数据库为:`global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 全局长期视图反对跨 Spark Session 会话
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
创立 DataSet
DataSet 与 RDD 很相似,然而,RDD 应用的 Java 的序列化器或者 Kyro 序列化,而 DataSet 应用的是 Encoder 对在网络间传输的对象进行序列化的。创立 DataSet 的示例如下:
case class Person(name: String, age: Long)
// 创立 DataSet
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// 通过导入 Spark 的隐式转换 spark.implicits._
// 能够自动识别数据类型
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // 返回: Array(2, 3, 4)
// 通过调用 as 办法,DataFrame 能够转为 DataSet,val path = "E://people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
RDD 与 DataFrame 互相转换
Spark SQL 反对两种不同的形式将 RDD 转换为 DataFrame。第一种是应用反射来推断蕴含特定类型对象的 RDD 的模式,这种基于反射的形式能够提供更简洁的代码,如果在编写 Spark 应用程序时,曾经明确了 schema,能够应用这种形式。第二种形式是通过可编程接口来构建 schema,而后将其利用于现有的 RDD。此形式编写的代码更简短,此种形式创立的 DataFrame,直到运行时才晓得该 DataFrame 的列及其类型。
上面案例的数据集如下 people.txt:
Tom, 29
Bob, 30
Jack, 19
通过反射的形式
Spark SQL 的 Scala 接口反对主动将蕴含样例类的 RDD 转换为 DataFrame。样例类定义表的 schema。通过反射读取样例类的参数名称,并映射成 column 的名称。
object RDD2DF_m1 {
// 创立样例类
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("RDD2DF_m1")
.master("local")
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
runRDD2DF(spark)
}
private def runRDD2DF(spark: SparkSession) = {
// 导入隐式转换, 用于 RDD 转为 DataFrame
import spark.implicits._
// 从文本文件中创立 RDD,并将其转换为 DataFrame
val peopleDF = spark.sparkContext
.textFile("file:///E:/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// 将 DataFrame 注册成长期视图
peopleDF.createOrReplaceTempView("people")
// 运行 SQL 语句
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// 应用字段索引拜访列
teenagersDF.map(teenager => "Name:" + teenager(0)).show()
// +----------+
// | value|
// +----------+
// |Name: Jack|
// +----------+
// 通过字段名拜访列
teenagersDF.map(teenager => "Name:" + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Jack|
// +------------+
}
}
通过构建 schema 的形式
通过构建 schema 的形式创立 DataFrame 次要包含三步:
- 1. 从原始 RDD 创立 Row 类型的 RDD
- 2. 应用 StructType,创立 schema
- 3. 通过 createDataFrame 办法将 schema 利用于 Row 类型的 RDD
object RDD2DF_m2 {def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("RDD2DF_m1")
.master("local")
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
runRDD2DF(spark)
}
private def runRDD2DF(spark: SparkSession) = {
// 导入隐式转换, 用于 RDD 转为 DataFrame
import spark.implicits._
// 创立原始 RDD
val peopleRDD = spark.sparkContext.textFile("E:/people.txt")
//step 1 将原始 RDD 转换为 ROW 类型的 RDD
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim.toInt))
//step 2 创立 schema
val schema = StructType(Array(StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
//step 3 创立 DF
val peopleDF = spark.createDataFrame(rowRDD, schema)
// 将 DataFrame 注册成长期视图
peopleDF.createOrReplaceTempView("people")
// 运行 SQL 语句
val results = spark.sql("SELECT name FROM people")
// 应用字段索引拜访列
results.map(attributes => "Name:" + attributes(0)).show()
// +----------+
// | value|
// +----------+
// | Name: Tom|
// | Name: Bob|
// | Name: Jack|
// +----------+
}
}
Spark SQL 的数据源
Spark SQL 反对通过 DataFrame 接口对各种数据源进行操作,能够应用关系转换以及长期视图对 DataFrame 进行操作。常见的数据源包含以下几种:
文件数据源
- Parquet 文件
- JSON 文件
- CSV 文件
- ORC 文件
private def runBasicDataSourceExample(spark: SparkSession): Unit = {
/**
* 读取 parquet 文件数据源, 并将后果写入到 parquet 文件
*/
val usersDF = spark
.read
.load("E://users.parquet")
usersDF.show()
// 将 DF 保留到 parquet 文件
usersDF
.select("name", "favorite_color")
.write
.mode(SaveMode.Overwrite)
.save("E://namesAndFavColors.parquet")
/**
* 读取 json 文件数据源, 并将后果写入到 parquet 文件
*/
val peopleDF = spark
.read
.format("json")
.load("E://people.json")
peopleDF.show()
// 将 DF 保留到 parquet 文件
peopleDF
.select("name", "age")
.write
.format("parquet")
.mode(SaveMode.Overwrite)
.save("E://namesAndAges.parquet")
/**
* 读取 CSV 文件数据源
*/
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("E://people.csv")
/**
* 将 usersDF 写入到 ORC 文件
*/
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.mode(SaveMode.Overwrite)
.save("E://users_with_options.orc")
/**
* 将 peopleDF 保留为长久化表,个别保留为 Hive 中
*/
peopleDF
.write
.option("path","E://warehouse/people_bucketed") // 保留门路
.bucketBy(42, "name") // 依照 name 字段分桶
.sortBy("age") // 依照 age 字段排序
.saveAsTable("people_bucketed")
/**
* 将 userDF 保留为分区文件,相似于 Hive 分区表
*/
usersDF
.write
.partitionBy("favorite_color") // 分区字段
.format("parquet") // 文件格式
.mode(SaveMode.Overwrite) // 保留模式
.save("E://namesPartByColor.parquet")
/**
*
*/
usersDF
.write
.option("path","E://warehouse/users_partitioned_bucketed") // 保留门路
.partitionBy("favorite_color") // 分区
.bucketBy(42, "name") // 分桶
.saveAsTable("users_partitioned_bucketed")
spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS users_partitioned_bucketed")
}
保留模式
Scala/Java | Meaning |
---|---|
SaveMode.ErrorIfExists (default) |
如果指标文件曾经存在,则报异样 |
SaveMode.Append |
如果指标文件或表曾经存在,则将后果追加进去 |
SaveMode.Overwrite |
如果指标文件或表曾经存在,则笼罩原有的内容 |
SaveMode.Ignore |
相似于 SQL 中的 CREATE TABLE IF NOT EXISTS,如果指标文件或表曾经存在,则不做任何操作 |
保留为长久化表
DataFrame 能够被保留为 Hive 的长久化表,值得注意的是,这种形式并不依赖与 Hive 的部署,也就是说 Spark 会应用 Derby 创立一个默认的本地 Hive metastore,与 createOrReplaceTempView 不同,该形式会间接将后果物化。
对于基于文件的数据源 (text, parquet, json 等),在保留的时候能够指定一个具体的门路,比方 df.write.option(“path”, “/some/path”).saveAsTable(“t”)(存储在指定门路下的文件格式为 parquet)。
当表被删除时,自定义的表的门路和表数据不会被移除。如果没有指定具体的门路,spark 默认的是 warehouse 的目录(/user/hive/warehouse), 当表被删除时,默认的表门路也会被删除。
Hive 数据源
见上面大节:Spark SQL 集成 Hive
JDBC 数据源
Spark SQL 还包含一个能够应用 JDBC 从其余数据库读取数据的数据源。与应用 JdbcRDD 相比,应优先应用此性能。这是因为后果作为 DataFrame 返回,它们能够在 Spark SQL 中轻松解决或与其余数据源连贯。JDBC 数据源也更易于应用 Java 或 Python,因为它不须要用户提供 ClassTag。
能够应用 Data Sources API 将近程数据库中的表加载为 DataFrame 或 Spark SQL 长期视图。用户能够在数据源选项中指定 JDBC 连贯属性。user 并且 password 通常作为用于登录数据源的连贯属性提供。除连贯属性外,Spark 还反对以下不辨别大小写的选项:
属性名称 | 解释 |
---|---|
url |
要连贯的 JDBC URL |
dbtable |
读取或写入的 JDBC 表 |
query |
指定查问语句 |
driver |
用于连贯到该 URL 的 JDBC 驱动类名 |
partitionColumn, lowerBound, upperBound |
如果指定了这些选项,则必须全副指定。另外,numPartitions 必须指定 |
numPartitions |
表读写中可用于并行处理的最大分区数。这也确定了并发 JDBC 连贯的最大数量。如果要写入的分区数超过此限度,咱们能够通过 coalesce(numPartitions) 在写入之前进行调用将其升高到此限度 |
queryTimeout |
默认为0 ,查问超时工夫 |
fetchsize |
JDBC 的获取大小,它确定每次要获取多少行。这能够帮忙进步 JDBC 驱动程序的性能 |
batchsize |
默认为 1000,JDBC 批处理大小,这能够帮忙进步 JDBC 驱动程序的性能。 |
isolationLevel |
事务隔离级别,实用于以后连贯。它能够是一个NONE ,READ_COMMITTED ,READ_UNCOMMITTED ,REPEATABLE_READ ,或SERIALIZABLE ,对应于由 JDBC 的连贯对象定义,缺省值为规范事务隔离级别READ_UNCOMMITTED 。此选项仅实用于写作。 |
sessionInitStatement |
在向近程数据库关上每个数据库会话之后,在开始读取数据之前,此选项将执行自定义 SQL 语句,应用它来实现会话初始化代码。 |
truncate |
这是与 JDBC writer 相干的选项。当 SaveMode.Overwrite 启用时,就会清空指标表的内容,而不是删除和重建其现有的表。默认为false |
pushDownPredicate |
用于启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种状况下,Spark 将尽可能将过滤器下推到 JDBC 数据源。 |
object JdbcDatasetExample {def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("JdbcDatasetExample")
.master("local") // 设置为本地运行
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
runJdbcDatasetExample(spark)
}
private def runJdbcDatasetExample(spark: SparkSession): Unit = {
// 留神:从 JDBC 源加载数据
val jdbcPersonDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost/mydb")
.option("dbtable", "person")
.option("user", "root")
.option("password", "123qwe")
.load()
// 打印 jdbcDF 的 schema
jdbcPersonDF.printSchema()
// 打印数据
jdbcPersonDF.show()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "123qwe")
// 通过.jdbc 的形式加载数据
val jdbcStudentDF = spark
.read
.jdbc("jdbc:mysql://localhost/mydb", "student", connectionProperties)
// 打印 jdbcDF 的 schema
jdbcStudentDF.printSchema()
// 打印数据
jdbcStudentDF.show()
// 保留数据到 JDBC 源
jdbcStudentDF.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost/mydb")
.option("dbtable", "student2")
.option("user", "root")
.option("password", "123qwe")
.mode(SaveMode.Append)
.save()
jdbcStudentDF
.write
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost/mydb", "student2", connectionProperties)
}
}
Spark SQL 集成 Hive
Spark SQL 还反对读取和写入存储在 Apache Hive 中的数据。然而,因为 Hive 具备大量依赖项,因而这些依赖项不蕴含在默认的 Spark 公布包中。如果能够在类门路上找到 Hive 依赖项,Spark 将主动加载它们。请留神,这些 Hive 依赖项也必须存在于所有工作节点 (worker nodes) 上,因为它们须要拜访 Hive 序列化和反序列化库(SerDes)能力拜访存储在 Hive 中的数据。
将 hive-site.xml,core-site.xml 以及 hdfs-site.xml 文件放在 conf/ 下。
在应用 Hive 时,必须实例化一个反对 Hive 的 SparkSession,包含连贯到持久性 Hive Metastore,反对 Hive 的序列化、反序列化(serdes)和 Hive 用户定义函数。没有部署 Hive 的用户仍能够启用 Hive 反对。如果未配置 hive-site.xml,则上下文 (context) 会在当前目录中主动创立 metastore_db,并且会创立一个由 spark.sql.warehouse.dir 配置的目录,其默认目录为 spark-warehouse,位于启动 Spark 应用程序的当前目录中。请留神,自 Spark 2.0.0 以来,该在 hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被标记过期(deprecated)。应用 spark.sql.warehouse.dir 用于指定 warehouse 中的默认地位。可能须要向启动 Spark 应用程序的用户授予写入的权限。
上面的案例为在本地运行(为了不便查看打印的后果),运行完结之后会发现在我的项目的目录下 E:IdeaProjectsmyspark 创立了 spark-warehouse 和 metastore_db 的文件夹。能够看出没有部署 Hive 的用户仍能够启用 Hive 反对,同时也能够将代码打包,放在集群上运行。
object SparkHiveExample {case class Record(key: Int, value: String)
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", "e://warehouseLocation")
.master("local")// 设置为本地运行
.enableHiveSupport()
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
import spark.implicits._
import spark.sql
// 应用 Spark SQL 的语法创立 Hive 中的表
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH'file:///e:/kv1.txt'INTO TABLE src")
// 应用 HiveQL 查问
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 反对应用聚合函数
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// SQL 查问的后果是一个 DataFrame,反对应用所有的惯例的函数
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 AND key > 0 ORDER BY key")
// DataFrames 是 Row 类型的, 容许你按程序拜访列.
val stringsDS = sqlDF.map {case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// 能够通过 SparkSession 应用 DataFrame 创立一个长期视图
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// 能够用 DataFrame 与 Hive 中的表进行 join 查问
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// 创立一个 Parquet 格局的 hive 托管表,应用的是 HQL 语法,没有应用 Spark SQL 的语法("USING hive")
sql("CREATE TABLE IF NOT EXISTS hive_records(key int, value string) STORED AS PARQUET")
// 读取 Hive 中的表,转换成了 DataFrame
val df = spark.table("src")
// 将该 DataFrame 保留为 Hive 中的表,应用的模式 (mode) 为复写模式(Overwrite)
// 即如果保留的表曾经存在,则会笼罩掉原来表中的内容
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// 查问表中的数据
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 设置 Parquet 数据文件门路
val dataDir = "/tmp/parquet_data"
//spark.range(10)返回的是 DataSet[Long]
// 将该 DataSet 间接写入 parquet 文件
spark.range(10).write.parquet(dataDir)
// 在 Hive 中创立一个 Parquet 格局的内部表
sql(s"CREATE EXTERNAL TABLE IF NOT EXISTS hive_ints(key int) STORED AS PARQUET LOCATION'$dataDir'")
// 查问下面创立的表
sql("SELECT * FROM hive_ints").show()
// +---+
// |key|
// +---+
// | 0|
// | 1|
// | 2|
// ...
// 开启 Hive 动静分区
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// 应用 DataFrame API 创立 Hive 的分区表
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// 分区键‘key’将会在最终的 schema 中被移除
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()}
}
Thrift server 与 Spark SQL CLI
能够应用 JDBC/ODBC 或者命令行拜访 Spark SQL,通过这种形式,用户能够间接应用 SQL 运行查问,而不必编写代码。
Thrift JDBC/ODBC server
Thrift JDBC/ODBC server 与 Hive 的 HiveServer2 向对应,能够应用 Beeline 拜访 JDBC 服务器。在 Spark 的 sbin 目录下存在 start-thriftserver.sh 脚本,应用此脚本启动 JDBC/ODBC 服务器:
./sbin/start-thriftserver.sh
应用 beeline 拜访 JDBC/ODBC 服务器,Beeline 会要求提供用户名和明码, 在非平安模式下,只需输出用户名和空白明码即可
beeline> !connect jdbc:hive2://localhost:10000
Spark SQL CLI
Spark SQL CLI 是在本地模式下运行 Hive Metastore 服务并执行从命令行输出的查问的便捷工具。请留神,Spark SQL CLI 无奈与 Thrift JDBC 服务器通信。
要启动 Spark SQL CLI,只须要在 Spark 的 bin 目录中运行以下命令:
./spark-sql
总结
本文次要对 Spark SQL 进行了论述,次要包含 Spark SQL 的介绍、DataFrame&DataSet API 根本应用、Catalyst Optimizer 优化器的基本原理、Spark SQL 编程、Spark SQL 数据源以及与 Hive 集成、Thrift server 与 Spark SQL CLI。下一篇将分享 Spark Streaming 编程指南。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包