一. MapReduce 并行度机制
1. MapTask 并行度机制
1. 概念
MapTask的并行度指的是map阶段有多少个并行的task独特解决工作。map阶段的工作解决并行度,势必影响到整个job的处理速度。那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?
2. 原理机制
一个MapReducejob的map阶段并行度由客户端在提交job时决定,即客户端提交job之前会看待解决数据进行逻辑切片。切片实现会造成切片布局文件(job.split),每个逻辑切片最终对应启动一个maptask。
逻辑切片机制由FileInputFormat实现类的getSplits() 办法实现。
FileInputFormat中默认的切片机制:
- 简略地依照文件的内容长度进行切片
- 切片大小,默认等于block大小
切片时不思考数据集整体,而是一一针对每一个文件独自切片
比方待处理数据有两个文件:
file1.txt 320M
file2.txt 10M
通过FileInputFormat的切片机制运算后,造成的切片信息如下:
file1.txt.split1—0M~128M
file1.txt.split2—128M~256M
file1.txt.split3—256M~320M
file2.txt.split1—0M~10M3. 相干参数 优化
在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));
切片次要由这几个值来运算决定:
minsize:默认值:1
配置参数: mapreduce.input.fileinputformat.split.minsize
maxsize:默认值:Long.MAXValue
配置参数:mapreduce.input.fileinputformat.split.maxsize
blocksize
因而,默认状况下,split size=block size,在hadoop 2.x中为128M。
然而,不论怎么调参数,都不能让多个小文件“划入”一个split。
还有个细节就是:
当bytesRemaining/splitSize > 1.1不满足的话,那么最初所有残余的会作为一个切片。从而不会造成例如129M文件布局成两个切片的场面。
2. ReduceTask 并行度机制
reducetask并行度同样影响整个job的执行并发度和执行效率,与maptask的并发数由切片数决定不同,Reducetask数量的决定是能够间接手动设置:
job.setNumReduceTasks(4);
如果数据分布不平均,就有可能在reduce阶段产生数据歪斜。
留神: reducetask数量并不是任意设置,还要思考业务逻辑需要,有些状况下,须要计算全局汇总后果,就只能有1个reducetask。
二. MapReduce工作流程详解
1. MapTask 工作机制
1. 流程图
2. 执行步骤
整个Map阶段流程大体如上图所示。简略概述:input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户本人实现的)进行解决,数据被map解决完结之后交给OutputCollector收集器,对其后果key进行分区(默认应用hash分区),而后写入buffer,每个map task都有一个内存缓冲区,存储着map的输入后果,当缓冲区快满的时候须要将缓冲区的数据以一个临时文件的形式寄存到磁盘,当整个map task完结后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输入文件,而后期待reduce task来拉数据。
具体步骤:
- 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits办法对输出目录中文件进行逻辑切片布局失去splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
- 将输出文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key示意每行首字符偏移值,value示意这一行文本内容。
- 读取split返回<key,value>,进入用户本人继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。
- map逻辑完之后,将map的每条后果通过context.write进行collect数据收集。在collect中,会先对其进行分区解决,默认应用HashPartitioner。MapReduce提供Partitioner接口,它的作用就是依据key或value及reduce的数量来决定以后的这对输入数据最终应该交由哪个reduce task解决。默认对key hash后再以reduce task数量取模。默认的取模形式只是为了均匀reduce的解决能力,如果用户本人对Partitioner有需要,能够订制并设置到job上。
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map后果,缩小磁盘IO的影响。咱们的key/value对以及Partition的后果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。环形缓冲区其实是一个数组,数组中寄存着key、value的序列化数据和key、value的元数据信息,包含partition、key的起始地位、value的起始地位以及value的长度。环形构造是一个抽象概念。缓冲区是有大小限度,默认是100MB。当map task的输入后果很多时,就可能会撑爆内存,所以须要在肯定条件下将缓冲区中的数据长期写入磁盘,而后从新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由独自线程来实现,不影响往缓冲区写map后果的线程。溢写线程启动时不应该阻止map的后果输入,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据曾经达到阈值(buffer size spill percent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输入后果还能够往剩下的20MB内存中写,互不影响。
- 当溢写线程启动后,须要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。如果job设置过Combiner,那么当初就是应用Combiner的时候了。将有雷同key的key/value对的value加起来,缩小溢写到磁盘的数据量。Combiner会优化MapReduce的两头后果,所以它在整个模型中会屡次应用。那哪些场景能力应用Combiner呢?从这里剖析,Combiner的输入是Reducer的输出,Combiner绝不能扭转最终的计算结果。Combiner只应该用于那种Reduce的输出key/value与输入key/value类型完全一致,且不影响最终后果的场景。比方累加,最大值等。Combiner的应用肯定得谨慎,如果用好,它对job执行效率有帮忙,反之会影响reduce的最终后果。
- 每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输入后果真的很大,有屡次这样的溢写产生,磁盘上相应的就会有多个临时文件存在。当整个数据处理完结之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
至此map整个阶段完结。
2. ReduceTask 工作机制
1. 流程图
2. 执行步骤
Reduce大抵分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段蕴含一个eventFetcher来获取已实现的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,别离为inMemoryMerger和onDiskMerger,别离将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy实现之后,copy阶段就实现了,开始进行sort阶段,sort阶段次要是执行finalMerge操作,纯正的sort阶段,实现之后就是reduce阶段,调用用户定义的reduce函数进行解决。
具体步骤:
- Copy阶段,简略地拉取数据。Reduce过程启动一些数据copy线程(Fetcher),通过HTTP形式申请maptask获取属于本人的文件。
- Merge阶段。这里的merge如map端的merge动作,只是数组中寄存的是不同map端copy来的数值。Copy过去的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵便。merge有三种模式:内存到内存;内存到磁盘;磁盘到磁盘。默认状况下第一种模式不启用。当内存中的数据量达到肯定阈值,就启动内存到磁盘的merge。与map 端相似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,而后在磁盘中生成了泛滥的溢写文件。第二种merge形式始终在运行,直到没有map端的数据时才完结,而后启动第三种磁盘到磁盘的merge形式生成最终的文件。
对排序后的键值对调用reduce办法,键相等的键值对调用一次reduce办法,每次调用会产生零个或者多个键值对,最初把这些输入的键值对写入到HDFS文件中。
二. MapReduce Shuffle 机制
Shuffle的本意是洗牌、混洗的意思,把一组有规定的数据尽量打乱成无规则的数据。
而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输入按指定的规定“打乱”成具备肯定规定的数据,以便reduce端接管解决。
shuffle是Mapreduce的外围,它散布在Mapreduce的map阶段和reduce阶段。个别把从Map产生输入开始到Reduce获得数据作为输出之前的过程称作shuffle。
- Partition阶段:将MapTask的后果输入到默认大小为100M的环形缓冲区,保留之前会对key进行分区的计算,默认Hash分区等。
- Spill阶段:当内存中的数据量达到肯定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前须要对数据进行一次排序的操作,如果配置了combiner,还会将有雷同分区号和key的数据进行排序。
- Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个两头数据文件。
- Copy阶段: ReduceTask启动Fetcher线程到曾经实现MapTask的节点上复制一份属于本人的数据,这些数据默认会保留在内存的缓冲区中,当内存的缓冲区达到肯定的阀值的时候,就会将数据写到磁盘之上。
- Merge阶段:在ReduceTask近程复制数据的同时,会在后盾开启两个线程对内存到本地的数据文件进行合并操作。
- Sort阶段:在对数据进行合并的同时,会进行排序操作,因为MapTask阶段曾经对数据进行了部分的排序,ReduceTask只需保障Copy的数据的最终整体有效性即可。