关于mapreduce:MapReduce-示例减少-Hadoop-MapReduce-中的侧连接

摘要:在排序和reducer 阶段,reduce 侧连贯过程会产生微小的网络I/O 流量,在这个阶段,雷同键的值被汇集在一起。本文分享自华为云社区《MapReduce 示例:缩小 Hadoop MapReduce 中的侧连贯》,作者:Donglian Lin。 在这篇博客中,将应用 MapReduce 示例向您解释如何在 Hadoop MapReduce 中执行缩减侧连贯。在这里,我假如您曾经相熟 MapReduce 框架并晓得如何编写根本的 MapReduce 程序。本博客中探讨的主题如下: • 什么是退出?• MapReduce 中的连贯• 什么是 Reduce 侧连贯?• 缩小侧连贯的 MapReduce 示例• 论断 什么是联接?join操作用于基于外键将两个或多个数据库表合并。通常,公司在其数据库中为客户和交易 记录保护独自的表 。而且,很多时候这些公司须要应用这些独自表格中的数据生成剖析报告。因而,他们应用公共列(外键)(如客户 ID 等)对这些独自的表执行连贯操作,以生成组合表。而后,他们剖析这个组合表以取得所需的剖析报告。 MapReduce 中的连贯就像 SQL join 一样,咱们也能够在 MapReduce 中对不同的数据集进行 join 操作。MapReduce 中有两种类型的连贯操作: • Map Side Join:顾名思义,join操作是在map阶段自身进行的。因而,在 map side join 中,mapper 执行 join 并且每个 map 的输出都必须依据键进行分区和排序。• 缩小副退出:顾名思义,在缩小侧退出,加速是 负责执行连贯操作。因为排序和改选阶段将具备雷同键的值发送到同一个 reducer,因而它比 map side join 绝对简略和容易实现,因而,默认状况下,数据是为咱们组织的。 当初,让咱们具体理解reduce side join。 什么是缩小侧连贯? ...

September 17, 2021 · 4 min · jiezi

关于mapreduce:云小课-MRS基础入门之HDFS组件介绍

