后面曾经介绍了BypassMergeSortShuffleWriter和UnsafeShuffleWriter两种ShuffleWriter实现,这里开始SortShuffleWriter的解说。
SortShuffleWriter应用ExternalSorter作为排序器,ExternalSorter又蕴含了具备聚合性能的PartitionedAppendOnlyMap和没有聚合性能的PartitionedPairBuffer这两种缓存,所以SortShuffleWriter既有反对排序的性能,也反对聚合的性能。
聚合
聚合是通过PartitionedAppendOnlyMap来解决的,所以记录会迭代输出给PartitionedAppendOnlyMap,上面聚合的例子均为reduce为算法。
PartitionedAppendOnlyMap反对key是null值的,haveNullValue是用来判断是否曾经有了key为null的值,nullValue是用来存储key为null的值。所以没有传入key为null的时候,haveNullValue为false,nullValue为null。
此时records迭代的记录为(null,1),因为haveNullValue为false,间接赋值nullValue为1,并把haveNullValue就改为true。
records新迭代的记录为(null,10),因为haveNullValue为true,则把nullValue的值1拿进去,并跟新值10进行相加,失去的11从新赋值给nullValue。
如果key不为null,PartitionedAppendOnlyMap是有一个数组data来存储的,格局就是key0,value0,key1,value1的模式。key的格局是(partitionId, key)。
records新迭代的记录为(a,1),咱们假如a对应的分区为0,且a在hash及与mask进行与运算后,失去的pos为1,所以数组的2*1=2的地位就是key,因为此时key是空的,key的值为(0,a),2+1的地位,就是(0,a)对应的值1。
2*pos的2,是因为每个值都占有2个地位。
records新迭代的记录为(a,10),此时这个key对应的地位是有值的,所以把旧值1拿进去,和新值10进行相加,失去的11存入key前面的地位。
records新迭代的记录为(b,2),咱们假如b对应的分区为1,且b在hash及与mask进行与运算后,失去的pos为2,所以数组的2*2=4的地位就是key,因为此时key是空的,key的值为(1,b),4+1的地位,就是(1,b)对应的值2。每次往空的key插入数据的时候,都会测验是否扩容。
records新迭代的记录为(b,10),此时这个key对应的地位是有值的,所以把旧值2拿进去,和新值10进行相加,失去的12存入key前面的地位。
records新迭代的记录为(c,3),咱们假如c对应的分区为0,且b在hash及与mask进行与运算后,失去的pos为1,所以数组的2*1=2的地位就是key,因为此时key不为空,且key是(0,a),并不是(0,c),所以pos会进行加1并且与mask进行与运算后,从新获取pos,如果拿到的pos地位还是有其余的key,则pos再加1从新计算,直至pos地位并无其余key或者key为(0,c)。咱们假如最初的地位是6。
每次迭代records后,会看以后的内存是否超过了内存阈值,如果超过了,就会依据公式2 * currentMemory - myMemoryThreshold
尝试获取内存,如果获取内存失败,阐明曾经没有多余的内存能够调配了,这个时候就会进行溢出。
溢出之前,先对data中的数据向前整顿排列,就是往左边的空值进行迁徙,依照分区ID的程序进行从新排序。此时程序就是(0,a),(0,c),(1,b)。
而后再依据整顿后的data以及haveNullValue、nullValue创立迭代器。这个迭代器会先拜访nullValue,而后再迭代data。
每写入1万次,就会有一个flush操作,把输入流中的数据真正写入到磁盘,并记录每个分区的元素个数以及每次写入磁盘的数据大小。
当迭代器迭代完结,就会开释占用的内存,并从新创立一个PartitionedAppendOnlyMap,每次溢出都会记录在spills数组中。
当records的数据迭代结束,就会依据内存的数据和溢出到磁盘的文件进行归并排序,最终合并到一个文件中,并记录每个分区的长度。最初依据分区的长度,生成索引文件。
非聚合
聚合是通过PartitionedPairBuffer来解决的,所以记录会迭代输出给PartitionedPairBuffer,上面聚合的例子均为reduce为算法。
PartitionedPairBuffer里的构造就相似于一个汇合,数据依照程序进行插入,插入的时候也是每次插入2个,一个是kye一个是value,这个和PartitionedAppendOnlyMap一样。
records新迭代的记录为(a,1),咱们假如a对应的分区为0,所以间接在数组里插入(0,a)和1。
records新迭代的记录为(b,2),咱们假如b对应的分区为1,所以间接在数组里插入(1,b)和2。
records新迭代的记录为(c,3),咱们假如c对应的分区为0,所以间接在数组里插入(0,c)和3。
records新迭代的记录为(a,4),咱们假如a对应的分区为0,所以间接在数组里插入(0,a)和4,这里并没有聚合。
溢出、写文件、写索引前面的流程同聚合。