关于mapreduce:MapReduce-示例减少-Hadoop-MapReduce-中的侧连接

摘要:在排序和reducer 阶段,reduce 侧连贯过程会产生微小的网络I/O 流量,在这个阶段,雷同键的值被汇集在一起。本文分享自华为云社区《MapReduce 示例:缩小 Hadoop MapReduce 中的侧连贯》,作者:Donglian Lin。 在这篇博客中,将应用 MapReduce 示例向您解释如何在 Hadoop MapReduce 中执行缩减侧连贯。在这里,我假如您曾经相熟 MapReduce 框架并晓得如何编写根本的 MapReduce 程序。本博客中探讨的主题如下: • 什么是退出?• MapReduce 中的连贯• 什么是 Reduce 侧连贯?• 缩小侧连贯的 MapReduce 示例• 论断 什么是联接?join操作用于基于外键将两个或多个数据库表合并。通常,公司在其数据库中为客户和交易 记录保护独自的表 。而且,很多时候这些公司须要应用这些独自表格中的数据生成剖析报告。因而,他们应用公共列(外键)(如客户 ID 等)对这些独自的表执行连贯操作,以生成组合表。而后,他们剖析这个组合表以取得所需的剖析报告。 MapReduce 中的连贯就像 SQL join 一样,咱们也能够在 MapReduce 中对不同的数据集进行 join 操作。MapReduce 中有两种类型的连贯操作: • Map Side Join:顾名思义,join操作是在map阶段自身进行的。因而,在 map side join 中,mapper 执行 join 并且每个 map 的输出都必须依据键进行分区和排序。• 缩小副退出:顾名思义,在缩小侧退出,加速是 负责执行连贯操作。因为排序和改选阶段将具备雷同键的值发送到同一个 reducer,因而它比 map side join 绝对简略和容易实现,因而,默认状况下,数据是为咱们组织的。 当初,让咱们具体理解reduce side join。 什么是缩小侧连贯? ...

September 17, 2021 · 4 min · jiezi

关于mapreduce:云小课-MRS基础入门之HDFS组件介绍

