关于hadoop:Hadoop之MapReduce三Shuffle机制和Partition分区

11次阅读

共计 2088 个字符,预计需要花费 6 分钟才能阅读完成。

1.Shuffle 机制

Shuffle 指的 map() 办法之后,reduce() 之前的数据处理过程。

就是将 MapTask 输入的后果数据,依照 Partitioner 分区制订的规定分发给 ReduceTask 执行,并在散发的过程中,对数据进行分区和排序。

2.Partition 分区

在进行 MapReduce 计算时,有时候须要把最终的输入数据分到不同的文件中,这时候,就须要用到分区。

举个例子:
有这样一组数据

class1_aaa 50
class2_bbb 100
class3_ccc 80
class1_ddd 10
class2_eee 100
class3_fff 70
class1_hhh 150
class2_lll 100
class3_www 80

想实现这样一个需要,对外面的数据的值分组求和,并将后果别离输入一个文件,那这里就须要 Partition 分区了。

Map 代码示例

public class ClMap extends Mapper<LongWritable, Text,Text, IntWritable> {
    // 输入的 k 和 v
    Text outk = new Text();
    IntWritable outv = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();
        String[] contents = line.split(" ");
        String outkey = contents[0].split("_")[0];

        outk.set(outkey);
        outv.set(Integer.parseInt(contents[1]));

        context.write(outk,outv);
    }
}

Partition 代码示例

public class ClPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {String strStarts = text.toString().split("_")[0];

        if("class1".equals(strStarts)){return 0;}else if("class2".equals(strStarts)){return 1;}else{return 2;}
    }
}

Reduce 代码示例

public class ClReduce extends Reducer<Text, IntWritable,Text,IntWritable> {IntWritable outv = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {sum += value.get();
        }
        outv.set(sum);
        context.write(key,outv);
    }
}

Driver 代码示例

public class ClDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(ClDriver.class);

        job.setMapperClass(ClMap.class);
        job.setReducerClass(ClReduce.class);

        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // ReduceTask 过程数
        job.setNumReduceTasks(3);
        // 设定应用的分区
        job.setPartitionerClass(ClPartitioner.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        boolean b = job.waitForCompletion(true);

        System.exit(b ? 0:1);

    }
}
正文完
 0