1.Shuffle机制
Shuffle指的map()办法之后,reduce()之前的数据处理过程。
就是将 MapTask 输入的后果数据,依照 Partitioner 分区制订的规定分发给ReduceTask执行,并在散发的过程中,对数据进行分区和排序。
2.Partition分区
在进行MapReduce计算时,有时候须要把最终的输入数据分到不同的文件中,这时候,就须要用到分区。
举个例子:
有这样一组数据
class1_aaa 50class2_bbb 100class3_ccc 80class1_ddd 10class2_eee 100class3_fff 70class1_hhh 150class2_lll 100class3_www 80
想实现这样一个需要,对外面的数据的值分组求和,并将后果别离输入一个文件,那这里就须要Partition分区了。
Map 代码示例
public class ClMap extends Mapper<LongWritable, Text,Text, IntWritable> { // 输入的k和v Text outk = new Text(); IntWritable outv = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] contents = line.split(" "); String outkey = contents[0].split("_")[0]; outk.set(outkey); outv.set(Integer.parseInt(contents[1])); context.write(outk,outv); }}
Partition代码示例
public class ClPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text text, IntWritable intWritable, int numPartitions) { String strStarts = text.toString().split("_")[0]; if("class1".equals(strStarts)){ return 0; }else if("class2".equals(strStarts)){ return 1; }else{ return 2; } }}
Reduce代码示例
public class ClReduce extends Reducer<Text, IntWritable,Text,IntWritable> { IntWritable outv = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outv.set(sum); context.write(key,outv); }}
Driver代码示例
public class ClDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ClDriver.class); job.setMapperClass(ClMap.class); job.setReducerClass(ClReduce.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // ReduceTask过程数 job.setNumReduceTasks(3); // 设定应用的分区 job.setPartitionerClass(ClPartitioner.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b ? 0:1); }}