哈喽~各位小伙伴们中秋快乐,好久没更新新的文章啦,今天分享如何使用 mapreduce 进行 join 操作。
在离线计算中,我们常常不只是会对单一一个文件进行操作,进行需要进行两个或多个文件关联出更多数据,类似与 sql 中的 join 操作。
今天就跟大家分享一下如何在 MapReduce 中实现 join 操作
需求
现有两张,一张是产品信息表,一张是订单表。订单表中只表存了产品 ID,如果想要查出订单以及产品的相关信息就必须使用关联。
实现
根据 MapReduce 特性,大家都知道在 reduce 端,相同 key 的 key,value 对会被放到同一个 reduce 方法中(不设置 partition 的话)。利用这个特点我们可以轻松实现 join 操作,请看下面示例。
产品表
ID | brand | model |
---|---|---|
p0001 | 苹果 | iphone11 pro max |
p0002 | 华为 | p30 |
p0003 | 小米 | mate10 |
订单表
id | name | address | produceID | num |
---|---|---|---|---|
00001 | kris | 深圳市福田区 | p0001 | 1 |
00002 | pony | 深圳市南山区 | p0001 | 2 |
00003 | jack | 深圳市坂田区 | p0001 | 3 |
假如数据量巨大,两表的数据是以文件的形式存储在 HDFS 中,需要用 mapreduce 程序来实现一下 SQL 查询运算:
select a.id,a.name,a.address,a.num from t_orders a join t_products on a.productID=b.ID
MapReduce 实现思路
通过将关联的条件 (prodcueID) 作为 map 输出的 key,将两表满足 join 条件的数据并携带数据所来源的文件信息,发往同一个
reduce task,在 reduce 中进行数据的串联
实现方式一 -reduce 端 join
定义一个 Bean
public class RJoinInfo implements Writable{
private String customerName="";
private String customerAddr="";
private String orderID="";
private int orderNum;
private String productID="";
private String productBrand="";
private String productModel="";
// 0 是产品,1 是订单
private int flag;
setter/getter
编写 Mapper
public class RJoinMapper extends Mapper<LongWritable,Text,Text,RJoinInfo> {private static Logger logger = LogManager.getLogger(RJoinMapper.class);
private RJoinInfo rJoinInfo = new RJoinInfo();
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 输入方式支持很多中包括数据库等等。这里用的是文件,因此可以直接强转为文件切片
FileSplit fileSplit = (FileSplit) context.getInputSplit();
// 获取文件名称
String name = fileSplit.getPath().getName();
logger.info("splitPathName:"+name);
String line = value.toString();
String[] split = line.split("\t");
String productID = "";
if(name.contains("product")){productID = split[0];
String setProductBrand = split[1];
String productModel = split[2];
rJoinInfo.setProductID(productID);
rJoinInfo.setProductBrand(setProductBrand);
rJoinInfo.setProductModel(productModel);
rJoinInfo.setFlag(0);
}else if(name.contains("orders")){String orderID = split[0];
String customerName = split[1];
String cutsomerAddr = split[2];
productID = split[3];
String orderNum = split[4];
rJoinInfo.setProductID(productID);
rJoinInfo.setCustomerName(customerName);
rJoinInfo.setCustomerAddr(cutsomerAddr);
rJoinInfo.setOrderID(orderID);
rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
rJoinInfo.setFlag(1);
}
k.set(productID);
context.write(k,rJoinInfo);
}
}
代码解释,这里根据 split 的文件名,判断是 products 还是 orders,
然后根据是 product 还是 orders 获取不同的数据,最用都以 productID 为 Key 发送给 Reduce 端
编写 Reducer
public class RJoinReducer extends Reducer<Text,RJoinInfo,RJoinInfo,NullWritable> {private static Logger logger = LogManager.getLogger(RJoinReducer.class);
@Override
protected void reduce(Text key, Iterable<RJoinInfo> values, Context context) throws IOException, InterruptedException {List<RJoinInfo> orders = new ArrayList<>();
String productID = key.toString();
logger.info("productID:"+productID);
RJoinInfo rJoinInfo = new RJoinInfo();
for (RJoinInfo value : values) {int flag = value.getFlag();
if (flag == 0) {
// 产品
try {BeanUtils.copyProperties(rJoinInfo,value);
} catch (IllegalAccessException e) {logger.error(e.getMessage());
} catch (InvocationTargetException e) {logger.error(e.getMessage());
}
}else {
// 订单
RJoinInfo orderInfo = new RJoinInfo();
try {BeanUtils.copyProperties(orderInfo,value);
} catch (IllegalAccessException e) {logger.error(e.getMessage());
} catch (InvocationTargetException e) {logger.error(e.getMessage());
}
orders.add(orderInfo);
}
}
for (RJoinInfo order : orders) {rJoinInfo.setOrderNum(order.getOrderNum());
rJoinInfo.setOrderID(order.getOrderID());
rJoinInfo.setCustomerName(order.getCustomerName());
rJoinInfo.setCustomerAddr(order.getCustomerAddr());
// 只输出 key 即可,value 可以使用 nullwritable
context.write(rJoinInfo,NullWritable.get());
}
}
}
代码解释: 根据 productID 会分为不同的组发到 reduce 端,reduce 端拿到后一组数据后,其中有一个产品对象和多个订单对象。
遍历每一个对象,根据 flag 区分产品和订单。保存产品对象,获取每个订单对象到一个集合中。当我们对每个对象都分好
类后,遍历订单集合将订单和产品信息集合,然后输出。
注意: 我们这里效率虽然不是最高的,主要是想说明 join 的思路。
编写 Driver
public class RJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
// conf.set("mapreduce.framework.name","yarn");
// conf.set("yarn.resourcemanager.hostname","server1");
// conf.set("fs.defaultFS","hdfs://server1:9000");
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
// 如果是本地运行,可以不用设置 jar 包的路径,因为不用拷贝 jar 到其他地方
job.setJarByClass(RJoinDriver.class);
// job.setJar("/Users/kris/IdeaProjects/bigdatahdfs/target/rjoin.jar");
job.setMapperClass(RJoinMapper.class);
job.setReducerClass(RJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(RJoinInfo.class);
job.setOutputKeyClass(RJoinInfo.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output"));
boolean waitForCompletion = job.waitForCompletion(true);
System.out.println(waitForCompletion);
}
}
== 上面实现的这种方式有个缺点,就是 join 操作是在 reduce 阶段完成的,reduce 端的处理压力太大,map 节点的运算负载则很低,资源利用率不高,且在 reduce 阶段极易产生数据倾斜 ==
实现方式二 -map 端 join
这种方式适用于关联表中有小表的情形:
可以将小表分发到所有的 map 节点,这样,map 节点就可以在本地对自己所读到的大表数据进行 join 操作并输出结果,可以大大提高 join 操作的并发度,加快处理速度。
编写 Mapper
在 Mapper 端我们一次性加载数据或者用 Distributedbache 将文件拷贝到每一个运行的 maptask 的节点上加载
这里我们使用第二种,在 mapper 类中定义好小表进行 join
static class RjoinMapper extends Mapper<LongWritable,Text,RJoinInfo,NullWritable>{private static Map<String, RJoinInfo> productMap = new HashMap<>();
// 在循环调用 map 方法之前会先调用 setup 方法。因此我们可以在 setup 方法中,先对文件进行处理
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 通过这几句代码可以获取到 cache file 的本地绝对路径,测试验证用
URI[] cacheFiles = context.getCacheFiles();
System.out.println(Arrays.toString(new URI[]{cacheFiles[0]}));
// 直接指定名字,默认在工作文件夹的目录下查找 1⃣
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("products.txt")))){
String line;
while ((line = bufferedReader.readLine())!=null){String[] split = line.split("\t");
String productID = split[0];
String setProductBrand = split[1];
String productModel = split[2];
RJoinInfo rJoinInfo = new RJoinInfo();
rJoinInfo.setProductID(productID);
rJoinInfo.setProductBrand(setProductBrand);
rJoinInfo.setProductModel(productModel);
rJoinInfo.setFlag(0);
productMap.put(productID, rJoinInfo);
}
}
super.setup(context);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {FileSplit fileSplit = (FileSplit)context.getInputSplit();
String name = fileSplit.getPath().getName();
if (name.contains("orders")) {String line = value.toString();
String[] split = line.split("\t");
String orderID = split[0];
String customerName = split[1];
String cutsomerAddr = split[2];
String productID = split[3];
String orderNum = split[4];
RJoinInfo rJoinInfo = productMap.get(productID);
rJoinInfo.setProductID(productID);
rJoinInfo.setCustomerName(customerName);
rJoinInfo.setCustomerAddr(cutsomerAddr);
rJoinInfo.setOrderID(orderID);
rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
rJoinInfo.setFlag(1);
context.write(rJoinInfo, NullWritable.get());
}
}
}
代码解释: 这里我们又重写了一个 setup()方法,这个方法会在执行 map()方法前先执行,因此我们可以在这个方法中事先加载好数据。
在上述代码中,我们直接指定名字就拿到了 product.txt 文件,这个究竟这个文件是怎么复制在 maptask 的节点上的呢,还要看下面的 driver
编写 Driver
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
job.setJarByClass(RJoinDemoInMapDriver.class);
job.setMapperClass(RjoinMapper.class);
job.setOutputKeyClass(RJoinInfo.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output2"));
// 指定需要缓存一个文件到所有的 maptask 运行节点工作目录
// job.addFileToClassPath(); 将普通文件缓存到 task 运行节点的 classpath 下
// job.addArchiveToClassPath(); 缓存 jar 包到 task 运行节点的 classpath 下
// job.addCacheArchive(); 缓存压缩包文件到 task 运行节点的工作目录
// job.addCacheFile(); 将普通文件 1⃣
job.addCacheFile(new URI("/Users/kris/Downloads/rjoin/products.txt"));
// 设置 reduce 的数量为 0
job.setNumReduceTasks(0);
boolean waitForCompletion = job.waitForCompletion(true);
System.out.println(waitForCompletion);
}
代码解释: 上述 Driver 中,我们通过 job.addCacheFile()指定了一个 URI 本地地址,运行时 mapreduce 就会将这个文件拷贝到 maptask 的运行工作目录中。
好啦~本期分享代码量偏多,主要是想分享如何使用 mapreduce 进行 join 操作的思路。下一篇我会再讲一下 计算共同好友的思路以及代码~
公众号搜索: 喜讯 XiCent 获取更多福利资源~~~~