关于hadoop:hadoop教程MapReduce

38次阅读

共计 8081 个字符,预计需要花费 21 分钟才能阅读完成。

什么是 MapReduce

MapReduce 是 hadoop 进行多节点计算时采纳的计算模型,说白了就是 hadoop 拆分工作的一套方法论,刚接触 MapReduce 这个概念时,一时很难了解,也查了很多材料,因为每个人了解不一样,反而看的越多越糊涂,其实实质是很简略的货色,这里举一个例子帮忙了解,因为网上大部分是 hadoop 官网计算单词(wordcount)的例子,这里就换一个场景举例。

假如有以下一份成绩单

1, 张三,78,87,69
2, 李四,56,76,91
3, 王五,65,46,84
4, 赵六,89,56,98
...

各列别离是 编号, 学生姓名, 语文问题, 数学问题, 英语问题,当初要求统计各科问题最高分,假如这份成绩单十分十分的长,有上千万行,在没有任何计算机系统的帮忙下,要怎么靠人工解决这个问题?

  • 单人统计

专门派一个人进行统计工作,长处是简略,毛病也很显著,须要十分长的工夫,甚至数据量达到肯定水平,一个人一辈子可能也统计不完

  • 多人统计

如果有足够的人能够进行统计工作,要怎么去协调这些人?假如成绩单有 100w 行并且有 1000 人能够进行统计

    1. 设一个管理员,管理员把成绩单均匀拆分成 1000 份给 1000 集体,每个人须要统计 1000 行数据
    1. 管理员制作一个表格,要求每个人把本人统计的后果填入该表格,表格格局如下
科目 人员 1 后果 人员 2 后果 人员 1000 后果
语文
数学
英语
    1. 管理员最终失去了如下数据

    科目 | 人员 1 后果 | 人员 2 后果 |…| 人员 1000 后果

语文 80 85 76
数学 89 90 88
英语 94 85 90
    1. 各科各有 1000 个后果,管理员又把这个表格拆成了 100 个小表格分给 100 集体进行统计,这样每个小表格各有 10 个数据,小表格格局如下

第一个人领到的小表格

科目 人员 1 后果 人员 2 后果 人员 10 后果
语文 80 85 76
数学 89 90 88
英语 94 85 90

第二个领到的小表格

科目 人员 11 后果 人员 12 后果 人员 20 后果
语文 83 75 88
数学 79 95 58
英语 94 85 90
    1. 管理员再次把每个人的后果收集上来,又失去了 100 份数据,如果管理员违心又能够把这个数据进行拆分交给多集体进行统计,如此重复最终失去一个最大值后果,管理员也能够本人实现最初的统计,因为数据量不大。

那么在这个过程中,咱们看到了,一份宏大的成绩单通过以下几个步骤后,最终咱们取得了咱们想要的后果

  • 成绩单拆分多份
  • 每一份进行独自统计
  • 对后果进行注销
  • 对统计的后果能够再次进行拆分,也能够间接进行统计
  • 如此重复之后最终失去了后果

那么把这个过程用 MapReduce 语言进行形容就能够是以下过程:

  • 成绩单拆分多份 - 分片(split)
  • 每一份进行独自统计 – map
  • 并且对后果进行注销 – shuffle
  • 对统计的后果能够再次进行拆分 - combiner
  • 也能够间接进行统计 – reduce

另外在管理员的表格中,三个科目前面记录

开发

咱们用理论 java 代码解决下面的问题,假如你曾经依照上一篇教程装置好了 hadoop 集群环境

  • 创立工程

你能够用你相熟的 ide 创立一个一般 java 工程,倡议用 maven 进行包治理,并退出以下包依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.5.1</version>
</dependency>
  • 创立 Mapper

Mapper 对应是 MapReduce 中的 map 过程,在以下 mapper 代码:

StudentMapper.java

public class StudentMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {String[] ss = text.toString().split(",");
        outputCollector.collect(new Text("语文"), new IntWritable(Integer.parseInt(ss[2])));
        outputCollector.collect(new Text("数学"), new IntWritable(Integer.parseInt(ss[3])));
        outputCollector.collect(new Text("英语"), new IntWritable(Integer.parseInt(ss[4])));
    }
}

StudentMapper 实现了 Mapper<LongWritable, Text, Text, IntWritable> 接口,这里有四个参数,四个参数含意如下

  • LongWritable:hadoop 会把 txt 文件按行进行宰割,这个示意该行在文件中的地位,个别不必
  • Text:行内容,比方第一行就是1, 张三,78,87,69
  • Text:后面提到,最终咱们要依照科目进行汇总而后计算最高分,那么科目名称就是 key,每次计算的后果就是前面的 value,所以这里用 text 示意 key,因为咱们要存储科目名称
  • IntWritable:存储计算结果,这里指的是基于本次统计所失去的科目最高分

办法 map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) 的几个参数和下面含意一样,留神到 outputCollector 是一个数组,阐明这里能够写入多个后果,reporter 能够向 hadoop 汇报工作进度。在这个 mapper 外面,咱们并没有做什么计算,咱们只是把文本外面的问题解析进去,并且按科目放到 outputCollector 中,相当于大家第一次都没干活,只是把数据整顿好。通过 mapper 后,数据从

