乐趣区

关于spark:第三篇Spark-SQL编程指南

在《第二篇 |Spark Core 编程指南》一文中,对 Spark 的外围模块进行了解说。本文将探讨 Spark 的另外一个重要模块 –Spark SQL,Spark SQL 是在 Shark 的根底之上构建的,于 2014 年 5 月公布。从名称上能够看出,该模块是 Spark 提供的关系型操作 API,实现了 SQL-on-Spark 的性能。对于一些相熟 SQL 的用户,能够间接应用 SQL 在 Spark 上进行简单的数据处理。通过本文,你能够理解到:

  • Spark SQL 简介
  • DataFrame API&DataSet API
  • Catalyst Optimizer 优化器
  • Spark SQL 基本操作
  • Spark SQL 的数据源
  • RDD 与 DataFrame 互相转换
  • Thrift server 与 Spark SQL CLI

Spark SQL 简介

Spark SQL 是 Spark 的其中一个模块,用于结构化数据处理。与根本的 Spark RDD API 不同,Spark SQL 提供的接口为 Spark 提供了无关数据结构和正在执行的计算的更多信息,Spark SQL 会应用这些额定的信息来执行额定的优化。应用 SparkSQL 的形式有很多种,包含 SQL、DataFrame API 以及 Dataset API。值得注意的是,无论应用何种形式何种语言,其执行引擎都是雷同的。实现这种对立,意味着开发人员能够轻松地在不同的 API 之间来回切换,从而使数据处理更加地灵便。

DataFrame API&DataSet API

DataFrame API

DataFrame 代表一个不可变的分布式数据汇合,其外围目标是让开发者面对数据处理时,只关怀要做什么,而不必关怀怎么去做,将一些优化的工作交由 Spark 框架自身去解决。DataFrame 是具备 Schema 信息的,也就是说能够被看做具备字段名称和类型的数据,相似于关系型数据库中的表,然而底层做了很多的优化。创立了 DataFrame 之后,就能够应用 SQL 进行数据处理。

用户能够从多种数据源中结构 DataFrame,例如:结构化数据文件,Hive 中的表,内部数据库或现有 RDD。DataFrame API 反对 Scala,Java,Python 和 R,在 Scala 和 Java 中,row 类型的 DataSet 代表 DataFrame,即 Dataset[Row] 等同于 DataFrame。

DataSet API

DataSet 是 Spark 1.6 中增加的新接口,是 DataFrame 的扩大,它具备 RDD 的长处(强类型输出,反对弱小的 lambda 函数)以及 Spark SQL 的优化执行引擎的长处。能够通过 JVM 对象构建 DataSet,而后应用函数转换(mapflatMapfilter)。值得注意的是,Dataset API 在 Scala 和 Java 中可用,Python 不反对 Dataset API。

另外,DataSet API 能够缩小内存的应用,因为 Spark 框架晓得 DataSet 的数据结构,因而在长久化 DataSet 时能够节俭很多的内存空间。

Catalyst Optimizer 优化器

在 Catalyst 中,存在两种类型的打算:

  • 逻辑打算(Logical Plan):定义数据集上的计算,尚未定义如何去执行计算。每个逻辑打算定义了一系列的用户代码所须要的属性 (查问字段) 和束缚(where 条件),然而不定义该如何执行。具体如下图所示:

  • 物理打算(Physical Plan): 物理打算是从逻辑打算生成的,定义了如何执行计算,是可执行的。举个栗子:逻辑打算中的 JOIN 会被转换为物理打算中的 sort merge JOIN。须要留神,Spark 会生成多个物理打算,而后抉择老本最低的物理打算。具体如下图所示:

在 Spark SQL 中,所有的算子操作会被转换成 AST(abstract syntax tree, 形象语法树),而后将其传递给 Catalyst 优化器。该优化器是在 Scala 的函数式编程根底会上构建的,Catalyst 反对基于规定的 (rule-based) 和基于老本的 (cost-based) 优化策略。

Spark SQL 的查问打算包含 4 个阶段(见下图):

  • 1. 剖析
  • 2. 逻辑优化
  • 3. 物理打算
  • 4. 生成代码,将查问局部编译成 Java 字节码

留神:在物理打算阶段,Catalyst 会生成多个打算,并且会计算每个打算的老本,而后比拟这些打算的老本的大小,即基于老本的策略。在其余阶段,都是基于规定的的优化策略。

