关于spark:第三篇Spark-SQL编程指南

在《第二篇|Spark Core编程指南》一文中,对Spark的外围模块进行了解说。本文将探讨Spark的另外一个重要模块–Spark SQL,Spark SQL是在Shark的根底之上构建的,于2014年5月公布。从名称上能够看出,该模块是Spark提供的关系型操作API,实现了SQL-on-Spark的性能。对于一些相熟SQL的用户,能够间接应用SQL在Spark上进行简单的数据处理。通过本文,你能够理解到:

  • Spark SQL简介
  • DataFrame API&DataSet API
  • Catalyst Optimizer优化器
  • Spark SQL基本操作
  • Spark SQL的数据源
  • RDD与DataFrame互相转换
  • Thrift server与Spark SQL CLI

Spark SQL简介

Spark SQL是Spark的其中一个模块,用于结构化数据处理。与根本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了无关数据结构和正在执行的计算的更多信息,Spark SQL会应用这些额定的信息来执行额定的优化。应用SparkSQL的形式有很多种,包含SQL、DataFrame API以及Dataset API。值得注意的是,无论应用何种形式何种语言,其执行引擎都是雷同的。实现这种对立,意味着开发人员能够轻松地在不同的API之间来回切换,从而使数据处理更加地灵便。

DataFrame API&DataSet API

DataFrame API

DataFrame代表一个不可变的分布式数据汇合,其外围目标是让开发者面对数据处理时,只关怀要做什么,而不必关怀怎么去做,将一些优化的工作交由Spark框架自身去解决。DataFrame是具备Schema信息的,也就是说能够被看做具备字段名称和类型的数据,相似于关系型数据库中的表,然而底层做了很多的优化。创立了DataFrame之后,就能够应用SQL进行数据处理。

用户能够从多种数据源中结构DataFrame,例如:结构化数据文件,Hive中的表,内部数据库或现有RDD。DataFrame API反对Scala,Java,Python和R,在Scala和Java中,row类型的DataSet代表DataFrame,即Dataset[Row]等同于DataFrame。

DataSet API

DataSet是Spark 1.6中增加的新接口,是DataFrame的扩大,它具备RDD的长处(强类型输出,反对弱小的lambda函数)以及Spark SQL的优化执行引擎的长处。能够通过JVM对象构建DataSet,而后应用函数转换(mapflatMapfilter)。值得注意的是,Dataset API在Scala和 Java中可用,Python不反对Dataset API。

另外,DataSet API能够缩小内存的应用,因为Spark框架晓得DataSet的数据结构,因而在长久化DataSet时能够节俭很多的内存空间。

Catalyst Optimizer优化器

在Catalyst中,存在两种类型的打算:

  • 逻辑打算(Logical Plan):定义数据集上的计算,尚未定义如何去执行计算。每个逻辑打算定义了一系列的用户代码所须要的属性(查问字段)和束缚(where条件),然而不定义该如何执行。具体如下图所示:

  • 物理打算(Physical Plan):物理打算是从逻辑打算生成的,定义了如何执行计算,是可执行的。举个栗子:逻辑打算中的JOIN会被转换为物理打算中的sort merge JOIN。须要留神,Spark会生成多个物理打算,而后抉择老本最低的物理打算。具体如下图所示:

在Spark SQL中,所有的算子操作会被转换成AST(abstract syntax tree,形象语法树),而后将其传递给Catalyst优化器。该优化器是在Scala的函数式编程根底会上构建的,Catalyst反对基于规定的(rule-based)和基于老本的(cost-based)优化策略。

Spark SQL的查问打算包含4个阶段(见下图):

  • 1.剖析
  • 2.逻辑优化
  • 3.物理打算
  • 4.生成代码,将查问局部编译成Java字节码

留神:在物理打算阶段,Catalyst会生成多个打算,并且会计算每个打算的老本,而后比拟这些打算的老本的大小,即基于老本的策略。在其余阶段,都是基于规定的的优化策略。

剖析

