一. 背景
在理论的数据库利用中,咱们常常须要从多个数据表中读取数据,这时咱们就能够应用 SQL 语句中的连贯(JOIN),在两个或多个数据表中查问数据。
在应用 MapReduce 框架进行数据处理的过程中,也会波及到从多个数据集读取数据,进行 join 关联的操作,只不过此时须要应用 java 代码并且依据 MapReduce 的编程标准进行业务的实现。
然而因为 MapReduce 的分布式设计理念的特殊性,因而对于 MapReduce 实现 join 操作具备了肯定的特殊性。非凡次要体现在:到底在 MapReduce 中的什么阶段进行数据集的关联操作,是 mapper 阶段还是 reducer 阶段,之间的区别又是什么?
整个 MapReduce 的 join 分为两类:map side join、reduce side join。
二. reduce side join
1. 概述
reduce side join,顾名思义,在 reduce 阶段执行 join 关联操作。这也是最容易想到和实现的 join 形式。因为通过shuffle 过程就能够将相干的数据分到雷同的分组中,这将为前面的 join 操作提供了便捷。
基本上,reduce side join 大抵步骤如下:
- mapper 别离读取不同的数据集;
- mapper 的输入中,通常以 join 的字段作为输入的 key;
- 不同数据集的数据通过 shuffle,key 一样的会被分到同一分组解决;
- 在 reduce 中依据业务需要把数据进行关联整合汇总,最终输入。
2. 弊病
reduce 端 join 最大的问题是整个 join 的工作是在 reduce 阶段实现的,然而通常状况下 MapReduce 中 reduce 的并行度是极小的(默认是 1 个),这就使得 所有的数据都挤压到 reduce 阶段解决,压力颇大 。尽管能够设置 reduce 的并行度,然而又会导致最终后果被扩散到多个不同文件中。
并且在数据从 mapper 到 reducer 的过程中,shuffle 阶段非常繁琐,数据集大时老本极高。
三. MapReduce 分布式缓存
DistributedCache 是 hadoop 框架提供的一种机制, 能够 将 job 指定的文件, 在 job 执行前, 后行散发到 task 执行的机器上, 并有相干机制对 cache 文件进行治理。
DistributedCache 可能缓存应用程序所需的文件(包含文本,档案文件,jar 文件等)。
Map-Redcue 框架在作业所有工作执行之前会把必要的文件拷贝到 slave 节点上。它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的 slave 节点缓存文档。
1. 应用形式
1. 增加缓存文件
能够应用 MapReduce 的 API 增加须要缓存的文件。
// 增加归档文件到分布式缓存中
job.addCacheArchive(URI uri);
// 增加一般文件到分布式缓存中
job.addCacheFile(URI uri);
留神:须要散发的文件, 必须提前放到 hdfs 上. 默认的门路前缀是 hdfs://。
2. 程序中读取缓存文件
在 Mapper 类或者 Reducer 类的 setup 办法中,用输出流获取分布式缓存中的文件。
protected void setup(Context context) throw IOException,InterruptedException{FileReader reader = new FileReader("myfile");
BufferReader br = new BufferedReader(reader);
......
}
四. map side join
1. 概述
map side join,其精华就是 在 map 阶段执行 join 关联操作, 并且程序也没有了 reduce 阶段,防止了 shuffle 时候的繁琐。实现的要害是应用 MapReduce 的分布式缓存。
尤其是波及到一大一小数据集的解决场景时,map 端的 join 将会施展出得天独厚的劣势。
map side join 的大抵思路如下:
- 首先剖析 join 解决的数据集,应用分布式缓存技术将小的数据集进行分布式缓存
- MapReduce 框架在执行的时候会主动将缓存的数据散发到各个 maptask 运行的机器上
- 程序只运行 mapper,在 mapper 初始化的时候从分布式缓存中读取小数据集数据,而后和本人读取的大数据集进行 join 关联,输入最终的后果。
- 整个 join 的过程没有 shuffle,没有 reducer。
2. 劣势
map 端 join 最大的劣势缩小 shuffle 时候的数据传输老本。并且 mapper 的并行度能够依据输出数据量主动调整,充分发挥分布式计算的劣势。
五. MapReduce join 案例:订单商品解决
1. 需要
有两份结构化的数据文件:itheima_goods(商品信息表)、itheima_order_goods(订单信息表), 具体字段内容如下。
要求 应用 MapReduce 统计出每笔订单中对应的具体的商品名称信息。比方 107860 商品对应着:AMAZFIT 彩色硅胶腕带
数据结构:
- itheima_goods
字段:goodsId(商品 id)、goodsSn(商品编号)、goodsName(商品名称) - itheima_order_goods
字段:orderId(订单 ID)、goodsId(商品 ID)、payPrice(理论领取价格)
2. Reduce Side 实现
1. 剖析
应用 mapper 解决订单数据和商品数据,输入的时候以 goodsId 商品编号作为 key。雷同 goodsId 的商品和订单会到同一个 reduce 的同一个分组,在分组中进行订单和商品信息的关联合并。在 MapReduce 程序中能够通过 context 获取到以后解决的切片所属的文件名称。依据文件名来判断以后解决的是订单数据还是商品数据,以此来进行不同逻辑的输入。
join 解决完之后,最初能够再通过 MapReduce 程序排序功能,将属于同一笔订单的所有商品信息汇聚在一起。
2. 代码实现
-
Mapper
package com.uuicon.sentiment_upload.join; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {Text outKey = new Text(); Text outValue = new Text(); StringBuffer sb = new StringBuffer(); String fileName = null; @Override protected void setup(Context context) throws IOException, InterruptedException {super.setup(context); FileSplit split = (FileSplit) context.getInputSplit(); fileName = split.getPath().getName(); System.out.println("以后文件 ----" + fileName); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {sb.setLength(0); String[] fields = value.toString().split("\\|"); if (fileName.contains("itheima_goods.txt")) { // 100101|155083444927602| 四川果冻橙 6 个约 180g/ 个 outKey.set(fields[0]); sb.append(fields[1] + "\t" + fields[2]); outValue.set(sb.insert(0, "goods#").toString()); context.write(outKey, outValue); } else { // 1|107860|7191 outKey.set(fields[1]); StringBuffer append = sb.append(fields[0]).append("\t").append(fields[1]).append("\t").append(fields[2]); outValue.set(sb.insert(0, "orders#").toString()); context.write(outKey, outValue); } } }
- Reduce 类
package com.uuicon.sentiment_upload.join;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {Text outKey = new Text();
Text outValue = new Text();
StringBuffer sb = new StringBuffer();
String fileName = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {super.setup(context);
FileSplit split = (FileSplit) context.getInputSplit();
fileName = split.getPath().getName();
System.out.println("以后文件 ----" + fileName);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {sb.setLength(0);
String[] fields = value.toString().split("\\|");
if (fileName.contains("itheima_goods.txt")) {
// 100101|155083444927602| 四川果冻橙 6 个约 180g/ 个
outKey.set(fields[0]);
sb.append(fields[1] + "\t" + fields[2]);
outValue.set(sb.insert(0, "goods#").toString());
context.write(outKey, outValue);
} else {
// 1|107860|7191
outKey.set(fields[1]);
StringBuffer append = sb.append(fields[0]).append("\t").append(fields[1]).append("\t").append(fields[2]);
outValue.set(sb.insert(0, "orders#").toString());
context.write(outKey, outValue);
}
}
}
-
驱动类
package com.uuicon.sentiment_upload.join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceJoinDriver {public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, ReduceJoinDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(ReduceJoinDriver.class); // 设置作业 Mapper reduce 类 job.setMapperClass(ReduceJoinMapper.class); job.setReducerClass(ReduceJoinReducer.class); // 设置作业 mapper 阶段输入 key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在, 如果存在, 删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); } }
- 后果排序
package com.uuicon.sentiment_upload.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReduceJoinSort {
public static class ReduceJoinSortMapper extends Mapper<LongWritable, Text, Text, Text> {Text outKey = new Text();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//2278 100101 38 155083444927602 四川果冻橙 6 个约 180g/ 个
String[] fields = value.toString().split("\t");
outKey.set(fields[0]);
outValue.set(fields[0] + "\t" + fields[1] + "\t" + fields[3] + "\t" + fields[4] + "\t" + fields[2]);
context.write(outKey, outValue);
}
}
public static class ReduceJoinSortReducer extends Reducer<Text, Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
// 配置文件对象
Configuration conf = new Configuration();
// 创立作业实例
Job job = Job.getInstance(conf, ReduceJoinSort.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(ReduceJoinSort.class);
// 设置作业 Mapper reduce 类
job.setMapperClass(ReduceJoinSortMapper.class);
job.setReducerClass(ReduceJoinSortReducer.class);
// 设置作业 mapper 阶段输入 key value 数据类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 配置作业的输出数据门路
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输入数据门路
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 判断输入门路是否存在, 如果存在, 删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true);
}
boolean resultFlag = job.waitForCompletion(true);
System.exit(resultFlag ? 0 : 1);
}
}
-
运行后果
- reduce join 的后果
- 从新排序之后的后果
3. Map Side 实现
1. 剖析
Map-side Join 是指在 Mapper 工作中加载特定数据集,此案例中把商品数据进行分布式缓存,应用 Mapper 读取订单数据和缓存的商品数据进行连贯。
通常为了方便使用,会在 mapper 的初始化办法 setup 中读取分布式缓存文件加载的程序的内存中,便于后续 mapper 解决数据。
因为在 mapper 阶段曾经实现了数据的关联操作,因而程序不须要进行 reduce。须要在 job 中将 reducetask 的个数设置为 0, 也就是 mapper 的输入就是程序最终的输入。
2. 代码实现
- Mapper 类
package com.uuicon.sentiment_upload.cache;
import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
public class ReduceCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Map<String, String> goodsMap = new HashedMap();
Text outKey = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 加载缓存文件
BufferedReader br = new BufferedReader(new FileReader("itheima_goods.txt"));
String line = null;
while ((line = br.readLine()) != null) {String[] fields = line.split("\\|");
goodsMap.put(fields[0], fields[1] + "\t" + fields[2]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 56982|100917|1192
String[] fields = value.toString().split("\\|");
outKey.set(value.toString() + "\t" + goodsMap.get(fields[1]));
context.write(outKey, NullWritable.get());
}
}
- 程序主类
package com.uuicon.sentiment_upload.cache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class ReduceCacheDriver {public static void main(String[] args) throws Exception {
// 配置文件对象
Configuration conf = new Configuration();
// 创立作业实例
Job job = Job.getInstance(conf, ReduceCacheDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(ReduceCacheDriver.class);
// 设置作业 Mapper reduce 类
job.setMapperClass(ReduceCacheMapper.class);
// 设置作业 mapper 阶段输入 key value 数据类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.addCacheFile(new URI("/data/cache/itheima_goods.txt"));
// 配置作业的输出数据门路
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输入数据门路
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 判断输入门路是否存在, 如果存在, 删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true);
}
boolean resultFlag = job.waitForCompletion(true);
System.exit(resultFlag ? 0 : 1);
}
}
-
提交运行
- 在工程的 pom.xml 文件中指定程序运行的主类全门路;
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.uuicon.sentiment_upload.cache.ReduceCacheDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
- 执行 mvn package 命令生成 jar 包;
- 将 jar 包上传到 hadoop 集群(任意节点上);
- 执行命令(任意节点上):hadoop jar xxxx.jar。留神保障 yarn 集群提前启动胜利。