剖析

Unresolved Logical plan –> Logical plan。Spark SQL 的查问打算首先起始于由 SQL 解析器返回的 AST,或者是由 API 构建的 DataFrame 对象。在这两种状况下,都会存在未解决的属性援用 (某个查问字段可能不存在,或者数据类型谬误),比方查问语句:SELECT col FROM sales, 对于字段col 的类型,或者该字段是否是一个无效的字段,只有等到查看该 sales 表时才会分明。当不能确定一个属性字段的类型或者没可能与输出表进行匹配时,称之为 未解决的 。Spark SQL 应用 Catalyst 的规定以及 Catalog 对象(可能拜访数据源的表信息) 来解决这些属性。首先会构建一个 Unresolved Logical Plan 树,而后作用一系列的规定,最初生成 Logical Plan。

逻辑优化

Logical plan –> Optimized Logical Plan。逻辑优化阶段应用基于规定的优化策略,比方谓词下推、投影裁剪等。通过一些列优化过后,生成优化的逻辑打算 Optimized Logical Plan。

物理打算

Optimized Logical Plan –>physical Plan。在物理打算阶段,Spark SQL 会将优化的逻辑打算生成多个物理执行打算,而后应用 Cost Model 计算每个物理打算的老本,最终抉择一个物理打算。在这个阶段,如果确定一张表很小(能够长久化到内存),Spark SQL 会应用 broadcast join。

须要留神的是,物理打算器也会应用基于规定的优化策略,比方将投影、过滤操作管道化一个 Spark 的 map 算子。此外,还会将逻辑打算阶段的操作推到数据源端(反对谓词下推、投影下推)。

代码生成

查问优化的最终阶段是生成 Java 字节码,应用 Quasi quotes 来实现这项工作的。

通过下面的剖析,对 Catalyst Optimizer 有了初步的理解。对于 Spark 的其余组件是如何与 Catalyst Optimizer 交互的呢?具体如下图所示:

如上图所示:ML Pipelines, Structured streaming 以及 GraphFrames 都应用了 DataFrame/Dataset
APIs,并且都得益于 Catalyst optimiser。

Quick Start

创立 SparkSession

SparkSession 是 Dataset 与 DataFrame API 的编程入口,从 Spark2.0 开始反对。用于对立原来的 HiveContext 和 SQLContext,为了兼容两者,依然保留这两个入口。通过一个 SparkSession 入口,进步了 Spark 的易用性。上面的代码展现了如何创立一个 SparkSession:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
// 导入隐式转换,比方将 RDD 转为 DataFrame
import spark.implicits._

创立 DataFrame

创立完 SparkSession 之后,能够应用 SparkSession 从曾经存在的 RDD、Hive 表或者其余数据源中创立 DataFrame。上面的示例应用的是从一个 JSON 文件数据源中创立 DataFrame:

/**
* {"name":"Michael"}
* {"name":"Andy", "age":30}
* {"name":"Justin", "age":19}
*/
val df = spark.read.json("E://people.json")
// 输入 DataFrame 的内容
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

DataFrame 基本操作

创立完 DataFrame 之后,能够对其进行一些列的操作,具体如上面代码所示:

// 打印该 DataFrame 的信息
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 查问 name 字段
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// 将每个人的 age + 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 查找 age 大于 21 的人员信息
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 依照 age 分组,统计每种 age 的个数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

在程序中应用 SQL 查问

下面的操作应用的是 DSL(domain-specific language) 形式,还能够间接应用 SQL 对 DataFrame 进行操作,具体如下所示:

// 将 DataFrame 注册为 SQL 的长期视图
// 该办法创立的是一个本地的长期视图,生命周期与其绑定的 SparkSession 会话相干
// 即如果创立该 view 的 session 完结了,该 view 也就隐没了
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View

下面应用的是 Temporary views 的形式,该形式是 Spark Session 范畴的。如果将创立的 view 能够在所有 session 之间共享,能够应用 Global Temporary View 的形式创立 view,具体如下:

// 将 DataFrame 注册为全局长期视图(global temporary view)
// 该办法创立的是一个全局的长期视图,生命周期与其绑定的 Spark 应用程序相干,// 即如果应用程序完结,会主动被删除
// 全局长期视图是能够跨 Spark Session 的,零碎保留的数据库名为 `global_temp`
// 当查问时,必须要加上全限定名,如 `SELECT * FROM global_temp.view1`
df.createGlobalTempView("people")

