弹性分布式数据集(Resilient Distributed Dataset,RDD)是 Spark 中的外围概念。简略的说,就是在 Spark 中创立一个 RDD,而后通过 RDD 对数据进行各种各样的操作。
RDD 创立
RDD 的创立包含:
- 从汇合中创立
- 从其余存储(比方 hdfs、本地文件等)创立
- 从其余 RDD 创立
上面通过这几种形式来创立 RDD,在创立 RDD 之前须要创立 SparkContext,从上面的例子能够看出 SparkContext 须要一个 conf,这个 conf 的 master 属性,在后面两张曾经提过了。
从汇合中创立有两个办法,parallelize 以及 makeRDD,makeRDD 实际上调用的是 parallelize 办法,这两个办法都容许除了汇合外,还能够再传递一个分区的参数。
从其余存储包含本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等,这里的文件能够用通配符,而不限定某一个文件,比方 textFile(“/my/directory/*.txt”),就是 /my/directory 下所有的 txt 文件。
object CreateRDD {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CreateRDD").setMaster("local")
val sc = new SparkContext(conf)
// parallelize
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
println(rdd1.count)
val rdd2: RDD[Int] = sc.makeRDD(data)
// makeRDD
println(rdd2.count)
val rdd3: RDD[String] = sc.textFile("D:\\data\\test.txt")
// 从其余存储
println(rdd3.first)
// 从其余 RDD
val rdd4: RDD[String] = rdd3.map(_ + "bbb")
println(rdd4.first)
// 敞开
sc.stop
}
}
RDD 的操作
RDD 反对两种操作:transformations 和 action。
transformations 从字面上了解,就是转换,能够说从一个数据集转换成另外一个数据集,这里须要留神的是,所有的 transformations 都是 lazy 的,也就是说他只管执行了咱们那行语句,比方 map 映射,他并不会进行 map 映射,只有触发到 action 的时候,才会进行计算。
action,会触发 RDD 的计算并把最终后果返回给驱动程序。
Transformations
map
map 就是映射,也就是说,传入一个值,把他变成另外一个值。
比方上面的例子中,把 1,2,3,4,5 变成 2,4,6,8,10,传入 1 把他变成 2,传入 2 把他变成 4。
object MapRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("MapRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val mapRdd = rdd1.map(_ * 2)
mapRdd.foreach(println)
// 传入 1 把他变成 2,传入 2 把他变成 4
val mapRdd2 = rdd1.map(x => x * 2)
mapRdd2.foreach(println)
}
}
filter
filter 就是过滤,你给我一个值,我计算后返回 true 或 false,如果返回的是 false,则不会在新的汇合中。
比方上面的例子中,偶数的才返回 true,所以最初打印的是 2,4
object FilterRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("FilterRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val rdd2: RDD[Int] = rdd1.filter(x => x % 2 == 0)
rdd2.foreach(println)
}
}
flatMap
flatMap 就是压平,你给我一个值,我给你零个或者多个值。
比方上面的例子中,你给我 ”a b”,我通过 split 后,给你 a 和 b 两个值。
object FlatMapRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("FlatMapRDD").setMaster("local"))
val data = Array("a b", "c d")
val rdd1: RDD[String] = sc.parallelize(data)
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))
rdd2.foreach(println)
}
}
mapPartitions
相似于 map,在 RDD 的每个分区 (块) 上独自运行。当因而运行在类型为 T 的 RDD 上,返回值,func 的类型必须是 Iterator<T> => Iterator<U>。
比方上面的例子中,你给我 x,我返回的是 x.map(_ * 2)。
object MapPartitionsRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val mapRdd = rdd1.mapPartitions(x => x.map(_ * 2))
mapRdd.foreach(println)
}
}
mapPartitionsWithIndex
相似于 mapPartitions,为 func 提供了一个示意分区索引的整数值。
也就是说,咱们能够通过传入的参数 index,获取到对应的分区。
object MapPartitionsWithIndexRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsWithIndexRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data, 2)
val mapRdd = rdd1.mapPartitionsWithIndex((index, x) => x.map(index + "," + _))
mapRdd.foreach(println)
}
}
sample
sample 就是采样,比方 100 个随机拿出 30 个。
sample 参数有三个:
- withReplacement:元素能够被屡次采样(采样时替换)
- fraction:样本的冀望大小作为 RDD 大小的一部分。当 withReplacement 为 true 时,fraction 为每个元素的冀望次数,数值必须大于 0;当 withReplacement 为 false 时,fraction 为每个元素的冀望概率,数值为 0 到 1。
- seed:随机数生成器的种子
object SampleRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("SampleRDD").setMaster("local"))
val data = 1 to 100
val rdd1: RDD[Int] = sc.parallelize(data)
val rdd2: RDD[Int] = rdd1.sample(true, 0.5, 1)
println(rdd2.count)
}
}
union
union 就是并集,把两个 rdd 合并起来。
比方上面的例子中,把 rdd1 和 rdd2 合并了。
object UnionRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("UnionRDD").setMaster("local"))
val data1 = Array(1, 2, 3, 4, 5)
val data2 = Array(6, 7, 8, 9, 10)
val rdd1: RDD[Int] = sc.parallelize(data1)
val rdd2: RDD[Int] = sc.parallelize(data2)
val rdd3 = rdd1.union(rdd2)
rdd3.foreach(println)
}
}
intersection
intersection 就是交加,取 2 个 rdd 雷同的局部。
比方上面的例子中,rdd1 和 rdd2 的交加是 3,4,5。
object IntersectionRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("IntersectionRDD").setMaster("local"))
val data1 = Array(1, 2, 3, 4, 5)
val data2 = Array(3, 4, 5, 6, 7)
val rdd1: RDD[Int] = sc.parallelize(data1)
val rdd2: RDD[Int] = sc.parallelize(data2)
val rdd3 = rdd1.intersection(rdd2)
rdd3.foreach(println)
}
}
distinct
distinct 就是过滤掉反复的局部。
比方上面的例子中,1,2,3 有反复,最初打印的是 1,2,3,4,5。
object DistinctRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("DistinctRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5, 1, 2, 3)
val rdd1: RDD[Int] = sc.parallelize(data)
val rdd2: RDD[Int] = rdd1.distinct
rdd2.foreach(println)
}
}
groupByKey
groupByKey 就是通过 key 进行分组,如果没有 key 的话,须要 keyBy 办法指定 key。
比方上面的例子中,把模 2 的后果当做 key,并返回一个新的 rdd。
在 foreach 中,能够看到参数包含 key 以及 iterator。
object GroupByKeyRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("GroupByKeyRDD").setMaster("local"))
val data1 = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data1)
val rdd2: RDD[(Int, Int)] = rdd1.keyBy(_ % 2)
rdd2.groupByKey().foreach(x => {
val iterator = x._2.iterator
var num = ""while (iterator.hasNext) num = num + iterator.next +" "println(x._1 +":" + num)
})
val data2 = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
val rdd3: RDD[(String, Int)] = sc.parallelize(data2)
rdd3.groupByKey().foreach(x => {
val iterator = x._2.iterator
var num = ""while (iterator.hasNext) num = num + iterator.next +" "println(x._1 +":" + num)
})
}
}
reduceByKey
reduceByKey 会兑雷同的 key 进行计算。
比方上面的例子中,最初的后果是(a,9)、(b,6)
object ReduceByKeyRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("ReduceByKeyRDD").setMaster("local"))
val data = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
val rdd1: RDD[(String, Int)] = sc.parallelize(data)
val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_ + _)
rdd2.foreach(println)
}
}
aggregateByKey
先按分区聚合,最初再进行聚合。
代码如下:
object AggregateByKeyRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("AggregateByKeyRDD").setMaster("local"))
val data = Array(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("a", 5), ("a", 6), ("b", 7), ("b", 8))
val rdd1: RDD[(String, Int)] = sc.parallelize(data, 2)
val rdd2: RDD[(String, Int)] = rdd1.aggregateByKey(3)(math.max(_, _), _ + _)
rdd2.foreach(println)
}
}
这个代码咱们用下图来阐明一下。
- 把汇合进行分区
- 依据 key 进行分组
- 分组后进行 max 计算,也就是下面 aggregateByKey 后的第一个函数,这里的 max 有两个值,一个是 aggregateByKey 前面的 3,另外一个就是 key 对应的 value,比方 a 对应的值是 1,2,这两个值和 3 求最大值,失去的 3,所以后果就是 a,3。
- 两个分区计算后的后果进行相加,也就是下面 aggregateByKey 后的第二个函数。所以最初的后果就是(b,12)、(a,9)
sortByKey
sortByKey 就是依据 key 进行排序,第一个参数如果是 true 就是升序,也是默认值,false 就是降序。
object SortByKeyRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("SortByKeyRDD").setMaster("local"))
val data = Array(("a", 1), ("c", 3), ("d", 4), ("b", 2))
val rdd1: RDD[(String,Int)] = sc.parallelize(data)
val rdd2: RDD[(String, Int)] = rdd1.sortByKey()
rdd2.foreach(println)
}
}
join
相似 mysql 的 join
object JoinRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("JoinRDD").setMaster("local"))
val data1 = Array(("a", 1), ("c", 3), ("d", 4), ("b", 2))
val data2 = Array(("a", 11), ("d", 14), ("e", 25))
val rdd1: RDD[(String, Int)] = sc.parallelize(data1)
val rdd2: RDD[(String, Int)] = sc.parallelize(data2)
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
val rdd4: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2)
val rdd5: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
val rdd6: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
// 相当于 mysql 的 INNER JOIN
// 后果:// (d,(4,14))
// (a,(1,11))
println("rdd3:join")
rdd3.foreach(println)
println("rdd4:fullOuterJoin")
// 返回左右数据集的全副数据,左右有一边不存在的数据以 None 填充
// 后果:// (d,(Some(4),Some(14)))
// (e,(None,Some(25)))
// (a,(Some(1),Some(11)))
// (b,(Some(2),None))
// (c,(Some(3),None))
rdd4.foreach(println)
// 相当于 mysql 的 LEFT JOIN
// 后果:// (d,(4,Some(14)))
// (a,(1,Some(11)))
// (b,(2,None))
// (c,(3,None))
println("rdd5:leftOuterJoin")
rdd5.foreach(println)
// 相当于 mysql 的 RIGHT JOIN
// 后果:// (d,(Some(4),14))
// (e,(None,25))
// (a,(Some(1),11))
println("rdd6:rightOuterJoin")
rdd6.foreach(println)
}
}
cogroup
对两个 RDD 中的 KV 元素,每个 RDD 中雷同 key 中的元素别离聚合成一个汇合。
比方上面的例子中,对 rdd1 的 a,拿到的后果是(1, 3),对于 rdd2 的 a,拿到的后果是(23),所以最终的后果就是(a,(CompactBuffer(1, 3),CompactBuffer(23)))。
object CogroupRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("CogroupRDD").setMaster("local"))
val data1 = Array(("a", 1), ("a", 3),("b", 2))
val data2 = Array(("b", 24), ("a", 23),("b", 22))
val rdd1: RDD[(String,Int)] = sc.parallelize(data1)
val rdd2: RDD[(String,Int)] = sc.parallelize(data2)
val rdd3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
// 后果:// (a,(CompactBuffer(1, 3),CompactBuffer(23)))
// (b,(CompactBuffer(2),CompactBuffer(24, 22)))
rdd3.foreach(println)
}
}
cartesian
就是笛卡尔。
object CartesianRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("CartesianRDD").setMaster("local"))
val data1 = Array(1, 2)
val data2 = Array(3, 4)
val rdd1: RDD[Int] = sc.parallelize(data1)
val rdd2: RDD[Int] = sc.parallelize(data2)
val rdd3: RDD[(Int, Int)] = rdd1.cartesian(rdd2)
// (1,3)
// (1,4)
// (2,3)
// (2,4)
rdd3.foreach(println)
}
}
action
reduce
依据传入的函数,对每个值进行计算
object ReduceRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("ReduceRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val result: Int = rdd1.reduce(_ + _)
println(result)
}
}
collect
拿到 rdd 的汇合给 driver 端,通常确认后果比拟少的时候用。
object CollectRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("CollectRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val result: Array[Int] = rdd1.collect()
println(result.mkString(","))
}
}
count
拿到 rdd 的汇合数量。
object CountRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("CountRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val result: Long = rdd1.count()
println(result)
}
}
first
拿到 rdd 汇合的第一个元素。
object FirstRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("FirstRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val result: Int = rdd1.first()
println(result)
}
}
take
拿到后面 n 个元素,比方上面例子中,传的是 3,所以拿到是 1,2,3。
object TakeRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("TakeRDD").setMaster("local"))
val data = Array(1, 2, 3, 4, 5)
val rdd1: RDD[Int] = sc.parallelize(data)
val result: Array[Int] = rdd1.take(3)
println(result.mkString(","))
}
}
takeSample
随机抽取 n 个元素。
object TakeSampleRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("TakeSampleRDD").setMaster("local"))
val data = 1 to 100
val rdd1: RDD[Int] = sc.parallelize(data)
val result: Array[Int] = rdd1.takeSample(true, 5, 1)
println(result.mkString(","))
}
}
takeOrdered
排序后取 N 个元素。
object TakeOrderedRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("TakeOrderedRDD").setMaster("local"))
val data = 1 to 100
val rdd1: RDD[Int] = sc.parallelize(data)
val result: Array[Int] = rdd1.takeOrdered(5)
println(result.mkString(","))
}
}
countByKey
计算每个 key 呈现的次数
object CountByKeyRDD {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("CountByKeyRDD").setMaster("local"))
val data = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
val rdd1: RDD[(String, Int)] = sc.parallelize(data)
val result: collection.Map[String, Long] = rdd1.countByKey()
result.foreach(item => println(item._1 + ":" + item._2))
}
}
foreach
遍历,比方下面很多例子的打印。