关于mapreduce:MapReduce-示例减少-Hadoop-MapReduce-中的侧连接

35次阅读

共计 6790 个字符,预计需要花费 17 分钟才能阅读完成。

摘要: 在排序和 reducer 阶段,reduce 侧连贯过程会产生微小的网络 I /O 流量,在这个阶段,雷同键的值被汇集在一起。

本文分享自华为云社区《MapReduce 示例:缩小 Hadoop MapReduce 中的侧连贯》,作者:Donglian Lin。

在这篇博客中,将应用 MapReduce 示例向您解释如何在 Hadoop MapReduce 中执行缩减侧连贯。在这里,我假如您曾经相熟 MapReduce 框架并晓得如何编写根本的 MapReduce 程序。本博客中探讨的主题如下:

• 什么是退出?
• MapReduce 中的连贯
• 什么是 Reduce 侧连贯?
• 缩小侧连贯的 MapReduce 示例
• 论断

什么是联接?

join 操作用于基于外键将两个或多个数据库表合并。通常,公司在其数据库中为客户和交易 记录保护独自的表。而且,很多时候这些公司须要应用这些独自表格中的数据生成剖析报告。因而,他们应用公共列(外键)(如客户 ID 等)对这些独自的表执行连贯操作,以生成组合表。而后,他们剖析这个组合表以取得所需的剖析报告。

MapReduce 中的连贯

就像 SQL join 一样,咱们也能够在 MapReduce 中对不同的数据集进行 join 操作。MapReduce 中有两种类型的连贯操作:

Map Side Join: 顾名思义,join 操作是在 map 阶段自身进行的。因而,在 map side join 中,mapper 执行 join 并且每个 map 的输出都必须依据键进行分区和排序。
缩小副退出: 顾名思义,在缩小侧退出,加速是 负责执行连贯操作。因为排序和改选阶段将具备雷同键的值发送到同一个 reducer,因而它比 map side join 绝对简略和容易实现,因而,默认状况下,数据是为咱们组织的。

当初,让咱们具体理解 reduce side join。

什么是缩小侧连贯?

如前所述,reduce side join 是在 reducer 阶段执行 join 操作的过程。基本上,reduce side join 以下列形式产生:

• Mapper 依据公共列或连贯键读取要组合的输出数据。
• 映射器解决输出并向输出增加标签以辨别属于不同起源或数据集或数据库的输出。
• 映射器输入两头键值对,其中键只是连贯键。
• 在排序和改选阶段之后,会为减速器生成一个键和值列表。
• 当初,reducer 将列表中存在的值与键连接起来,以给出最终的聚合输入。

缩小边连贯的 MapReduce 示例

假如我有两个独自的运动场数据集:

cust_details: 它蕴含客户的详细信息。
transaction_details: 蕴含客户的交易记录。

应用这两个数据集,我想晓得每个客户的生命周期价值。在 这样做时,我将须要以下货色:

• 此人的姓名以及该人拜访的频率。
• 他 / 她购买设施所破费的总金额。

上图只是向您展现了咱们将对其执行 reduce side join 操作的两个数据集的 schema。单击上面的按钮下载蕴含此 MapReduce 示例的源代码和输出文件的整个我的项目:

在将下面的 MapReduce 示例我的项目在 reduce 端退出 Eclipse 时,请记住以下几点:

• 输出文件位于我的项目的 input_files 目录中。将这些加载到您的 HDFS 中。
• 不要遗记依据您的零碎或 VM 构建 Hadoop Reference Jars 的门路(存在于 reduce side join 我的项目 lib 目录中)。

当初,让咱们理解在这个 MapReduce 示例中的 map 和 reduce 阶段外部产生了什么对于 reduce side join:

1. 地图阶段:

我将为两个数据集中的每一个设置一个独自的映射器,即一个映射器用于 cust_details 输出,另一个用于 transaction_details 输出。

cust_details 的映射器:

public static class CustsMapper extends Mapper <Object, Text, Text, Text>
{public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{String record = value.toString();
String[] parts = record.split(",");
context.write(new Text(parts[0]), new Text("cust" + parts[1]));
}
}

o 我将一次读取一个元组的输出。
o 而后,我将令牌化在元组的每个字并用的名字一起取卡斯特 ID 集体Ø ñ。
o Ť ħ È Ç乌斯 ID 将是我的键值对键,我的映射器将最终生成。
o 我还将增加一个标签“Ç乌斯”,以表明该输出元组是 cust_details 类型。
o 因而,我的 cust_details 映射器将生成以下两头键值对:

键 – 值对:[客户 ID,客户名称]

例如:[4000001,Ç乌斯 克里斯蒂娜],[4000002,卡斯特佩奇] 等

transaction_details 的映射器:

public static class TxnsMapper extends Mapper <Object, Text, Text, Text>
{public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{String record = value.toString();
String[] parts = record.split(",");
context.write(new Text(parts[2]), new Text("tnxn" + parts[3]));
}
}

• 就像 cust_details 的映射器一样,我将在这里遵循相似的步骤。然而,会有一些差别:
o 我将获取金额值而不是人名。
o 在这种状况下,咱们将应用“tnxn”作为标签。
• 因而,客户 ID 将是映射器最终生成的键值对的我的键。
• 最初,transaction_details 映射器的输入将采纳以下格局:

键值对:[客户 ID,tnxn 金额]

示例:[4000001, tnxn 40.33]、[4000002, tnxn 198.44] 等。

2. 排序和洗牌阶段

排序和改选阶段将生成与每个键对应的值的数组列表。换句话说,它将两头键值对中每个惟一键对应的所有值放在一起。排序和改选阶段的输入将采纳以下格局:

键 – 值列表:

• {cust ID1 – [(cust name1), (tnxn amount1), (tnxn amount2), (tnxn amount3),…..]}
• {客户 ID2 – [( 客户名称 2), (tnxn amount1), (tnxn amount2), (tnxn amount3),…..]}
• ……

例子:

• {4000001 – [(cust kristina), (tnxn 40.33), (tnxn 47.05),…]};
• {4000002 – [(cust paige), (tnxn 198.44), (tnxn 5.58),…]};
• ……

当初,框架将为每个惟一的连贯键(cust id)和相应的值列表调用 reduce() 办法(reduce(Text key, Iterable<Text> values, Context context))。而后,reducer 将对相应值列表中存在的值执行连贯操作,以最终计算所需的输入。因而,执行的 reducer 工作的数量将等于惟一客户 ID 的数量。

当初让咱们理解在这个 MapReduce 示例中,reducer 如何执行连贯操作。

3. 减速器阶段

如果您还记得,执行这种缩小侧连贯操作的次要指标是找出特定客户拜访综合体育馆的次数以及该客户在不同静止上破费的总金额。因而,我的最终输入应采纳以下格局:

Key – Value 对:[客户姓名] (Key) – [总金额,拜访频率] (Value)

减速机代码:

public static class ReduceJoinReducer extends Reducer <Text, Text, Text, Text>
{public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException 
{
String name = "";
double total = 0.0;
int count = 0;
for (Text t : values) 
{String parts[] = t.toString().split(" ");
if (parts[0].equals("tnxn")) 
{
count++;
total += Float.parseFloat(parts[1]);
} 
else if (parts[0].equals("cust")) 
{name = parts[1];
}
}
String str = String.format("%d %f", count, total);
context.write(new Text(name), new Text(str));
}
}

因而,将在每个减速器中采取以下步骤来实现所需的输入:

