1.MapReduce 介绍
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析利用”的外围框架。
MapReduce 外围性能是将用户编写的业务逻辑代码和自带默认组件整合成一个残缺的分布式运算程序,并发运行在一个 Hadoop 集群上。
MapReduce 的作用就是大数据处理。
2.MapReduce 的核心思想
举一个例子,咱们有 1000 个球,外面有红色、蓝色、红色各种色彩的球,咱们想晓得外面有多少红球,咱们把这个找出红球数量的工作交给了 MapReduce 来做。
1)MapReduce 会将这 1000 个球交分成若干份,交给若干人
2)每人数本人手中红球的数量,而后上报下来
3)汇总所有数,求和,失去红球数量。
在大数据处理上,MapReduce 设计思维的外围就是 ” 分而治之 ”,它将简单的、运行在大规模集群上的并行计算过程高度形象成两个 Map 和 Reduce 两个函数。
函数名 | 输出 | 解决 | 输入 |
---|---|---|---|
Map | 键值对 \<k1,v1\> | Map 函数将解决这些键值对,并以另一种键值对模式输入两头后果 List(\<K2,V2\>) | List(\<K2,V2\> |
Reduce | List(\<K2,V2\> | 对传入的两头后果列表数据进行某种整顿或进一步的解决,并产生最终的输入后果 List(\<K3,V3\> | List(\<K3,V3\> |
简略来说,Map 阶段负责拆分解决,Reduce 阶段将 Map 阶段的处理结果进行汇总
3. 官放 wordcount 示例
这里有一个 words.txt 文件
[v2admin@hadoop10 demo]$ cat words.txt
zhangsan hello tianjin
beijing shanghai shandong
hebei guangzhou tangshan
上传至 hadoop 上
[v2admin@hadoop10 demo]$ hadoop fs -put words.txt /home/input
运行 wordcount
// 进入 $HADOOP_HOME/share/hadoop/mapreduce
[v2admin@hadoop10 mapreduce]$ hadoop jar hadoop-mapreduce-examples-3.1.3.jar wordcount /home/input /home/output1
而后咱们看下过程
2020-12-30 14:31:30,989 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1647)) - map 0% reduce 0%
2020-12-30 14:31:36,035 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1647)) - map 100% reduce 0%
2020-12-30 14:31:41,134 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1647)) - map 100% reduce 100%
跟上述所说一样,工作被分成两个阶段 map 和 reduce,咱们看下最终后果
[v2admin@hadoop10 mapreduce]$ hadoop fs -cat /home/output1/part-r-00000
2020-12-30 14:34:52,772 INFO [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2020-12-30 14:34:53,299 INFO [main] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
beijing 1
guangzhou 1
hebei 1
hello 1
shandong 1
shanghai 1
tangshan 1
tianjin 1
zhangsan 1
4 本人去实现一个 wordcount
先看下官网的 wordcount,通过反编译工具能够去看源码,要实现一个 wordcount,须要编写三局部内容:Mapper、Reducer 和 Driver。
同时也看到 hadoop 中并没有应用 java 中的数据类型,而是有本人实现的。
Java 类型 | Hadoop Writable 类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
- Mapper 的示例代码:
/**
* Map 编写步骤
* 1)自定义的 Mapper 类要继承 Mapper 类
* 2)Mapper 的输出数据为 k - v 对模式,类型自定义
* 3)实现父类 map()办法,也是业务逻辑
* 4)输入后果也是 kv- 对, 类型自定义
*
*/
public class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable>{
// 输入的 k
Text k = new Text();
// 输入的 v
IntWritable v = new IntWritable(1);
/**
* @param key 输出的 k,* @param value 输出的 v,这个理论就是每次进来的数据
* @param context mapper 运行时的调度
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将输出的数据转化成 String 类型,因为咱们要对数据切片
String value_content = value.toString();
// 应用空格切分数据
String[] words = value_content.split(" ");
// 迭代数组
for (String word : words) {k.set(word);
context.write(k,v);
}
// 至此 Mapper 实现
}
}
- Reducer 代码示例
/**
* 1)继承 Reducer 类
* 2)Reducer 的输出类型对应 Mapper 的输入数据类型
* 3)实现 reduce()办法,也就是编写业务逻辑
* 4)ReduceTask 过程每一组雷同 k 的 k - v 组调用一次 reduce 办法
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {sum += value.get();
}
v.set(sum);
context.write(key,v);
}
}
- Driver 示例代码
public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 定义配置对象,配置相干参数
Configuration conf = new Configuration();
/*
示例
// namenode 的拜访地址
conf.set("fs.defaultFS","hdfs://hadoop10:9820");
// yarn 配置
conf.set("mapreduce.framework.name","yarn");
// resourceManager 配置,我的 resourceManger 在 hadoop12 机器上
conf.set("yarn.resourcemanager.hostname","hadoop12");
// 容许 mr 近程运行在集群上
conf.set("mapreduce.app-submission.cross-platform","true");
*/
// 1. 创立 job 工作
Job job = Job.getInstance(conf);
// 2. 设置 jar 包
job.setJarByClass(WordCountDriver.class);
// 3. 关联 Mapper 和 Reducer
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReducer.class);
// 4. 设置 Map 和 Reduce 的输入类型
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//5. 设置输入输出文件
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6. 提交工作
boolean b = job.waitForCompletion(true);
System.exit(b ? 0:1);
}
}
能够打成 jar 包 Hadoop_Demo1-1.0-SNAPSHOT.jar,上传到集群中运行咱们 wordcount 程序
示例
[v2admin@hadoop10 ~]$ hadoop jar Hadoop_Demo1-1.0-SNAPSHOT.jar cn.leaf.demo04.WordCountDriver /home/input /home/output2