关于spark:Spark-宽依赖和窄依赖

40次阅读

共计 3771 个字符,预计需要花费 10 分钟才能阅读完成。

后面 Standalone 模式下的 Master 对资源的调度,是第一层调度。为了前面第二层调度解说,这里先用一个例子作为铺垫,也顺便讲一下宽依赖和窄依赖。所波及的 RDD 应用之前曾经讲过了,这里不做赘述。

join

这里有两个 RDD,分区都是 2,通过 join 转换为 rdd3,为了更直观的看出每个数据在哪个分区,上面都通过 p+ 数字 的模式打印进去。

val sc = new SparkContext(new SparkConf().setAppName("MixRDD").setMaster("local"))
sc.setLogLevel("ERROR")
val rdd1: RDD[(String, Int)] = sc.parallelize(List(("a", 1), ("b", 1), ("c", 1), ("d", 1)), 2)
val rdd2: RDD[(String, Int)] = sc.parallelize(List(("a", 2), ("d", 2), ("e", 2), ("f", 2)), 2)
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
// rdd1:ArrayBuffer((a,1), (b,1), (c,1), (d,1))
println("rdd1:" + rdd1.collect().toBuffer)
// rdd2:ArrayBuffer((a,2), (d,2), (e,2), (f,2))
println("rdd2:" + rdd2.collect().toBuffer)
// rdd3:ArrayBuffer((d,(1,2)), (a,(1,2)))
println("rdd3:" + rdd3.collect().toBuffer)
val p1: RDD[String] = rdd1.mapPartitionsWithIndex(f1)
val p2: RDD[String] = rdd2.mapPartitionsWithIndex(f1)
val p3: RDD[String] = rdd3.mapPartitionsWithIndex(f2)
// p1:ArrayBuffer(a:0, b:0, c:1, d:1)
println("p1:" + p1.collect().toBuffer)
// p2:ArrayBuffer(a:0, d:0, e:1, f:1)
println("p2:" + p2.collect().toBuffer)
// p3:ArrayBuffer(d:0, a:1)
println("p3:" + p3.collect().toBuffer)

下面的代码的流程图如下,能够看到 RDD1 的分区 0 连着 RDD3 的分区 1,RDD1 的分区 1 连着 RDD2 的分区 0(图例没有分区数字,从上到下递增,0 开始),RDD2 同理。
RDD1 的每个分区都只有一个“儿子”,也就是说 RDD1 的每个分区仅被一个 RDD3 的一个分区依赖,这样咱们能够称为窄依赖。

map

这里有一个 RDD4,通过 map 失去 RDD5,比方 RDD4 的一个元素 ”a”,变成(“a”,4)。

val rdd4: RDD[String] = sc.parallelize(List("a", "b", "c", "d"), 2)
val rdd5: RDD[(String, Int)] = rdd4.map((_, 4))
// rdd4:ArrayBuffer(a, b, c, d)
println("rdd4:" + rdd4.collect().toBuffer)
// rdd5:ArrayBuffer((a,3), (b,3), (c,3), (d,3))
println("rdd5:" + rdd5.collect().toBuffer)
val p4: RDD[String] = rdd4.mapPartitionsWithIndex(f3)
val p5: RDD[String] = rdd5.mapPartitionsWithIndex(f1)
// p4:ArrayBuffer(a:0, b:0, c:1, d:1)
println("p4:" + p4.collect().toBuffer)
// p5:ArrayBuffer(a:0, b:0, c:1, d:1)
println("p5:" + p5.collect().toBuffer)

下面的代码的流程图如下,能够看到 RDD4 的分区 0 连着 RDD5 的分区 0,RDD4 的分区 1 连着 RDD5 的分区 1,因为 RDD4 的每个分区都只有一个“儿子”,也就是说 RDD4 的每个分区仅被一个 RDD5 的一个分区依赖,这样咱们也能够称为窄依赖。

union