• 在每个减速器中,我都会有一个键和值列表,其中键只是客户 ID。值列表将具备来自两个数据集的输出,即来自 transaction_details 的金额和来自 cust_details 的名称。
• 当初,我将遍历 reducer 中的值列表中存在的值。
• 而后,我将拆分值列表并查看该值是 transaction_details 类型还是 cust_details 类型。
• 如果是 transaction_details 类型,我将执行以下步骤:
o 我将计数器值加一来计算这个人的拜访频率。
o 我将累积更新金额值以计算该人破费的总金额。
• 另一方面,如果值是 cust_details 类型,我会将它存储在一个字符串变量中。稍后,我会将名称指定为我的输入键值对中的键。
• 最初,我将在我的 HDFS 的输入文件夹中写入输入键值对。

因而,我的减速器将生成的最终输入如下:

克里斯蒂娜,651.05 8

佩奇,706.97 6

…..

而且,咱们下面所做的整个过程在 MapReduce 中称为 Reduce Side Join。

源代码:

下面的缩小侧连贯的 MapReduce 示例的源代码如下:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class ReduceJoin {
 public static class CustsMapper extends Mapper <Object, Text, Text, Text>
 {public void map(Object key, Text value, Context context)
 throws IOException, InterruptedException 
 {String record = value.toString();
 String[] parts = record.split(",");
 context.write(new Text(parts[0]), new Text("cust" + parts[1]));
 }
 }
 
 public static class TxnsMapper extends Mapper <Object, Text, Text, Text>
 {public void map(Object key, Text value, Context context) 
 throws IOException, InterruptedException 
 {String record = value.toString();
 String[] parts = record.split(",");
 context.write(new Text(parts[2]), new Text("tnxn" + parts[3]));
 }
 }
 
 public static class ReduceJoinReducer extends Reducer <Text, Text, Text, Text>
 {public void reduce(Text key, Iterable<Text> values, Context context)
 throws IOException, InterruptedException 
 {
 String name = "";
 double total = 0.0;
 int count = 0;
 for (Text t : values) 
 {String parts[] = t.toString().split(" ");
 if (parts[0].equals("tnxn")) 
 {
 count++;
 total += Float.parseFloat(parts[1]);
 } 
 else if (parts[0].equals("cust")) 
 {name = parts[1];
 }
 }
 String str = String.format("%d %f", count, total);
 context.write(new Text(name), new Text(str));
 }
 }
 
 public static void main(String[] args) throws Exception {Configuration conf = new Configuration();
 Job job = new Job(conf, "Reduce-side join");
 job.setJarByClass(ReduceJoin.class);
 job.setReducerClass(ReduceJoinReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(Text.class);
 
 MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CustsMapper.class);
 MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, TxnsMapper.class);
 Path outputPath = new Path(args[2]);
 
 FileOutputFormat.setOutputPath(job, outputPath);
 outputPath.getFileSystem(conf).delete(outputPath);
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
 }

运行这个程序

最初,在 reduce side join 上运行上述 MapReduce 示例程序的命令 如下:

hadoop jar reducejoin.jar ReduceJoin /sample/input/cust_details /sample/input/transaction_details /sample/output

论断:

在排序和 reducer 阶段,reduce 侧连贯过程会产生微小的网络 I /O 流量,在这个阶段,雷同键的值被汇集在一起。因而,如果您有大量具备数百万个值的不同数据集,您很可能会遇到 OutOfMemory 异样,即您的 RAM 已满,因而溢出。在我看来,应用 reduce side join 的长处是:

  • 这很容易实现,因为咱们利用 MapReduce 框架中的内置排序和改选算法,该算法组合雷同键的值并将其发送到同一个减速器。
  • 在 reduce side join 中,您的输出不须要遵循任何严格的格局,因而您也能够对非结构化数据执行连贯操作。

一般来说,人们更喜爱 Apache Hive,它是 Hadoop 生态系统的一部分,来执行连贯操作。因而,如果您来自 SQL 背景,则无需放心编写 MapReduce Java 代码来执行连贯操作。您能够应用 Hive 作为代替计划。

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0