1.Shuffle 机制
Shuffle 指的 map() 办法之后,reduce() 之前的数据处理过程。
就是将 MapTask 输入的后果数据,依照 Partitioner 分区制订的规定分发给 ReduceTask 执行,并在散发的过程中,对数据进行分区和排序。
2.Partition 分区
在进行 MapReduce 计算时,有时候须要把最终的输入数据分到不同的文件中,这时候,就须要用到分区。
举个例子:
有这样一组数据
class1_aaa 50
class2_bbb 100
class3_ccc 80
class1_ddd 10
class2_eee 100
class3_fff 70
class1_hhh 150
class2_lll 100
class3_www 80
想实现这样一个需要,对外面的数据的值分组求和,并将后果别离输入一个文件,那这里就须要 Partition 分区了。
Map 代码示例
public class ClMap extends Mapper<LongWritable, Text,Text, IntWritable> {
// 输入的 k 和 v
Text outk = new Text();
IntWritable outv = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();
String[] contents = line.split(" ");
String outkey = contents[0].split("_")[0];
outk.set(outkey);
outv.set(Integer.parseInt(contents[1]));
context.write(outk,outv);
}
}
Partition 代码示例
public class ClPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int numPartitions) {String strStarts = text.toString().split("_")[0];
if("class1".equals(strStarts)){return 0;}else if("class2".equals(strStarts)){return 1;}else{return 2;}
}
}
Reduce 代码示例
public class ClReduce extends Reducer<Text, IntWritable,Text,IntWritable> {IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {sum += value.get();
}
outv.set(sum);
context.write(key,outv);
}
}
Driver 代码示例
public class ClDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClDriver.class);
job.setMapperClass(ClMap.class);
job.setReducerClass(ClReduce.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// ReduceTask 过程数
job.setNumReduceTasks(3);
// 设定应用的分区
job.setPartitionerClass(ClPartitioner.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0:1);
}
}