什么是MapReduce

MapReduce是hadoop进行多节点计算时采纳的计算模型,说白了就是hadoop拆分工作的一套方法论,刚接触MapReduce这个概念时,一时很难了解,也查了很多材料,因为每个人了解不一样,反而看的越多越糊涂,其实实质是很简略的货色,这里举一个例子帮忙了解,因为网上大部分是hadoop官网计算单词(wordcount)的例子,这里就换一个场景举例。

假如有以下一份成绩单

1,张三,78,87,692,李四,56,76,913,王五,65,46,844,赵六,89,56,98...

各列别离是编号,学生姓名,语文问题,数学问题,英语问题,当初要求统计各科问题最高分,假如这份成绩单十分十分的长,有上千万行,在没有任何计算机系统的帮忙下,要怎么靠人工解决这个问题?

  • 单人统计

专门派一个人进行统计工作,长处是简略,毛病也很显著,须要十分长的工夫,甚至数据量达到肯定水平,一个人一辈子可能也统计不完

  • 多人统计

如果有足够的人能够进行统计工作,要怎么去协调这些人?假如成绩单有100w行并且有1000人能够进行统计

    1. 设一个管理员,管理员把成绩单均匀拆分成1000份给1000集体,每个人须要统计1000行数据
    1. 管理员制作一个表格,要求每个人把本人统计的后果填入该表格,表格格局如下
科目人员1后果人员2后果...人员1000后果
语文
数学
英语
    1. 管理员最终失去了如下数据

    科目| 人员1后果|人员2后果|...|人员1000后果

语文8085...76
数学8990...88
英语9485...90
    1. 各科各有1000个后果,管理员又把这个表格拆成了100个小表格分给100集体进行统计,这样每个小表格各有10个数据,小表格格局如下

第一个人领到的小表格

科目人员1后果人员2后果...人员10后果
语文8085...76
数学8990...88
英语9485...90

第二个领到的小表格

科目人员11后果人员12后果...人员20后果
语文8375...88
数学7995...58
英语9485...90
    1. 管理员再次把每个人的后果收集上来,又失去了100份数据,如果管理员违心又能够把这个数据进行拆分交给多集体进行统计,如此重复最终失去一个最大值后果,管理员也能够本人实现最初的统计,因为数据量不大。

那么在这个过程中,咱们看到了,一份宏大的成绩单通过以下几个步骤后,最终咱们取得了咱们想要的后果

  • 成绩单拆分多份
  • 每一份进行独自统计
  • 对后果进行注销
  • 对统计的后果能够再次进行拆分,也能够间接进行统计
  • 如此重复之后最终失去了后果

那么把这个过程用MapReduce语言进行形容就能够是以下过程:

  • 成绩单拆分多份- 分片(split)
  • 每一份进行独自统计 - map
  • 并且对后果进行注销 - shuffle
  • 对统计的后果能够再次进行拆分- combiner
  • 也能够间接进行统计 - reduce

另外在管理员的表格中,三个科目前面记录

开发

咱们用理论java代码解决下面的问题,假如你曾经依照上一篇教程装置好了hadoop集群环境

  • 创立工程

你能够用你相熟的ide创立一个一般java工程,倡议用maven进行包治理,并退出以下包依赖

<dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-client</artifactId>    <version>2.5.1</version></dependency>
  • 创立Mapper

Mapper对应是MapReduce中的map过程,在以下mapper代码:

StudentMapper.java

public class StudentMapper extends MapReduceBase implements        Mapper<LongWritable, Text, Text, IntWritable> {    public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {        String[] ss = text.toString().split(",");        outputCollector.collect(new Text("语文"), new IntWritable(Integer.parseInt(ss[2])));        outputCollector.collect(new Text("数学"), new IntWritable(Integer.parseInt(ss[3])));        outputCollector.collect(new Text("英语"), new IntWritable(Integer.parseInt(ss[4])));    }}

StudentMapper实现了 Mapper<LongWritable, Text, Text, IntWritable>接口,这里有四个参数,四个参数含意如下

  • LongWritable:hadoop会把txt文件按行进行宰割,这个示意该行在文件中的地位,个别不必
  • Text:行内容,比方第一行就是1,张三,78,87,69
  • Text:后面提到,最终咱们要依照科目进行汇总而后计算最高分,那么科目名称就是key,每次计算的后果就是前面的value,所以这里用text示意key,因为咱们要存储科目名称
  • IntWritable:存储计算结果,这里指的是基于本次统计所失去的科目最高分

办法map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)的几个参数和下面含意一样,留神到outputCollector是一个数组,阐明这里能够写入多个后果,reporter能够向hadoop汇报工作进度。在这个mapper外面,咱们并没有做什么计算,咱们只是把文本外面的问题解析进去,并且按科目放到outputCollector中,相当于大家第一次都没干活,只是把数据整顿好。通过mapper后,数据从

1,张三,78,87,692,李四,56,76,913,王五,65,46,844,赵六,89,56,98...

变成了

---
语文78566589...
数学87764656...
英语69918498...

StudentReducer.java

public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {    public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {        StringBuffer str = new StringBuffer();        Integer max = -1;        while (iterator.hasNext()) {            Integer score = iterator.next().get();            if (score > max) {                max = score;            }        }        outputCollector.collect(new Text(text.toString()), new IntWritable(max));    }}

