先晓得是什么,再去理解为什么
MapReduce入门概述
MapReduce定义
MapReduce是一个基于Hadoop的分布式运算程序的编程框架
它的外围性能是将用户编写的业务逻辑代码和自带的组件组合成为一个残缺的分布式运算程序,并发的运行在Hadoop集群上。
MapReduce长处
- MapReduce易于编程:简略的实现一些接口就能够实现分布式程序,并且这个分布式程序能够散布到大量便宜的PC机器上执行。
- 良好的扩展性:加机器就能够减少计算能力
- 高容错性:所谓容错就是当零碎中一台机器故障时候,有一种机制能够将任务分配到新机器上而后持续运行,这个过程是不须要人工干预的
- 适宜PB级上数据的离线解决:大数据的稳固解决
MapReduce毛病
- 不善于实时计算:MapReduce不能像Mysql,在毫秒级或秒级返回后果
- 不善于流式计算:流式计算输出数据是动静的,连续不断的,然而MR解决的数据肯定是动态的,这是由设计决定的
- 不善于DAG计算:多个工作具备依赖关系,后者输出依赖前者输入,这种活MR不善于,读写磁盘太多性能降落
MapReduce统计单词过程
默认是依照128M进行数据切块哦
在上图过程一共有三种:
- APPMaster:负责整个程序的过程调度和状态协调
- MapTask:负责Map阶段的整个数据处理流程
- ReduceTask:负责Reduce阶段的整个数据处理流程
MapReduce编程套路
咱们编写的局部根本分为三个:Mapper,Reducer和Driver
Map阶段
(1)用户自定义Mapper要继承的父类
(2)Mapper的输出数据格式是KV对
(3)Mapper中业务逻辑写在map()办法中【map()对每个KV对调用一次】
(4)Mapper的输入数据格式也是KV对
Reducer阶段
(1)用户自定义Reducer继承本人的父类
(2)Reducer的输出数据类型对应Mapper的输入数据类型,也是KV
(3)Reduce业务逻辑写在reduce()办法中【reduce()对每个KV对调用一次】
Driver阶段
相当于YARN集群的客户端,等等程序写完,须要通过它把整个程序提交到YARN集群上
HELLO WORLD案例
需要
统计文件中单词的呈现的词频
Mapper代码
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 输入 for (String word : words) { k.set(word); context.write(k, v); } }}
Reducer阶段
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{int sum;IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输入 v.set(sum); context.write(key,v); }}
Driver驱动类
public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 输入输出门路须要依据本人电脑上理论的输入输出门路设置// 留神这里是在win下跑,如果放到集群上门路须要更改args = new String[] { "e:/input/inputword", "e:/output1" }; // 1 获取配置信息以及封装工作 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置jar加载门路 job.setJarByClass(WordcountDriver.class); // 3 设置map和reduce类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 4 设置map输入 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置Reduce输入 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置输出和输入门路 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}