弹性分布式数据集(Resilient Distributed Dataset,RDD)是 Spark 中的外围概念。简略的说,就是在Spark中创立一个RDD,而后通过RDD对数据进行各种各样的操作。

RDD创立

RDD的创立包含:

  1. 从汇合中创立
  2. 从其余存储(比方hdfs、本地文件等)创立
  3. 从其余RDD创立

上面通过这几种形式来创立RDD,在创立RDD之前须要创立SparkContext,从上面的例子能够看出SparkContext须要一个conf,这个conf的master属性,在后面两张曾经提过了。
从汇合中创立有两个办法,parallelize以及makeRDD,makeRDD实际上调用的是parallelize办法,这两个办法都容许除了汇合外,还能够再传递一个分区的参数。
从其余存储包含本地文件系统、HDFS、Cassandra、HBase、Amazon S3等,这里的文件能够用通配符,而不限定某一个文件,比方textFile("/my/directory/*.txt"),就是/my/directory下所有的txt文件。

object CreateRDD {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("CreateRDD").setMaster("local")    val sc = new SparkContext(conf)    // parallelize    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    println(rdd1.count)    val rdd2: RDD[Int] = sc.makeRDD(data)    // makeRDD    println(rdd2.count)    val rdd3: RDD[String] = sc.textFile("D:\\data\\test.txt")    // 从其余存储    println(rdd3.first)    // 从其余RDD    val rdd4: RDD[String] = rdd3.map(_ + "bbb")    println(rdd4.first)    // 敞开    sc.stop  }}

RDD的操作

RDD反对两种操作:transformations和action。
transformations从字面上了解,就是转换,能够说从一个数据集转换成另外一个数据集,这里须要留神的是,所有的transformations都是lazy的,也就是说他只管执行了咱们那行语句,比方map映射,他并不会进行map映射,只有触发到action的时候,才会进行计算。
action,会触发RDD的计算并把最终后果返回给驱动程序。

Transformations

map

map就是映射,也就是说,传入一个值,把他变成另外一个值。
比方上面的例子中,把1,2,3,4,5变成2,4,6,8,10,传入1把他变成2,传入2把他变成4。

object MapRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("MapRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val mapRdd = rdd1.map(_ * 2)    mapRdd.foreach(println)    // 传入1把他变成2,传入2把他变成4    val mapRdd2 = rdd1.map(x => x * 2)    mapRdd2.foreach(println)  }}

filter

filter就是过滤,你给我一个值,我计算后返回true或false,如果返回的是false,则不会在新的汇合中。
比方上面的例子中,偶数的才返回true,所以最初打印的是2,4

object FilterRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("FilterRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val rdd2: RDD[Int] = rdd1.filter(x => x % 2 == 0)    rdd2.foreach(println)  }}

flatMap

flatMap就是压平,你给我一个值,我给你零个或者多个值。
比方上面的例子中,你给我"a b",我通过split后,给你a和b两个值。

object FlatMapRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("FlatMapRDD ").setMaster("local"))    val data = Array("a b", "c d")    val rdd1: RDD[String] = sc.parallelize(data)    val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))    rdd2.foreach(println)  }}

mapPartitions

相似于map,在RDD的每个分区(块)上独自运行。当因而运行在类型为T的RDD上,返回值,func的类型必须是Iterator<T> => Iterator<U>。
比方上面的例子中,你给我x,我返回的是x.map(_ * 2)。

object MapPartitionsRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val mapRdd = rdd1.mapPartitions(x => x.map(_ * 2))    mapRdd.foreach(println)  }}

mapPartitionsWithIndex

相似于mapPartitions,为func提供了一个示意分区索引的整数值。
也就是说,咱们能够通过传入的参数index,获取到对应的分区。

object MapPartitionsWithIndexRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsWithIndexRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data, 2)    val mapRdd = rdd1.mapPartitionsWithIndex((index, x) => x.map(index + "," + _))    mapRdd.foreach(println)  }}

sample

sample就是采样,比方100个随机拿出30个。
sample参数有三个:

  • withReplacement:元素能够被屡次采样(采样时替换)
  • fraction:样本的冀望大小作为RDD大小的一部分。当withReplacement为true时,fraction为每个元素的冀望次数,数值必须大于0;当withReplacement为false时,fraction为每个元素的冀望概率,数值为0到1。
  • seed:随机数生成器的种子
object SampleRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("SampleRDD").setMaster("local"))    val data = 1 to 100    val rdd1: RDD[Int] = sc.parallelize(data)    val rdd2: RDD[Int] = rdd1.sample(true, 0.5, 1)    println(rdd2.count)  }}