Reducer就开始真正执行计算了,reducer函数reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)参数含意如下:

  • text:就是Mapper的第三个参数
  • iterator:就是Mapper中写入outputCollector的数据,和第一参数组合起来就是mapper中的outputCollector
  • outputCollector:reducer计算后的后果须要写入到该参数中,这里咱们写入的内容是相似key:语文 value:90构造的数据,所以类型为<Text, IntWritable>

后面提到过mapper会把数据整顿好,并且按科目将问题写入的outputCollector中,那么到了reducer这一步,hadoop就会把mapper写入的数据依照key进行汇总(也就是科目),并且交付给reducer,reducer负责计算外面最高分,并且也将后果写入outputCollector。

StudentProcessor

public class StudentProcessor {    public static void main(String args[]) throws Exception {        JobConf conf = new JobConf(StudentProcessor.class);        conf.setJobName("max_scroe_poc1");        conf.setOutputKeyClass(Text.class);        conf.setOutputValueClass(IntWritable.class);        conf.setMapperClass(StudentMapper.class);        conf.setReducerClass(StudentReducer.class);        conf.setInputFormat(TextInputFormat.class);        conf.setOutputFormat(TextOutputFormat.class);        FileInputFormat.setInputPaths(conf, new Path(args[0]));        FileOutputFormat.setOutputPath(conf, new Path(args[1]));        JobClient.runJob(conf);    }}

咱们还须要一个蕴含main函数的启动类,执行mvn package命令进行打包,咱们假如包名为hadoop-score-job.jar,将jar包通过ftp等工具上传到服务器目录下。

  • 上传数据

hadoop借助hdfs分布式文件系统,可能将大文件存储在多个节点,通过hdfs cli工具,咱们感觉在操作本地文件一样,在下面的代码中FileInputFormat.setInputPaths(conf, new Path(args[0]));设置了MapReduce的数据起源,用户指定目录,该目录下文件作为数据起源,这里的目录就是hdfs中的目录,并且该目录必须存在,而且数据须要上传到该目录下,执行以下命令创立目录

hadoop fs -mkdir poc01_input

执行以下命令将数据导入到hdfs中

hadoop fs -put score.txt poc01_input

score.txt内容为

1,张三,78,87,692,李四,56,76,913,王五,65,46,844,赵六,89,56,98

通过ls命令能够查看文件是否上传胜利

$ hadoop fs -ls poc01_inputFound 1 items-rw-r--r--   1 hadoop supergroup         72 2020-12-13 15:43 poc01_input/score.txt
  • 执行job

执行以下命令开始运行job

$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:1804020/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:1804020/12/13 16:01:34 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 120/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:220/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_000520/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_000520/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_000520/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false20/12/13 16:01:43 INFO mapreduce.Job:  map 0% reduce 0%20/12/13 16:01:51 INFO mapreduce.Job:  map 100% reduce 0%20/12/13 16:01:57 INFO mapreduce.Job:  map 100% reduce 100%20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49        File System Counters                FILE: Number of bytes read=84                FILE: Number of bytes written=625805                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=316                HDFS: Number of bytes written=30                HDFS: Number of read operations=9                HDFS: Number of large read operations=0                HDFS: Number of write operations=2        Job Counters                 Launched map tasks=2                Launched reduce tasks=1                Data-local map tasks=2                Total time spent by all maps in occupied slots (ms)=12036                Total time spent by all reduces in occupied slots (ms)=3311                Total time spent by all map tasks (ms)=12036                Total time spent by all reduce tasks (ms)=3311                Total vcore-milliseconds taken by all map tasks=12036                Total vcore-milliseconds taken by all reduce tasks=3311                Total megabyte-milliseconds taken by all map tasks=12324864                Total megabyte-milliseconds taken by all reduce tasks=3390464        Map-Reduce Framework                Map input records=4                Map output records=12                Map output bytes=132                Map output materialized bytes=90                Input split bytes=208                Combine input records=12                Combine output records=6                Reduce input groups=3                Reduce shuffle bytes=90                Reduce input records=6                Reduce output records=3                Spilled Records=12                Shuffled Maps =2                Failed Shuffles=0                Merged Map outputs=2                GC time elapsed (ms)=395                CPU time spent (ms)=1790                Physical memory (bytes) snapshot=794595328                Virtual memory (bytes) snapshot=5784080384                Total committed heap usage (bytes)=533200896        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=108        File Output Format Counters                 Bytes Written=30
  • hadoop-score-job.jar为下面打包的jar包,须要cd到jar包目录下执行命令
  • com.hadoop.poc.StudentProcessor 蕴含main函数的类
  • poc01_input 数据起源目录
  • poc01_output 数据输入目录

job执行完后,后果会保留在poc01_output目录下

$ hadoop fs -ls poc01_output2Found 2 items-rw-r--r--   1 hadoop supergroup          0 2020-12-13 16:01 poc01_output2/_SUCCESS-rw-r--r--   1 hadoop supergroup         30 2020-12-13 16:01 poc01_output2/part-00000$ hadoop fs -cat poc01_output2/part-00000数学    87英语    98语文    89