1.Spark计算依赖内存,如果目前只有10g内存,然而须要将500G的文件排序并输入,须要如何操作?
①、把磁盘上的500G数据宰割为100块(chunks),每份5GB。(留神,要留一些零碎空间!)
②、程序将每份5GB数据读入内存,应用quick sort算法排序。
③、把排序好的数据(也是5GB)寄存回磁盘。
④、循环100次,当初,所有的100个块都曾经各自排序了。(剩下的工作就是如何把它们合并排序!)
⑤、从100个块中别离读取5G/100=0.05 G入内存(100input buffers)。
⑥、执行100路合并,并将合并后果长期存储于5g基于内存的输入缓冲区中。当缓冲区写满5GB时,写入硬盘上最终文件,并清空输入缓冲区;当100个输出缓冲区中任何一个处理完毕时,写入该缓冲区所对应的块中的下一个0.05 GB,直到全副解决实现。
2.countByValue和countByKey的区别
首先从源码角度来看:
// PairRDDFunctions.scaladef countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}// RDD.scaladef countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope { map(value => (value, null)).countByKey()}
countByValue(RDD.scala)
- 作用在一般的
RDD
上 - 其实现过程调用了
countByKey
countByKey(PairRDDFunctions.scala)
- 作用在 PairRDD 上
- 对 key 进行计数
- 数据要收到Driver端,后果集大时,不实用
问题:
countByKey
能够作用在 一般的RDD
上吗countByValue
能够作用在PairRDD
上吗
val rdd1: RDD[Int] = sc.makeRDD(1 to 10)val rdd2: RDD[(Int, Int)] = sc.makeRDD((1 to 10).toList.zipWithIndex)val result1 = rdd1.countByValue() //能够val result2 = rdd1.countByKey() //语法错误val result3 = rdd2.countByValue() //能够val result4 = rdd2.countByKey() //能够
3.两个rdd join 什么时候有shuffle什么时候没有shuffle
其中join操作是考验所有数据库性能的一项重要指标,对于Spark来说,考验join的性能就是Shuffle,Shuffle 须要通过磁盘和网络传输,Shuffle数据越少性能越好,有时候能够尽量避免程序进行Shuffle ,那么什么状况下有Shuffle ,什么状况下没有Shuffle 呢
3.1 Broadcast join
broadcast join 比拟好了解,除了本人实现外,Spark SQL
曾经帮咱们默认来实现了,其实就是小表散发到所有Executors
,控制参数是:spark.sql.autoBroadcastJoinThreshold
默认大小是10m, 即小于这个阈值即主动应用broadcast join
.
3.2 Bucket join
其实rdd形式和table相似,不同的是后者要写入Bucket表,这里次要讲rdd的形式,原理就是,当两个rdd依据雷同分区形式,事后做好分区,分区后果是统一的,这样就能够进行Bucket join, 另外这种join没有事后的算子,须要在写程序时候本人来开发,对于表的这种join能够看一下 字节跳动在Spark SQL上的外围优化实际 。能够看下上面的例子
rdd1、rdd2都是Pair RDD
rdd1、rdd2的数据完全相同
肯定有shuffle
rdd1 => 5个分区
rdd2 => 6个分区
rdd1 => 5个分区 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
rdd2 => 5个分区 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
肯定没有shuffle
rdd1 => 5个分区 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
rdd2 => 5个分区 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
这样所有Shuffle
的算子,如果数据提前做好了分区(partitionBy
),很多状况下没有Shuffle
.
除下面两种形式外,个别就是有Shuffle
的join
, 对于spark的join原理能够查看:大数据开发-Spark Join原理详解
4..transform 是不是肯定不触发action
有个算子例外,那就是sortByKey,其底层有个抽样算法,水塘抽样,最初须要依据抽样的后果,进行RangePartition的,所以从job角度来说会看到两个job,除了触发action的自身算子之外,记住上面的
sortByKey → 水塘抽样→ collect
5.播送变量是怎么设计的
咱们都晓得,播送变量是把数据放到每个excutor上,也都晓得播送变量的数据肯定是从driver开始进来的,什么意思呢,如果播送表放在hive表中,那么它的存储就是在各个block块上,也对应多个excutor (不一样的叫法),首先将数据拉到driver上,而后再进行播送,播送时候不是全副播送,是依据excutor事后用到数据的,首先拿数据,而后通过bt协定进行传输,什么是bt协定呢,就是数据在分布式点对点网络上,依据网络间隔来去拉对应的数据,下载者也是上传者,这样就不同每个task (excutor)都从driver上来拉数据,这样就缩小了压力,另外在spark1.几的时候还是task级别,当初是独特的一个锁,整个excutor上的task共享这份数据。
参考
https://juejin.cn/post/6844903989557854216
https://www.jianshu.com/p/6bf887bf52b2
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注