摘要:HDFS是MapReduce服务中的根底文件系统,全称为Hadoop的分布式文件系统(Hadoop Distributed File System),可反对实现大规模数据牢靠的分布式读写。本文分享自华为云社区《【云小课】EI第21课 MRS根底入门之HDFS组件介绍》,原文作者:Hi,EI 。 HDFS针对的应用场景是数据读写具备“一次写,屡次读”的特色,而数据“写”操作是程序写,也就是在文件创建时的写入或者在现有文件之后的增加操作。HDFS保障一个文件在一个时刻只被一个调用者执行写操作,而能够被多个调用者执行读操作。 HDFS构造HDFS是一个Master/Slave的架构,次要蕴含主、备NameNode和多个DataNode角色。在Master上运行NameNode,而在每一个Slave上运行DataNode,ZKFC须要和NameNode一起运行。 NameNode和DataNode之间的通信都是建设在TCP/IP的根底之上的。NameNode、DataNode、ZKFC和JournalNode能部署在运行Linux的服务器上。 图1-1中各模块的性能阐明如表1-1所示。 HA即为High Availability,用于解决NameNode单点故障问题,该个性通过主备的形式为主NameNode提供一个备用者,一旦主NameNode呈现故障,能够迅速切换至备NameNode,从而不间断对外提供服务。 在一个典型HDFS HA场景中,通常由两个NameNode组成,一个处于Active状态,另一个处于Standby状态。 为了能实现Active和Standby两个NameNode的元数据信息同步,需提供一个共享存储系统。本版本提供基于QJM(Quorum Journal Manager)的HA解决方案,如图1-2所示。主备NameNode之间通过一组JournalNode同步元数据信息。 通常配置奇数个(2N+1个)JournalNode,且起码要运行3个JournalNode。这样,一条元数据更新音讯只有有N+1个JournalNode写入胜利就认为数据写入胜利,此时最多容忍N个JournalNode写入失败。比方,3个JournalNode时,最多容许1个JournalNode写入失败,5个JournalNode时,最多容许2个JournalNode写入失败。 因为JournalNode是一个轻量级的守护过程,能够与Hadoop其它服务共用机器。倡议将JournalNode部署在管制节点上,以防止数据节点在进行大数据量传输时引起JournalNode写入失败。 HDFS原理MRS应用HDFS的正本机制来保证数据的可靠性,HDFS中每保留一个文件则主动生成1个备份文件,即共2个正本。HDFS正本数可通过“dfs.replication”参数查问。 当MRS集群中Core节点规格抉择为非本地盘(hdd)时,若集群中只有一个Core节点,则HDFS默认正本数为1。若集群中Core节点数大于等于2,则HDFS默认正本数为2。当MRS集群中Core节点规格抉择为本地盘(hdd)时,若集群中只有一个Core节点,则HDFS默认正本数为1。若集群中有两个Core节点,则HDFS默认正本数为2。若集群中Core节点数大于等于3,则HDFS默认正本数为3。MRS服务的HDFS组件反对以下局部个性: HDFS组件反对纠删码,使得数据冗余缩小到50%,且可靠性更高,并引入条带化的块存储构造,最大化的利用现有集群单节点多磁盘的能力,使得数据写入性能在引入编码过程后,仍和原来多正本冗余的性能靠近。反对HDFS组件上节点平衡调度和单节点内的磁盘平衡调度,有助于扩容节点或扩容磁盘后的HDFS存储性能晋升。更多对于Hadoop的架构和具体原理介绍,请参见:http://hadoop.apache.org/。 HDFS文件根底操作在MRS集群中,您能够通过治理控制台、客户端命令以及API接口等多种形式进行HDFS文件的操作。 MRS集群的创立您可参考创立集群。 1、通过MRS治理控制台查看HDFS文件信息在MRS治理控制台,点击集群名称进入到MRS集群详情页面,单击“文件治理”。 在文件治理页面,即可查看HDFS文件列表,并能够执行文件删除、文件夹增删以及与OBS服务数据的导入导入。 2、通过集群客户端查看HDFS文件信息a. 登录MRS集群的FusionInsight Manager页面(如果没有弹性IP,需提前购买弹性IP),新建一个用户hdfstest,绑定用户组supergroup,绑定角色System_administrator(集群未开启Kerberos认证可跳过)。 b. 下载并装置集群全量客户端,例如客户端装置目录为“/opt/client”,相干操作可参考装置客户端。 c. 为客户端节点绑定一个弹性IP,而后应用root用户登录主Master节点,并进入客户端所在目录并认证用户。 cd /opt/client source bigdata_env kinit hbasetest(集群未开启Kerberos认证可跳过) d. 应用hdfs命令进行HDFS文件相干操作。 例如: 创立文件夹:hdfs dfs -mkdir /tmp/testdir 查看文件夹:hdfs dfs -ls /tmp Found 11 items drwx------ - hdfs hadoop 0 2021-05-20 11:20 /tmp/.testHDFS drwxrwxrwx - mapred hadoop 0 2021-05-10 10:33 /tmp/hadoop-yarn drwxrwxrwx - hive hadoop 0 2021-05-10 10:43 /tmp/hive drwxrwx--- - hive hive 0 2021-05-18 16:21 /tmp/hive-scratch drwxrwxrwt - yarn hadoop 0 2021-05-17 11:30 /tmp/logs drwx------ - hive hadoop 0 2021-05-20 11:20 /tmp/monitor drwxrwxrwx - spark2x hadoop 0 2021-05-10 10:45 /tmp/spark2x drwxrwxrwx - spark2x hadoop 0 2021-05-10 10:44 /tmp/sparkhive-scratch drwxr-xr-x - hetuserver hadoop 0 2021-05-17 11:32 /tmp/state-store-launcher drwxr-xr-x - hdfstest hadoop 0 2021-05-20 11:20 /tmp/testdir drwxrwxrwx - hive hadoop 0 2021-05-10 10:43 /tmp/tmp-hive-insert-flag上传本地文件至HDFS:hdfs dfs -put /tmp/test.txt /tmp/testdir (/tmp/test.txt提前准备) ...

June 25, 2021 · 1 min · jiezi

关于mapreduce:四Hadoop之MapReduce实战小例子