1, 张三,78,87,69
2, 李四,56,76,91
3, 王五,65,46,84
4, 赵六,89,56,98
...

变成了

语文 78 56 65 89
数学 87 76 46 56
英语 69 91 84 98

StudentReducer.java

public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {StringBuffer str = new StringBuffer();
        Integer max = -1;
        while (iterator.hasNext()) {Integer score = iterator.next().get();
            if (score > max) {max = score;}
        }
        outputCollector.collect(new Text(text.toString()), new IntWritable(max));
    }
}

Reducer 就开始真正执行计算了,reducer 函数 reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) 参数含意如下:

  • text:就是 Mapper 的第三个参数
  • iterator:就是 Mapper 中写入 outputCollector 的数据,和第一参数组合起来就是 mapper 中的 outputCollector
  • outputCollector:reducer 计算后的后果须要写入到该参数中,这里咱们写入的内容是相似 key: 语文 value:90 构造的数据,所以类型为<Text, IntWritable>

后面提到过 mapper 会把数据整顿好,并且按科目将问题写入的 outputCollector 中,那么到了 reducer 这一步,hadoop 就会把 mapper 写入的数据依照 key 进行汇总(也就是科目),并且交付给 reducer,reducer 负责计算外面最高分,并且也将后果写入 outputCollector。

StudentProcessor

public class StudentProcessor {public static void main(String args[]) throws Exception {JobConf conf = new JobConf(StudentProcessor.class);
        conf.setJobName("max_scroe_poc1");
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(StudentMapper.class);
        conf.setReducerClass(StudentReducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}

咱们还须要一个蕴含 main 函数的启动类,执行 mvn package 命令进行打包,咱们假如包名为hadoop-score-job.jar,将 jar 包通过 ftp 等工具上传到服务器目录下。

  • 上传数据

hadoop 借助 hdfs 分布式文件系统,可能将大文件存储在多个节点,通过 hdfs cli 工具,咱们感觉在操作本地文件一样,在下面的代码中 FileInputFormat.setInputPaths(conf, new Path(args[0])); 设置了 MapReduce 的数据起源,用户指定目录,该目录下文件作为数据起源,这里的目录就是 hdfs 中的目录,并且该目录必须存在,而且数据须要上传到该目录下,执行以下命令创立目录

hadoop fs -mkdir poc01_input

执行以下命令将数据导入到 hdfs 中

hadoop fs -put score.txt poc01_input

score.txt内容为

1, 张三,78,87,69
2, 李四,56,76,91
3, 王五,65,46,84
4, 赵六,89,56,98

通过 ls 命令能够查看文件是否上传胜利

$ hadoop fs -ls poc01_input
Found 1 items
-rw-r--r--   1 hadoop supergroup         72 2020-12-13 15:43 poc01_input/score.txt
  • 执行 job

执行以下命令开始运行 job

$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output

20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:34 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 1
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:2
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_0005
20/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found
20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_0005
20/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/
20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_0005
20/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false
20/12/13 16:01:43 INFO mapreduce.Job:  map 0% reduce 0%
20/12/13 16:01:51 INFO mapreduce.Job:  map 100% reduce 0%
20/12/13 16:01:57 INFO mapreduce.Job:  map 100% reduce 100%
20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully
20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=84
                FILE: Number of bytes written=625805
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=316
                HDFS: Number of bytes written=30
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=12036
                Total time spent by all reduces in occupied slots (ms)=3311
                Total time spent by all map tasks (ms)=12036
                Total time spent by all reduce tasks (ms)=3311
                Total vcore-milliseconds taken by all map tasks=12036
                Total vcore-milliseconds taken by all reduce tasks=3311
                Total megabyte-milliseconds taken by all map tasks=12324864
                Total megabyte-milliseconds taken by all reduce tasks=3390464
        Map-Reduce Framework
                Map input records=4
                Map output records=12
                Map output bytes=132
                Map output materialized bytes=90
                Input split bytes=208
                Combine input records=12
                Combine output records=6
                Reduce input groups=3
                Reduce shuffle bytes=90
                Reduce input records=6
                Reduce output records=3
                Spilled Records=12
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=395
                CPU time spent (ms)=1790
                Physical memory (bytes) snapshot=794595328
                Virtual memory (bytes) snapshot=5784080384
                Total committed heap usage (bytes)=533200896
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=108
        File Output Format Counters 
                Bytes Written=30
  • hadoop-score-job.jar 为下面打包的 jar 包,须要 cd 到 jar 包目录下执行命令
  • com.hadoop.poc.StudentProcessor 蕴含 main 函数的类
  • poc01_input 数据起源目录
  • poc01_output 数据输入目录

job 执行完后,后果会保留在 poc01_output 目录下

$ hadoop fs -ls poc01_output2
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2020-12-13 16:01 poc01_output2/_SUCCESS
-rw-r--r--   1 hadoop supergroup         30 2020-12-13 16:01 poc01_output2/part-00000

$ hadoop fs -cat poc01_output2/part-00000
数学    87
英语    98
语文    89

正文完
 0