乐趣区

关于mapreduce:Hadoop学习笔记二MapReduce的IO类型-文件切片

1. 对 MapReduce 的了解

是什么:Hadoop 默认自带的 分布式计算 框架

做什么:提供一系列接口(外围类:InputFormat、OutputFormat、Mapper、Reducer、Driver),让用户可能实现自定义业务性能的分布式计算工作

【长处】:

  • 高扩展性:计算资源不够,间接减少节点数量即可。品质可能不够,数量肯定管够
  • 高容错性:一个节点工作失败,能主动转移到其余闲暇节点
  • 适宜大数据处理:得益于其扩展性,只有数量足够,可能计算 TB 级别的数据

【毛病】:

  • 无奈进行实时计算:太慢了!!!太慢了!!!
  • 不善于流式计算:MR 的输出数据个别都是动态的,无奈解决动静的输出格局
  • 不善于迭代计算:一个残缺的 MR 计算过程个别为 Mapper – Reducer – 写出后果,频繁的迭代计算将要启动多组 MR 并串联,而每次 MR 的后果都是会以文件的模式写出,给下一个 MR 组输出的话又得从新读取,太过繁琐

【集体总结】:

集体认为,MR 程序的毛病总结的话就是一点,IO 过程太多了。首先 map 阶段输出须要从 HDFS 中读取数据(这个过程曾经是比较慢的了),而后 map 阶段业务实现后写出数据(一次 IO),而 reduce 阶段的输出首先得从 map 节点读取曾经写出的后果文件(可能是网络 IO),读到 reduce 节点后如果后果集太大又会写到磁盘(又一次 IO),最初才是次要的 reduce 过程,最初后果依然是写回磁盘 …… 一组 MR 的执行,可能要波及到同节点、异节点的屡次 IO,如果用来做机器学习之类的简单迭代计算,可能 IO 工夫比外围业务工夫更长,因而 MR 适宜做一些一次性、单步骤、大量级的计算。

2. Mapper & Reducer 编程须要留神的点

  • map() 办法是对每个 <K, V> 键值对调用一次
  • reduce() 办法是对每个 key 调用一次,每个 key 可能对应多个 value,其中 value 封装为迭代器对象
  • 不要导错包!!!不要导错包!!!不要导错包!!! 血泪的踩坑之旅 — mapred(老版本,不必)— mapreduce(用它!)

3. MR 的切片

【什么是切片?】

MR 要进行计算,就必须要有输出,而 MR 是分布式计算模式,因而对于数据量较大的计算,就会启动多个 Mapper 和 Reducer(Reducer 数目个别都会远小于 Mapper),而作为第一道环节,多个 Mapper 都承受残缺的数据集,那分布式齐全就没有意义了,因而在数据进入 Mapper 前,会首先进行 逻辑切分 ,将整个数据集分成多份,每份送给一个 Mapper 进行解决。因而, 切片数 = Mapper(MapTask) 的个数

【怎么切?】

第一种:依据文件大小切(默认切片形式) — 附源码解析

  • 依据参数失去 SplitSize,即满足多大了就切一块,见源码解析
  • 文件优先级 > 切片,即如果一个文件不够 SplitSize,则间接作为一个切片,不是把所有文件当整体
  • 肯定要明确一点:这里的切片只是逻辑切片!!相当于隔一段给文件 ” 打一个标记 ”,并不是真正物理上将数据文件划分为几份。实际上 Client 端做的分片仅仅是提交 job.split 信息,行将这些标记发送给 MR,Map 阶段每个 Mapper 将会依据这些标记去读取各自被调配的那局部数据
protected long getFormatMinSplitSize() {return 1L;}
// 获取最小切片大小 -- 默认为 1
public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
}

// 获取最大切片大小 -- 默认为 Long.MAX
public static long getMaxSplitSize(JobContext context) {return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
    }

// ......(此处省略一万行代码)

