FileOutputFormat 及其子类产生的文件放在输入目录下。一个 reducer 就会输入一个文件,文件名称的输入格局为 name-r-nnnnn,例如 part-r-00000

  • name 是由程序设定的任意名字,默认为 part
  • nnnnn 是一个指名块号的整数(从0开始)。块号保障从不同块(mapper 或者 reducer)写的输入在雷同名字状况下不会抵触。

有时可能要对输入的文件名进行管制或让每个 reducer 输入多个文件。MapReduce 为此提供了 MultipleOutputFormat 类。MultipleOutputFormat 类能够将数据写到多个文件,能够依据输入键和值或者任意字符串来重命名这些文件或者目录。

1. 重定义输入文件

咱们能够对输入的文件名进行管制。思考这样一个需要:咱们须要依照单词首字母输入来辨别 WordCount 程序的输入。这个需要能够应用 MultipleOutputs 来实现:

public class MultipleOutputsExample  extends Configured implements Tool {    // Mapper    public static class MultipleOutputsMapper extends Mapper<LongWritable, Text, Text, IntWritable> {        private final static IntWritable one = new IntWritable(1);        private Text word = new Text();        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            String line = value.toString();            StringTokenizer itr = new StringTokenizer(line);            while (itr.hasMoreTokens()) {                word.set(itr.nextToken());                context.write(word, one);            }        }    }    // Reducer    public static class MultipleOutputsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        private MultipleOutputs<Text, IntWritable> multipleOutputs;        @Override        protected void setup(Context context) throws IOException, InterruptedException {            // 初始化 MultipleOutputs            multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);        }        @Override        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {            int sum = 0;            for(IntWritable intWritable : values){                sum += intWritable.get();            }            // key 首字母作为根底输入门路            String baseOutput = StringUtils.substring(key.toString(), 0, 1);            // 应用 multipleOutputs            multipleOutputs.write(key, new IntWritable(sum), StringUtils.lowerCase(baseOutput));        }        @Override        protected void cleanup(Context context) throws IOException, InterruptedException {            // 敞开 MultipleOutputs            multipleOutputs.close();        }    }    public int run(String[] args) throws Exception {        if (args.length != 2) {            System.err.println("./xxxx <input> <output>");            System.exit(1);        }        String inputPaths = args[0];        String outputPath = args[1];        Configuration conf = this.getConf();        Job job = Job.getInstance(conf);        job.setJobName("MultipleOutputsExample");        job.setJarByClass(MultipleOutputsExample.class);        // Map 输入 Key 格局        job.setMapOutputKeyClass(Text.class);        // Map 输入 Value 格局        job.setMapOutputValueClass(IntWritable.class);        // Reduce 输入 Key 格局        job.setOutputKeyClass(Text.class);        // Reduce 输入 Value 格局        job.setOutputValueClass(IntWritable.class);        // Mapper 类        job.setMapperClass(MultipleOutputsMapper.class);        // Reducer 类        job.setReducerClass(MultipleOutputsReducer.class);        // 输出门路        FileInputFormat.setInputPaths(job, inputPaths);        // 输入门路        FileOutputFormat.setOutputPath(job, new Path(outputPath));        boolean success = job.waitForCompletion(true);        return success ? 0 : 1;    }    public static void main(String[] args) throws Exception {        int result = ToolRunner.run(new Configuration(), new MultipleOutputsExample(), args);        System.exit(result);    }}

首先在 setup() 办法中结构一个 MultipleOutputs 的实例。在 reduce() 办法中应用 MultipleOutputs 实例的 write(KEYOUT key, VALUEOUT value, String baseOutputPath) 办法代替 context.write() 办法输入。与 context.write 办法输入的最大不同就是提供了一个重写输入门路的参数 baseOutputPath,默认为 part,在这咱们应用输入单词的首字母:

localhost:target wy$ hadoop fs -ls /data/word-count/word-count-output/Found 7 items-rw-r--r--   1 wy supergroup          0 2022-07-23 15:31 /data/word-count/word-count-output/_SUCCESS-rw-r--r--   1 wy supergroup          9 2022-07-23 15:31 /data/word-count/word-count-output/a-r-00000-rw-r--r--   1 wy supergroup          8 2022-07-23 15:31 /data/word-count/word-count-output/f-r-00000-rw-r--r--   1 wy supergroup          9 2022-07-23 15:31 /data/word-count/word-count-output/h-r-00000-rw-r--r--   1 wy supergroup          4 2022-07-23 15:31 /data/word-count/word-count-output/i-r-00000-rw-r--r--   1 wy supergroup          0 2022-07-23 15:31 /data/word-count/word-count-output/part-r-00000-rw-r--r--   1 wy supergroup         21 2022-07-23 15:31 /data/word-count/word-count-output/s-r-00000

咱们能够看到在输入文件中不仅有咱们想要的输入文件类型,还有 part-r-nnnnn 模式的文件,然而文件内没有信息,这是程序默认的输入文件。所以咱们在指定输入文件名称时,最好不要指定 name 为part,因为它曾经被应用为默认值了。

2. 多目录输入

在 MultipleOutputs 的 write() 办法中指定的根本门路 baseOutputPath 能够蕴含文件门路分隔符(/),这样就能够创立任意深度的子目录。例如,咱们改变下面的需要:按单词的首字母来辨别数据,不同首字母的数据位于不同子目录(例如:a/part-r-00000):

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {    int sum = 0;    for(IntWritable intWritable : values){        sum += intWritable.get();    }    // key 首字母作为根底输入门路    String baseOutput = StringUtils.substring(key.toString(), 0, 1);    // 应用 multipleOutputs    // String basePath = StringUtils.lowerCase(baseOutput);    // 只须要批改此处 蕴含文件分隔符    String basePath = String.format("%s/part", StringUtils.lowerCase(baseOutput));    multipleOutputs.write(key, new IntWritable(sum), basePath);}

上述代码文件输入名称的模式为 {单词首字母}/part-r-nnnnn

localhost:target wy$ hadoop fs -ls /data/word-count/word-count-output/Found 7 items-rw-r--r--   1 wy supergroup          0 2022-07-23 17:42 /data/word-count/word-count-output/_SUCCESSdrwxr-xr-x   - wy supergroup          0 2022-07-23 17:42 /data/word-count/word-count-output/adrwxr-xr-x   - wy supergroup          0 2022-07-23 17:42 /data/word-count/word-count-output/fdrwxr-xr-x   - wy supergroup          0 2022-07-23 17:42 /data/word-count/word-count-output/hdrwxr-xr-x   - wy supergroup          0 2022-07-23 17:42 /data/word-count/word-count-output/i-rw-r--r--   1 wy supergroup          0 2022-07-23 17:42 /data/word-count/word-count-output/part-r-00000drwxr-xr-x   - wy supergroup          0 2022-07-23 17:42 /data/word-count/word-count-output/s

本文由mdnice多平台公布