1.Spark 计算依赖内存,如果目前只有 10g 内存,然而须要将 500G 的文件排序并输入,须要如何操作?
①、把磁盘上的 500G 数据宰割为 100 块(chunks),每份 5GB。(留神,要留一些零碎空间!)
②、程序将每份 5GB 数据读入内存,应用 quick sort 算法排序。
③、把排序好的数据(也是 5GB)寄存回磁盘。
④、循环 100 次,当初,所有的 100 个块都曾经各自排序了。(剩下的工作就是如何把它们合并排序!)
⑤、从 100 个块中别离读取 5G/100=0.05 G 入内存(100input buffers)。
⑥、执行 100 路合并,并将合并后果长期存储于 5g 基于内存的输入缓冲区中。当缓冲区写满 5GB 时,写入硬盘上最终文件,并清空输入缓冲区;当 100 个输出缓冲区中任何一个处理完毕时,写入该缓冲区所对应的块中的下一个 0.05 GB,直到全副解决实现。
2.countByValue 和 countByKey 的区别
首先从源码角度来看:
// PairRDDFunctions.scala
def countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}
// RDD.scala
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()}
countByValue(RDD.scala)
- 作用在一般的
RDD
上 - 其实现过程调用了
countByKey
countByKey(PairRDDFunctions.scala)
- 作用在 PairRDD 上
- 对 key 进行计数
- 数据要收到 Driver 端,后果集大时,不实用
问题:
countByKey
能够作用在 一般的RDD
上吗countByValue
能够作用在PairRDD
上吗
val rdd1: RDD[Int] = sc.makeRDD(1 to 10)
val rdd2: RDD[(Int, Int)] = sc.makeRDD((1 to 10).toList.zipWithIndex)
val result1 = rdd1.countByValue() // 能够
val result2 = rdd1.countByKey() // 语法错误
val result3 = rdd2.countByValue() // 能够
val result4 = rdd2.countByKey() // 能够
3. 两个 rdd join 什么时候有 shuffle 什么时候没有 shuffle
其中 join 操作是考验所有数据库性能的一项重要指标,对于 Spark 来说,考验 join 的性能就是 Shuffle,Shuffle 须要通过磁盘和网络传输,Shuffle 数据越少性能越好,有时候能够尽量避免程序进行 Shuffle , 那么什么状况下有 Shuffle,什么状况下没有 Shuffle 呢
3.1 Broadcast join
broadcast join 比拟好了解,除了本人实现外,Spark SQL
曾经帮咱们默认来实现了,其实就是小表散发到所有Executors
,控制参数是:spark.sql.autoBroadcastJoinThreshold
默认大小是 10m, 即小于这个阈值即主动应用broadcast join
.
3.2 Bucket join
其实 rdd 形式和 table 相似,不同的是后者要写入 Bucket 表,这里次要讲 rdd 的形式,原理就是,当两个 rdd 依据雷同分区形式,事后做好分区,分区后果是统一的,这样就能够进行 Bucket join, 另外这种 join 没有事后的算子,须要在写程序时候本人来开发,对于表的这种 join 能够看一下 字节跳动在 Spark SQL 上的外围优化实际。能够看下上面的例子
rdd1、rdd2 都是 Pair RDD
rdd1、rdd2 的数据完全相同
肯定有 shuffle
rdd1 => 5 个分区
rdd2 => 6 个分区
rdd1 => 5 个分区 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
rdd2 => 5 个分区 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
肯定没有 shuffle
rdd1 => 5 个分区 =>(1,0),(1,0),(1,0),(1,0),(1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
rdd2 => 5 个分区 =>(1,0),(1,0),(1,0),(1,0),(1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
这样所有 Shuffle
的算子,如果数据提前做好了分区(partitionBy
),很多状况下没有Shuffle
.
除下面两种形式外,个别就是有 Shuffle
的join
, 对于 spark 的 join 原理能够查看:大数据开发 -Spark Join 原理详解
4..transform 是不是肯定不触发 action
有个算子例外,那就是 sortByKey, 其底层有个抽样算法,水塘抽样,最初须要依据抽样的后果,进行 RangePartition 的, 所以从 job 角度来说会看到两个 job,除了触发 action 的自身算子之外,记住上面的
sortByKey → 水塘抽样→ collect
5. 播送变量是怎么设计的
咱们都晓得,播送变量是把数据放到每个 excutor 上,也都晓得播送变量的数据肯定是从 driver 开始进来的,什么意思呢,如果播送表放在 hive 表中,那么它的存储就是在各个 block 块上,也对应多个 excutor (不一样的叫法),首先将数据拉到 driver 上,而后再进行播送,播送时候不是全副播送,是依据 excutor 事后用到数据的,首先拿数据,而后通过 bt 协定进行传输,什么是 bt 协定呢,就是数据在分布式点对点网络上,依据网络间隔来去拉对应的数据,下载者也是上传者,这样就不同每个 task(excutor)都从 driver 上来拉数据,这样就缩小了压力,另外在 spark1. 几的时候还是 task 级别,当初是独特的一个锁,整个 excutor 上的 task 共享这份数据。
参考
https://juejin.cn/post/6844903989557854216
https://www.jianshu.com/p/6bf887bf52b2
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注