关于spark:Spark-Task的执行过程二-UnsafeShuffleWriter

8次阅读

共计 1452 个字符,预计需要花费 4 分钟才能阅读完成。

上一篇讲了 BypassMergeSortShuffleWriter 实现形式,并且晓得抉择 BypassMergeSortShuffleWriter 的时候,分区数是不能超过 200 的,因为每次执行的时候,会依据分区数量,学生成临时文件,如果分区数很多的话,那就会很有很多的临时文件,磁盘性能就十分不好。
UnsafeShuffleWriter 也是不具备聚合性能的,然而他应用 Tungsten 的内存作为缓存,这样磁盘的性能就失去了大大的晋升。

流程

拿到 RDD 的后果后,迭代每条记录,把记录的键值对写入到 SerializationStream 的输入流中,SerializationStream 是包装了名为 serBuffer 的 MyByteArrayOutputStream 对象。

而后把字节数组交给 ShuffleExternalSorter,一个专门用于对 Shuffle 数据进行排序的内部排序器。内部排序器拿到字节数组,发现此时没有 page(即 MemoryBlock),于是就创立了一个 page,把字节数组放在 page 中。另外须要把创立的 page 退出到曾经调配的 Page 列表 allocatedPages,并把创立的 page 作为 currentPage。

同时还要记录元数据信息,寄存在 ShuffleInMemorySorter 的长整型数组中用于排序,其中高 24 位存储分区 ID,两头 13 位存储页号,低 27 位存储偏移量。

反复着下面的步骤,迭代 RDD 每条记录,直至 page 没有足够的空间。

此时就要申请调配新的 Page,把这个 page 退出到曾经调配的 Page 列表 allocatedPages,并把新创建的 page 作为 currentPage,新的字节数组就往新的 page 里放。

RDD 的后果持续迭代,ShuffleInMemorySorter 中的记录数超过设定的值,或者长整型数组曾经不够用然而无奈扩大的时候,就须要内存中的数据将被溢出到磁盘。
首先是对内存中的数据依据分区进行排序,咱们上面用 0 或 1 示意分区,XY 标识页号和偏移量。通过排序后,内存中的数据就依据分区有序的排列起来。

而后依据分区,把数据写入文件中,并把文件信息写在 SpillInfo 中,SpillInfo 记录了这个文件的信息、blockId 信息以及每个分区对应文件里的长度(比方 4,4)。这个 SpillInfo 最终寄存在元数据信息的列表 spills 中。

数据都保留在磁盘后,那内存中的数据就能够革除回收了,包含曾经调配的 Page 列表 allocatedPages、每个 page、currentPage、ShuffleInMemorySorter。

假如 RDD 的后果在写完 3 次 file 时完结了,那此时 spills 列表就有 3 个 SpillInfo,每个 SpillInfo 都指向着文件,并且记录每个分区对应的长度(假如为 (4,4),(3,2),(5,1))。

最终依据 spilss 列表中的 3 个 SpillInfo,依据每个分区对应的长度,把每个文件分区 0 的内容写入到新的文件,并记录分区 0 的长度为 12,再把每个文件分区 1 的内容写入到新的文件,并记录分区 1 的长度为 7,依据两个分区的长度,生成索引文件。

和 BypassMergeSortShuffleWriter 比起来,假如有 1 万个分区,BypassMergeSortShuffleWriter 是会学生成 1 万个临时文件,最初再进行合并,而 UnsafeShuffleWriter 是依据 RDD 后果集的数据大小,生成临时文件,大大减少了临时文件的个数。

正文完
 0