Unresolved Logical plan –> Logical plan。Spark SQL的查问打算首先起始于由SQL解析器返回的AST,或者是由API构建的DataFrame对象。在这两种状况下,都会存在未解决的属性援用(某个查问字段可能不存在,或者数据类型谬误),比方查问语句:SELECT col FROM sales,对于字段col的类型,或者该字段是否是一个无效的字段,只有等到查看该sales表时才会分明。当不能确定一个属性字段的类型或者没可能与输出表进行匹配时,称之为未解决的。Spark SQL应用Catalyst的规定以及Catalog对象(可能拜访数据源的表信息)来解决这些属性。首先会构建一个Unresolved Logical Plan树,而后作用一系列的规定,最初生成Logical Plan。

逻辑优化

Logical plan –> Optimized Logical Plan。逻辑优化阶段应用基于规定的优化策略,比方谓词下推、投影裁剪等。通过一些列优化过后,生成优化的逻辑打算Optimized Logical Plan。

物理打算

Optimized Logical Plan –>physical Plan。在物理打算阶段,Spark SQL会将优化的逻辑打算生成多个物理执行打算,而后应用Cost Model计算每个物理打算的老本,最终抉择一个物理打算。在这个阶段,如果确定一张表很小(能够长久化到内存),Spark SQL会应用broadcast join。

须要留神的是,物理打算器也会应用基于规定的优化策略,比方将投影、过滤操作管道化一个Spark的map算子。此外,还会将逻辑打算阶段的操作推到数据源端(反对谓词下推、投影下推)。

代码生成

查问优化的最终阶段是生成Java字节码,应用Quasi quotes来实现这项工作的。

通过下面的剖析,对Catalyst Optimizer有了初步的理解。对于Spark的其余组件是如何与Catalyst Optimizer交互的呢?具体如下图所示:

如上图所示:ML Pipelines, Structured streaming以及 GraphFrames都应用了DataFrame/Dataset
APIs,并且都得益于 Catalyst optimiser。

Quick Start

创立SparkSession

SparkSession是Dataset与DataFrame API的编程入口,从Spark2.0开始反对。用于对立原来的HiveContext和SQLContext,为了兼容两者,依然保留这两个入口。通过一个SparkSession入口,进步了Spark的易用性。上面的代码展现了如何创立一个SparkSession:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
//导入隐式转换,比方将RDD转为DataFrame
import spark.implicits._

创立DataFrame

创立完SparkSession之后,能够应用SparkSession从曾经存在的RDD、Hive表或者其余数据源中创立DataFrame。上面的示例应用的是从一个JSON文件数据源中创立DataFrame:

/**
* {"name":"Michael"}
* {"name":"Andy", "age":30}
* {"name":"Justin", "age":19}
*/
val df = spark.read.json("E://people.json")
//输入DataFrame的内容
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

DataFrame基本操作

创立完DataFrame之后,能够对其进行一些列的操作,具体如上面代码所示:

// 打印该DataFrame的信息
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 查问name字段
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// 将每个人的age + 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 查找age大于21的人员信息
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 依照age分组,统计每种age的个数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

在程序中应用SQL查问

下面的操作应用的是DSL(domain-specific language)形式,还能够间接应用SQL对DataFrame进行操作,具体如下所示:

// 将DataFrame注册为SQL的长期视图
// 该办法创立的是一个本地的长期视图,生命周期与其绑定的SparkSession会话相干
// 即如果创立该view的session完结了,该view也就隐没了
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View

下面应用的是Temporary views的形式,该形式是Spark Session范畴的。如果将创立的view能够在所有session之间共享,能够应用Global Temporary View的形式创立view,具体如下:

// 将DataFrame注册为全局长期视图(global temporary view)
// 该办法创立的是一个全局的长期视图,生命周期与其绑定的Spark应用程序相干,
// 即如果应用程序完结,会主动被删除
// 全局长期视图是能够跨Spark Session的,零碎保留的数据库名为`global_temp`
// 当查问时,必须要加上全限定名,如`SELECT * FROM global_temp.view1`
df.createGlobalTempView("people")

