本文源码:GitHub·点这里 || GitEE·点这里
一、MapReduce概述
1、基本概念
Hadoop外围组件之一:分布式计算的计划MapReduce,是一种编程模型,用于大规模数据集的并行运算,其中Map(映射)和Reduce(归约)。
MapReduce既是一个编程模型,也是一个计算组件,解决的过程分为两个阶段,Map阶段:负责把工作合成为多个小工作,Reduce负责把多个小工作的处理结果进行汇总。其中Map阶段次要输出是一对Key-Value,通过map计算后输入一对Key-Value值;而后将雷同Key合并,造成Key-Value汇合;再将这个Key-Value汇合转入Reduce阶段,通过计算输入最终Key-Value后果集。
2、特点形容
MapReduce能够实现基于上千台服务器并发工作,提供很弱小的数据处理能力,如果其中单台服务挂掉,计算工作会主动本义到另外节点执行,保障高容错性;然而MapReduce不适应于实时计算与流式计算,计算的数据是动态的。
二、操作案例
1、流程形容
数据文件个别以CSV格局居多,数据行通常以空格分隔,这里须要思考数据内容特点;
文件通过切片调配在不同的MapTask工作中并发执行;
MapTask工作执行结束之后,执行ReduceTask工作,依赖Map阶段的数据;
ReduceTask工作执行结束后,输入文件后果。
2、根底配置
hadoop: # 读取的文件源 inputPath: hdfs://hop01:9000/hopdir/javaNew.txt # 该门路必须是程序运行前不存在的 outputPath: /wordOut
3、Mapper程序
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text mapKey = new Text(); IntWritable mapValue = new IntWritable(1); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1、读取行 String line = value.toString(); // 2、行内容切割,依据文件中分隔符 String[] words = line.split(" "); // 3、存储 for (String word : words) { mapKey.set(word); context.write(mapKey, mapValue); } }}
4、Reducer程序
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int sum ; IntWritable value = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1、累加求和统计 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2、输入后果 value.set(sum); context.write(key,value); }}
5、执行程序
@RestControllerpublic class WordWeb { @Resource private MapReduceConfig mapReduceConfig ; @GetMapping("/getWord") public String getWord () throws IOException, ClassNotFoundException, InterruptedException { // 申明配置 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() ); Job job = Job.getInstance(hadoopConfig); // Job执行作业 输出门路 FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath())); // Job执行作业 输入门路 FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath())); // 自定义 Mapper和Reducer 两个阶段的工作解决类 job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); // 设置输入后果的Key和Value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //执行Job直到实现 job.waitForCompletion(true); return "success" ; }}
6、执行后果查看
将应用程序打包放到hop01服务上执行;
java -jar map-reduce-case01.jar
三、案例剖析
1、数据类型
Java数据类型与对应的Hadoop数据序列化类型;
Java类型 | Writable类型 | Java类型 | Writable类型 |
---|---|---|---|
String | Text | float | FloatWritable |
int | IntWritable | long | LongWritable |
boolean | BooleanWritable | double | DoubleWritable |
byte | ByteWritable | array | DoubleWritable |
map | MapWritable |
2、外围模块
Mapper模块:解决输出的数据,业务逻辑在map()办法中实现,输入的数据也是KV格局;
Reducer模块:解决Map程序输入的KV数据,业务逻辑在reduce()办法中;
Driver模块:将程序提交到yarn进行调度,提交封装了运行参数的job对象;
四、序列化操作
1、序列化简介
序列化:将内存中对象转换为二进制的字节序列,能够通过输入流长久化存储或者网络传输;
反序列化:接管输出字节流或者读取磁盘长久化的数据,加载到内存的对象过程;
Hadoop序列化相干接口:Writable实现的序列化机制、Comparable治理Key的排序问题;
2、案例实现
案例形容:读取文件,并对文件雷同的行做数据累加计算,输入计算结果;该案例演示在本地执行,不把Jar包上传的hadoop服务器,驱动配置统一。
实体对象属性
public class AddEntity implements Writable { private long addNum01; private long addNum02; private long resNum; // 构造方法 public AddEntity() { super(); } public AddEntity(long addNum01, long addNum02) { super(); this.addNum01 = addNum01; this.addNum02 = addNum02; this.resNum = addNum01 + addNum02; } // 序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(addNum01); dataOutput.writeLong(addNum02); dataOutput.writeLong(resNum); } // 反序列化 @Override public void readFields(DataInput dataInput) throws IOException { // 留神:反序列化程序和写序列化程序统一 this.addNum01 = dataInput.readLong(); this.addNum02 = dataInput.readLong(); this.resNum = dataInput.readLong(); } // 省略Get和Set办法}
Mapper机制
public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> { Text myKey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 读取行 String line = value.toString(); // 行内容切割 String[] lineArr = line.split(","); // 内容格局解决 String lineNum = lineArr[0]; long addNum01 = Long.parseLong(lineArr[1]); long addNum02 = Long.parseLong(lineArr[2]); myKey.set(lineNum); AddEntity myValue = new AddEntity(addNum01,addNum02); // 输入 context.write(myKey, myValue); }}
Reducer机制
public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> { @Override protected void reduce(Text key, Iterable<AddEntity> values, Context context) throws IOException, InterruptedException { long addNum01Sum = 0; long addNum02Sum = 0; // 解决Key雷同 for (AddEntity addEntity : values) { addNum01Sum += addEntity.getAddNum01(); addNum02Sum += addEntity.getAddNum02(); } // 最终输入 AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum); context.write(key, addRes); }}
案例最终后果:
五、源代码地址
GitHub·地址https://github.com/cicadasmile/big-data-parentGitEE·地址https://gitee.com/cicadasmile/big-data-parent
举荐浏览:编程体系整顿
序号 | 项目名称 | GitHub地址 | GitEE地址 | 举荐指数 |
---|---|---|---|---|
01 | Java形容设计模式,算法,数据结构 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
02 | Java根底、并发、面向对象、Web开发 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆ |
03 | SpringCloud微服务根底组件案例详解 | GitHub·点这里 | GitEE·点这里 | ☆☆☆ |
04 | SpringCloud微服务架构实战综合案例 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
05 | SpringBoot框架根底利用入门到进阶 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆ |
06 | SpringBoot框架整合开发罕用中间件 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
07 | 数据管理、分布式、架构设计根底案例 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |
08 | 大数据系列、存储、组件、计算等框架 | GitHub·点这里 | GitEE·点这里 | ☆☆☆☆☆ |