后面Standalone模式下的Master对资源的调度,是第一层调度。为了前面第二层调度解说,这里先用一个例子作为铺垫,也顺便讲一下宽依赖和窄依赖。所波及的RDD应用之前曾经讲过了,这里不做赘述。
join
这里有两个RDD,分区都是2,通过join转换为rdd3,为了更直观的看出每个数据在哪个分区,上面都通过p+数字
的模式打印进去。
val sc = new SparkContext(new SparkConf().setAppName("MixRDD").setMaster("local"))sc.setLogLevel("ERROR")val rdd1: RDD[(String, Int)] = sc.parallelize(List(("a", 1), ("b", 1), ("c", 1), ("d", 1)), 2)val rdd2: RDD[(String, Int)] = sc.parallelize(List(("a", 2), ("d", 2), ("e", 2), ("f", 2)), 2)val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)// rdd1:ArrayBuffer((a,1), (b,1), (c,1), (d,1))println("rdd1:" + rdd1.collect().toBuffer)// rdd2:ArrayBuffer((a,2), (d,2), (e,2), (f,2))println("rdd2:" + rdd2.collect().toBuffer)// rdd3:ArrayBuffer((d,(1,2)), (a,(1,2)))println("rdd3:" + rdd3.collect().toBuffer)val p1: RDD[String] = rdd1.mapPartitionsWithIndex(f1)val p2: RDD[String] = rdd2.mapPartitionsWithIndex(f1)val p3: RDD[String] = rdd3.mapPartitionsWithIndex(f2)// p1:ArrayBuffer(a:0, b:0, c:1, d:1)println("p1:" + p1.collect().toBuffer)// p2:ArrayBuffer(a:0, d:0, e:1, f:1)println("p2:" + p2.collect().toBuffer)// p3:ArrayBuffer(d:0, a:1)println("p3:" + p3.collect().toBuffer)
下面的代码的流程图如下,能够看到RDD1的分区0连着RDD3的分区1,RDD1的分区1连着RDD2的分区0(图例没有分区数字,从上到下递增,0开始),RDD2同理。
RDD1的每个分区都只有一个“儿子”,也就是说RDD1的每个分区仅被一个RDD3的一个分区依赖,这样咱们能够称为窄依赖。
map
这里有一个RDD4,通过map失去RDD5,比方RDD4的一个元素"a",变成("a",4)。
val rdd4: RDD[String] = sc.parallelize(List("a", "b", "c", "d"), 2)val rdd5: RDD[(String, Int)] = rdd4.map((_, 4))// rdd4:ArrayBuffer(a, b, c, d)println("rdd4:" + rdd4.collect().toBuffer)// rdd5:ArrayBuffer((a,3), (b,3), (c,3), (d,3))println("rdd5:" + rdd5.collect().toBuffer)val p4: RDD[String] = rdd4.mapPartitionsWithIndex(f3)val p5: RDD[String] = rdd5.mapPartitionsWithIndex(f1)// p4:ArrayBuffer(a:0, b:0, c:1, d:1)println("p4:" + p4.collect().toBuffer)// p5:ArrayBuffer(a:0, b:0, c:1, d:1)println("p5:" + p5.collect().toBuffer)
下面的代码的流程图如下,能够看到RDD4的分区0连着RDD5的分区0,RDD4的分区1连着RDD5的分区1,因为RDD4的每个分区都只有一个“儿子”,也就是说RDD4的每个分区仅被一个RDD5的一个分区依赖,这样咱们也能够称为窄依赖。
union
这里有一个RDD6,通过union下面失去的RDD5,失去一个新的RDD7。
val rdd6: RDD[(String, Int)] = sc.parallelize(List(("a", 6), ("d", 6), ("e", 6), ("f", 6)), 2)val rdd7: RDD[(String, Int)] = rdd5.union(rdd6)// rdd6:ArrayBuffer((a,6), (d,6), (e,6), (f,6))println("rdd6:" + rdd6.collect().toBuffer)// rdd7:ArrayBuffer((a,4), (b,4), (c,4), (d,4), (a,6), (d,6), (e,6), (f,6))println("rdd7:" + rdd7.collect().toBuffer)val p6: RDD[String] = rdd6.mapPartitionsWithIndex(f1)val p7: RDD[String] = rdd7.mapPartitionsWithIndex(f1)// p6:ArrayBuffer(a:0, d:0, e:1, f:1)println("p6:" + p6.collect().toBuffer)// p7:ArrayBuffer(a:0, b:0, c:1, d:1, a:2, d:2, e:3, f:3)println("p7:" + p7.collect().toBuffer)
下面的代码的流程图如下,能够看到RDD5的分区0连着RDD7的分区0,RDD5的分区1连着RDD7的分区1,因为RDD5的每个分区都只有一个“儿子”,也就是说RDD5的每个分区仅被一个RDD7的一个分区依赖,RDD6同理,这样咱们也能够称为窄依赖。
所以,只有有一个“儿子”,或者仅被一个分区依赖咱们称为窄依赖。
fullOuterJoin
通过fullOuterJoin把join的后果和union的后果连接起来,并且分区数设置为4。
val rdd8: RDD[(String, (Option[Int], Option[(Int, Int)]))] = rdd7.fullOuterJoin(rdd3, 4)// rdd8:ArrayBuffer((d,(Some(4),Some((1,2)))), (d,(Some(6),Some((1,2)))), (e,(Some(6),None)), (a,(Some(4),Some((1,2)))), (a,(Some(6),Some((1,2)))), (b,(Some(4),None)), (f,(Some(6),None)), (c,(Some(4),None)))println("rdd8:" + rdd8.collect().toBuffer)val p8: RDD[String] = rdd8.mapPartitionsWithIndex(f4)// p8:ArrayBuffer(d:0, d:0, e:1, a:1, a:1, b:2, f:2, c:3)println("p8:" + p8.collect().toBuffer)
下面的代码的流程图如下,能够看到RDD7的分区0连着RDD8的分区1,RDD7的分区0还连着RDD8的分区2。RDD7的分区1连着RDD8的分区0,RDD7的分区1还连着RDD8的分区3。RDD7的分区2连着RDD8的分区0,RDD7的分区1还连着RDD8的分区1。
所以RDD7的分区0、分区1、分区2都不止一个“儿子”,也就是说RDD7的分区0、分区1、分区2被RDD8的多个分区依赖,所以这种叫做宽依赖。
其余代码
val f1: (Int, Iterator[(String, Int)]) => Iterator[String] = (x, y) => { (y.toList.map(z => z._1 + ":" + x)).iterator}val f2: (Int, Iterator[(String, (Int, Int))]) => Iterator[String] = (x, y) => { (y.toList.map(z => z._1 + ":" + x)).iterator}val f3: (Int, Iterator[String]) => Iterator[String] = (x, y) => { (y.toList.map(z => z + ":" + x)).iterator}val f4: (Int, Iterator[(String, (Option[Int], Option[(Int, Int)]))]) => Iterator[String] = (x, y) => { (y.toList.map(z => z._1 + ":" + x)).iterator}