这里有一个 RDD6,通过 union 下面失去的 RDD5,失去一个新的 RDD7。

val rdd6: RDD[(String, Int)] = sc.parallelize(List(("a", 6), ("d", 6), ("e", 6), ("f", 6)), 2)
val rdd7: RDD[(String, Int)] = rdd5.union(rdd6)
// rdd6:ArrayBuffer((a,6), (d,6), (e,6), (f,6))
println("rdd6:" + rdd6.collect().toBuffer)
// rdd7:ArrayBuffer((a,4), (b,4), (c,4), (d,4), (a,6), (d,6), (e,6), (f,6))
println("rdd7:" + rdd7.collect().toBuffer)
val p6: RDD[String] = rdd6.mapPartitionsWithIndex(f1)
val p7: RDD[String] = rdd7.mapPartitionsWithIndex(f1)
// p6:ArrayBuffer(a:0, d:0, e:1, f:1)
println("p6:" + p6.collect().toBuffer)
// p7:ArrayBuffer(a:0, b:0, c:1, d:1, a:2, d:2, e:3, f:3)
println("p7:" + p7.collect().toBuffer)

下面的代码的流程图如下,能够看到 RDD5 的分区 0 连着 RDD7 的分区 0,RDD5 的分区 1 连着 RDD7 的分区 1,因为 RDD5 的每个分区都只有一个“儿子”,也就是说 RDD5 的每个分区仅被一个 RDD7 的一个分区依赖,RDD6 同理,这样咱们也能够称为窄依赖。

所以,只有有一个“儿子”,或者仅被一个分区依赖咱们称为窄依赖。

fullOuterJoin

通过 fullOuterJoin 把 join 的后果和 union 的后果连接起来,并且分区数设置为 4。

val rdd8: RDD[(String, (Option[Int], Option[(Int, Int)]))] = rdd7.fullOuterJoin(rdd3, 4)
// rdd8:ArrayBuffer((d,(Some(4),Some((1,2)))), (d,(Some(6),Some((1,2)))), (e,(Some(6),None)), (a,(Some(4),Some((1,2)))), (a,(Some(6),Some((1,2)))), (b,(Some(4),None)), (f,(Some(6),None)), (c,(Some(4),None)))
println("rdd8:" + rdd8.collect().toBuffer)
val p8: RDD[String] = rdd8.mapPartitionsWithIndex(f4)
// p8:ArrayBuffer(d:0, d:0, e:1, a:1, a:1, b:2, f:2, c:3)
println("p8:" + p8.collect().toBuffer)

下面的代码的流程图如下,能够看到 RDD7 的分区 0 连着 RDD8 的分区 1,RDD7 的分区 0 还连着 RDD8 的分区 2。RDD7 的分区 1 连着 RDD8 的分区 0,RDD7 的分区 1 还连着 RDD8 的分区 3。RDD7 的分区 2 连着 RDD8 的分区 0,RDD7 的分区 1 还连着 RDD8 的分区 1。
所以 RDD7 的分区 0、分区 1、分区 2 都不止一个“儿子”,也就是说 RDD7 的分区 0、分区 1、分区 2 被 RDD8 的多个分区依赖,所以这种叫做宽依赖。

其余代码

val f1: (Int, Iterator[(String, Int)]) => Iterator[String] = (x, y) => {(y.toList.map(z => z._1 + ":" + x)).iterator
}

val f2: (Int, Iterator[(String, (Int, Int))]) => Iterator[String] = (x, y) => {(y.toList.map(z => z._1 + ":" + x)).iterator
}

val f3: (Int, Iterator[String]) => Iterator[String] = (x, y) => {(y.toList.map(z => z + ":" + x)).iterator
}

val f4: (Int, Iterator[(String, (Option[Int], Option[(Int, Int)]))]) => Iterator[String] = (x, y) => {(y.toList.map(z => z._1 + ":" + x)).iterator
}

正文完
 0