union

union就是并集,把两个rdd合并起来。
比方上面的例子中,把rdd1和rdd2合并了。

object UnionRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("UnionRDD").setMaster("local"))    val data1 = Array(1, 2, 3, 4, 5)    val data2 = Array(6, 7, 8, 9, 10)    val rdd1: RDD[Int] = sc.parallelize(data1)    val rdd2: RDD[Int] = sc.parallelize(data2)    val rdd3 = rdd1.union(rdd2)    rdd3.foreach(println)  }}

intersection

intersection就是交加,取2个rdd雷同的局部。
比方上面的例子中,rdd1和rdd2的交加是3,4,5。

object IntersectionRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("IntersectionRDD").setMaster("local"))    val data1 = Array(1, 2, 3, 4, 5)    val data2 = Array(3, 4, 5, 6, 7)    val rdd1: RDD[Int] = sc.parallelize(data1)    val rdd2: RDD[Int] = sc.parallelize(data2)    val rdd3 = rdd1.intersection(rdd2)    rdd3.foreach(println)  }}

distinct

distinct就是过滤掉反复的局部。
比方上面的例子中,1,2,3有反复,最初打印的是1,2,3,4,5。

object DistinctRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("DistinctRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5, 1, 2, 3)    val rdd1: RDD[Int] = sc.parallelize(data)    val rdd2: RDD[Int] = rdd1.distinct    rdd2.foreach(println)  }}

groupByKey

groupByKey就是通过key进行分组,如果没有key的话,须要keyBy办法指定key。
比方上面的例子中,把模2的后果当做key,并返回一个新的rdd。
在foreach中,能够看到参数包含key以及iterator。

object GroupByKeyRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("GroupByKeyRDD").setMaster("local"))    val data1 = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data1)    val rdd2: RDD[(Int, Int)] = rdd1.keyBy(_ % 2)    rdd2.groupByKey().foreach(x => {      val iterator = x._2.iterator      var num = ""      while (iterator.hasNext) num = num + iterator.next + " "      println(x._1 + ":" + num)    })    val data2 = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))    val rdd3: RDD[(String, Int)] = sc.parallelize(data2)    rdd3.groupByKey().foreach(x => {      val iterator = x._2.iterator      var num = ""      while (iterator.hasNext) num = num + iterator.next + " "      println(x._1 + ":" + num)    })  }}

reduceByKey

reduceByKey会兑雷同的key进行计算。
比方上面的例子中,最初的后果是(a,9)、(b,6)

object ReduceByKeyRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("ReduceByKeyRDD").setMaster("local"))    val data = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))    val rdd1: RDD[(String, Int)] = sc.parallelize(data)    val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_ + _)    rdd2.foreach(println)  }}

aggregateByKey

先按分区聚合,最初再进行聚合。
代码如下:

object AggregateByKeyRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("AggregateByKeyRDD").setMaster("local"))    val data = Array(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("a", 5), ("a", 6), ("b", 7), ("b", 8))    val rdd1: RDD[(String, Int)] = sc.parallelize(data, 2)    val rdd2: RDD[(String, Int)] = rdd1.aggregateByKey(3)(math.max(_, _), _ + _)    rdd2.foreach(println)  }}

这个代码咱们用下图来阐明一下。

  1. 把汇合进行分区
  2. 依据key进行分组
  3. 分组后进行max计算,也就是下面aggregateByKey后的第一个函数,这里的max有两个值,一个是aggregateByKey前面的3,另外一个就是key对应的value,比方a对应的值是1,2,这两个值和3 求最大值,失去的3,所以后果就是a,3。
  4. 两个分区计算后的后果进行相加,也就是下面aggregateByKey后的第二个函数。所以最初的后果就是(b,12)、(a,9)

sortByKey

sortByKey就是依据key进行排序,第一个参数如果是true就是升序,也是默认值,false就是降序。

object SortByKeyRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("SortByKeyRDD").setMaster("local"))    val data = Array(("a", 1), ("c", 3), ("d", 4), ("b", 2))    val rdd1: RDD[(String,Int)] = sc.parallelize(data)    val rdd2: RDD[(String, Int)] = rdd1.sortByKey()    rdd2.foreach(println)  }}

join

相似mysql的join

object JoinRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("JoinRDD").setMaster("local"))    val data1 = Array(("a", 1), ("c", 3), ("d", 4), ("b", 2))    val data2 = Array(("a", 11), ("d", 14), ("e", 25))    val rdd1: RDD[(String, Int)] = sc.parallelize(data1)    val rdd2: RDD[(String, Int)] = sc.parallelize(data2)    val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)    val rdd4: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2)    val rdd5: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)    val rdd6: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)    // 相当于mysql的INNER JOIN    // 后果:    // (d,(4,14))    // (a,(1,11))    println("rdd3:join")    rdd3.foreach(println)    println("rdd4:fullOuterJoin")    // 返回左右数据集的全副数据,左右有一边不存在的数据以None填充    // 后果:    // (d,(Some(4),Some(14)))    //  (e,(None,Some(25)))    // (a,(Some(1),Some(11)))    //  (b,(Some(2),None))    // (c,(Some(3),None))    rdd4.foreach(println)    // 相当于mysql的LEFT JOIN    // 后果:    // (d,(4,Some(14)))    // (a,(1,Some(11)))    // (b,(2,None))    // (c,(3,None))    println("rdd5:leftOuterJoin")    rdd5.foreach(println)    // 相当于mysql的RIGHT JOIN    // 后果:    // (d,(Some(4),14))    // (e,(None,25))    // (a,(Some(1),11))    println("rdd6:rightOuterJoin")    rdd6.foreach(println)  }}

cogroup

对两个RDD中的KV元素,每个RDD中雷同key中的元素别离聚合成一个汇合。
比方上面的例子中,对rdd1的a,拿到的后果是(1, 3),对于rdd2的a,拿到的后果是(23),所以最终的后果就是(a,(CompactBuffer(1, 3),CompactBuffer(23)))。

object CogroupRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("CogroupRDD").setMaster("local"))    val data1 = Array(("a", 1), ("a", 3),("b", 2))    val data2 = Array(("b", 24), ("a", 23),("b", 22))    val rdd1: RDD[(String,Int)] = sc.parallelize(data1)    val rdd2: RDD[(String,Int)] = sc.parallelize(data2)    val rdd3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)    // 后果:    // (a,(CompactBuffer(1, 3),CompactBuffer(23)))    // (b,(CompactBuffer(2),CompactBuffer(24, 22)))    rdd3.foreach(println)  }}

cartesian

就是笛卡尔。

object CartesianRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("CartesianRDD").setMaster("local"))    val data1 = Array(1, 2)    val data2 = Array(3, 4)    val rdd1: RDD[Int] = sc.parallelize(data1)    val rdd2: RDD[Int] = sc.parallelize(data2)    val rdd3: RDD[(Int, Int)] = rdd1.cartesian(rdd2)    // (1,3)    // (1,4)    // (2,3)    // (2,4)    rdd3.foreach(println)  }}

action

reduce

依据传入的函数,对每个值进行计算

object ReduceRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("ReduceRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val result: Int = rdd1.reduce(_ + _)    println(result)  }}

collect

拿到rdd的汇合给driver端,通常确认后果比拟少的时候用。

object CollectRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("CollectRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val result: Array[Int] = rdd1.collect()    println(result.mkString(","))  }}

count

拿到rdd的汇合数量。

object CountRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("CountRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val result: Long = rdd1.count()    println(result)  }}

first

拿到rdd汇合的第一个元素。

object FirstRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("FirstRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val result: Int = rdd1.first()    println(result)  }}

take

拿到后面n个元素,比方上面例子中,传的是3,所以拿到是1,2,3。

object TakeRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("TakeRDD").setMaster("local"))    val data = Array(1, 2, 3, 4, 5)    val rdd1: RDD[Int] = sc.parallelize(data)    val result: Array[Int] = rdd1.take(3)    println(result.mkString(","))  }}

takeSample

随机抽取n个元素。

object TakeSampleRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("TakeSampleRDD").setMaster("local"))    val data = 1 to 100    val rdd1: RDD[Int] = sc.parallelize(data)    val result: Array[Int] = rdd1.takeSample(true, 5, 1)    println(result.mkString(","))  }}

takeOrdered

排序后取N个元素。

object TakeOrderedRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("TakeOrderedRDD").setMaster("local"))    val data = 1 to 100    val rdd1: RDD[Int] = sc.parallelize(data)    val result: Array[Int] = rdd1.takeOrdered(5)    println(result.mkString(","))  }}

countByKey

计算每个key呈现的次数

object CountByKeyRDD {  def main(args: Array[String]): Unit = {    val sc = new SparkContext(new SparkConf().setAppName("CountByKeyRDD").setMaster("local"))    val data = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))    val rdd1: RDD[(String, Int)] = sc.parallelize(data)    val result: collection.Map[String, Long] = rdd1.countByKey()    result.foreach(item => println(item._1 + ":" + item._2))  }}

foreach

遍历,比方下面很多例子的打印。