关于hadoop:hadoop教程MapReduce

什么是MapReduce

MapReduce是hadoop进行多节点计算时采纳的计算模型,说白了就是hadoop拆分工作的一套方法论,刚接触MapReduce这个概念时,一时很难了解,也查了很多材料,因为每个人了解不一样,反而看的越多越糊涂,其实实质是很简略的货色,这里举一个例子帮忙了解,因为网上大部分是hadoop官网计算单词(wordcount)的例子,这里就换一个场景举例。

假如有以下一份成绩单

1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98
...

各列别离是编号,学生姓名,语文问题,数学问题,英语问题,当初要求统计各科问题最高分,假如这份成绩单十分十分的长,有上千万行,在没有任何计算机系统的帮忙下,要怎么靠人工解决这个问题?

  • 单人统计

专门派一个人进行统计工作,长处是简略,毛病也很显著,须要十分长的工夫,甚至数据量达到肯定水平,一个人一辈子可能也统计不完

  • 多人统计

如果有足够的人能够进行统计工作,要怎么去协调这些人?假如成绩单有100w行并且有1000人能够进行统计

    1. 设一个管理员,管理员把成绩单均匀拆分成1000份给1000集体,每个人须要统计1000行数据
    1. 管理员制作一个表格,要求每个人把本人统计的后果填入该表格,表格格局如下
科目 人员1后果 人员2后果 人员1000后果
语文
数学
英语
    1. 管理员最终失去了如下数据

    科目| 人员1后果|人员2后果|…|人员1000后果

语文 80 85 76
数学 89 90 88
英语 94 85 90
    1. 各科各有1000个后果,管理员又把这个表格拆成了100个小表格分给100集体进行统计,这样每个小表格各有10个数据,小表格格局如下

第一个人领到的小表格

科目 人员1后果 人员2后果 人员10后果
语文 80 85 76
数学 89 90 88
英语 94 85 90

第二个领到的小表格

科目 人员11后果 人员12后果 人员20后果
语文 83 75 88
数学 79 95 58
英语 94 85 90
    1. 管理员再次把每个人的后果收集上来,又失去了100份数据,如果管理员违心又能够把这个数据进行拆分交给多集体进行统计,如此重复最终失去一个最大值后果,管理员也能够本人实现最初的统计,因为数据量不大。

那么在这个过程中,咱们看到了,一份宏大的成绩单通过以下几个步骤后,最终咱们取得了咱们想要的后果

  • 成绩单拆分多份
  • 每一份进行独自统计
  • 对后果进行注销
  • 对统计的后果能够再次进行拆分,也能够间接进行统计
  • 如此重复之后最终失去了后果

那么把这个过程用MapReduce语言进行形容就能够是以下过程:

  • 成绩单拆分多份- 分片(split)
  • 每一份进行独自统计 – map
  • 并且对后果进行注销 – shuffle
  • 对统计的后果能够再次进行拆分- combiner
  • 也能够间接进行统计 – reduce

另外在管理员的表格中,三个科目前面记录

开发

咱们用理论java代码解决下面的问题,假如你曾经依照上一篇教程装置好了hadoop集群环境

  • 创立工程

你能够用你相熟的ide创立一个一般java工程,倡议用maven进行包治理,并退出以下包依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.5.1</version>
</dependency>
  • 创立Mapper

Mapper对应是MapReduce中的map过程,在以下mapper代码:

StudentMapper.java

public class StudentMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
        String[] ss = text.toString().split(",");
        outputCollector.collect(new Text("语文"), new IntWritable(Integer.parseInt(ss[2])));
        outputCollector.collect(new Text("数学"), new IntWritable(Integer.parseInt(ss[3])));
        outputCollector.collect(new Text("英语"), new IntWritable(Integer.parseInt(ss[4])));
    }
}

StudentMapper实现了 Mapper<LongWritable, Text, Text, IntWritable>接口,这里有四个参数,四个参数含意如下

  • LongWritable:hadoop会把txt文件按行进行宰割,这个示意该行在文件中的地位,个别不必
  • Text:行内容,比方第一行就是1,张三,78,87,69
  • Text:后面提到,最终咱们要依照科目进行汇总而后计算最高分,那么科目名称就是key,每次计算的后果就是前面的value,所以这里用text示意key,因为咱们要存储科目名称
  • IntWritable:存储计算结果,这里指的是基于本次统计所失去的科目最高分

办法map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)的几个参数和下面含意一样,留神到outputCollector是一个数组,阐明这里能够写入多个后果,reporter能够向hadoop汇报工作进度。在这个mapper外面,咱们并没有做什么计算,咱们只是把文本外面的问题解析进去,并且按科目放到outputCollector中,相当于大家第一次都没干活,只是把数据整顿好。通过mapper后,数据从