// 全局长期视图默认的保留数据库为:`global_temp` 
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// 全局长期视图反对跨 Spark Session 会话
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

创立 DataSet

DataSet 与 RDD 很相似,然而,RDD 应用的 Java 的序列化器或者 Kyro 序列化,而 DataSet 应用的是 Encoder 对在网络间传输的对象进行序列化的。创立 DataSet 的示例如下:

case class Person(name: String, age: Long)
// 创立 DataSet
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// 通过导入 Spark 的隐式转换 spark.implicits._
// 能够自动识别数据类型
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 通过调用 as 办法,DataFrame 能够转为 DataSet,val path = "E://people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

RDD 与 DataFrame 互相转换

Spark SQL 反对两种不同的形式将 RDD 转换为 DataFrame。第一种是应用反射来推断蕴含特定类型对象的 RDD 的模式,这种基于反射的形式能够提供更简洁的代码,如果在编写 Spark 应用程序时,曾经明确了 schema,能够应用这种形式。第二种形式是通过可编程接口来构建 schema,而后将其利用于现有的 RDD。此形式编写的代码更简短,此种形式创立的 DataFrame,直到运行时才晓得该 DataFrame 的列及其类型。

上面案例的数据集如下 people.txt:

Tom, 29
Bob, 30
Jack, 19

通过反射的形式

Spark SQL 的 Scala 接口反对主动将蕴含样例类的 RDD 转换为 DataFrame。样例类定义表的 schema。通过反射读取样例类的参数名称,并映射成 column 的名称。

object RDD2DF_m1 {
  // 创立样例类
  case class  Person(name: String, age: Int)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    // 导入隐式转换, 用于 RDD 转为 DataFrame
    import spark.implicits._
    // 从文本文件中创立 RDD,并将其转换为 DataFrame
    val peopleDF = spark.sparkContext
      .textFile("file:///E:/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    // 将 DataFrame 注册成长期视图
    peopleDF.createOrReplaceTempView("people")
    // 运行 SQL 语句
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    // 应用字段索引拜访列
    teenagersDF.map(teenager => "Name:" + teenager(0)).show()
    // +----------+
    // |     value|
    // +----------+
    // |Name: Jack|
    // +----------+

    // 通过字段名拜访列
    teenagersDF.map(teenager => "Name:" + teenager.getAs[String]("name")).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Jack|
    // +------------+
  }
}

通过构建 schema 的形式

通过构建 schema 的形式创立 DataFrame 次要包含三步:

  • 1. 从原始 RDD 创立 Row 类型的 RDD
  • 2. 应用 StructType,创立 schema
  • 3. 通过 createDataFrame 办法将 schema 利用于 Row 类型的 RDD
object RDD2DF_m2 {def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    // 导入隐式转换, 用于 RDD 转为 DataFrame
    import spark.implicits._
    // 创立原始 RDD
    val peopleRDD = spark.sparkContext.textFile("E:/people.txt")
    //step 1 将原始 RDD 转换为 ROW 类型的 RDD
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim.toInt))
    //step 2 创立 schema
    val schema = StructType(Array(StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))
    //step 3 创立 DF
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    // 将 DataFrame 注册成长期视图
    peopleDF.createOrReplaceTempView("people")
    // 运行 SQL 语句
    val results = spark.sql("SELECT name FROM people")
    // 应用字段索引拜访列
    results.map(attributes => "Name:" + attributes(0)).show()
    // +----------+
    // |     value|
    // +----------+
    // | Name: Tom|
    // | Name: Bob|
    // | Name: Jack|
    // +----------+
  }
}

Spark SQL 的数据源

Spark SQL 反对通过 DataFrame 接口对各种数据源进行操作,能够应用关系转换以及长期视图对 DataFrame 进行操作。常见的数据源包含以下几种:

文件数据源

  • Parquet 文件
  • JSON 文件
  • CSV 文件
  • ORC 文件
