1.Shuffle机制

Shuffle指的map()办法之后,reduce()之前的数据处理过程。

就是将 MapTask 输入的后果数据,依照 Partitioner 分区制订的规定分发给ReduceTask执行,并在散发的过程中,对数据进行分区和排序。

2.Partition分区

在进行MapReduce计算时,有时候须要把最终的输入数据分到不同的文件中,这时候,就须要用到分区。

举个例子:
有这样一组数据

class1_aaa 50class2_bbb 100class3_ccc 80class1_ddd 10class2_eee 100class3_fff 70class1_hhh 150class2_lll 100class3_www 80

想实现这样一个需要,对外面的数据的值分组求和,并将后果别离输入一个文件,那这里就须要Partition分区了。

Map 代码示例

public class ClMap extends Mapper<LongWritable, Text,Text, IntWritable> {    // 输入的k和v    Text outk = new Text();    IntWritable outv = new IntWritable();    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String line = value.toString();        String[] contents = line.split(" ");        String outkey = contents[0].split("_")[0];        outk.set(outkey);        outv.set(Integer.parseInt(contents[1]));        context.write(outk,outv);    }}

Partition代码示例

public class ClPartitioner extends Partitioner<Text, IntWritable> {    @Override    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {        String strStarts = text.toString().split("_")[0];        if("class1".equals(strStarts)){            return 0;        }else if("class2".equals(strStarts)){            return 1;        }else{            return 2;        }    }}

Reduce代码示例

public class ClReduce extends Reducer<Text, IntWritable,Text,IntWritable> {    IntWritable outv = new IntWritable();    @Override    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {        int sum = 0;        for (IntWritable value : values) {            sum += value.get();        }        outv.set(sum);        context.write(key,outv);    }}

Driver代码示例

public class ClDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(ClDriver.class);        job.setMapperClass(ClMap.class);        job.setReducerClass(ClReduce.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        // ReduceTask过程数        job.setNumReduceTasks(3);        // 设定应用的分区        job.setPartitionerClass(ClPartitioner.class);        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));                boolean b = job.waitForCompletion(true);        System.exit(b ? 0:1);    }}