一. 背景
在理论的数据库利用中,咱们常常须要从多个数据表中读取数据,这时咱们就能够应用SQL语句中的连贯(JOIN),在两个或多个数据表中查问数据。
在应用MapReduce框架进行数据处理的过程中,也会波及到从多个数据集读取数据,进行join关联的操作,只不过此时须要应用java代码并且依据MapReduce的编程标准进行业务的实现。
然而因为MapReduce的分布式设计理念的特殊性,因而对于MapReduce实现join操作具备了肯定的特殊性。非凡次要体现在:到底在MapReduce中的什么阶段进行数据集的关联操作,是mapper阶段还是reducer阶段,之间的区别又是什么?
整个MapReduce的join分为两类:map side join、reduce side join。
二. reduce side join
1. 概述
reduce side join,顾名思义,在reduce阶段执行join关联操作。这也是最容易想到和实现的join形式。因为通过shuffle过程就能够将相干的数据分到雷同的分组中,这将为前面的join操作提供了便捷。
基本上,reduce side join大抵步骤如下:
- mapper别离读取不同的数据集;
- mapper的输入中,通常以join的字段作为输入的key;
- 不同数据集的数据通过shuffle,key一样的会被分到同一分组解决;
- 在reduce中依据业务需要把数据进行关联整合汇总,最终输入。
2. 弊病
reduce端join最大的问题是整个join的工作是在reduce阶段实现的,然而通常状况下MapReduce中reduce的并行度是极小的(默认是1个),这就使得所有的数据都挤压到reduce阶段解决,压力颇大。尽管能够设置reduce的并行度,然而又会导致最终后果被扩散到多个不同文件中。
并且在数据从mapper到reducer的过程中,shuffle阶段非常繁琐,数据集大时老本极高。
三. MapReduce 分布式缓存
DistributedCache是hadoop框架提供的一种机制,能够将job指定的文件,在job执行前,后行散发到task执行的机器上,并有相干机制对cache文件进行治理。
DistributedCache可能缓存应用程序所需的文件 (包含文本,档案文件,jar文件等)。
Map-Redcue框架在作业所有工作执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。
1. 应用形式
1. 增加缓存文件
能够应用MapReduce的API增加须要缓存的文件。
//增加归档文件到分布式缓存中job.addCacheArchive(URI uri); //增加一般文件到分布式缓存中job.addCacheFile(URI uri);
留神:须要散发的文件,必须提前放到hdfs上.默认的门路前缀是hdfs://。
2. 程序中读取缓存文件
在Mapper类或者Reducer类的setup办法中,用输出流获取分布式缓存中的文件。
protected void setup(Context context) throw IOException,InterruptedException{ FileReader reader = new FileReader("myfile"); BufferReader br = new BufferedReader(reader); ......}
四. map side join
1. 概述
map side join,其精华就是在map阶段执行join关联操作,并且程序也没有了reduce阶段,防止了shuffle时候的繁琐。实现的要害是应用MapReduce的分布式缓存。
尤其是波及到一大一小数据集的解决场景时,map端的join将会施展出得天独厚的劣势。
map side join的大抵思路如下:
- 首先剖析join解决的数据集,应用分布式缓存技术将小的数据集进行分布式缓存
- MapReduce框架在执行的时候会主动将缓存的数据散发到各个maptask运行的机器上
- 程序只运行mapper,在mapper初始化的时候从分布式缓存中读取小数据集数据,而后和本人读取的大数据集进行join关联,输入最终的后果。
- 整个join的过程没有shuffle,没有reducer。
2. 劣势
map端join最大的劣势缩小shuffle时候的数据传输老本。并且mapper的并行度能够依据输出数据量主动调整,充分发挥分布式计算的劣势。
五. MapReduce join案例:订单商品解决
1. 需要
有两份结构化的数据文件:itheima_goods(商品信息表)、itheima_order_goods(订单信息表),具体字段内容如下。
要求应用MapReduce统计出每笔订单中对应的具体的商品名称信息。比方107860商品对应着:AMAZFIT彩色硅胶腕带
数据结构:
- itheima_goods
字段:goodsId(商品id)、goodsSn(商品编号)、goodsName(商品名称) - itheima_order_goods
字段: orderId(订单ID)、goodsId(商品ID)、payPrice(理论领取价格)
2. Reduce Side 实现
1. 剖析
应用mapper解决订单数据和商品数据,输入的时候以goodsId商品编号作为key。雷同goodsId的商品和订单会到同一个reduce的同一个分组,在分组中进行订单和商品信息的关联合并。在MapReduce程序中能够通过context获取到以后解决的切片所属的文件名称。依据文件名来判断以后解决的是订单数据还是商品数据,以此来进行不同逻辑的输入。
join解决完之后,最初能够再通过MapReduce程序排序功能,将属于同一笔订单的所有商品信息汇聚在一起。
2. 代码实现
Mapper
package com.uuicon.sentiment_upload.join;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); StringBuffer sb = new StringBuffer(); String fileName = null; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); FileSplit split = (FileSplit) context.getInputSplit(); fileName = split.getPath().getName(); System.out.println("以后文件----" + fileName); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { sb.setLength(0); String[] fields = value.toString().split("\\|"); if (fileName.contains("itheima_goods.txt")) { // 100101|155083444927602|四川果冻橙6个约180g/个 outKey.set(fields[0]); sb.append(fields[1] + "\t" + fields[2]); outValue.set(sb.insert(0, "goods#").toString()); context.write(outKey, outValue); } else { // 1|107860|7191 outKey.set(fields[1]); StringBuffer append = sb.append(fields[0]).append("\t").append(fields[1]).append("\t").append(fields[2]); outValue.set(sb.insert(0, "orders#").toString()); context.write(outKey, outValue); } }}
- Reduce 类
package com.uuicon.sentiment_upload.join;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); StringBuffer sb = new StringBuffer(); String fileName = null; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); FileSplit split = (FileSplit) context.getInputSplit(); fileName = split.getPath().getName(); System.out.println("以后文件----" + fileName); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { sb.setLength(0); String[] fields = value.toString().split("\\|"); if (fileName.contains("itheima_goods.txt")) { // 100101|155083444927602|四川果冻橙6个约180g/个 outKey.set(fields[0]); sb.append(fields[1] + "\t" + fields[2]); outValue.set(sb.insert(0, "goods#").toString()); context.write(outKey, outValue); } else { // 1|107860|7191 outKey.set(fields[1]); StringBuffer append = sb.append(fields[0]).append("\t").append(fields[1]).append("\t").append(fields[2]); outValue.set(sb.insert(0, "orders#").toString()); context.write(outKey, outValue); } }}
驱动类
package com.uuicon.sentiment_upload.join;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class ReduceJoinDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, ReduceJoinDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(ReduceJoinDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(ReduceJoinMapper.class); job.setReducerClass(ReduceJoinReducer.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}
- 后果排序
package com.uuicon.sentiment_upload.join;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinSort { public static class ReduceJoinSortMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //2278 100101 38 155083444927602 四川果冻橙6个约180g/个 String[] fields = value.toString().split("\t"); outKey.set(fields[0]); outValue.set(fields[0] + "\t" + fields[1] + "\t" + fields[3] + "\t" + fields[4] + "\t" + fields[2]); context.write(outKey, outValue); } } public static class ReduceJoinSortReducer extends Reducer<Text, Text, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, NullWritable.get()); } } } public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, ReduceJoinSort.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(ReduceJoinSort.class); // 设置作业Mapper reduce 类 job.setMapperClass(ReduceJoinSortMapper.class); job.setReducerClass(ReduceJoinSortReducer.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}
运行后果
- reduce join的后果
- 从新排序之后的后果
3. Map Side 实现
1. 剖析
Map-side Join是指在Mapper工作中加载特定数据集,此案例中把商品数据进行分布式缓存,应用Mapper读取订单数据和缓存的商品数据进行连贯。
通常为了方便使用,会在mapper的初始化办法setup中读取分布式缓存文件加载的程序的内存中,便于后续mapper解决数据。
因为在mapper阶段曾经实现了数据的关联操作,因而程序不须要进行reduce。须要在job中将reducetask的个数设置为0,也就是mapper的输入就是程序最终的输入。
2. 代码实现
- Mapper 类
package com.uuicon.sentiment_upload.cache;import org.apache.commons.collections.map.HashedMap;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.util.Map;public class ReduceCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Map<String, String> goodsMap = new HashedMap(); Text outKey = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //加载缓存文件 BufferedReader br = new BufferedReader(new FileReader("itheima_goods.txt")); String line = null; while ((line = br.readLine()) != null) { String[] fields = line.split("\\|"); goodsMap.put(fields[0], fields[1] + "\t" + fields[2]); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 56982|100917|1192 String[] fields = value.toString().split("\\|"); outKey.set(value.toString() + "\t" + goodsMap.get(fields[1])); context.write(outKey, NullWritable.get()); }}
- 程序主类
package com.uuicon.sentiment_upload.cache;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class ReduceCacheDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, ReduceCacheDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(ReduceCacheDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(ReduceCacheMapper.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); job.addCacheFile(new URI("/data/cache/itheima_goods.txt")); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}
提交运行
- 在工程的pom.xml文件中指定程序运行的主类全门路;
<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.uuicon.sentiment_upload.cache.ReduceCacheDriver</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins>
- 执行mvn package命令生成jar包;
- 将jar包上传到hadoop集群(任意节点上);
- 执行命令(任意节点上):hadoop jar xxxx.jar。留神保障yarn集群提前启动胜利。