1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98
...

变成了

语文 78 56 65 89
数学 87 76 46 56
英语 69 91 84 98

StudentReducer.java

public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
        StringBuffer str = new StringBuffer();
        Integer max = -1;
        while (iterator.hasNext()) {
            Integer score = iterator.next().get();
            if (score > max) {
                max = score;
            }
        }
        outputCollector.collect(new Text(text.toString()), new IntWritable(max));
    }
}

Reducer就开始真正执行计算了,reducer函数reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)参数含意如下:

  • text:就是Mapper的第三个参数
  • iterator:就是Mapper中写入outputCollector的数据,和第一参数组合起来就是mapper中的outputCollector
  • outputCollector:reducer计算后的后果须要写入到该参数中,这里咱们写入的内容是相似key:语文 value:90构造的数据,所以类型为<Text, IntWritable>

后面提到过mapper会把数据整顿好,并且按科目将问题写入的outputCollector中,那么到了reducer这一步,hadoop就会把mapper写入的数据依照key进行汇总(也就是科目),并且交付给reducer,reducer负责计算外面最高分,并且也将后果写入outputCollector。

StudentProcessor

public class StudentProcessor {

    public static void main(String args[]) throws Exception {
        JobConf conf = new JobConf(StudentProcessor.class);
        conf.setJobName("max_scroe_poc1");
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(StudentMapper.class);
        conf.setReducerClass(StudentReducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}

咱们还须要一个蕴含main函数的启动类,执行mvn package命令进行打包,咱们假如包名为hadoop-score-job.jar,将jar包通过ftp等工具上传到服务器目录下。

  • 上传数据

hadoop借助hdfs分布式文件系统,可能将大文件存储在多个节点,通过hdfs cli工具,咱们感觉在操作本地文件一样,在下面的代码中FileInputFormat.setInputPaths(conf, new Path(args[0]));设置了MapReduce的数据起源,用户指定目录,该目录下文件作为数据起源,这里的目录就是hdfs中的目录,并且该目录必须存在,而且数据须要上传到该目录下,执行以下命令创立目录

hadoop fs -mkdir poc01_input

执行以下命令将数据导入到hdfs中

hadoop fs -put score.txt poc01_input

score.txt内容为

1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98

通过ls命令能够查看文件是否上传胜利

$ hadoop fs -ls poc01_input
Found 1 items
-rw-r--r--   1 hadoop supergroup         72 2020-12-13 15:43 poc01_input/score.txt
  • 执行job

执行以下命令开始运行job

$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output

20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:34 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 1
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:2
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_0005
20/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found
20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_0005
20/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/
20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_0005
20/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false
20/12/13 16:01:43 INFO mapreduce.Job:  map 0% reduce 0%
20/12/13 16:01:51 INFO mapreduce.Job:  map 100% reduce 0%
20/12/13 16:01:57 INFO mapreduce.Job:  map 100% reduce 100%
20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully
20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=84
                FILE: Number of bytes written=625805
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=316
                HDFS: Number of bytes written=30
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=12036
                Total time spent by all reduces in occupied slots (ms)=3311
                Total time spent by all map tasks (ms)=12036
                Total time spent by all reduce tasks (ms)=3311
                Total vcore-milliseconds taken by all map tasks=12036
                Total vcore-milliseconds taken by all reduce tasks=3311
                Total megabyte-milliseconds taken by all map tasks=12324864
                Total megabyte-milliseconds taken by all reduce tasks=3390464
        Map-Reduce Framework
                Map input records=4
                Map output records=12
                Map output bytes=132
                Map output materialized bytes=90
                Input split bytes=208
                Combine input records=12
                Combine output records=6
                Reduce input groups=3
                Reduce shuffle bytes=90
                Reduce input records=6
                Reduce output records=3
                Spilled Records=12
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=395
                CPU time spent (ms)=1790
                Physical memory (bytes) snapshot=794595328
                Virtual memory (bytes) snapshot=5784080384
                Total committed heap usage (bytes)=533200896
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=108
        File Output Format Counters 
                Bytes Written=30
  • hadoop-score-job.jar为下面打包的jar包,须要cd到jar包目录下执行命令
  • com.hadoop.poc.StudentProcessor 蕴含main函数的类
  • poc01_input 数据起源目录
  • poc01_output 数据输入目录

job执行完后,后果会保留在poc01_output目录下

$ hadoop fs -ls poc01_output2
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2020-12-13 16:01 poc01_output2/_SUCCESS
-rw-r--r--   1 hadoop supergroup         30 2020-12-13 16:01 poc01_output2/part-00000

$ hadoop fs -cat poc01_output2/part-00000
数学    87
英语    98
语文    89

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理