输出数据文件 AvgTemperature.txt: DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM1020160602,00,霾,1984,130,9,390,348,30020160802,01,霾,1163,81,8,393,368,30220160706,02,霾,1079,108,17,360,394,30620160706,03,霾,1116,79,6,339,387,30320160502,04,霾,1198,98,16,357,325,307 20160602,05,霾,1762,126,9,324,316,30120160408,06,霾,1996,131,3,349,344,30120160604,07,霾,1952,119,26,347,300,30920160105,08,霾,1410,81,8,350,395,30720160104,09,霾,1718,130,4,352,335,30820160501,10,霾,1714,119,27,310,336,30720160601,11,霾,1660,130,23,311,364,30220160606,12,霾,1598,96,12,369,346,30920160602,13,霾,1673,127,2,343,346,30320160706,14,霾,1578,122,8,360,323,30720160707,15,霾,1237,118,12,384,384,30120160205,16,霾,1231,78,9,361,357,30220160605,17,霾,1166,86,30,350,388,30720160506,18,霾,1426,94,2,378,372,30520160805,19,霾,1874,144,20,376,327,30220160405,20,霾,1778,94,22,360,335,30420160104,21,霾,1055,64,22,376,361,30520160304,22,霾,1349,78,15,367,384,30820160203,23,霾,2004,110,2,359,371,30420160603,24,霾,1375,115,19,308,301,30820160402,25,霾,1201,69,5,387,342,30520160707,26,霾,1272,112,23,348,333,30720160702,27,霾,1738,60,12,393,300,30320160301,28,霾,1752,107,12,364,331,30120160704,29,霾,1442,65,9,332,369,308第一题:编写月平均气温统计程序 import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class AvgTemperature { public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> { private IntWritable intValue = new IntWritable(); private Text dateKey = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); String date = items[0]; String tmp = items[5]; if(!"DATE".equals(date) && !"N/A".equals(tmp)){//排除第一行阐明以及未取到数据的行 dateKey.set(date.substring(0, 6)); intValue.set(Integer.parseInt(tmp)); context.write(dateKey, intValue); } } } public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int tmp_sum = 0; int count = 0; for(IntWritable val : values){ tmp_sum += val.get(); count++; } int tmp_avg = tmp_sum/count; result.set(tmp_avg); context.write(key, result); } } public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "AvgTemperature"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(AvgTemperature.class); job.setMapperClass(StatMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(StatReducer.class); job.setPartitionerClass(HashPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); TextInputFormat.setInputPaths(job, args[0]); job.setNumReduceTasks(Integer.parseInt(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); }} ...

October 31, 2020 · 2 min · jiezi

关于mapreduce:Hadoop学习笔记二MapReduce的IO类型-文件切片

1. 对MapReduce的了解是什么:Hadoop默认自带的分布式计算框架 做什么:提供一系列接口(外围类:InputFormat、OutputFormat、Mapper、Reducer、Driver),让用户可能实现自定义业务性能的分布式计算工作 【长处】: 高扩展性:计算资源不够,间接减少节点数量即可。品质可能不够,数量肯定管够高容错性:一个节点工作失败,能主动转移到其余闲暇节点适宜大数据处理:得益于其扩展性,只有数量足够,可能计算TB级别的数据【毛病】: 无奈进行实时计算:太慢了!!!太慢了!!!不善于流式计算:MR的输出数据个别都是动态的,无奈解决动静的输出格局不善于迭代计算:一个残缺的MR计算过程个别为Mapper - Reducer - 写出后果,频繁的迭代计算将要启动多组MR并串联,而每次MR的后果都是会以文件的模式写出,给下一个MR组输出的话又得从新读取,太过繁琐【集体总结】: 集体认为,MR程序的毛病总结的话就是一点,IO过程太多了。首先map阶段输出须要从HDFS中读取数据(这个过程曾经是比较慢的了),而后map阶段业务实现后写出数据(一次IO),而reduce阶段的输出首先得从map节点读取曾经写出的后果文件(可能是网络IO),读到reduce节点后如果后果集太大又会写到磁盘(又一次IO),最初才是次要的reduce过程,最初后果依然是写回磁盘......一组MR的执行,可能要波及到同节点、异节点的屡次IO,如果用来做机器学习之类的简单迭代计算,可能IO工夫比外围业务工夫更长,因而MR适宜做一些一次性、单步骤、大量级的计算。 2. Mapper & Reducer编程须要留神的点map() 办法是对每个<K, V>键值对调用一次reduce() 办法是对每个key调用一次,每个key可能对应多个value,其中value封装为迭代器对象不要导错包!!!不要导错包!!!不要导错包!!! 血泪的踩坑之旅 -- mapred(老版本,不必) -- mapreduce(用它!)3. MR的切片【什么是切片?】 MR要进行计算,就必须要有输出,而MR是分布式计算模式,因而对于数据量较大的计算,就会启动多个Mapper和Reducer(Reducer数目个别都会远小于Mapper),而作为第一道环节,多个Mapper都承受残缺的数据集,那分布式齐全就没有意义了,因而在数据进入Mapper前,会首先进行逻辑切分,将整个数据集分成多份,每份送给一个Mapper进行解决。因而,切片数 = Mapper(MapTask) 的个数 【怎么切?】 第一种:依据文件大小切(默认切片形式) -- 附源码解析 依据参数失去SplitSize,即满足多大了就切一块,见源码解析文件优先级 > 切片,即如果一个文件不够SplitSize,则间接作为一个切片,不是把所有文件当整体肯定要明确一点:这里的切片只是逻辑切片!!相当于隔一段给文件"打一个标记",并不是真正物理上将数据文件划分为几份。实际上Client端做的分片仅仅是提交job.split信息,行将这些标记发送给MR,Map阶段每个Mapper将会依据这些标记去读取各自被调配的那局部数据protected long getFormatMinSplitSize() { return 1L; }// 获取最小切片大小 -- 默认为 1public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);}// 获取最大切片大小 -- 默认为Long.MAXpublic static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L); }// ......(此处省略一万行代码)if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 理论切片的大小 -- computeSplitSize函数见下 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; // 如果剩下的文件大于 1.1 * SplitSize,那么持续切分;否则间接作为一个分片; 防止出现过小文件 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } // ......(此处省略一万行代码)// 外围公式protected long computeSplitSize(long blockSize, long minSize, long maxSize) { // 默认状况下就是块文件的大小 -- 128M return Math.max(minSize, Math.min(maxSize, blockSize));}第二种:依据文件行数切 ...

August 10, 2020 · 1 min · jiezi

关于mapreduce:Hadoop学习笔记二MapReduce的IO类型-文件切片

1. 对MapReduce的了解是什么:Hadoop默认自带的分布式计算框架 做什么:提供一系列接口(外围类:InputFormat、OutputFormat、Mapper、Reducer、Driver),让用户可能实现自定义业务性能的分布式计算工作 【长处】: 高扩展性:计算资源不够,间接减少节点数量即可。品质可能不够,数量肯定管够高容错性:一个节点工作失败,能主动转移到其余闲暇节点适宜大数据处理:得益于其扩展性,只有数量足够,可能计算TB级别的数据【毛病】: 无奈进行实时计算:太慢了!!!太慢了!!!不善于流式计算:MR的输出数据个别都是动态的,无奈解决动静的输出格局不善于迭代计算:一个残缺的MR计算过程个别为Mapper - Reducer - 写出后果,频繁的迭代计算将要启动多组MR并串联,而每次MR的后果都是会以文件的模式写出,给下一个MR组输出的话又得从新读取,太过繁琐【集体总结】: 集体认为,MR程序的毛病总结的话就是一点,IO过程太多了。首先map阶段输出须要从HDFS中读取数据(这个过程曾经是比较慢的了),而后map阶段业务实现后写出数据(一次IO),而reduce阶段的输出首先得从map节点读取曾经写出的后果文件(可能是网络IO),读到reduce节点后如果后果集太大又会写到磁盘(又一次IO),最初才是次要的reduce过程,最初后果依然是写回磁盘......一组MR的执行,可能要波及到同节点、异节点的屡次IO,如果用来做机器学习之类的简单迭代计算,可能IO工夫比外围业务工夫更长,因而MR适宜做一些一次性、单步骤、大量级的计算。 2. Mapper & Reducer编程须要留神的点map() 办法是对每个<K, V>键值对调用一次reduce() 办法是对每个key调用一次,每个key可能对应多个value,其中value封装为迭代器对象不要导错包!!!不要导错包!!!不要导错包!!! 血泪的踩坑之旅 -- mapred(老版本,不必) -- mapreduce(用它!)3. MR的切片【什么是切片?】 MR要进行计算,就必须要有输出,而MR是分布式计算模式,因而对于数据量较大的计算,就会启动多个Mapper和Reducer(Reducer数目个别都会远小于Mapper),而作为第一道环节,多个Mapper都承受残缺的数据集,那分布式齐全就没有意义了,因而在数据进入Mapper前,会首先进行逻辑切分,将整个数据集分成多份,每份送给一个Mapper进行解决。因而,切片数 = Mapper(MapTask) 的个数 【怎么切?】 第一种:依据文件大小切(默认切片形式) -- 附源码解析 依据参数失去SplitSize,即满足多大了就切一块,见源码解析文件优先级 > 切片,即如果一个文件不够SplitSize,则间接作为一个切片,不是把所有文件当整体肯定要明确一点:这里的切片只是逻辑切片!!相当于隔一段给文件"打一个标记",并不是真正物理上将数据文件划分为几份。实际上Client端做的分片仅仅是提交job.split信息,行将这些标记发送给MR,Map阶段每个Mapper将会依据这些标记去读取各自被调配的那局部数据protected long getFormatMinSplitSize() { return 1L; }// 获取最小切片大小 -- 默认为 1public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);}// 获取最大切片大小 -- 默认为Long.MAXpublic static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L); }// ......(此处省略一万行代码)if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 理论切片的大小 -- computeSplitSize函数见下 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; // 如果剩下的文件大于 1.1 * SplitSize,那么持续切分;否则间接作为一个分片; 防止出现过小文件 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } // ......(此处省略一万行代码)// 外围公式protected long computeSplitSize(long blockSize, long minSize, long maxSize) { // 默认状况下就是块文件的大小 -- 128M return Math.max(minSize, Math.min(maxSize, blockSize));}第二种:依据文件行数切 ...

August 10, 2020 · 1 min · jiezi

Windows-下运行-Hadoop-并部署到-AWSqbit

本文环境Windows 10JDK 8IntelliJ IDEA 2019.3.4(Community Edition)Hadoop 2.8.5AWS EMR 5.3.0详细步骤新建 Maven 工程 修改 pom.xml 配置<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>mapreducedemo</groupId> <artifactId>mapreducedemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <hadoop.version>2.8.5</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies></project>新建 Package 和 Java Class 从官方拷贝代码到 WordCount.javapackage mapreducedemo;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}将文本文件 demo.txt 放到 pom.xml 同级目录添加运行配置 ...

July 8, 2020 · 1 min · jiezi

MapReduce案例好友推荐

用过各种社交平台(如QQ、微博、朋友网等等)的小伙伴应该都知道有一个叫 "可能认识" 或者 "好友推荐" 的功能(如下图)。它的算法主要是根据你们之间的共同好友数进行推荐,当然也有其他如爱好、特长等等。共同好友的数量越多,表明你们可能认识,系统便会自动推荐。今天我将向大家介绍如何使用MapReduce计算共同好友 算法假设有以下好友列表,A的好友有B,C,D,F,E,O; B的好友有A,C,E,K 以此类推那我们要如何算出A-O用户每个用户之间的共同好友呢?A:B,C,D,F,E,OB:A,C,E,KC:F,A,D,ID:A,E,F,LE:B,C,D,M,LF:A,B,C,D,E,O,MG:A,C,D,E,FH:A,C,D,E,OI:A,OJ:B,OK:A,C,DL:D,E,FM:E,F,GO:A,H,I,J下面我们将演示分步计算,思路主要如下:先算出某个用户是哪些用户的共同好友,如A是B,C,D,E等的共同好友。遍历B,C,D,E两两配对如(B-C共同好友A,注意防止重复B-C,C-B)作为key放松给reduce端,这样reduce就会收到所有B-C的共同好友的集合。可能到这里你还不太清楚怎么回事,下面我给大家画一个图。 代码演示由上可知,此次计算由两步组成,因此需要两个MapReduce程序先后执行第一步:通过mapreduce得到 某个用户是哪些用户的共同好友。public class FriendsDemoStepOneDriver { static class FriendsDemoStepOneMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(":"); String user = split[0]; String[] friends = split[1].split(","); for (String friend : friends) {// 输出友人,人。 这样的就可以得到哪个人是哪些人的共同朋友 context.write(new Text(friend),new Text(user)); } } } static class FriendsDemoStepOneReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text person : values) { sb.append(person+","); } context.write(key,new Text(sb.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name","local"); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); job.setJarByClass(FriendsDemoStepOneDriver.class); job.setMapperClass(FriendsDemoStepOneMapper.class); job.setReducerClass(FriendsDemoStepOneReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/mapreduce/friends/friends.txt")); FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/mapreduce/friends/output")); boolean completion = job.waitForCompletion(true); System.out.println(completion); }}运行的到的结果如下: ...

October 14, 2019 · 2 min · jiezi

Hadoop-MapReduce-Spark-配置项

适用范围本文涉及到的配置项主要针对 Hadoop 2.x,Spark 2.x。 MapReduce官方文档https://hadoop.apache.org/doc...左下角: mapred-default.xml 配置项举例mapreduce.job.reduce.slowstart.completedmaps当 Map Task 完成的比例达到该值后才会为 Reduce Task 申请资源mapreduce.output.fileoutputformat.compressMapReduce Job 的结果输出需要使用压缩Spark官方文档https://spark.apache.org/docs...最后有提到想要直接配置 Hadoop 项,在 Hadoop 配置项前加 spark.hadoop 即可。Custom Hadoop/Hive Configuration 配置项示例spark.dynamicAllocation.enabled是否动态分配内存(Spark 原生配置项)spark.hadoop.mapreduce.output.fileoutputformat.compressJob 的结果输出是否使用压缩(Hadoop MR 配置项)本文出自: walker snapshot

September 19, 2019 · 1 min · jiezi

Hadoop的搭建和第一个Hadoop小项目单词计数

Hadoop的搭建我自己是在windows10上搭建的hadoop。 参考资料如下: 1.hadoop详细安装及配置 2.winutils下载 3.hadoop3.0.3下载 4hadoop启动报错java.lang.NoClassDefFoundError:/org/apache/hadoop/yarn/server/timelineCollectorManager 第一个Hadoop小项目:单词计数单词计数应该是很多人入门Hadoop的第一个小项目。我自己看的参考资料是《MapReduce设计模式》。运作这个小例子是不需要启动Hadoop的。 采坑总结:(1)Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir ar 我自己的解决方案是在系统变量添加HADOOP_HOME,在系统变量的PATH里添加bin,重启IDEA。之前在用户变量里添加过不知道为什么没生效,所以在系统变量里加。用以下代码验证: System.out.println(System.getenv("HADOOP_HOME"));System.out.println(System.getenv("PATH"));如果有些人报错说找不到winutils.exe,需要去下载winutils的包,把对应版本的bin文件夹替换hadoop的bin。我在【hadoop的搭建】部分的参考资料有给下载的github地址。 (2)Maven的依赖问题。 Exception in thread "main" java.lang.VerifyError: Bad return type'org/apache/hadoop/mapred/JobStatus' (current frame, stack[0]) is not assign 'org/apache/hadoop/mapreduce/JobStatus'这个我在网上没有找到解决方法,但是我的程序是参照《MapReduce设计模式》来的,确定应该不是程序的问题之后,应该只能是Maven依赖的问题。修改后,我的项目的依赖包括:hadoop-common、hadoop-hdfs、hadoop-mapreduce-client-core、hadoop-mapreduce-client-jobclient、hadoop-mapreduce-client-common。版本都是3.0.3,因为我搭建的Hadoop版本是3.0.3。 (3)也是Maven依赖问题。 java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.添加hadoop-mapreduce-client-jobclient、hadoop-mapreduce-client-common这两个依赖就好。参考资料:https://blog.csdn.net/qq_2012... 完整的代码import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.fs.Path;import java.io.IOException;import java.util.StringTokenizer;/** * @Author liuffei * @Date 2019/7/13 9:41 * @Description */public class CommentWordCount { //Mapper<Object, Text,Text, IntWritable>表示输入键,输入值,输出键,输出值 //mapper输入的键值是在作业配置的FileInputFormat中定义的。 public static class WordCountMapper extends Mapper<Object, Text,Text, IntWritable> { //设置计数为1 IntWritable one = new IntWritable(1); Text word = new Text(); //覆盖了Mapper类的map方法 public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String txt = value.toString(); //将输入值中的非字母替换为空字符串 txt = txt.replaceAll("[^a-zA-Z]",""); StringTokenizer stringTokenizer = new StringTokenizer(txt); while(stringTokenizer.hasMoreTokens()) { word.set(stringTokenizer.nextToken()); //将每个单词计数为1,并保存。 context.write(word, one); } } } //Reducer<Text, IntWritable,Text, IntWritable>表示输入键,输入值,输出键,输出值 //Reducer的输入键输入值应该和Mapper的输出键输出值的类型保持一致 public static class IntSumReducer extends Reducer<Text, IntWritable,Text, IntWritable> { public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int sum = 0; for (IntWritable val:values) { sum += val.get(); } context.write(key,new IntWritable(sum)); } } public static void main(String[] args){ try { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("need enter input and output directory path"); System.exit(2); } Job job = Job.getInstance(conf, "Word Count"); //与自己定义的类名保持一致 job.setJarByClass(CommentWordCount.class); //与自己定义的Mapper类和Reducer类保持一致 job.setMapperClass(WordCountMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); //设置的输出键和输出值和mapper定义的需要保持一致。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //输入输出路径 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } }}运行main方法之前,我在自己的项目的src同级目录建立了input文件夹,并建立了两个txt文件(注意文件的读取是按行的,所以每个单词单独一行)。运行main方法时,添加输入输出路径。文件夹的路径大家可以自己定义。output文件夹不需要自己建立,会自动建立,每次运行时需要把之前生成的output文件夹删除,不然会报output文件夹已经存在的错。 ...

July 13, 2019 · 2 min · jiezi

使用Python操作Hadoop,Python-MapReduce

环境环境使用:hadoop3.1,Python3.6,ubuntu18.04Hadoop是使用Java开发的,推荐使用Java操作HDFS。有时候也需要我们使用Python操作HDFS。本次我们来讨论如何使用Python操作HDFS,进行文件上传,下载,查看文件夹,以及如何使用Python进行MapReduce编程。使用Python操作HDFS首先需要安装和导入hdfs库,使用pip install hdfs。1. 连接并查看指定路径下的数据from hdfs import * client = Client(‘http://ip:port’) #2.X版本port 使用50070 3.x版本port 使用9870client.list(’/’) #查看hdfs /下的目录2. 创建目录client.makedirs(’/test’)client.makedirs(’/test’,permision = 777 ) # permision可以设置参数3. 重命名、删除client.rename(’/test’,‘123’) #将/test 目录改名为123client.delete(’/test’,True) #第二个参数表示递归删除 4.下载将/test/log.txt 文件下载至/home目录下。client.download(’/test/log.txt’,’/home’) 5. 读取with client.read("/test/[PPT]Google Protocol Buffers.pdf") as reader: print reader.read()其他参数:read(args, *kwds) hdfs_path:hdfs路径 offset:设置开始的字节位置 l- ength:读取的长度(字节为单位) buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。 encoding:指定编码 chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象 delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。 progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1作为第二个参数。6.上传数据将文件上传至hdfs的 /test下。client.upload(‘/test’,’/home/test/a.log’)Python-MapReduce编写mapper代码,map.py:import sysfor line in sys.stdin: fields = line.strip().split() for item in fields: print(item + ’ ’ + ‘1’)编写reducer代码,reduce.py:import sysresult = {}for line in sys.stdin: kvs = line.strip().split(’ ‘) k = kvs[0] v = kvs[1] if k in result: result[k]+=1 else: result[k] = 1for k,v in result.items(): print("%s\t%s" %(k,v))添加测试文本,test1.txt:tale as old as timetrue as it can bebeauty and the beast本地测试执行map代码:cat test1.txt | python map.py结果:tale 1as 1old 1as 1time 1true 1as 1it 1can 1be 1beauty 1and 1the 1beast 1本地测试执行reduce代码:cat test1.txt | python map.py | sort -k1,1 | python reduce.py执行结果:and 1be 1old 1beauty 1true 1it 1beast 1as 3can 1time 1the 1tale 1在Hadoop平台执行map-reduce程序本地测试完毕,编写脚本在HDFS中执行程序脚本:run.sh (请根据本机环境修改)HADOOP_CMD="/app/hadoop-3.1.2/bin/hadoop"STREAM_JAR_PATH="/app/hadoop-3.1.2/share/hadoop/tools/lib/hadoop-streaming-3.1.2.jar"INPUT_FILE_PATH_1="/py/input/“OUTPUT_PATH="/output”$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH# Step 1.$HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH_1 -output $OUTPUT_PATH -mapper “python map.py” -reducer “python reduce.py” -file ./map.py -file ./reduce.py \添加执行权限chmod a+x run.sh;执行测试:bash run.sh,查看结果:练习1. 文件合并去重输入文件file1的样例如下:20150101 x20150102 y20150103 x20150104 y20150105 z20150106 x输入文件file2的样例如下:20150101 y20150102 y20150103 x20150104 z20150105 y根据输入文件file1和file2合并得到的输出文件file3的样例如下:20150101 x20150101 y20150102 y20150103 x20150104 y20150104 z20150105 y20150105 z20150106 x对于两个输入文件,即文件file1和文件file2,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3。为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:第一列按学号排列;学号相同,按x,y,z排列。2. 挖掘父子关系输入文件内容如下:child parentSteven LucySteven JackJone LucyJone JackLucy MaryLucy FrankJack AliceJack JesseDavid AliceDavid JessePhilip DavidPhilip AlmaMark DavidMark Alma输出文件内容如下:grandchild grandparentSteven AliceSteven JesseJone AliceJone JesseSteven MarySteven FrankJone MaryJone FrankPhilip AlicePhilip JesseMark AliceMark Jesse你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下:孙子在前,祖父在后孙子相同,祖父的名字按照A-Z排列 ...

April 7, 2019 · 2 min · jiezi

windows调试hadoop-mapreduce任务踩坑记录(使用idea)

首先准备Hadoop连接驱动,放到任意一个文件夹中,并将其bin目录写入path环境环境变量,另取其中的hadoop.dll文件放入c盘System32文件夹中。创建空maven项目,这是我的全部依赖 <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> </dependencies>将hadoop四大配置文件放在resources根目录下,还有log4j.properties再准备你的mapreduce源码,在main函数中写入以下代码System.setProperty(“HADOOP_USER_NAME”, “hadoop”);Configuration conf = new Configuration();conf.set(“fs.defaultFS”, “hdfs://10.32.6.132:9000”);//ip依照自己的集群进行定义conf.set(“mapreduce.framework.name”, “yarn”);conf.set(“mapreduce.app-submission.cross-platform”, “true”);//允许跨平台提交conf.set(“mapred.jar”,“E:\hadooptest\target\hadooptest-1.0-SNAPSHOT-jar-with-dependencies.jar”);mapred.jar就写编译出来的jar包位置idea中运行配置如下其中输入路径一定要存在,输出路径一定要不存在,由Hadoop自行创建此处写hadoop驱动的根目录,之后点击运行即可。遇到的问题:1.一定要保证hadoop集群是可运行的,单机版也可以,但一定要保证是正常的。2.在本机运行过程中会调用hadoop历史服务器,采用sbin/mr-jobhistory-daemon.sh start historyserver命令启动不启动的异常为10020端口无法访问3.运行过程中出现的各种连接异常的警告可以忽视,不可忽视的是异常,端口默认访问地址是0.0.0.0是无法被其他主机访问的,所以任何连接被异常中断时请检查是否在配置文件中显式指定了套接字。

March 16, 2019 · 1 min · jiezi

MapReduce精髓

虽然Google的MapReduce论文很老了(十多年),但只要还没看,就值得一看。概要MapReduce是一种重视容错性的分布式并行计算模式,它把分布式并行计算分为map和reduce两个阶段:map: 把输入数据集切分成很多份(1份可包含很多records),传给map函数做转换处理(每次处理1条record,得出1条结果),结果集被输出到文件reduce: 读取map的结果集,传给reduce函数做归约处理(每次处理1条record,更新一条共享的结果),结果集被输出到文件每台机器是一个node。map和reduce都可以在很多worker node上运行。1个任务=1个函数+1组输入数据,任务被分配到worker node上运行。有一个中央的调度器,叫做master node,来进行全局调度,给worker node分配任务。示意图就不贴了,到处可见的WordCount例子也不写了,别人写过的我一般不写。请参考网络资料。参考好了吗?继续吧!优化论文指出它是为大量廉价机器组成的环境而设计的,环境特点是:机器特别多,机器性能参差不齐,有的机器会突然坏掉。论文的很多优化都是在解决这种环境所特有的问题,这在当时是开创性的,因为一般的分布式并行技术都还在研究一组性能均等的高性能机器,不能容忍某台机器变慢或故障。现在工业界都是用MapReduce的方式在搞,因为多数企业的环境都是论文里说的这种。Google推出MapReduce论文时已经考虑周密了,给出了很多优化点:Map阶段:一开始先把输入数据集切分成M片,每片一般16~64MBmap完成时通知master,master记录所有map的完成情况和文件位置map tasks要切得小而多,建议远大于机器数,容易负载均衡(3个task分给2个node,均衡度肯定不如13个task分给2个node)map tasks尽量分配到靠近数据的node,以节省带宽用combine函数做本地预合并,以减小map的输出结果集Reduce阶段:reducer从mapper node下载输入文件reduce先写临时文件,完成时用原子的文件重命名操作reduce用lazy iterator读取输入,以节省内存全阶段:map输出到本地文件,reduce输出到全局文件map和reduce都可有多个副本同时重复执行,谁快就用谁的结果(即使有个副本突然变慢,也有别的副本在跑)总有一些落后进程,增加百分之几的备用资源,就能加速扫除长尾,节省百分之几十的时间调度器尽量把任务分配给空闲的worker,因此速度快的worker自然会处理更多tasks遇到错误的record,写标记到master,再有task遇到时跳过它注释:这种“总有一些落后进程”的现象叫做尾部延迟放大(tail delay amplification),分布式数据库执行查询的scatter/gatter模式也有此问题。Google的解法相当于task rebalance,来个比喻:让先进员工帮助落后员工完成积压的任务,以保证全团队的项目按时完成。我们要知道,“计算易移动,数据难移动”,我们总是倾向于把计算移动到另一处,而不是把数据移过去。MapReduce就是移动了计算,并且把计算尽可能移动到数据附近。task rebalance只移动计算,无需移动数据,所以才管用。而分布式数据库在执行查询时,把计算分发到data node上,如果有的data node反应慢,就没有办法,因为数据不好移。

March 1, 2019 · 1 min · jiezi