共计 4190 个字符,预计需要花费 11 分钟才能阅读完成。
1. 对 MapReduce 的了解
是什么:Hadoop 默认自带的 分布式计算 框架
做什么:提供一系列接口(外围类:InputFormat、OutputFormat、Mapper、Reducer、Driver),让用户可能实现自定义业务性能的分布式计算工作
【长处】:
- 高扩展性:计算资源不够,间接减少节点数量即可。品质可能不够,数量肯定管够
- 高容错性:一个节点工作失败,能主动转移到其余闲暇节点
- 适宜大数据处理:得益于其扩展性,只有数量足够,可能计算 TB 级别的数据
【毛病】:
- 无奈进行实时计算:太慢了!!!太慢了!!!
- 不善于流式计算:MR 的输出数据个别都是动态的,无奈解决动静的输出格局
- 不善于迭代计算:一个残缺的 MR 计算过程个别为 Mapper – Reducer – 写出后果,频繁的迭代计算将要启动多组 MR 并串联,而每次 MR 的后果都是会以文件的模式写出,给下一个 MR 组输出的话又得从新读取,太过繁琐
【集体总结】:
集体认为,MR 程序的毛病总结的话就是一点,IO 过程太多了。首先 map 阶段输出须要从 HDFS 中读取数据(这个过程曾经是比较慢的了),而后 map 阶段业务实现后写出数据(一次 IO),而 reduce 阶段的输出首先得从 map 节点读取曾经写出的后果文件(可能是网络 IO),读到 reduce 节点后如果后果集太大又会写到磁盘(又一次 IO),最初才是次要的 reduce 过程,最初后果依然是写回磁盘 …… 一组 MR 的执行,可能要波及到同节点、异节点的屡次 IO,如果用来做机器学习之类的简单迭代计算,可能 IO 工夫比外围业务工夫更长,因而 MR 适宜做一些一次性、单步骤、大量级的计算。
2. Mapper & Reducer 编程须要留神的点
- map() 办法是对每个 <K, V> 键值对调用一次
- reduce() 办法是对每个 key 调用一次,每个 key 可能对应多个 value,其中 value 封装为迭代器对象
- 不要导错包!!!不要导错包!!!不要导错包!!! 血泪的踩坑之旅 — mapred(老版本,不必)— mapreduce(用它!)
3. MR 的切片
【什么是切片?】
MR 要进行计算,就必须要有输出,而 MR 是分布式计算模式,因而对于数据量较大的计算,就会启动多个 Mapper 和 Reducer(Reducer 数目个别都会远小于 Mapper),而作为第一道环节,多个 Mapper 都承受残缺的数据集,那分布式齐全就没有意义了,因而在数据进入 Mapper 前,会首先进行 逻辑切分 ,将整个数据集分成多份,每份送给一个 Mapper 进行解决。因而, 切片数 = Mapper(MapTask) 的个数
【怎么切?】
第一种:依据文件大小切(默认切片形式) — 附源码解析
- 依据参数失去 SplitSize,即满足多大了就切一块,见源码解析
- 文件优先级 > 切片,即如果一个文件不够 SplitSize,则间接作为一个切片,不是把所有文件当整体
- 肯定要明确一点:这里的切片只是逻辑切片!!相当于隔一段给文件 ” 打一个标记 ”,并不是真正物理上将数据文件划分为几份。实际上 Client 端做的分片仅仅是提交 job.split 信息,行将这些标记发送给 MR,Map 阶段每个 Mapper 将会依据这些标记去读取各自被调配的那局部数据
protected long getFormatMinSplitSize() {return 1L;}
// 获取最小切片大小 -- 默认为 1
public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
}
// 获取最大切片大小 -- 默认为 Long.MAX
public static long getMaxSplitSize(JobContext context) {return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
}
// ......(此处省略一万行代码)
if (this.isSplitable(job, path)) {long blockSize = file.getBlockSize();
// 理论切片的大小 -- computeSplitSize 函数见下
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining;
int blkIndex;
// 如果剩下的文件大于 1.1 * SplitSize,那么持续切分;否则间接作为一个分片;防止出现过小文件
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
// ......(此处省略一万行代码)
// 外围公式
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
// 默认状况下就是块文件的大小 -- 128M
return Math.max(minSize, Math.min(maxSize, blockSize));
}
第二种:依据文件行数切
- 相当于将评判规范从文件大小换成了行数,设置多少行为一个切片
- 同样不会跨文件切
第三种:毁灭小文件切法
当有大量小文件时,如果用下面两种机制,则会导致切片很多,启用十分多的 Mapper,资源利用率极低,因而设置了一种切片形式专门用于小文件过多的场景
重要参数:CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 默认 4m
- Step1:虚拟存储,机制见伪代码
if (文件 A.size() <= MaxInputSplitSize){A 文件不做解决,间接为一个虚构文件;}else if(文件 A.size() <= MaxInputSplitSize * 2){A 均匀切分为两个等大小的虚构文件[A1,A2];
}else{A 切掉大小为 MaxInputSplitSize 的一块,剩下的局部持续判断;}
- Step2:从新切片
if(文件 B1 >= MaxInputSplitSize){B1 作为一个切片;}else{
B1 期待和下一个虚构文件合并;
if(合并后的大小 >= MaxInputSplitSize){合并文件成为一个切片;}else{持续期待合并;}
}
4. MR 罕用的输出类型
全部都是继承了 FileInputFormat 抽象类,如果自定义输出类型,须要继承该抽象类并重写 RecordReader 办法(实现具体自定义读的逻辑)
InputFormat | Split_Type | Key — Type | Value — Type |
---|---|---|---|
TextInputFormat | 按文件大小切(默认) | 每行偏移量 offset <LongWritable> | 该行文本 <Text> |
NLineInputFormat | 按固定行数切 | 每行偏移量 offset <LongWritable> | 该行文本 <Text> |
CombineTextInputFormat | 毁灭小文件切法 | 每行偏移量 offset <LongWritable> | 该行文本 <Text> |
KeyValueInputFormat | 按文件大小切(默认) | 每行 Split 后的 field[0] <Text> | field[0] <Text> |
这四种次要的输出类型都是按行读取数据,每一行会造成一个 KV 对,调用一次 map 办法
【千万别忘了!!!】
除了默认输出格局,其余三种输出格局都须要在 Driver 类中设置相干参数:
// 设置 Job 的输出类型
job.setInputFormatClass(......)
// 对于 NLineInputFormat,要设置多少行一个切片
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 对于 CombineTextInputFormat 要设置最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 默认 4m
// 对于 KeyValueInputFormat 要指定对应的分隔符 -- 每行的 Key 和 Value 用什么宰割的
// 特地留神:这个参数是在 Configuration 中进行设置的!!conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t"); // 以制表符宰割
5. MR 罕用的输入类型
同理,MR 的输入类型 (Reducer 写出的文件类型) 也都继承了 FileOutputFormat 类,同样反对自定义输入格局,须要继承 FileOutputFormat 类并重写 RecordWriter 办法
OutputFormat | Description |
---|---|
TextOutputFormat | Reducer 的后果间接以字符串格局按行写出 |
SequenceFileOutputFormat | Reducer 的后果依照 KV 键值对格局写出的 序列化 文件 |
SequenceFileOutputFormat 中写入的是 Reducer 输入的键值对,且进行了序列化,因而更不便进行压缩;此外,一个 MR 流程失去的后果如果是 SequenceFileOutputFormat 格局写出,那么该后果可能很便当地作为下一个 MR 流程的输出,因为曾经序列化封装好了 KV 对,作为输出时只须要进行反序列化读入就能够了
PS
思否什么时候可能反对自定义字体色彩!!!好想记出五光十色而又骚气的笔记 ……o(╥﹏╥)o