// 全局长期视图默认的保留数据库为:`global_temp` 
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// 全局长期视图反对跨Spark Session会话
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

创立DataSet

DataSet与RDD很相似,然而,RDD应用的Java的序列化器或者Kyro序列化,而DataSet应用的是Encoder对在网络间传输的对象进行序列化的。创立DataSet的示例如下:

case class Person(name: String, age: Long)
// 创立DataSet
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// 通过导入Spark的隐式转换spark.implicits._
// 能够自动识别数据类型
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 通过调用as办法,DataFrame能够转为DataSet,
val path = "E://people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

RDD与DataFrame互相转换

Spark SQL反对两种不同的形式将RDD转换为DataFrame。第一种是应用反射来推断蕴含特定类型对象的RDD的模式,这种基于反射的形式能够提供更简洁的代码,如果在编写Spark应用程序时,曾经明确了schema,能够应用这种形式。第二种形式是通过可编程接口来构建schema,而后将其利用于现有的RDD。此形式编写的代码更简短,此种形式创立的DataFrame,直到运行时才晓得该DataFrame的列及其类型。

上面案例的数据集如下people.txt:

Tom, 29
Bob, 30
Jack, 19

通过反射的形式

Spark SQL的Scala接口反对主动将蕴含样例类的RDD转换为DataFrame。样例类定义表的schema。通过反射读取样例类的参数名称,并映射成column的名称。

object RDD2DF_m1 {
  //创立样例类
  case class  Person(name: String, age: Int)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    //导入隐式转换,用于RDD转为DataFrame
    import spark.implicits._
    //从文本文件中创立RDD,并将其转换为DataFrame
    val peopleDF = spark.sparkContext
      .textFile("file:///E:/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    //将DataFrame注册成长期视图
    peopleDF.createOrReplaceTempView("people")
    // 运行SQL语句
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    // 应用字段索引拜访列
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    // +----------+
    // |     value|
    // +----------+
    // |Name: Jack|
    // +----------+

    // 通过字段名拜访列
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Jack|
    // +------------+
  }
}

通过构建schema的形式

通过构建schema的形式创立DataFrame次要包含三步:

  • 1.从原始RDD创立Row类型的RDD
  • 2.应用StructType,创立schema
  • 3.通过createDataFrame办法将schema利用于Row类型的RDD
object RDD2DF_m2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    //导入隐式转换,用于RDD转为DataFrame
    import spark.implicits._
    //创立原始RDD
    val peopleRDD = spark.sparkContext.textFile("E:/people.txt")
    //step 1 将原始RDD转换为ROW类型的RDD
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim.toInt))
    //step 2 创立schema
    val schema = StructType(Array(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))
    //step 3 创立DF
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    // 将DataFrame注册成长期视图
    peopleDF.createOrReplaceTempView("people")
    // 运行SQL语句
    val results = spark.sql("SELECT name FROM people")
    // 应用字段索引拜访列
    results.map(attributes => "Name: " + attributes(0)).show()
    // +----------+
    // |     value|
    // +----------+
    // | Name: Tom|
    // | Name: Bob|
    // | Name: Jack|
    // +----------+
  }
}

Spark SQL的数据源

Spark SQL反对通过DataFrame接口对各种数据源进行操作,能够应用关系转换以及长期视图对DataFrame进行操作。常见的数据源包含以下几种:

文件数据源

  • Parquet文件
  • JSON文件
  • CSV文件
  • ORC文件