private def runBasicDataSourceExample(spark: SparkSession): Unit = {
    /**
      * 读取 parquet 文件数据源, 并将后果写入到 parquet 文件
      */

    val usersDF = spark
      .read
      .load("E://users.parquet")
    usersDF.show()
    // 将 DF 保留到 parquet 文件
    usersDF
      .select("name", "favorite_color")
      .write
      .mode(SaveMode.Overwrite)
      .save("E://namesAndFavColors.parquet")
    /**
      * 读取 json 文件数据源, 并将后果写入到 parquet 文件
      */

    val peopleDF = spark
      .read
      .format("json")
      .load("E://people.json")
    peopleDF.show()
    // 将 DF 保留到 parquet 文件
    peopleDF
      .select("name", "age")
      .write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .save("E://namesAndAges.parquet")

    /**
      * 读取 CSV 文件数据源
      */
    val peopleDFCsv = spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("E://people.csv")

    /**
      * 将 usersDF 写入到 ORC 文件
      */
    usersDF.write.format("orc")
      .option("orc.bloom.filter.columns", "favorite_color")
      .option("orc.dictionary.key.threshold", "1.0")
      .option("orc.column.encoding.direct", "name")
      .mode(SaveMode.Overwrite)
      .save("E://users_with_options.orc")

    /**
      * 将 peopleDF 保留为长久化表,个别保留为 Hive 中
      */
    peopleDF
      .write
      .option("path","E://warehouse/people_bucketed") // 保留门路
      .bucketBy(42, "name")           // 依照 name 字段分桶
      .sortBy("age")                  // 依照 age 字段排序
      .saveAsTable("people_bucketed")

    /**
      * 将 userDF 保留为分区文件,相似于 Hive 分区表
      */
    usersDF
      .write
      .partitionBy("favorite_color")  // 分区字段
      .format("parquet")        // 文件格式
      .mode(SaveMode.Overwrite) // 保留模式
      .save("E://namesPartByColor.parquet")

    /**
      *
      */
    usersDF
      .write
      .option("path","E://warehouse/users_partitioned_bucketed") // 保留门路
      .partitionBy("favorite_color")  // 分区
      .bucketBy(42, "name")           // 分桶
      .saveAsTable("users_partitioned_bucketed")

    spark.sql("DROP TABLE IF EXISTS people_bucketed")
    spark.sql("DROP TABLE IF EXISTS users_partitioned_bucketed")
  }

保留模式

Scala/Java Meaning
SaveMode.ErrorIfExists(default) 如果指标文件曾经存在,则报异样
SaveMode.Append 如果指标文件或表曾经存在,则将后果追加进去
SaveMode.Overwrite 如果指标文件或表曾经存在,则笼罩原有的内容
SaveMode.Ignore 相似于 SQL 中的 CREATE TABLE IF NOT EXISTS,如果指标文件或表曾经存在,则不做任何操作

保留为长久化表

DataFrame 能够被保留为 Hive 的长久化表,值得注意的是,这种形式并不依赖与 Hive 的部署,也就是说 Spark 会应用 Derby 创立一个默认的本地 Hive metastore,与 createOrReplaceTempView 不同,该形式会间接将后果物化。

对于基于文件的数据源 (text, parquet, json 等),在保留的时候能够指定一个具体的门路,比方 df.write.option(“path”, “/some/path”).saveAsTable(“t”)(存储在指定门路下的文件格式为 parquet) 当表被删除时,自定义的表的门路和表数据不会被移除。如果没有指定具体的门路,spark 默认的是 warehouse 的目录(/user/hive/warehouse), 当表被删除时,默认的表门路也会被删除。

Hive 数据源

见上面大节:Spark SQL 集成 Hive

JDBC 数据源

Spark SQL 还包含一个能够应用 JDBC 从其余数据库读取数据的数据源。与应用 JdbcRDD 相比,应优先应用此性能。这是因为后果作为 DataFrame 返回,它们能够在 Spark SQL 中轻松解决或与其余数据源连贯。JDBC 数据源也更易于应用 Java 或 Python,因为它不须要用户提供 ClassTag。

能够应用 Data Sources API 将近程数据库中的表加载为 DataFrame 或 Spark SQL 长期视图。用户能够在数据源选项中指定 JDBC 连贯属性。user 并且 password 通常作为用于登录数据源的连贯属性提供。除连贯属性外,Spark 还反对以下不辨别大小写的选项:

属性名称 解释
url 要连贯的 JDBC URL
dbtable 读取或写入的 JDBC 表
query 指定查问语句
driver 用于连贯到该 URL 的 JDBC 驱动类名
partitionColumn, lowerBound, upperBound 如果指定了这些选项,则必须全副指定。另外,numPartitions必须指定
numPartitions 表读写中可用于并行处理的最大分区数。这也确定了并发 JDBC 连贯的最大数量。如果要写入的分区数超过此限度,咱们能够通过 coalesce(numPartitions) 在写入之前进行调用将其升高到此限度
queryTimeout 默认为0,查问超时工夫
fetchsize JDBC 的获取大小,它确定每次要获取多少行。这能够帮忙进步 JDBC 驱动程序的性能
batchsize 默认为 1000,JDBC 批处理大小,这能够帮忙进步 JDBC 驱动程序的性能。
isolationLevel 事务隔离级别,实用于以后连贯。它能够是一个NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或SERIALIZABLE,对应于由 JDBC 的连贯对象定义,缺省值为规范事务隔离级别READ_UNCOMMITTED。此选项仅实用于写作。
sessionInitStatement 在向近程数据库关上每个数据库会话之后,在开始读取数据之前,此选项将执行自定义 SQL 语句,应用它来实现会话初始化代码。
truncate 这是与 JDBC writer 相干的选项。当 SaveMode.Overwrite 启用时,就会清空指标表的内容,而不是删除和重建其现有的表。默认为false
pushDownPredicate 用于启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种状况下,Spark 将尽可能将过滤器下推到 JDBC 数据源。
object JdbcDatasetExample {def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("JdbcDatasetExample")
      .master("local") // 设置为本地运行
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runJdbcDatasetExample(spark)
  }

  private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    // 留神:从 JDBC 源加载数据
    val jdbcPersonDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost/mydb")
      .option("dbtable", "person")
      .option("user", "root")
      .option("password", "123qwe")
      .load()
    // 打印 jdbcDF 的 schema
    jdbcPersonDF.printSchema()
    // 打印数据
    jdbcPersonDF.show()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "123qwe")
    // 通过.jdbc 的形式加载数据
    val jdbcStudentDF = spark
      .read
      .jdbc("jdbc:mysql://localhost/mydb", "student", connectionProperties)
    // 打印 jdbcDF 的 schema
    jdbcStudentDF.printSchema()
    // 打印数据
    jdbcStudentDF.show()
    // 保留数据到 JDBC 源
    jdbcStudentDF.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost/mydb")
      .option("dbtable", "student2")
      .option("user", "root")
      .option("password", "123qwe")
      .mode(SaveMode.Append)
      .save()

    jdbcStudentDF
      .write
      .mode(SaveMode.Append)
      .jdbc("jdbc:mysql://localhost/mydb", "student2", connectionProperties)

  }
}

Spark SQL 集成 Hive

Spark SQL 还反对读取和写入存储在 Apache Hive 中的数据。然而,因为 Hive 具备大量依赖项,因而这些依赖项不蕴含在默认的 Spark 公布包中。如果能够在类门路上找到 Hive 依赖项,Spark 将主动加载它们。请留神,这些 Hive 依赖项也必须存在于所有工作节点 (worker nodes) 上,因为它们须要拜访 Hive 序列化和反序列化库(SerDes)能力拜访存储在 Hive 中的数据。

将 hive-site.xml,core-site.xml 以及 hdfs-site.xml 文件放在 conf/ 下

在应用 Hive 时,必须实例化一个反对 Hive 的 SparkSession,包含连贯到持久性 Hive Metastore,反对 Hive 的序列化、反序列化(serdes)和 Hive 用户定义函数。没有部署 Hive 的用户仍能够启用 Hive 反对。如果未配置 hive-site.xml,则上下文 (context) 会在当前目录中主动创立 metastore_db,并且会创立一个由 spark.sql.warehouse.dir 配置的目录,其默认目录为 spark-warehouse,位于启动 Spark 应用程序的当前目录中。请留神,自 Spark 2.0.0 以来,该在 hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被标记过期(deprecated)。应用 spark.sql.warehouse.dir 用于指定 warehouse 中的默认地位。可能须要向启动 Spark 应用程序的用户授予写入的权限。

​ 上面的案例为在本地运行(为了不便查看打印的后果),运行完结之后会发现在我的项目的目录下 E:IdeaProjectsmyspark 创立了 spark-warehouse 和 metastore_db 的文件夹。能够看出没有部署 Hive 的用户仍能够启用 Hive 反对,同时也能够将代码打包,放在集群上运行。

