动手写的第一个MapReduce程序wordcount

36次阅读

共计 6445 个字符,预计需要花费 17 分钟才能阅读完成。

引语:

    之前运行过了 hadoop 官方自带的第一个例子 wordcount, 这次我们自己手写一个, 这个相当于是编程语言中的 helloworld 一样.
首先我们了解一下我们要写的 MapReduce 是处理的哪个部分, 我们知道 hadoop 处理文件是先将要处理的文件拆分成很多个部分, 分别处理完成, 最后再将结果给汇聚起来,
形成最终的处理结果.(也就是分治法的思想)我们接下来举个单词统计的例子, 看看我们写的代码是整个 MapReduce 过程中的哪些部分.

具体 MapReduce 的过程例子

首先咱们有这么一个文件, 文件内容如下:
hello world hello java
hello hadoop
很简单的一个文件就两行. 那么 hadoop 是怎么做单词统计的呢? 我们用步骤来描述下:
第一步: 读取这个文件, 按行来将这个文件每一行的单词给拆分出来, 然后形成很多 key/value 的结果, 处理完就是这样
<hello,1>
<world,1>
<hello,1>
<java,1>
<hello,1>
<hadoop,1>
第二步: 排序
排序完会变成这样的结果
<hadoop,1>
<hello,1>
<hello,1>
<hello,1>
<java,1>
<world,1>
第三步: 合并
合并后的结果如下
<hadoop,1>
<hello,1,1,1>
<java,1>
<world,1>
第四步: 汇聚结果
<hadoop,1>
<hello,3>
<java,1>
<world,1>

到第四步完成, 单词统计其实也就完成了. 看完这个具体的实例, 想必大家对 mapreduce 的处理过程有了一个比较清晰的理解.
然后我们要知道第二步和第三步是 hadoop 框架帮助我们完成的, 我们实际上需要写代码的地方是第一步和第四步.
第一步对应的就是 Map 的过程, 第四步对应的是 Reduce 的过程.

编写 mapreduce 代码

现在我们要做的就是完成第一步和第四步的代码
1. 创建项目


创建一个普通的 java 项目就行, 然后一路 next 点过去, 项目名自己取一个.
2. 引入到时用到的 hadoop 的包, 我这里用的是 hadoop-3.2.0 的版本, 需要引入哪些包呢?
要引入的包:
(1).hadoop 目录下 share/hadoop/common 下的包 (除了那个 test 的包, 官方的测试例子, 可以不需要引入)
(2). 和上一条一样的 common 下 lib 中的包
(3).hadoop 目录下 share/hadoop/mapreduce 下的包
(4). 和上一条一样 mapreduce 下的 lib 中的包
然后在 idea 中引入这些包, 点击 File->Project Structure->Modules
点击右边的小加号来引入刚才说的那些 jar 包

3. 引入包完成以后, 我们创建一个叫 WordCount 的 java 文件, 然后开始敲代码
这里直接贴一下代码,__要注意 import 的部分, 是不是和我一样?__因为好些个名字一样的类, 来自于不同的 jar, 容易弄错.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * @author wxwwt
 * @since 2019-09-15
 */
public class WordCount {

    /**
     * Object      : 输入文件的内容
     * Text        : 输入的每一行的数据
     * Text        : 输出的 key 的类型
     * IntWritable : 输出 value 的类型
     */
    private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {context.write(new Text(itr.nextToken()), new IntWritable(1));
            }
        }
    }

    /**
     * Text         :  Mapper 输入的 key
     * IntWritable  :  Mapper 输入的 value
     * Text         :  Reducer 输出的 key
     * IntWritable  :  Reducer 输出的 value
     */
    private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable item : values) {count += item.get();
            }
            context.write(key, new IntWritable(count));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 创建配置
        Configuration configuration = new Configuration();
        // 设置 hadoop 的作业  jobName 是 WordCount
        Job job = Job.getInstance(configuration, "WordCount");
        // 设置 jar
        job.setJarByClass(WordCount.class);
        // 设置 Mapper 的 class
        job.setMapperClass(WordCountMapper.class);
        // 设置 Reducer 的 class
        job.setReducerClass(WordCountReducer.class);
        // 设置输出的 key 和 value 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 待 job 执行完  程序退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

Mapper 程序:

/**
 * Object      : 输入文件的内容
 * Text        : 输入的每一行的数据
 * Text        : 输出的 key 的类型
 * IntWritable : 输出 value 的类型
 */
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {context.write(new Text(itr.nextToken()), new IntWritable(1));
        }
    }
}

context 是全局的上下文, 先使用了 StringTokenizer 将 value(也就是每行的数据)按照空格分成了很多份,StringTokenizer 如果没有传入指定的分割符的话, 默认会将

"\t\n\r\f" 空格制表符换行符等符号作为分隔符, 然后使用 nextToken()来遍历这个按照空格分割的字符串.context.write(new Text(itr.nextToken()), new IntWritable(1));

的意思就是将 key/value 写入到上下文中.
注: 在 hadoop 编程中 String 是 Text,Integer 是 IntWritable. 这是 hadoop 自己封装的类. 记住就好了, 使用起来和原来的类差不多
这里就是写入了 key 为 Text 的单词, 和 value 为 Writable 的 1(统计数量).

Reduce 程序:

/**
 * Text         :  Mapper 输入的 key
 * IntWritable  :  Mapper 输入的 value
 * Text         :  Reducer 输出的 key
 * IntWritable  :  Reducer 输出的 value
 */
private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable item : values) {count += item.get();
        }
        context.write(key, new IntWritable(count));
    }
}

reduce 完成的是第四步的内容, 我们看看上面的实例过程就回知道此时的输入参数大概是这样
<hello,1,1,1>
所以这里会有一个遍历 values 的过程, 就是将这三个 1 给累加起来了.

程序入口:

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    // 创建配置
    Configuration configuration = new Configuration();
    // 设置 hadoop 的作业  jobName 是 WordCount
    Job job = Job.getInstance(configuration, "WordCount");
    // 设置 jar
    job.setJarByClass(WordCount.class);
    // 设置 Mapper 的 class
    job.setMapperClass(WordCountMapper.class);
    // 设置 Reducer 的 class
    job.setReducerClass(WordCountReducer.class);
    // 设置输出的 key 和 value 类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // 设置输入输出路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 待 job 执行完  程序退出
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

程序入口这里其实看注释就已经比较清楚了, 都是设置一些 mapreduce 需要的参数和路径之类的,
照着写就行了. 这里稍微要注意一点的就是

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

我们回顾一下之前运行 hadoop 的第一个程序的时候, 命令大概是
hadoop jar WordCount.jar /input/wordcount/file1 /output/wcoutput
后面的两个参数就是文件的输入路径和输出路径, 如果咱们代码修改了参数的位置或者有其他参数的操作.
就要对应好 args 下标的位置.

4. 指定 jar 包运行的入口
代码完成后咱们就可以打包了
先选择 File -> Project Structure -> Artifacts -> + -> JAR -> From modules with dependencies

然后选择刚才 WordCount 的 main

再点击 Build -> Build Artifacts

然后会弹出一个框 选择 Build

之后会在项目中生成一个 out 的目录, 在里面找到我们需要的 WordCount.jar, 上传到 hadoop 所在的服务器.
到这里基本上就结束了, 因为后面运行的步骤和我之前的文章时一样的, 可以参考:hadoop 运行第一个实例 wordcount

### 注意事项:
有可能直接运行

  hadoop jar WordCount.jar /input/wordcount/file1  /output/wcoutput

会失败, 报一个异常:

Exception in thread "main" java.io.IOException: Mkdirs failed to create /XXX/XXX
  at org.apache.hadoop.util.RunJar.ensureDirectory(RunJar.java:106)
  at org.apache.hadoop.util.RunJar.main(RunJar.java:150)

类似上面这样的.

这时候需要删除掉 jar 包里面的 License 文件夹和里面的东西, 可以参考这个链接:[stackoverflow](https://stackoverflow.com/que…
)
查看下 jar 中 license 的文件和文件夹
jar tvf XXX.jar | grep -i license
然后删除掉 META-INF/LICENSE 里面的内容
zip -d XXX.jar META-INF/LICENSE

总结:

1. 了解了 mapReduce 的运行步骤, 这样知道了我们只需要写 map 和 reduce 的过程, 中间步骤 hadoop 框架已经做了处理, 以后其他的程序也可以参考这个步骤来写
2.hadoop 中 String 是 Text,Integer 是 IntWritable 这个要记住, 用错了会报异常的
3. 报 Mkdirs failed to create /XXX/XXX 异常的时候先检查是不是路径有问题, 没有的话就删除掉 jar 包中的 META-INF/LICENSE

参考资料:

1.https://hadoop.apache.org/doc…
2.https://stackoverflow.com/que…

正文完
 0