private def runBasicDataSourceExample(spark: SparkSession): Unit = {
    /**
      * 读取parquet文件数据源,并将后果写入到parquet文件
      */

    val usersDF = spark
      .read
      .load("E://users.parquet")
    usersDF.show()
    // 将DF保留到parquet文件
    usersDF
      .select("name", "favorite_color")
      .write
      .mode(SaveMode.Overwrite)
      .save("E://namesAndFavColors.parquet")
    /**
      * 读取json文件数据源,并将后果写入到parquet文件
      */

    val peopleDF = spark
      .read
      .format("json")
      .load("E://people.json")
    peopleDF.show()
    // 将DF保留到parquet文件
    peopleDF
      .select("name", "age")
      .write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .save("E://namesAndAges.parquet")

    /**
      * 读取CSV文件数据源
      */
    val peopleDFCsv = spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("E://people.csv")

    /**
      * 将usersDF写入到ORC文件
      */
    usersDF.write.format("orc")
      .option("orc.bloom.filter.columns", "favorite_color")
      .option("orc.dictionary.key.threshold", "1.0")
      .option("orc.column.encoding.direct", "name")
      .mode(SaveMode.Overwrite)
      .save("E://users_with_options.orc")

    /**
      * 将peopleDF保留为长久化表,个别保留为Hive中
      */
    peopleDF
      .write
      .option("path","E://warehouse/people_bucketed") // 保留门路
      .bucketBy(42, "name")           // 依照name字段分桶
      .sortBy("age")                  // 依照age字段排序
      .saveAsTable("people_bucketed")

    /**
      * 将userDF保留为分区文件,相似于Hive分区表
      */
    usersDF
      .write
      .partitionBy("favorite_color")  // 分区字段
      .format("parquet")        // 文件格式
      .mode(SaveMode.Overwrite) // 保留模式
      .save("E://namesPartByColor.parquet")

    /**
      *
      */
    usersDF
      .write
      .option("path","E://warehouse/users_partitioned_bucketed") // 保留门路
      .partitionBy("favorite_color")  // 分区
      .bucketBy(42, "name")           // 分桶
      .saveAsTable("users_partitioned_bucketed")

    spark.sql("DROP TABLE IF EXISTS people_bucketed")
    spark.sql("DROP TABLE IF EXISTS users_partitioned_bucketed")
  }

保留模式

Scala/Java Meaning
SaveMode.ErrorIfExists(default) 如果指标文件曾经存在,则报异样
SaveMode.Append 如果指标文件或表曾经存在,则将后果追加进去
SaveMode.Overwrite 如果指标文件或表曾经存在,则笼罩原有的内容
SaveMode.Ignore 相似于SQL中的CREATE TABLE IF NOT EXISTS,如果指标文件或表曾经存在,则不做任何操作

保留为长久化表

DataFrame能够被保留为Hive的长久化表,值得注意的是,这种形式并不依赖与Hive的部署,也就是说Spark会应用Derby创立一个默认的本地Hive metastore,与createOrReplaceTempView不同,该形式会间接将后果物化。

对于基于文件的数据源( text, parquet, json等),在保留的时候能够指定一个具体的门路,比方 df.write.option(“path”, “/some/path”).saveAsTable(“t”)(存储在指定门路下的文件格式为parquet)当表被删除时,自定义的表的门路和表数据不会被移除。如果没有指定具体的门路,spark默认的是warehouse的目录(/user/hive/warehouse),当表被删除时,默认的表门路也会被删除。

Hive数据源

见上面大节:Spark SQL集成Hive

JDBC数据源

Spark SQL还包含一个能够应用JDBC从其余数据库读取数据的数据源。与应用JdbcRDD相比,应优先应用此性能。这是因为后果作为DataFrame返回,它们能够在Spark SQL中轻松解决或与其余数据源连贯。JDBC数据源也更易于应用Java或Python,因为它不须要用户提供ClassTag。

能够应用Data Sources API将近程数据库中的表加载为DataFrame或Spark SQL长期视图。用户能够在数据源选项中指定JDBC连贯属性。 user并且password通常作为用于登录数据源的连贯属性提供。除连贯属性外,Spark还反对以下不辨别大小写的选项:

属性名称 解释
url 要连贯的JDBC URL
dbtable 读取或写入的JDBC表
query 指定查问语句
driver 用于连贯到该URL的JDBC驱动类名
partitionColumn, lowerBound, upperBound 如果指定了这些选项,则必须全副指定。另外, numPartitions必须指定
numPartitions 表读写中可用于并行处理的最大分区数。这也确定了并发JDBC连贯的最大数量。如果要写入的分区数超过此限度,咱们能够通过coalesce(numPartitions)在写入之前进行调用将其升高到此限度
queryTimeout 默认为0,查问超时工夫
fetchsize JDBC的获取大小,它确定每次要获取多少行。这能够帮忙进步JDBC驱动程序的性能
batchsize 默认为1000,JDBC批处理大小,这能够帮忙进步JDBC驱动程序的性能。
isolationLevel 事务隔离级别,实用于以后连贯。它能够是一个NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连贯对象定义,缺省值为规范事务隔离级别READ_UNCOMMITTED。此选项仅实用于写作。
sessionInitStatement 在向近程数据库关上每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句,应用它来实现会话初始化代码。
truncate 这是与JDBC writer相干的选项。当SaveMode.Overwrite启用时,就会清空指标表的内容,而不是删除和重建其现有的表。默认为false
pushDownPredicate 用于启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种状况下,Spark将尽可能将过滤器下推到JDBC数据源。
object JdbcDatasetExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("JdbcDatasetExample")
      .master("local") //设置为本地运行
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runJdbcDatasetExample(spark)
  }

  private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    //留神:从JDBC源加载数据
    val jdbcPersonDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost/mydb")
      .option("dbtable", "person")
      .option("user", "root")
      .option("password", "123qwe")
      .load()
    //打印jdbcDF的schema
    jdbcPersonDF.printSchema()
    //打印数据
    jdbcPersonDF.show()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "123qwe")
    //通过.jdbc的形式加载数据
    val jdbcStudentDF = spark
      .read
      .jdbc("jdbc:mysql://localhost/mydb", "student", connectionProperties)
    //打印jdbcDF的schema
    jdbcStudentDF.printSchema()
    //打印数据
    jdbcStudentDF.show()
    // 保留数据到JDBC源
    jdbcStudentDF.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost/mydb")
      .option("dbtable", "student2")
      .option("user", "root")
      .option("password", "123qwe")
      .mode(SaveMode.Append)
      .save()

    jdbcStudentDF
      .write
      .mode(SaveMode.Append)
      .jdbc("jdbc:mysql://localhost/mydb", "student2", connectionProperties)

  }
}

Spark SQL集成Hive

Spark SQL还反对读取和写入存储在Apache Hive中的数据。然而,因为Hive具备大量依赖项,因而这些依赖项不蕴含在默认的Spark公布包中。如果能够在类门路上找到Hive依赖项,Spark将主动加载它们。请留神,这些Hive依赖项也必须存在于所有工作节点(worker nodes)上,因为它们须要拜访Hive序列化和反序列化库(SerDes)能力拜访存储在Hive中的数据。

将hive-site.xml,core-site.xml以及hdfs-site.xml文件放在conf/下

在应用Hive时,必须实例化一个反对Hive的SparkSession,包含连贯到持久性Hive Metastore,反对Hive 的序列化、反序列化(serdes)和Hive用户定义函数。没有部署Hive的用户仍能够启用Hive反对。如果未配置hive-site.xml,则上下文(context)会在当前目录中主动创立metastore_db,并且会创立一个由spark.sql.warehouse.dir配置的目录,其默认目录为spark-warehouse,位于启动Spark应用程序的当前目录中。请留神,自Spark 2.0.0以来,该在hive-site.xml中的hive.metastore.warehouse.dir属性已被标记过期(deprecated)。应用spark.sql.warehouse.dir用于指定warehouse中的默认地位。可能须要向启动Spark应用程序的用户授予写入的权限。

​ 上面的案例为在本地运行(为了不便查看打印的后果),运行完结之后会发现在我的项目的目录下E:IdeaProjectsmyspark创立了spark-warehouse和metastore_db的文件夹。能够看出没有部署Hive的用户仍能够启用Hive反对,同时也能够将代码打包,放在集群上运行。

