关于spark:Spark-Task的执行过程三-SortShuffleWriter

34次阅读

共计 2341 个字符,预计需要花费 6 分钟才能阅读完成。

后面曾经介绍了 BypassMergeSortShuffleWriter 和 UnsafeShuffleWriter 两种 ShuffleWriter 实现,这里开始 SortShuffleWriter 的解说。
SortShuffleWriter 应用 ExternalSorter 作为排序器,ExternalSorter 又蕴含了具备聚合性能的 PartitionedAppendOnlyMap 和没有聚合性能的 PartitionedPairBuffer 这两种缓存,所以 SortShuffleWriter 既有反对排序的性能,也反对聚合的性能。

聚合

聚合是通过 PartitionedAppendOnlyMap 来解决的,所以记录会迭代输出给 PartitionedAppendOnlyMap,上面聚合的例子均为 reduce 为算法。

PartitionedAppendOnlyMap 反对 key 是 null 值的,haveNullValue 是用来判断是否曾经有了 key 为 null 的值,nullValue 是用来存储 key 为 null 的值。所以没有传入 key 为 null 的时候,haveNullValue 为 false,nullValue 为 null。

此时 records 迭代的记录为 (null,1),因为 haveNullValue 为 false,间接赋值 nullValue 为 1,并把 haveNullValue 就改为 true。

records 新迭代的记录为(null,10),因为 haveNullValue 为 true,则把 nullValue 的值 1 拿进去,并跟新值 10 进行相加,失去的 11 从新赋值给 nullValue。

如果 key 不为 null,PartitionedAppendOnlyMap 是有一个数组 data 来存储的,格局就是 key0,value0,key1,value1 的模式。key 的格局是 (partitionId, key)。

records 新迭代的记录为(a,1),咱们假如 a 对应的分区为 0,且 a 在 hash 及与 mask 进行与运算后,失去的 pos 为 1,所以数组的 2 *1= 2 的地位就是 key,因为此时 key 是空的,key 的值为(0,a),2+ 1 的地位,就是(0,a) 对应的值 1。
2*pos 的 2,是因为每个值都占有 2 个地位。

records 新迭代的记录为 (a,10),此时这个 key 对应的地位是有值的,所以把旧值 1 拿进去,和新值 10 进行相加,失去的 11 存入 key 前面的地位。

records 新迭代的记录为(b,2),咱们假如 b 对应的分区为 1,且 b 在 hash 及与 mask 进行与运算后,失去的 pos 为 2,所以数组的 2 *2= 4 的地位就是 key,因为此时 key 是空的,key 的值为(1,b),4+ 1 的地位,就是(1,b) 对应的值 2。每次往空的 key 插入数据的时候,都会测验是否扩容。

records 新迭代的记录为 (b,10),此时这个 key 对应的地位是有值的,所以把旧值 2 拿进去,和新值 10 进行相加,失去的 12 存入 key 前面的地位。

records 新迭代的记录为(c,3),咱们假如 c 对应的分区为 0,且 b 在 hash 及与 mask 进行与运算后,失去的 pos 为 1,所以数组的 2 *1= 2 的地位就是 key,因为此时 key 不为空,且 key 是(0,a),并不是(0,c),所以 pos 会进行加 1 并且与 mask 进行与运算后,从新获取 pos,如果拿到的 pos 地位还是有其余的 key,则 pos 再加 1 从新计算,直至 pos 地位并无其余 key 或者 key 为(0,c)。咱们假如最初的地位是 6。

每次迭代 records 后,会看以后的内存是否超过了内存阈值,如果超过了,就会依据公式 2 * currentMemory - myMemoryThreshold 尝试获取内存,如果获取内存失败,阐明曾经没有多余的内存能够调配了,这个时候就会进行溢出。
溢出之前,先对 data 中的数据向前整顿排列,就是往左边的空值进行迁徙,依照分区 ID 的程序进行从新排序。此时程序就是 (0,a),(0,c),(1,b)。

而后再依据整顿后的 data 以及 haveNullValue、nullValue 创立迭代器。这个迭代器会先拜访 nullValue,而后再迭代 data。

每写入 1 万次,就会有一个 flush 操作,把输入流中的数据真正写入到磁盘,并记录每个分区的元素个数以及每次写入磁盘的数据大小。
当迭代器迭代完结,就会开释占用的内存,并从新创立一个 PartitionedAppendOnlyMap,每次溢出都会记录在 spills 数组中。

当 records 的数据迭代结束,就会依据内存的数据和溢出到磁盘的文件进行归并排序,最终合并到一个文件中,并记录每个分区的长度。最初依据分区的长度,生成索引文件。

非聚合

聚合是通过 PartitionedPairBuffer 来解决的,所以记录会迭代输出给 PartitionedPairBuffer,上面聚合的例子均为 reduce 为算法。
PartitionedPairBuffer 里的构造就相似于一个汇合,数据依照程序进行插入,插入的时候也是每次插入 2 个,一个是 kye 一个是 value,这个和 PartitionedAppendOnlyMap 一样。

records 新迭代的记录为 (a,1),咱们假如 a 对应的分区为 0,所以间接在数组里插入(0,a) 和 1。

records 新迭代的记录为 (b,2),咱们假如 b 对应的分区为 1,所以间接在数组里插入(1,b) 和 2。

records 新迭代的记录为 (c,3),咱们假如 c 对应的分区为 0,所以间接在数组里插入(0,c) 和 3。

records 新迭代的记录为 (a,4),咱们假如 a 对应的分区为 0,所以间接在数组里插入(0,a) 和 4,这里并没有聚合。

溢出、写文件、写索引前面的流程同聚合。

正文完
 0