if (this.isSplitable(job, path)) {long blockSize = file.getBlockSize();
    // 理论切片的大小 -- computeSplitSize 函数见下
    long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

    long bytesRemaining;
    int blkIndex;
    // 如果剩下的文件大于 1.1 * SplitSize,那么持续切分;否则间接作为一个分片;防止出现过小文件
    for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
        splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
    }
    
// ......(此处省略一万行代码)
// 外围公式
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    // 默认状况下就是块文件的大小 -- 128M
        return Math.max(minSize, Math.min(maxSize, blockSize));
}

第二种:依据文件行数切

  • 相当于将评判规范从文件大小换成了行数,设置多少行为一个切片
  • 同样不会跨文件切

第三种:毁灭小文件切法

当有大量小文件时,如果用下面两种机制,则会导致切片很多,启用十分多的 Mapper,资源利用率极低,因而设置了一种切片形式专门用于小文件过多的场景

重要参数:CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 默认 4m

  • Step1:虚拟存储,机制见伪代码
if (文件 A.size() <= MaxInputSplitSize){A 文件不做解决,间接为一个虚构文件;}else if(文件 A.size() <= MaxInputSplitSize * 2){A 均匀切分为两个等大小的虚构文件[A1,A2];
}else{A 切掉大小为 MaxInputSplitSize 的一块,剩下的局部持续判断;}
  • Step2:从新切片
if(文件 B1 >= MaxInputSplitSize){B1 作为一个切片;}else{
    B1 期待和下一个虚构文件合并;
    if(合并后的大小 >= MaxInputSplitSize){合并文件成为一个切片;}else{持续期待合并;}
}

4. MR 罕用的输出类型

全部都是继承了 FileInputFormat 抽象类,如果自定义输出类型,须要继承该抽象类并重写 RecordReader 办法(实现具体自定义读的逻辑)

InputFormat Split_Type Key — Type Value — Type
TextInputFormat 按文件大小切(默认) 每行偏移量 offset <LongWritable> 该行文本 <Text>
NLineInputFormat 按固定行数切 每行偏移量 offset <LongWritable> 该行文本 <Text>
CombineTextInputFormat 毁灭小文件切法 每行偏移量 offset <LongWritable> 该行文本 <Text>
KeyValueInputFormat 按文件大小切(默认) 每行 Split 后的 field[0] <Text> field[0] <Text>

这四种次要的输出类型都是按行读取数据,每一行会造成一个 KV 对,调用一次 map 办法

【千万别忘了!!!】

除了默认输出格局,其余三种输出格局都须要在 Driver 类中设置相干参数:

// 设置 Job 的输出类型
job.setInputFormatClass(......)

// 对于 NLineInputFormat,要设置多少行一个切片
NLineInputFormat.setNumLinesPerSplit(job, 3);

// 对于 CombineTextInputFormat 要设置最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 默认 4m

// 对于 KeyValueInputFormat 要指定对应的分隔符 -- 每行的 Key 和 Value 用什么宰割的
// 特地留神:这个参数是在 Configuration 中进行设置的!!conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");  // 以制表符宰割

5. MR 罕用的输入类型

同理,MR 的输入类型 (Reducer 写出的文件类型) 也都继承了 FileOutputFormat 类,同样反对自定义输入格局,须要继承 FileOutputFormat 类并重写 RecordWriter 办法

OutputFormat Description
TextOutputFormat Reducer 的后果间接以字符串格局按行写出
SequenceFileOutputFormat Reducer 的后果依照 KV 键值对格局写出的 序列化 文件

SequenceFileOutputFormat 中写入的是 Reducer 输入的键值对,且进行了序列化,因而更不便进行压缩;此外,一个 MR 流程失去的后果如果是 SequenceFileOutputFormat 格局写出,那么该后果可能很便当地作为下一个 MR 流程的输出,因为曾经序列化封装好了 KV 对,作为输出时只须要进行反序列化读入就能够了

PS

思否什么时候可能反对自定义字体色彩!!!好想记出五光十色而又骚气的笔记 ……o(╥﹏╥)o

退出移动版