什么是 MapReduce
MapReduce 是 hadoop 进行多节点计算时采纳的计算模型,说白了就是 hadoop 拆分工作的一套方法论,刚接触 MapReduce 这个概念时,一时很难了解,也查了很多材料,因为每个人了解不一样,反而看的越多越糊涂,其实实质是很简略的货色,这里举一个例子帮忙了解,因为网上大部分是 hadoop 官网计算单词(wordcount)的例子,这里就换一个场景举例。
假如有以下一份成绩单
1, 张三,78,87,69
2, 李四,56,76,91
3, 王五,65,46,84
4, 赵六,89,56,98
...
各列别离是 编号, 学生姓名, 语文问题, 数学问题, 英语问题
,当初要求统计各科问题最高分,假如这份成绩单十分十分的长,有上千万行,在没有任何计算机系统的帮忙下,要怎么靠人工解决这个问题?
- 单人统计
专门派一个人进行统计工作,长处是简略,毛病也很显著,须要十分长的工夫,甚至数据量达到肯定水平,一个人一辈子可能也统计不完
- 多人统计
如果有足够的人能够进行统计工作,要怎么去协调这些人?假如成绩单有 100w 行并且有 1000 人能够进行统计
-
- 设一个管理员,管理员把成绩单均匀拆分成 1000 份给 1000 集体,每个人须要统计 1000 行数据
-
- 管理员制作一个表格,要求每个人把本人统计的后果填入该表格,表格格局如下
科目 | 人员 1 后果 | 人员 2 后果 | … | 人员 1000 后果 |
---|---|---|---|---|
语文 | ||||
数学 | ||||
英语 |
-
- 管理员最终失去了如下数据
科目 | 人员 1 后果 | 人员 2 后果 |…| 人员 1000 后果
语文 | 80 | 85 | … | 76 | |
数学 | 89 | 90 | … | 88 | |
英语 | 94 | 85 | … | 90 |
-
- 各科各有 1000 个后果,管理员又把这个表格拆成了 100 个小表格分给 100 集体进行统计,这样每个小表格各有 10 个数据,小表格格局如下
第一个人领到的小表格
科目 | 人员 1 后果 | 人员 2 后果 | … | 人员 10 后果 | |
---|---|---|---|---|---|
语文 | 80 | 85 | … | 76 | |
数学 | 89 | 90 | … | 88 | |
英语 | 94 | 85 | … | 90 |
第二个领到的小表格
科目 | 人员 11 后果 | 人员 12 后果 | … | 人员 20 后果 | |
---|---|---|---|---|---|
语文 | 83 | 75 | … | 88 | |
数学 | 79 | 95 | … | 58 | |
英语 | 94 | 85 | … | 90 |
-
- 管理员再次把每个人的后果收集上来,又失去了 100 份数据,如果管理员违心又能够把这个数据进行拆分交给多集体进行统计,如此重复最终失去一个最大值后果,管理员也能够本人实现最初的统计,因为数据量不大。
那么在这个过程中,咱们看到了,一份宏大的成绩单通过以下几个步骤后,最终咱们取得了咱们想要的后果
- 成绩单拆分多份
- 每一份进行独自统计
- 对后果进行注销
- 对统计的后果能够再次进行拆分,也能够间接进行统计
- 如此重复之后最终失去了后果
那么把这个过程用 MapReduce 语言进行形容就能够是以下过程:
- 成绩单拆分多份 - 分片(split)
- 每一份进行独自统计 – map
- 并且对后果进行注销 – shuffle
- 对统计的后果能够再次进行拆分 - combiner
- 也能够间接进行统计 – reduce
另外在管理员的表格中,三个科目前面记录
开发
咱们用理论 java 代码解决下面的问题,假如你曾经依照上一篇教程装置好了 hadoop 集群环境
- 创立工程
你能够用你相熟的 ide 创立一个一般 java 工程,倡议用 maven 进行包治理,并退出以下包依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
- 创立 Mapper
Mapper 对应是 MapReduce 中的 map 过程,在以下 mapper 代码:
StudentMapper.java
public class StudentMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {String[] ss = text.toString().split(",");
outputCollector.collect(new Text("语文"), new IntWritable(Integer.parseInt(ss[2])));
outputCollector.collect(new Text("数学"), new IntWritable(Integer.parseInt(ss[3])));
outputCollector.collect(new Text("英语"), new IntWritable(Integer.parseInt(ss[4])));
}
}
StudentMapper 实现了 Mapper<LongWritable, Text, Text, IntWritable>
接口,这里有四个参数,四个参数含意如下
- LongWritable:hadoop 会把 txt 文件按行进行宰割,这个示意该行在文件中的地位,个别不必
- Text:行内容,比方第一行就是
1, 张三,78,87,69
- Text:后面提到,最终咱们要依照科目进行汇总而后计算最高分,那么科目名称就是 key,每次计算的后果就是前面的 value,所以这里用 text 示意 key,因为咱们要存储科目名称
- IntWritable:存储计算结果,这里指的是基于本次统计所失去的科目最高分
办法 map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
的几个参数和下面含意一样,留神到 outputCollector 是一个数组,阐明这里能够写入多个后果,reporter 能够向 hadoop 汇报工作进度。在这个 mapper 外面,咱们并没有做什么计算,咱们只是把文本外面的问题解析进去,并且按科目放到 outputCollector 中,相当于大家第一次都没干活,只是把数据整顿好。通过 mapper 后,数据从
1, 张三,78,87,69
2, 李四,56,76,91
3, 王五,65,46,84
4, 赵六,89,56,98
...
变成了
– | – | – | ||||
---|---|---|---|---|---|---|
语文 | 78 | 56 | 65 | 89 | … | |
数学 | 87 | 76 | 46 | 56 | … | |
英语 | 69 | 91 | 84 | 98 | … |
StudentReducer.java
public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {StringBuffer str = new StringBuffer();
Integer max = -1;
while (iterator.hasNext()) {Integer score = iterator.next().get();
if (score > max) {max = score;}
}
outputCollector.collect(new Text(text.toString()), new IntWritable(max));
}
}
Reducer 就开始真正执行计算了,reducer 函数 reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
参数含意如下:
- text:就是 Mapper 的第三个参数
- iterator:就是 Mapper 中写入 outputCollector 的数据,和第一参数组合起来就是 mapper 中的 outputCollector
- outputCollector:reducer 计算后的后果须要写入到该参数中,这里咱们写入的内容是相似
key: 语文 value:90
构造的数据,所以类型为<Text, IntWritable>
后面提到过 mapper 会把数据整顿好,并且按科目将问题写入的 outputCollector 中,那么到了 reducer 这一步,hadoop 就会把 mapper 写入的数据依照 key 进行汇总(也就是科目),并且交付给 reducer,reducer 负责计算外面最高分,并且也将后果写入 outputCollector。
StudentProcessor
public class StudentProcessor {public static void main(String args[]) throws Exception {JobConf conf = new JobConf(StudentProcessor.class);
conf.setJobName("max_scroe_poc1");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(StudentMapper.class);
conf.setReducerClass(StudentReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
咱们还须要一个蕴含 main 函数的启动类,执行 mvn package
命令进行打包,咱们假如包名为hadoop-score-job.jar
,将 jar 包通过 ftp 等工具上传到服务器目录下。
- 上传数据
hadoop 借助 hdfs 分布式文件系统,可能将大文件存储在多个节点,通过 hdfs cli 工具,咱们感觉在操作本地文件一样,在下面的代码中 FileInputFormat.setInputPaths(conf, new Path(args[0]));
设置了 MapReduce 的数据起源,用户指定目录,该目录下文件作为数据起源,这里的目录就是 hdfs 中的目录,并且该目录必须存在,而且数据须要上传到该目录下,执行以下命令创立目录
hadoop fs -mkdir poc01_input
执行以下命令将数据导入到 hdfs 中
hadoop fs -put score.txt poc01_input
score.txt
内容为
1, 张三,78,87,69
2, 李四,56,76,91
3, 王五,65,46,84
4, 赵六,89,56,98
通过 ls
命令能够查看文件是否上传胜利
$ hadoop fs -ls poc01_input
Found 1 items
-rw-r--r-- 1 hadoop supergroup 72 2020-12-13 15:43 poc01_input/score.txt
- 执行 job
执行以下命令开始运行 job
$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output
20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:34 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 1
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:2
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_0005
20/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found
20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_0005
20/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/
20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_0005
20/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false
20/12/13 16:01:43 INFO mapreduce.Job: map 0% reduce 0%
20/12/13 16:01:51 INFO mapreduce.Job: map 100% reduce 0%
20/12/13 16:01:57 INFO mapreduce.Job: map 100% reduce 100%
20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully
20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=84
FILE: Number of bytes written=625805
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=316
HDFS: Number of bytes written=30
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=12036
Total time spent by all reduces in occupied slots (ms)=3311
Total time spent by all map tasks (ms)=12036
Total time spent by all reduce tasks (ms)=3311
Total vcore-milliseconds taken by all map tasks=12036
Total vcore-milliseconds taken by all reduce tasks=3311
Total megabyte-milliseconds taken by all map tasks=12324864
Total megabyte-milliseconds taken by all reduce tasks=3390464
Map-Reduce Framework
Map input records=4
Map output records=12
Map output bytes=132
Map output materialized bytes=90
Input split bytes=208
Combine input records=12
Combine output records=6
Reduce input groups=3
Reduce shuffle bytes=90
Reduce input records=6
Reduce output records=3
Spilled Records=12
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=395
CPU time spent (ms)=1790
Physical memory (bytes) snapshot=794595328
Virtual memory (bytes) snapshot=5784080384
Total committed heap usage (bytes)=533200896
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=108
File Output Format Counters
Bytes Written=30
- hadoop-score-job.jar 为下面打包的 jar 包,须要 cd 到 jar 包目录下执行命令
- com.hadoop.poc.StudentProcessor 蕴含 main 函数的类
- poc01_input 数据起源目录
- poc01_output 数据输入目录
job 执行完后,后果会保留在 poc01_output
目录下
$ hadoop fs -ls poc01_output2
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2020-12-13 16:01 poc01_output2/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 30 2020-12-13 16:01 poc01_output2/part-00000
$ hadoop fs -cat poc01_output2/part-00000
数学 87
英语 98
语文 89