共计 5049 个字符,预计需要花费 13 分钟才能阅读完成。
本文源码:GitHub·点这里 || GitEE·点这里
一、MapReduce 概述
1、基本概念
Hadoop 外围组件之一:分布式计算的计划 MapReduce,是一种编程模型,用于大规模数据集的并行运算,其中 Map(映射)和 Reduce(归约)。
MapReduce 既是一个编程模型,也是一个计算组件,解决的过程分为两个阶段,Map 阶段:负责把工作合成为多个小工作,Reduce 负责把多个小工作的处理结果进行汇总。其中 Map 阶段次要输出是一对 Key-Value,通过 map 计算后输入一对 Key-Value 值;而后将雷同 Key 合并,造成 Key-Value 汇合;再将这个 Key-Value 汇合转入 Reduce 阶段,通过计算输入最终 Key-Value 后果集。
2、特点形容
MapReduce 能够实现基于上千台服务器并发工作,提供很弱小的数据处理能力,如果其中单台服务挂掉,计算工作会主动本义到另外节点执行,保障高容错性;然而 MapReduce 不适应于实时计算与流式计算,计算的数据是动态的。
二、操作案例
1、流程形容
数据文件个别以 CSV 格局居多,数据行通常以空格分隔,这里须要思考数据内容特点;
文件通过切片调配在不同的 MapTask 工作中并发执行;
MapTask 工作执行结束之后,执行 ReduceTask 工作,依赖 Map 阶段的数据;
ReduceTask 工作执行结束后,输入文件后果。
2、根底配置
hadoop:
# 读取的文件源
inputPath: hdfs://hop01:9000/hopdir/javaNew.txt
# 该门路必须是程序运行前不存在的
outputPath: /wordOut
3、Mapper 程序
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {Text mapKey = new Text();
IntWritable mapValue = new IntWritable(1);
@Override
protected void map (LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1、读取行
String line = value.toString();
// 2、行内容切割,依据文件中分隔符
String[] words = line.split(" ");
// 3、存储
for (String word : words) {mapKey.set(word);
context.write(mapKey, mapValue);
}
}
}
4、Reducer 程序
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum ;
IntWritable value = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
// 1、累加求和统计
sum = 0;
for (IntWritable count : values) {sum += count.get();
}
// 2、输入后果
value.set(sum);
context.write(key,value);
}
}
5、执行程序
@RestController
public class WordWeb {
@Resource
private MapReduceConfig mapReduceConfig ;
@GetMapping("/getWord")
public String getWord () throws IOException, ClassNotFoundException, InterruptedException {
// 申明配置
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("fs.hdfs.impl",
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
hadoopConfig.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
Job job = Job.getInstance(hadoopConfig);
// Job 执行作业 输出门路
FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath()));
// Job 执行作业 输入门路
FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath()));
// 自定义 Mapper 和 Reducer 两个阶段的工作解决类
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
// 设置输入后果的 Key 和 Value 的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 执行 Job 直到实现
job.waitForCompletion(true);
return "success" ;
}
}
6、执行后果查看
将应用程序打包放到 hop01 服务上执行;
java -jar map-reduce-case01.jar
三、案例剖析
1、数据类型
Java 数据类型与对应的 Hadoop 数据序列化类型;
Java 类型 | Writable 类型 | Java 类型 | Writable 类型 |
---|---|---|---|
String | Text | float | FloatWritable |
int | IntWritable | long | LongWritable |
boolean | BooleanWritable | double | DoubleWritable |
byte | ByteWritable | array | DoubleWritable |
map | MapWritable |
2、外围模块
Mapper 模块 :解决输出的数据,业务逻辑在 map() 办法中实现,输入的数据也是 KV 格局;
Reducer 模块 :解决 Map 程序输入的 KV 数据,业务逻辑在 reduce() 办法中;
Driver 模块:将程序提交到 yarn 进行调度,提交封装了运行参数的 job 对象;
四、序列化操作
1、序列化简介
序列化:将内存中对象转换为二进制的字节序列,能够通过输入流长久化存储或者网络传输;
反序列化:接管输出字节流或者读取磁盘长久化的数据,加载到内存的对象过程;
Hadoop 序列化相干接口:Writable 实现的序列化机制、Comparable 治理 Key 的排序问题;
2、案例实现
案例形容:读取文件,并对文件雷同的行做数据累加计算,输入计算结果;该案例演示在本地执行,不把 Jar 包上传的 hadoop 服务器,驱动配置统一。
实体对象属性
public class AddEntity implements Writable {
private long addNum01;
private long addNum02;
private long resNum;
// 构造方法
public AddEntity() {super();
}
public AddEntity(long addNum01, long addNum02) {super();
this.addNum01 = addNum01;
this.addNum02 = addNum02;
this.resNum = addNum01 + addNum02;
}
// 序列化
@Override
public void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(addNum01);
dataOutput.writeLong(addNum02);
dataOutput.writeLong(resNum);
}
// 反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
// 留神:反序列化程序和写序列化程序统一
this.addNum01 = dataInput.readLong();
this.addNum02 = dataInput.readLong();
this.resNum = dataInput.readLong();}
// 省略 Get 和 Set 办法
}
Mapper 机制
public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> {Text myKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 读取行
String line = value.toString();
// 行内容切割
String[] lineArr = line.split(",");
// 内容格局解决
String lineNum = lineArr[0];
long addNum01 = Long.parseLong(lineArr[1]);
long addNum02 = Long.parseLong(lineArr[2]);
myKey.set(lineNum);
AddEntity myValue = new AddEntity(addNum01,addNum02);
// 输入
context.write(myKey, myValue);
}
}
Reducer 机制
public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> {
@Override
protected void reduce(Text key, Iterable<AddEntity> values, Context context)
throws IOException, InterruptedException {
long addNum01Sum = 0;
long addNum02Sum = 0;
// 解决 Key 雷同
for (AddEntity addEntity : values) {addNum01Sum += addEntity.getAddNum01();
addNum02Sum += addEntity.getAddNum02();}
// 最终输入
AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum);
context.write(key, addRes);
}
}
案例最终后果:
五、源代码地址
GitHub·地址
https://github.com/cicadasmile/big-data-parent
GitEE·地址
https://gitee.com/cicadasmile/big-data-parent
举荐浏览:编程体系整顿
序号 | 项目名称 | GitHub 地址 | GitEE 地址 | 举荐指数 |
---|---|---|---|---|
01 | Java 形容设计模式, 算法, 数据结构 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
02 | Java 根底、并发、面向对象、Web 开发 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆ |
03 | SpringCloud 微服务根底组件案例详解 | GitHub·点这里 | GitEE·点这里 | ☆☆☆ |
04 | SpringCloud 微服务架构实战综合案例 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
05 | SpringBoot 框架根底利用入门到进阶 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆ |
06 | SpringBoot 框架整合开发罕用中间件 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
07 | 数据管理、分布式、架构设计根底案例 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
08 | 大数据系列、存储、组件、计算等框架 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |