1.MapReduce介绍
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析利用”的外围框架。
MapReduce外围性能是将用户编写的业务逻辑代码和自带默认组件整合成一个残缺的分布式运算程序,并发运行在一个Hadoop集群上。
MapReduce的作用就是大数据处理。
2.MapReduce的核心思想
举一个例子,咱们有1000个球,外面有红色、蓝色、红色各种色彩的球,咱们想晓得外面有多少红球,咱们把这个找出红球数量的工作交给了MapReduce来做。
1)MapReduce会将这1000个球交分成若干份,交给若干人
2)每人数本人手中红球的数量,而后上报下来
3)汇总所有数,求和,失去红球数量。
在大数据处理上,MapReduce设计思维的外围就是"分而治之",它将简单的、运行在大规模集群上的并行计算过程高度形象成两个Map和Reduce两个函数。
函数名 | 输出 | 解决 | 输入 |
---|---|---|---|
Map | 键值对\<k1,v1\> | Map 函数将解决这些键值对,并以另一种键值对模式输入两头后果 List(\<K2,V2\>) | List(\<K2,V2\> |
Reduce | List(\<K2,V2\> | 对传入的两头后果列表数据进行某种整顿或进一步的解决,并产生最终的输入后果List(\<K3,V3\> | List(\<K3,V3\> |
简略来说,Map阶段负责拆分解决,Reduce阶段将Map阶段的处理结果进行汇总
3.官放wordcount示例
这里有一个words.txt文件
[v2admin@hadoop10 demo]$ cat words.txt zhangsan hello tianjinbeijing shanghai shandonghebei guangzhou tangshan
上传至hadoop上
[v2admin@hadoop10 demo]$ hadoop fs -put words.txt /home/input
运行wordcount
// 进入$HADOOP_HOME/share/hadoop/mapreduce[v2admin@hadoop10 mapreduce]$ hadoop jar hadoop-mapreduce-examples-3.1.3.jar wordcount /home/input /home/output1
而后咱们看下过程
2020-12-30 14:31:30,989 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1647)) - map 0% reduce 0%2020-12-30 14:31:36,035 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1647)) - map 100% reduce 0%2020-12-30 14:31:41,134 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1647)) - map 100% reduce 100%
跟上述所说一样,工作被分成两个阶段map和reduce,咱们看下最终后果
[v2admin@hadoop10 mapreduce]$ hadoop fs -cat /home/output1/part-r-000002020-12-30 14:34:52,772 INFO [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS2020-12-30 14:34:53,299 INFO [main] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = falsebeijing 1guangzhou 1hebei 1hello 1shandong 1shanghai 1tangshan 1tianjin 1zhangsan 1
4 本人去实现一个wordcount
先看下官网的wordcount,通过反编译工具能够去看源码,要实现一个wordcount,须要编写三局部内容:Mapper、Reducer和Driver。
同时也看到hadoop中并没有应用java中的数据类型,而是有本人实现的。
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
- Mapper的示例代码:
/** * Map编写步骤 * 1)自定义的Mapper类要继承Mapper类 * 2)Mapper的输出数据为k-v对模式,类型自定义 * 3)实现父类map()办法,也是业务逻辑 * 4)输入后果也是kv-对,类型自定义 * */public class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable>{ // 输入的k Text k = new Text(); // 输入的v IntWritable v = new IntWritable(1); /** * @param key 输出的k, * @param value 输出的v,这个理论就是每次进来的数据 * @param context mapper运行时的调度 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将输出的数据转化成String类型,因为咱们要对数据切片 String value_content = value.toString(); // 应用空格切分数据 String[] words = value_content.split(" "); // 迭代数组 for (String word : words) { k.set(word); context.write(k,v); } // 至此 Mapper实现 }}
- Reducer代码示例
/** * 1)继承Reducer类 * 2)Reducer的输出类型对应Mapper的输入数据类型 * 3)实现reduce()办法,也就是编写业务逻辑 * 4)ReduceTask过程每一组雷同k的k-v组调用一次reduce办法 */public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } v.set(sum); context.write(key,v); }}
- Driver示例代码
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 定义配置对象,配置相干参数 Configuration conf = new Configuration(); /* 示例 // namenode的拜访地址 conf.set("fs.defaultFS","hdfs://hadoop10:9820"); // yarn配置 conf.set("mapreduce.framework.name","yarn"); // resourceManager配置,我的resourceManger在hadoop12机器上 conf.set("yarn.resourcemanager.hostname","hadoop12"); // 容许mr近程运行在集群上 conf.set("mapreduce.app-submission.cross-platform","true"); */ // 1.创立job工作 Job job = Job.getInstance(conf); // 2.设置jar包 job.setJarByClass(WordCountDriver.class); // 3.关联Mapper和Reducer job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReducer.class); // 4.设置Map和Reduce的输入类型 job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //5. 设置输入输出文件 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //6.提交工作 boolean b = job.waitForCompletion(true); System.exit(b ? 0:1); }}
能够打成jar包 Hadoop_Demo1-1.0-SNAPSHOT.jar,上传到集群中运行咱们wordcount程序
示例
[v2admin@hadoop10 ~]$ hadoop jar Hadoop_Demo1-1.0-SNAPSHOT.jar cn.leaf.demo04.WordCountDriver /home/input /home/output2