摘要:HDFS是MapReduce服务中的根底文件系统,全称为Hadoop的分布式文件系统(Hadoop Distributed File System),可反对实现大规模数据牢靠的分布式读写。本文分享自华为云社区《【云小课】EI第21课 MRS根底入门之HDFS组件介绍》,原文作者:Hi,EI 。 HDFS针对的应用场景是数据读写具备“一次写,屡次读”的特色,而数据“写”操作是程序写,也就是在文件创建时的写入或者在现有文件之后的增加操作。HDFS保障一个文件在一个时刻只被一个调用者执行写操作,而能够被多个调用者执行读操作。 HDFS构造HDFS是一个Master/Slave的架构,次要蕴含主、备NameNode和多个DataNode角色。在Master上运行NameNode,而在每一个Slave上运行DataNode,ZKFC须要和NameNode一起运行。 NameNode和DataNode之间的通信都是建设在TCP/IP的根底之上的。NameNode、DataNode、ZKFC和JournalNode能部署在运行Linux的服务器上。 图1-1中各模块的性能阐明如表1-1所示。 HA即为High Availability,用于解决NameNode单点故障问题,该个性通过主备的形式为主NameNode提供一个备用者,一旦主NameNode呈现故障,能够迅速切换至备NameNode,从而不间断对外提供服务。 在一个典型HDFS HA场景中,通常由两个NameNode组成,一个处于Active状态,另一个处于Standby状态。 为了能实现Active和Standby两个NameNode的元数据信息同步,需提供一个共享存储系统。本版本提供基于QJM(Quorum Journal Manager)的HA解决方案,如图1-2所示。主备NameNode之间通过一组JournalNode同步元数据信息。 通常配置奇数个(2N+1个)JournalNode,且起码要运行3个JournalNode。这样,一条元数据更新音讯只有有N+1个JournalNode写入胜利就认为数据写入胜利,此时最多容忍N个JournalNode写入失败。比方,3个JournalNode时,最多容许1个JournalNode写入失败,5个JournalNode时,最多容许2个JournalNode写入失败。 因为JournalNode是一个轻量级的守护过程,能够与Hadoop其它服务共用机器。倡议将JournalNode部署在管制节点上,以防止数据节点在进行大数据量传输时引起JournalNode写入失败。 HDFS原理MRS应用HDFS的正本机制来保证数据的可靠性,HDFS中每保留一个文件则主动生成1个备份文件,即共2个正本。HDFS正本数可通过“dfs.replication”参数查问。 当MRS集群中Core节点规格抉择为非本地盘(hdd)时,若集群中只有一个Core节点,则HDFS默认正本数为1。若集群中Core节点数大于等于2,则HDFS默认正本数为2。当MRS集群中Core节点规格抉择为本地盘(hdd)时,若集群中只有一个Core节点,则HDFS默认正本数为1。若集群中有两个Core节点,则HDFS默认正本数为2。若集群中Core节点数大于等于3,则HDFS默认正本数为3。MRS服务的HDFS组件反对以下局部个性: HDFS组件反对纠删码,使得数据冗余缩小到50%,且可靠性更高,并引入条带化的块存储构造,最大化的利用现有集群单节点多磁盘的能力,使得数据写入性能在引入编码过程后,仍和原来多正本冗余的性能靠近。反对HDFS组件上节点平衡调度和单节点内的磁盘平衡调度,有助于扩容节点或扩容磁盘后的HDFS存储性能晋升。更多对于Hadoop的架构和具体原理介绍,请参见:http://hadoop.apache.org/。 HDFS文件根底操作在MRS集群中,您能够通过治理控制台、客户端命令以及API接口等多种形式进行HDFS文件的操作。 MRS集群的创立您可参考创立集群。 1、通过MRS治理控制台查看HDFS文件信息在MRS治理控制台,点击集群名称进入到MRS集群详情页面,单击“文件治理”。 在文件治理页面,即可查看HDFS文件列表,并能够执行文件删除、文件夹增删以及与OBS服务数据的导入导入。 2、通过集群客户端查看HDFS文件信息a. 登录MRS集群的FusionInsight Manager页面(如果没有弹性IP,需提前购买弹性IP),新建一个用户hdfstest,绑定用户组supergroup,绑定角色System_administrator(集群未开启Kerberos认证可跳过)。 b. 下载并装置集群全量客户端,例如客户端装置目录为“/opt/client”,相干操作可参考装置客户端。 c. 为客户端节点绑定一个弹性IP,而后应用root用户登录主Master节点,并进入客户端所在目录并认证用户。 cd /opt/client source bigdata_env kinit hbasetest(集群未开启Kerberos认证可跳过) d. 应用hdfs命令进行HDFS文件相干操作。 例如: 创立文件夹:hdfs dfs -mkdir /tmp/testdir 查看文件夹:hdfs dfs -ls /tmp Found 11 items drwx------ - hdfs hadoop 0 2021-05-20 11:20 /tmp/.testHDFS drwxrwxrwx - mapred hadoop 0 2021-05-10 10:33 /tmp/hadoop-yarn drwxrwxrwx - hive hadoop 0 2021-05-10 10:43 /tmp/hive drwxrwx--- - hive hive 0 2021-05-18 16:21 /tmp/hive-scratch drwxrwxrwt - yarn hadoop 0 2021-05-17 11:30 /tmp/logs drwx------ - hive hadoop 0 2021-05-20 11:20 /tmp/monitor drwxrwxrwx - spark2x hadoop 0 2021-05-10 10:45 /tmp/spark2x drwxrwxrwx - spark2x hadoop 0 2021-05-10 10:44 /tmp/sparkhive-scratch drwxr-xr-x - hetuserver hadoop 0 2021-05-17 11:32 /tmp/state-store-launcher drwxr-xr-x - hdfstest hadoop 0 2021-05-20 11:20 /tmp/testdir drwxrwxrwx - hive hadoop 0 2021-05-10 10:43 /tmp/tmp-hive-insert-flag上传本地文件至HDFS:hdfs dfs -put /tmp/test.txt /tmp/testdir (/tmp/test.txt提前准备) ...