object SparkHiveExample {


  case class Record(key: Int, value: String)

  def main(args: Array[String]) {


    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", "e://warehouseLocation")
      .master("local")//设置为本地运行
      .enableHiveSupport()
      .getOrCreate()


    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    import spark.implicits._
    import spark.sql
    //应用Spark SQL 的语法创立Hive中的表
    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    sql("LOAD DATA LOCAL INPATH 'file:///e:/kv1.txt' INTO TABLE src")

    // 应用HiveQL查问
    sql("SELECT * FROM src").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // 反对应用聚合函数
    sql("SELECT COUNT(*) FROM src").show()
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // SQL查问的后果是一个DataFrame,反对应用所有的惯例的函数
    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 AND key > 0 ORDER BY key")

    // DataFrames是Row类型的, 容许你按程序拜访列.
    val stringsDS = sqlDF.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    //能够通过SparkSession应用DataFrame创立一个长期视图
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")

    //能够用DataFrame与Hive中的表进行join查问
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // |  5| val_5|  5| val_5|
    // ...

    //创立一个Parquet格局的hive托管表,应用的是HQL语法,没有应用Spark SQL的语法("USING hive")
    sql("CREATE TABLE IF NOT EXISTS hive_records(key int, value string) STORED AS PARQUET")

    //读取Hive中的表,转换成了DataFrame
    val df = spark.table("src")
    //将该DataFrame保留为Hive中的表,应用的模式(mode)为复写模式(Overwrite)
    //即如果保留的表曾经存在,则会笼罩掉原来表中的内容
    df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
    // 查问表中的数据
    sql("SELECT * FROM hive_records").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // 设置Parquet数据文件门路
    val dataDir = "/tmp/parquet_data"
    //spark.range(10)返回的是DataSet[Long]
    //将该DataSet间接写入parquet文件
    spark.range(10).write.parquet(dataDir)
    // 在Hive中创立一个Parquet格局的内部表
    sql(s"CREATE EXTERNAL TABLE IF NOT EXISTS hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
    // 查问下面创立的表
    sql("SELECT * FROM hive_ints").show()
    // +---+
    // |key|
    // +---+
    // |  0|
    // |  1|
    // |  2|
    // ...

    // 开启Hive动静分区
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    // 应用DataFrame API创立Hive的分区表
    df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")

    //分区键‘key’将会在最终的schema中被移除
    sql("SELECT * FROM hive_part_tbl").show()
    // +-------+---+
    // |  value|key|
    // +-------+---+
    // |val_238|238|
    // | val_86| 86|
    // |val_311|311|
    // ...

    spark.stop()

  }
}

Thrift server与Spark SQL CLI

能够应用JDBC/ODBC或者命令行拜访Spark SQL,通过这种形式,用户能够间接应用SQL运行查问,而不必编写代码。

Thrift JDBC/ODBC server

Thrift JDBC/ODBC server与Hive的HiveServer2向对应,能够应用Beeline拜访JDBC服务器。在Spark的sbin目录下存在start-thriftserver.sh脚本,应用此脚本启动JDBC/ODBC服务器:

./sbin/start-thriftserver.sh

应用beeline拜访JDBC/ODBC服务器,Beeline会要求提供用户名和明码,在非平安模式下,只需输出用户名和空白明码即可

beeline> !connect jdbc:hive2://localhost:10000

Spark SQL CLI

Spark SQL CLI是在本地模式下运行Hive Metastore服务并执行从命令行输出的查问的便捷工具。请留神,Spark SQL CLI无奈与Thrift JDBC服务器通信。

要启动Spark SQL CLI,只须要在Spark的bin目录中运行以下命令:

./spark-sql 

总结

本文次要对Spark SQL进行了论述,次要包含Spark SQL的介绍、DataFrame&DataSet API根本应用、Catalyst Optimizer优化器的基本原理、Spark SQL编程、Spark SQL数据源以及与Hive集成、Thrift server与Spark SQL CLI。下一篇将分享Spark Streaming编程指南。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理