object SparkHiveExample {case class Record(key: Int, value: String)

  def main(args: Array[String]) {


    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", "e://warehouseLocation")
      .master("local")// 设置为本地运行
      .enableHiveSupport()
      .getOrCreate()


    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    import spark.implicits._
    import spark.sql
    // 应用 Spark SQL 的语法创立 Hive 中的表
    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    sql("LOAD DATA LOCAL INPATH'file:///e:/kv1.txt'INTO TABLE src")

    // 应用 HiveQL 查问
    sql("SELECT * FROM src").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // 反对应用聚合函数
    sql("SELECT COUNT(*) FROM src").show()
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // SQL 查问的后果是一个 DataFrame,反对应用所有的惯例的函数
    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 AND key > 0 ORDER BY key")

    // DataFrames 是 Row 类型的, 容许你按程序拜访列.
    val stringsDS = sqlDF.map {case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    // 能够通过 SparkSession 应用 DataFrame 创立一个长期视图
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")

    // 能够用 DataFrame 与 Hive 中的表进行 join 查问
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // |  5| val_5|  5| val_5|
    // ...

    // 创立一个 Parquet 格局的 hive 托管表,应用的是 HQL 语法,没有应用 Spark SQL 的语法("USING hive")
    sql("CREATE TABLE IF NOT EXISTS hive_records(key int, value string) STORED AS PARQUET")

    // 读取 Hive 中的表,转换成了 DataFrame
    val df = spark.table("src")
    // 将该 DataFrame 保留为 Hive 中的表,应用的模式 (mode) 为复写模式(Overwrite)
    // 即如果保留的表曾经存在,则会笼罩掉原来表中的内容
    df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
    // 查问表中的数据
    sql("SELECT * FROM hive_records").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // 设置 Parquet 数据文件门路
    val dataDir = "/tmp/parquet_data"
    //spark.range(10)返回的是 DataSet[Long]
    // 将该 DataSet 间接写入 parquet 文件
    spark.range(10).write.parquet(dataDir)
    // 在 Hive 中创立一个 Parquet 格局的内部表
    sql(s"CREATE EXTERNAL TABLE IF NOT EXISTS hive_ints(key int) STORED AS PARQUET LOCATION'$dataDir'")
    // 查问下面创立的表
    sql("SELECT * FROM hive_ints").show()
    // +---+
    // |key|
    // +---+
    // |  0|
    // |  1|
    // |  2|
    // ...

    // 开启 Hive 动静分区
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    // 应用 DataFrame API 创立 Hive 的分区表
    df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")

    // 分区键‘key’将会在最终的 schema 中被移除
    sql("SELECT * FROM hive_part_tbl").show()
    // +-------+---+
    // |  value|key|
    // +-------+---+
    // |val_238|238|
    // | val_86| 86|
    // |val_311|311|
    // ...

    spark.stop()}
}

Thrift server 与 Spark SQL CLI

能够应用 JDBC/ODBC 或者命令行拜访 Spark SQL,通过这种形式,用户能够间接应用 SQL 运行查问,而不必编写代码。

Thrift JDBC/ODBC server

Thrift JDBC/ODBC server 与 Hive 的 HiveServer2 向对应,能够应用 Beeline 拜访 JDBC 服务器。在 Spark 的 sbin 目录下存在 start-thriftserver.sh 脚本,应用此脚本启动 JDBC/ODBC 服务器:

./sbin/start-thriftserver.sh

应用 beeline 拜访 JDBC/ODBC 服务器,Beeline 会要求提供用户名和明码, 在非平安模式下,只需输出用户名和空白明码即可

beeline> !connect jdbc:hive2://localhost:10000

Spark SQL CLI

Spark SQL CLI 是在本地模式下运行 Hive Metastore 服务并执行从命令行输出的查问的便捷工具。请留神,Spark SQL CLI 无奈与 Thrift JDBC 服务器通信。

要启动 Spark SQL CLI,只须要在 Spark 的 bin 目录中运行以下命令:

./spark-sql 

总结

本文次要对 Spark SQL 进行了论述,次要包含 Spark SQL 的介绍、DataFrame&DataSet API 根本应用、Catalyst Optimizer 优化器的基本原理、Spark SQL 编程、Spark SQL 数据源以及与 Hive 集成、Thrift server 与 Spark SQL CLI。下一篇将分享 Spark Streaming 编程指南。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版