1. 残缺的 MapReduce 程序
一个残缺的 MapReduce 程序在分布式运行中蕴含两类实例过程:
1)MrAppMaster:负责整个程序运行过程的调度及状态协调
2)YarnChild:负责 Map 阶段的整个数据处理流程
3)YarnChild:负责 Reduce 阶段的整个数据处理流程。
MapTask 和 ReduceTask 都是跑在 YarnChild 的过程中,当然是各自的 YarnChild 过程。
2.MapReduce 的数据流示意图
1)一个 mr 程序已启动,首先启动 MrAppMaster,它负责整个程序的过程调度及状态协调。
2)MrAppMaster 依据 job 的形容信息,计算出须要 MapTask 的数量 n,而后申请启动 n 个 MapTask 过程。
3)MapTask 启动后,从 InputFormat 取得 RecordReader,(RecordReader 封装了输出的数据),从中读取数据,以 kv 对的模式传递到 map()办法中,再将 map()输入的 kv 对加到缓存中保留,之后将缓存中 kv 对依照 k 进行分区排序,溢写到磁盘文件。
4)MrAppMaster 监控所有 MapTask 过程,当发现所有 MapTask 过程实现工作,去启动指定数量的 ReduceTask 过程,调配 ReduceTask 过程要解决的数据分区。
5)ReduceTask 过程启动后,从若干个 MapTask 获取若干个输入后果文件,在本地进行归并排序,将雷同的 key 的 kv 分为一个组,调用 reduce()办法进行逻辑解决,最初出后果 kv,调用 OutputFormat 将后果输入到指定地位。
当然理论当中,并不是间接应用 InputFormat,而是应用 FileInputFormat 的实现类,常见的实现类 TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。
OutputFormat 同样,应用的是它实现类,像 TextOutputFormat 或者自定义实现。
3.InputFormat
InputFormat 负责 Map 端数据的输出
public abstract class InputFormat<K, V> {public InputFormat() { }
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
源码中两个办法
getSplits(): 生成切片信息。
createRecordReader(): 负责输出数据的读取解决
这是一个抽象类,不能间接应用,InputFormat 有一个子抽象类 FileInputFormat,它继承了 InputFormat,具体实现了 getSplits()办法,也是 Hadoop 的默认切片实现规定。
FileInputFormat的切片机制
1)简略的依照文件的内容长度进行切片
2)切片大小默认等于 Block 大小,默认 Block 在 2.x 和 3.x 版本为 128M
3)切片时不思考数据集整体,而是一一针对每一个文件独自切片
外围实现过程就在 getSplits()办法中。
举个示例:
20201120.log 200M
20201121.log 100M
切片实现后:
20201120.log.split1 0-128
20201120.log split2 128-200
20201121.log.split1 100M
FileInputFormat实现类
FileInputFormat 常见的接口实现类包含:TextInputFormat、KeyValueTextInputFormat 等。
- TextInputFormat:默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable 类型。值是这行的内容,不包含任何行终止符(换行符和回车符),Text 类型。
- 2.KeyValueTextInputFormat 每一行均为一条记录,被分隔符宰割为 key,value。能够通过在驱动类中设置 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, “\t”); 来设定分隔符。默认分隔符是 tab
当然还有其余实现类 NLineInputFormat、CombineTextInputFormat 等,咱们也能够本人实现。