Spark SQL 反对多种数据源,如 JDBC、HDFS、HBase。它的外部组件,如 SQL 的语法解析器、分析器等反对重定义进行扩大,能更好的满足不同的业务场景。与 Spark Core 无缝集成,大数据培训提供了 DataSet/DataFrame 的可编程形象数据模型,并且可被视为一个分布式的 SQL 查问引擎。
DataSet/DataFrame
DataSet/DataFrame 都是 Spark SQL 提供的分布式数据集,绝对于 RDD 而言,除了记录数据以外,还记录表的 schema 信息。
DataFrame 是 DataSet 以命名列形式组织的分布式数据集,相似于 RDBMS 中的表,或者 R 和 Python 中的 data frame。DataFrame API 反对 Scala、Java、Python、R。在 Scala API 中,DataFrame 变成类型为 Row 的 Dataset:
type DataFrame = Dataset[Row]。
DataFrame 在编译期不进行数据中字段的类型查看,在运行期进行查看。但 DataSet 则与之相同,因为它是强类型的。此外,二者都是应用 catalyst 进行 sql 的解析和优化。为了不便,以下对立应用 DataSet 统称。
DataSet 创立
DataSet 通常通过加载内部数据或通过 RDD 转化创立。
1. 加载内部数据
以加载 json 和 mysql 为例:
val ds = sparkSession.read.json(“/ 门路 /people.json”)
val ds = sparkSession.read.format(“jdbc”)
.options(Map(“url” -> “jdbc:mysql://ip:port/db”,
“driver” -> “com.mysql.jdbc.Driver”,
“dbtable” -> “tableName”, “user” -> “root”, “root” -> “123”)).load()
2.RDD 转换为 DataSet
通过 RDD 转化创立 DataSet,关键在于为 RDD 指定 schema,通常有两种形式(伪代码):
1. 定义一个 case class,利用反射机制来推断
1) 从 HDFS 中加载文件为一般 RDD
val lineRDD = sparkContext.textFile(“hdfs://ip:port/person.txt”).map(_.split(” “))
2) 定义 case class(相当于表的 schema)
case class Person(id:Int, name:String, age:Int)
3) 将 RDD 和 case class 关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
4) 将 RDD 转换成 DataFrame
val ds= personRDD.toDF
2. 手动定义一个 schema StructType,间接指定在 RDD 上
val schemaString =”name age”
val schema = StructType(schemaString.split(” “).map(fieldName => StructField(fieldName, StringType, true)))
val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))
val ds = sparkSession.createDataFrame(rowRdd,schema)操作 DataSet 的两种格调语法
DSL 语法
1. 查问 DataSet 局部列中的内容
personDS.select(col(“name”))
personDS.select(col(“name”), col(“age”))
2. 查问所有的 name 和 age 和 salary,并将 salary 加 1000
personDS.select(col(“name”), col(“age”), col(“salary”) + 1000)
personDS.select(personDS(“name”), personDS(“age”), personDS(“salary”) + 1000)
3. 过滤 age 大于 18 的
personDS.filter(col(“age”) > 18)
4. 按年龄进行分组并统计雷同年龄的人数
personDS.groupBy(“age”).count()
留神:间接应用 col 办法须要 import org.apache.spark.sql.functions._
SQL 语法
如果想应用 SQL 格调的语法,须要将 DataSet 注册成表
personDS.registerTempTable(“person”)
// 查问年龄最大的前两名
val result = sparkSession.sql(“select * from person order by age desc limit 2”)
// 保留后果为 json 文件。留神:如果不指定存储格局,则默认存储为 parquet
result.write.format(“json”).save(“hdfs://ip:port/res2”)
Spark SQL 的几种应用形式
1.sparksql-shell 交互式查问
就是利用 Spark 提供的 shell 命令行执行 SQL
2. 编程
首先要获取 Spark SQL 编程 ” 入口 ”:SparkSession(当然在晚期版本中大家可能更相熟的是 SQLContext,如果是操作 hive 则为 HiveContext)。这里以读取 parquet 为例:
val spark = SparkSession.builder()
.appName(“example”).master(“local[*]”).getOrCreate();
val df = sparkSession.read.format(“parquet”).load(“/ 门路 /parquet 文件 ”)
而后就能够针对 df 进行业务解决了。
3.Thriftserver
beeline 客户端连贯操作
启动 spark-sql 的 thrift 服务,sbin/start-thriftserver.sh,启动脚本中配置好 Spark 集群服务资源、地址等信息。而后通过 beeline 连贯 thrift 服务进行数据处理。
hive-jdbc 驱动包来拜访 spark-sql 的 thrift 服务
在我的项目 pom 文件中引入相关驱动包,跟拜访 mysql 等 jdbc 数据源相似。示例:
Class.forName(“org.apache.hive.jdbc.HiveDriver”)
val conn = DriverManager.getConnection(“jdbc:hive2://ip:port”, “root”, “123”);
try {
val stat = conn.createStatement()
val res = stat.executeQuery(“select * from people limit 1”)
while (res.next()) {
println(res.getString("name"))
}
} catch {
case e: Exception => e.printStackTrace()
} finally{
if(conn!=null) conn.close()
}
Spark SQL 获取 Hive 数据
Spark SQL 读取 hive 数据的关键在于将 hive 的元数据作为服务裸露给 Spark。除了通过下面 thriftserver jdbc 连贯 hive 的形式,也能够通过上面这种形式:
首先,配置 $HIVE_HOME/conf/hive-site.xml,减少如下内容:
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:port</value>
</property>
而后,启动 hive metastore
最初,将 hive-site.xml 复制或者软链到 $SPARK_HOME/conf/。如果 hive 的元数据存储在 mysql 中,那么须要将 mysql 的连贯驱动 jar 包如 mysql-connector-java-5.1.12.jar 放到 $SPARK_HOME/lib/ 下,启动 spark-sql 即可操作 hive 中的库和表。而此时应用 hive 元数据获取 SparkSession 的形式为:
val spark = SparkSession.builder()
.config(sparkConf).enableHiveSupport().getOrCreate()
UDF、UDAF、Aggregator
UDF
UDF 是最根底的用户自定义函数,以自定义一个求字符串长度的 udf 为例:
val udf_str_length = udf{(str:String) => str.length}
spark.udf.register(“str_length”,udf_str_length)
val ds =sparkSession.read.json(“ 门路 /people.json”)
ds.createOrReplaceTempView(“people”)
sparkSession.sql(“select str_length(address) from people”)UDAF
定义 UDAF,须要继承抽象类 UserDefinedAggregateFunction,它是弱类型的,上面的 aggregator 是强类型的。以求平均数为例:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField(“inputColumn”, LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a Row
that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer buffer
with new input data from input
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to buffer1
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// Register the function to access it
spark.udf.register(“myAverage”, MyAverage)
val df = spark.read.json(“examples/src/main/resources/employees.json”)
df.createOrReplaceTempView(“employees”)
df.show()
val result = spark.sql(“SELECT myAverage(salary) as average_salary FROM employees”)
result.show()Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify buffer
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json(“examples/src/main/resources/employees.json”).as[Employee]
ds.show()
// Convert the function to a TypedColumn
and give it a name
val averageSalary = MyAverage.toColumn.name(“average_salary”)
val result = ds.select(averageSalary)
result.show()