June 25, 2021 · 1 min · jiezi

关于mapreduce:四Hadoop之MapReduce实战小例子

输出数据文件 AvgTemperature.txt: DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM1020160602,00,霾,1984,130,9,390,348,30020160802,01,霾,1163,81,8,393,368,30220160706,02,霾,1079,108,17,360,394,30620160706,03,霾,1116,79,6,339,387,30320160502,04,霾,1198,98,16,357,325,307 20160602,05,霾,1762,126,9,324,316,30120160408,06,霾,1996,131,3,349,344,30120160604,07,霾,1952,119,26,347,300,30920160105,08,霾,1410,81,8,350,395,30720160104,09,霾,1718,130,4,352,335,30820160501,10,霾,1714,119,27,310,336,30720160601,11,霾,1660,130,23,311,364,30220160606,12,霾,1598,96,12,369,346,30920160602,13,霾,1673,127,2,343,346,30320160706,14,霾,1578,122,8,360,323,30720160707,15,霾,1237,118,12,384,384,30120160205,16,霾,1231,78,9,361,357,30220160605,17,霾,1166,86,30,350,388,30720160506,18,霾,1426,94,2,378,372,30520160805,19,霾,1874,144,20,376,327,30220160405,20,霾,1778,94,22,360,335,30420160104,21,霾,1055,64,22,376,361,30520160304,22,霾,1349,78,15,367,384,30820160203,23,霾,2004,110,2,359,371,30420160603,24,霾,1375,115,19,308,301,30820160402,25,霾,1201,69,5,387,342,30520160707,26,霾,1272,112,23,348,333,30720160702,27,霾,1738,60,12,393,300,30320160301,28,霾,1752,107,12,364,331,30120160704,29,霾,1442,65,9,332,369,308第一题:编写月平均气温统计程序 import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class AvgTemperature { public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> { private IntWritable intValue = new IntWritable(); private Text dateKey = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); String date = items[0]; String tmp = items[5]; if(!"DATE".equals(date) && !"N/A".equals(tmp)){//排除第一行阐明以及未取到数据的行 dateKey.set(date.substring(0, 6)); intValue.set(Integer.parseInt(tmp)); context.write(dateKey, intValue); } } } public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int tmp_sum = 0; int count = 0; for(IntWritable val : values){ tmp_sum += val.get(); count++; } int tmp_avg = tmp_sum/count; result.set(tmp_avg); context.write(key, result); } } public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "AvgTemperature"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(AvgTemperature.class); job.setMapperClass(StatMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(StatReducer.class); job.setPartitionerClass(HashPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); TextInputFormat.setInputPaths(job, args[0]); job.setNumReduceTasks(Integer.parseInt(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); }} ...

October 31, 2020 · 2 min · jiezi

关于mapreduce:Hadoop学习笔记二MapReduce的IO类型-文件切片

1. 对MapReduce的了解是什么:Hadoop默认自带的分布式计算框架 做什么:提供一系列接口(外围类:InputFormat、OutputFormat、Mapper、Reducer、Driver),让用户可能实现自定义业务性能的分布式计算工作 【长处】: 高扩展性:计算资源不够,间接减少节点数量即可。品质可能不够,数量肯定管够高容错性:一个节点工作失败,能主动转移到其余闲暇节点适宜大数据处理:得益于其扩展性,只有数量足够,可能计算TB级别的数据【毛病】: 无奈进行实时计算:太慢了!!!太慢了!!!不善于流式计算:MR的输出数据个别都是动态的,无奈解决动静的输出格局不善于迭代计算:一个残缺的MR计算过程个别为Mapper - Reducer - 写出后果,频繁的迭代计算将要启动多组MR并串联,而每次MR的后果都是会以文件的模式写出,给下一个MR组输出的话又得从新读取,太过繁琐【集体总结】: 集体认为,MR程序的毛病总结的话就是一点,IO过程太多了。首先map阶段输出须要从HDFS中读取数据(这个过程曾经是比较慢的了),而后map阶段业务实现后写出数据(一次IO),而reduce阶段的输出首先得从map节点读取曾经写出的后果文件(可能是网络IO),读到reduce节点后如果后果集太大又会写到磁盘(又一次IO),最初才是次要的reduce过程,最初后果依然是写回磁盘......一组MR的执行,可能要波及到同节点、异节点的屡次IO,如果用来做机器学习之类的简单迭代计算,可能IO工夫比外围业务工夫更长,因而MR适宜做一些一次性、单步骤、大量级的计算。 2. Mapper & Reducer编程须要留神的点map() 办法是对每个<K, V>键值对调用一次reduce() 办法是对每个key调用一次,每个key可能对应多个value,其中value封装为迭代器对象不要导错包!!!不要导错包!!!不要导错包!!! 血泪的踩坑之旅 -- mapred(老版本,不必) -- mapreduce(用它!)3. MR的切片【什么是切片?】 MR要进行计算,就必须要有输出,而MR是分布式计算模式,因而对于数据量较大的计算,就会启动多个Mapper和Reducer(Reducer数目个别都会远小于Mapper),而作为第一道环节,多个Mapper都承受残缺的数据集,那分布式齐全就没有意义了,因而在数据进入Mapper前,会首先进行逻辑切分,将整个数据集分成多份,每份送给一个Mapper进行解决。因而,切片数 = Mapper(MapTask) 的个数 【怎么切?】 第一种:依据文件大小切(默认切片形式) -- 附源码解析 依据参数失去SplitSize,即满足多大了就切一块,见源码解析文件优先级 > 切片,即如果一个文件不够SplitSize,则间接作为一个切片,不是把所有文件当整体肯定要明确一点:这里的切片只是逻辑切片!!相当于隔一段给文件"打一个标记",并不是真正物理上将数据文件划分为几份。实际上Client端做的分片仅仅是提交job.split信息,行将这些标记发送给MR,Map阶段每个Mapper将会依据这些标记去读取各自被调配的那局部数据protected long getFormatMinSplitSize() { return 1L; }// 获取最小切片大小 -- 默认为 1public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);}// 获取最大切片大小 -- 默认为Long.MAXpublic static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L); }// ......(此处省略一万行代码)if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 理论切片的大小 -- computeSplitSize函数见下 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; // 如果剩下的文件大于 1.1 * SplitSize,那么持续切分;否则间接作为一个分片; 防止出现过小文件 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } // ......(此处省略一万行代码)// 外围公式protected long computeSplitSize(long blockSize, long minSize, long maxSize) { // 默认状况下就是块文件的大小 -- 128M return Math.max(minSize, Math.min(maxSize, blockSize));}第二种:依据文件行数切 ...

August 10, 2020 · 1 min · jiezi

关于mapreduce:Hadoop学习笔记二MapReduce的IO类型-文件切片

1. 对MapReduce的了解是什么:Hadoop默认自带的分布式计算框架 做什么:提供一系列接口(外围类:InputFormat、OutputFormat、Mapper、Reducer、Driver),让用户可能实现自定义业务性能的分布式计算工作 【长处】: 高扩展性:计算资源不够,间接减少节点数量即可。品质可能不够,数量肯定管够高容错性:一个节点工作失败,能主动转移到其余闲暇节点适宜大数据处理:得益于其扩展性,只有数量足够,可能计算TB级别的数据【毛病】: 无奈进行实时计算:太慢了!!!太慢了!!!不善于流式计算:MR的输出数据个别都是动态的,无奈解决动静的输出格局不善于迭代计算:一个残缺的MR计算过程个别为Mapper - Reducer - 写出后果,频繁的迭代计算将要启动多组MR并串联,而每次MR的后果都是会以文件的模式写出,给下一个MR组输出的话又得从新读取,太过繁琐【集体总结】: 集体认为,MR程序的毛病总结的话就是一点,IO过程太多了。首先map阶段输出须要从HDFS中读取数据(这个过程曾经是比较慢的了),而后map阶段业务实现后写出数据(一次IO),而reduce阶段的输出首先得从map节点读取曾经写出的后果文件(可能是网络IO),读到reduce节点后如果后果集太大又会写到磁盘(又一次IO),最初才是次要的reduce过程,最初后果依然是写回磁盘......一组MR的执行,可能要波及到同节点、异节点的屡次IO,如果用来做机器学习之类的简单迭代计算,可能IO工夫比外围业务工夫更长,因而MR适宜做一些一次性、单步骤、大量级的计算。 2. Mapper & Reducer编程须要留神的点map() 办法是对每个<K, V>键值对调用一次reduce() 办法是对每个key调用一次,每个key可能对应多个value,其中value封装为迭代器对象不要导错包!!!不要导错包!!!不要导错包!!! 血泪的踩坑之旅 -- mapred(老版本,不必) -- mapreduce(用它!)3. MR的切片【什么是切片?】 MR要进行计算,就必须要有输出,而MR是分布式计算模式,因而对于数据量较大的计算,就会启动多个Mapper和Reducer(Reducer数目个别都会远小于Mapper),而作为第一道环节,多个Mapper都承受残缺的数据集,那分布式齐全就没有意义了,因而在数据进入Mapper前,会首先进行逻辑切分,将整个数据集分成多份,每份送给一个Mapper进行解决。因而,切片数 = Mapper(MapTask) 的个数 【怎么切?】 第一种:依据文件大小切(默认切片形式) -- 附源码解析 依据参数失去SplitSize,即满足多大了就切一块,见源码解析文件优先级 > 切片,即如果一个文件不够SplitSize,则间接作为一个切片,不是把所有文件当整体肯定要明确一点:这里的切片只是逻辑切片!!相当于隔一段给文件"打一个标记",并不是真正物理上将数据文件划分为几份。实际上Client端做的分片仅仅是提交job.split信息,行将这些标记发送给MR,Map阶段每个Mapper将会依据这些标记去读取各自被调配的那局部数据protected long getFormatMinSplitSize() { return 1L; }// 获取最小切片大小 -- 默认为 1public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);}// 获取最大切片大小 -- 默认为Long.MAXpublic static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L); }// ......(此处省略一万行代码)if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 理论切片的大小 -- computeSplitSize函数见下 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; // 如果剩下的文件大于 1.1 * SplitSize,那么持续切分;否则间接作为一个分片; 防止出现过小文件 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } // ......(此处省略一万行代码)// 外围公式protected long computeSplitSize(long blockSize, long minSize, long maxSize) { // 默认状况下就是块文件的大小 -- 128M return Math.max(minSize, Math.min(maxSize, blockSize));}第二种:依据文件行数切 ...

August 10, 2020 · 1 min · jiezi

Windows-下运行-Hadoop-并部署到-AWSqbit

本文环境Windows 10JDK 8IntelliJ IDEA 2019.3.4(Community Edition)Hadoop 2.8.5AWS EMR 5.3.0详细步骤新建 Maven 工程 修改 pom.xml 配置<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>mapreducedemo</groupId> <artifactId>mapreducedemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <hadoop.version>2.8.5</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies></project>新建 Package 和 Java Class 从官方拷贝代码到 WordCount.javapackage mapreducedemo;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}将文本文件 demo.txt 放到 pom.xml 同级目录添加运行配置 ...

July 8, 2020 · 1 min · jiezi