共计 5739 个字符,预计需要花费 15 分钟才能阅读完成。
1. Spark SQL 是什么?
- 处理 结构化 数据的一个 spark 的模块
- 它提供了一个编程抽象叫做 DataFrame 并且作为分布式 SQL 查询引擎的作用
2. Spark SQL 的特点
- 多语言的接口支持(java python scala)
- 统一的数据访问
- 完全兼容 hive
- 支持标准的连接
3. 为什么学习 SparkSQL?
我们已经学习了 Hive,它是将 Hive SQL 转换成 MapReduce 然后提交到集群上执行,大大简化了编写 MapReduce 的程序的复杂性,由于 MapReduce 这种计算模型执行效率比较慢。所有 Spark SQL 的应运而生,它是将 Spark SQL 转换成 RDD,然后提交到集群执行,执行效率非常快!
4. DataFrame(数据框)
- 与 RDD 类似,DataFrame 也是一个分布式数据容器
- 然而 DataFrame 更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即 schema
- DataFrame 其实就是带有 schema 信息的 RDD
5. SparkSQL1.x 的 API 编程
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
5.1 使用 sqlContext 创建 DataFrame(测试用)
object Ops3 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man")))
val df1: DataFrame = sqlContext.createDataFrame(rdd1)
df1.show(1)
}
}
case class Person(name: String, age: Int, sex: String);
5.2 使用 sqlContxet 中提供的隐式转换函数(测试用)
import org.apache.spark
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man")))
import sqlContext.implicits._
val df1: DataFrame = rdd1.toDF
df1.show()
5.3 使用 SqlContext 创建 DataFrame(常用)
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowRDD: RDD[Row] = linesRDD.map(line => {val lineSplit: Array[String] = line.split(",")
Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2))
})
val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema)
rowDF.show()
6. 使用新版本的 2.x 的 API
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
// 数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("p1")
val df2 = sparkSession.sql("select * from p1")
df2.show()
7. 操作 SparkSQL 的方式
7.1 使用 SQL 语句的方式对 DataFrame 进行操作
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//Spark2.x 新的 API 相当于 Spark1.x 的 SQLContext
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
// 数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("p1")// 这是 Sprk2.x 新的 API 相当于 Spark1.x 的 registTempTable()
val df2 = sparkSession.sql("select * from p1")
df2.show()
7.2 使用 DSL 语句的方式对 DataFrame 进行操作
DSL(domain specific language ) 特定领域语言
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
// 数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.show()
8. SparkSQL 的输出
8.1 写出到 JSON 文件
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
// 数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.write.json("hdfs://uplooking02:8020/sparktest1")
8.2 写出到关系型数据库(mysql)
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
// 数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
val url = "jdbc:mysql://localhost:3306/test"
// 表会自动创建
val tbName = "person1";
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "root")
//SaveMode 默认为 ErrorIfExists
df.write.mode(SaveMode.Append).jdbc(url, tbName, prop)
9. 作业
9.1 大数据项目
1. 在线学习日志分析
xxx.log
2. 数据清洗, 转换成 RDD
3. 使用 sparksql 来做离线计算, 计算结果落地到 mysql 中
9.2 Web 项目做数据的可视化
SpringBoot+Echars
正文完