hadoop概述
Hadoop 历史
- Hadoop最早起源于Nutch。Nutch的设计指标是一个网络爬虫引擎,但随着抓取网页数据量的增大,Nutch遇到了重大的性能扩大问题。2003年,2004年谷歌公布两篇论文为该问题提供了一个解决方案一个是HDFS的前身GFS,用于海量网页的存储;另一个是分布式计算框架MAPERDUCE。Nutch的创始人依据论文的领导用了·两年的工夫实现了HDFS和MapReduce代码。并将其从Nutch剥离进去,成为独立我的项目Hadoop。2008年Hadoop成为Apache顶级我的项目。
- 晚期的Hadoop并非当初大家所相熟的Hadoop分布式开源软件,而是指代大数据的一个生态圈,包含很多其余的软件。在2010前后,Hbase,HIVE,Zookeeper等顺次脱离Hadoop我的项目成为Apache的顶级我的项目。2011年,Hadoop公布2.0,在架构上进行了重大更新,引入Yarn框架专一于资源的治理精简了MapReduce的职责。同时Yarn框架作为一个通用的资源调度和治理模块,同时反对多种其余的编程模型,比方最闻名的Spark。
- 因为HADOOP的版本治理简单,简单集群的部署装置配置等须要编写大量的配置文件而后散发到每一台节点上,容易出错效率低下。所以很多公司会在根底的Hadoop进行商业化后进行再发行。目前Hadoop发行版十分多,有华为发行版、Intel发行版、Cloudera发行版(CDH)等。
等。
Hadoop(2.0)的组成
HDFS
HDFS的组成
NameNode:是整个HDFS集群的管理者,
- 治理HDFS的名称空间
- 治理正本策略
- 治理数据块在DataNode的地位映射信息
- 与客户端交互解决来自于客户端的读写操作。
DataNode:理论文件的存储者。
- 存储理论的数据块
- 执行数据块的读、写操作
- DataNode启动后向NameNode注册并每6小时向NameNode上报所有的块信息
- DataNode的心跳是每3秒一次,心跳返回后果带有NameNode给该DataNode的命令,例如:复制数据到另一台机器;删除某个数据块。
- 若NameNode超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
客户端
- HDFS提供的工具包,面向开发者,封装了对HDFS的操作调用。
- 负责文件切分:文件上传到HDFS时,Client负责与NameNode和DataNode交互,将文件切分为一个一个的block进行上传。
Secondary NameNode:辅助NameNode,分担其工作量。
- 定期合并Fsimage和Edits,并推送给NameNode。
- 辅助复原NameNode。
- 并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务
HDFS读写流程
HDFS上传文件
- client向NameNode申请上传文件,NameNode进行合规性检测,并创立相应的目录元数据。并返回是否能够上传。
- client将文件切分,再次询问NameNode第一个Block须要上传到哪几个DataNode服务端上。NameNode 返回 3 个 DataNode 节点,别离为 dn1、dn2、dn3。
- client将第一个block数据上传给dn1,dn1 收到申请会持续调用dn2,而后 dn2 调用 dn3,将这个通信管道建设实现。
- client开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3。
- 当一个Block传输实现之后,客户端再次申请NameNode上传第二个Block,而后反复1~4步骤。
NameNode节点抉择策略-节点间隔计算
- 如果Client与HADOOP在同一个集群,则NameNode会抉择间隔待上传数据最近间隔的DataNode接收数据。节点间隔:两个节点达到最近的独特先人的间隔总和。
- 如果Client与HADOOP不在同一个集群,则NameNode随机选一个机架上的一个节点。第二个正本在另一个机架的随机的一个节点,第三个正本在第二个正本所在的机架的随机节点。
HDFS读流程
- client向NameNode申请下载文件,NameNode返回文件块所在的DataNode地址。
- client依据节点间隔筛选一台最近的DataNode服务器,而后开始读取数据。DataNode以Packet来传输数据。
- client接管到一个packet数据后,并进行校验。校验通过后,将packet写入指标文件,再申请第二个packet。整个过程为串行的过程。(因为IO自身就是速度最慢的流程)
- 读取数据的过程中,如果client端与dn数据结点通信时呈现谬误,则尝试连贯蕴含此数据块的下一个dn数据结点。失败的dn数据结点将被client记录,并且当前不再连贯。
SecondaryNameNode
NameNode是机器的文件治理容易造成单点读写性能问题与数据存储平安问题。
SecondaryNameNode与Name帮助解决读写性能问题:NameNode的数据既存储在内存中又存储在磁盘中
- Fsimage文件:HDFS文件系统元数据的一个永久性的检查点,其中蕴含HDFS文件系统的所有目录和文件inode的序列化信息。
- Edits文件:寄存HDFS文件系统的所有更新操作的门路,文件系统客户端执行的所有写操作首先会被记录到Edits文件中。Edits文件只进行追加操作,效率很高。每当元数据有更新或者增加元数据时,批改内存中的元数据并追加到Edits中。
- NameNode启动的时候都会将Fsimage文件读入内存,加载Edits外面的更新操作,保障内存中的元数据信息是最新的、同步的。
- 长时间增加数据到 Edits 中,会导致该文件数据过大,效率升高,而且一旦断电,复原元数据须要的工夫过长。因而2NN,专门用于FsImage和Edits的合并。
SecondaryNameNode 工作机制
NameNode启动后,会创立FsImage和Edits文件。如果不是第一次启动,则间接加载FsImage和Edits文件。
- fsimage_0000000000000000002 文件最新的FsImage。
- edits_inprogress_0000000000000000003 正在进行中的edits。
- seen_txid 是一个txt的文本文件,记录是最新的edits_inprogress文件开端的数字。
Secondary NameNode 工作
- 2NN询问NN是须要CheckPoint。
- NN将当初进行中的edits文件和最新的fsimage文件拷贝到2NN并更新seen_txid里的数字,而后从新生成edits文件。
- 2NN加载编辑日志和镜像文件到内存,并合并生成新的镜像文件fsimage.chkpoint。而后拷贝回NN。NN将fsimage.chkpoint重新命名为fsImage实现一次滚写。
HDFS总结
长处
1. 高容错性:数据主动保留多个正本,通过减少正本的模式,进步容错性。某一个正本失落后,能够主动复原。2. 适宜解决大数据:可能解决的数据规模达到GB,TB甚至PB级别,文件数量能够达到百万规模以上。3. 可构建在便宜的机器上:通过多正本机制,进步可靠性。
毛病
1. 不适宜低时延的数据拜访 2. 无奈高效的对大量小文件进行存储,大量的小文件会占用NameNode大量的内存来存储文件目录和块信息,同时小文件的寻址工夫会超过读取工夫,它违反了HDFS的设计指标 3. 不反对并发写入,文件随机批改。仅反对数据追加的。
DataNode与NameNode源代码导读
代码浏览前的筹备工作:Hadoop Rpc框架指南
Rpc 协定
public interface MyInterface { Object versionID = 1; boolean demo();}
Rpc provider
public class MyHadoopServer implements MyInterface { @Override public boolean demo() { return false; } public static void main(String[] args) { Server server = new RPC.Builder(new Configuration()) .setBindAddress("localhost") .setPort(8888) .setProtocol(MyInterface.class) .setInstance(new MyHadoopServer()) .build(); server.start(); }}
Rpc consuemr
public class MyHadoopClient { public static void main(String[] args) throws Exception { MyInterface client = RPC.getProxy( MyInterface.class, MyInterface.versionID, new InetSocketAddress("localhost", 8888), new Configuration()); client.demo(); } }
NameNode 启动源码
- 启动9870端口服务
- 加载镜像文件和编辑日志
- 初始化NN的RPC服务端:用于接管DataNode的RPC申请
- NN启动资源检测
NN对心跳超时判断(启动一个线程去判断DataNode是否超时)
- HDFS默认DataNode掉线容忍工夫未 timeout = 2 heartbeat.recheck.interval + 10 dfs.heartbeat.interval(2*5+30)超过这个工夫会被认为DataNode超时
DataNode 启动源码
工作流程
源码图示
MapReduce
MapReduce示例
- 需要:有一个大小为300M的存文本文件。统计其每一个字母呈现的总次数。要求:[a-p]一个后果文件,[q-z]一个后果文件。
实现:
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 循环写出 for (String word : words) { // 封装outk outK.set(word); // 写出 context.write(outK, outV); } }}
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 累加 for (IntWritable value : values) { sum += value.get(); } outV.set(sum); // 写出 context.write(key,outV); }}
切片规定决定MapTask的数量。
- MapReduce将数据的读取形象为一个InputFormat。罕用的FileInputFormat是针对文件读取的一个具体实现。
- FileInputFormat的默认分片规定为:将待处理的数据文件进行逻辑分片,每128M为一个数据切片。一个数据切片交给一个MapTask进行并行处理。
分片数量过大,开启过多的MapTask浪费资源。分片数量过小,MapTask阶段解决慢。
ReduceTask的数量须要手工指定。
- 在Map阶段须要对每一个输入数据会通过分区算法计算分区。
- 若ReduceTask=0,则示意没有Reduce阶段,输入文件个数和Map个数统一。
- 若ReduceTask=1,所有输入文件为一个。
- ReduceTask数量要大于分区的后果不同值的数,否则数据无奈被生产会产生异样。
- 若ReduceTask的数量大于分区数量,则会有局部reduceTask处于闲置状态。
MapReduce具体的工作流程。
Map阶段
- read阶段:通过RecordReader从InputFormat分片中将读取数据并将数据解析成一个个key/value。
- map阶段:用户自定义的Mapper.map办法执行。将输出的key/value转为输入的key/value。
collect阶段:接管输入Key/Value 并调用分区算法,将输入数据写入对应的分区的环形内存缓存区中。
- spill阶段:当内存缓存区的使用率超过肯定的阈值后,将触发溢写线程。该线程当初内存中进行疾速排序,而后将数据溢写到磁盘上。
- Combine阶段:当所有数据处理实现后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
Reduce阶段
- Copy阶段:ReduceTask从所有的MapTask中拷贝同一分区的数据,每一个ReduceTask负责解决一个分区,互不影响。如果文件大小超过肯定的阈值,则溢写到磁盘上,否则存储在内存中。
- Merge阶段:ReduceTask将所有雷同分区的材料合并成一个大文件,
- sort阶段:将合并后的大文件进行归并排序。因为mapTask自身保障了分区内区内有序,因而ReduceTask只须要对所有数据进行一次归并排序即可。
- reduce阶段:执行用户的reduce办法,并将后果写到HDFS。
MapReduce优缺点
长处
- 实现简略,封装度高。
- 扩展性强:能够疾速减少机器来扩大它的计算能力。
- 高容错性:当某个节点挂掉,它会主动将工作转移到领一个节点运行,两头不须要人工参加。
- 适宜PB级别以上海量数据的离线解决。
毛病
- 不善于实时计算。
- 不善于流式计算:MapReduce的输出数据集是动态的。
- 重IO:每个 MapReduce 作业的输入后果都会写入到磁盘,会造成大量的磁盘IO。
MapTask源代码导读
MapTask.run办法是MapTask的入口.
- 读取配置,初始化MapTask生成jobId
- 判断应用的api抉择应用runNewMapper或者runOldMapper,并执行。
- MapTask执行完结,做一些清理工作。
runNewMapper
- 实例化默认的inputFormat,计算切片。依据设置的reduceTask数量实例化输入对象。实例化输出对象。
mapper.run(mapperContext)执行
- 循环确认每组kv,执行用户的map逻辑。
- map办法中调用collector.collect办法。将数据写入环形缓冲区。
output.close(mapperContext)执行最终调用MapTask的close办法
调用MapTask的flush办法。
- sortAndSpill 内存排序,并溢写到文件。每一个分区一个临时文件。区内有序
- mergeParts 归并排序,将多个临时文件合并成一个文件。
- 调用MapTask的close办法,该办法是一个空办法。
ReduceTask源代码导读:其入口为RecuceTask的run办法。
- 首先初始化copy,sort,reduce的状态器。
- initialize初始化outputformat为TextOutputFormat。
- shuffleConsumerPlugin.init(shuffleContext)办法执行。初始化inMemoryMerger和onDiskMerger。
shuffleConsumerPlugin.run();
- 创立Fetcher抓取数据,数据抓取实现后,将状态切为sort
- merger.close();里的finanlMerge执行,将内存中的数据和磁盘内的数据合并。
将状态切为reduce。runNewReducer。用户自定义的Reduce办法执行。
用户自定义的reduce办法执行,并调用context.write办法写数据。最终调用TextOutputFormat的write办法。
- 先写key
- 再写value
- 最初写入一个换行
Yarn
Yarn组成
ResourceManager(RM):全局资源的管理者
- 由两局部组成:一个是可插拔式的调度Scheduler,一个是ApplicationManager
- Scheduler :一个存粹的调度器,不负责应用程序的监控
- ApplicationManager:次要负责接管job的提交申请,为利用调配第一个Container来运行ApplicationMaster,还有就是负责监控ApplicationMaster,在遇到失败时重启ApplicationMaster运行的Container
- NodeManager(NM)
- 接管ResourceManager的申请,调配Container给利用的某个工作
- 和ResourceManager替换信息以确保整个集群安稳运行。ResourceManager就是通过收集每个NodeManager的报告信息来追踪整个集群衰弱状态的,而NodeManager负责监控本身的衰弱状态。
- 治理每个Container的生命周期
- 治理每个节点上的日志
- 执行Yarn下面利用的一些额定的服务,比方MapReduce的shuffle过程
Container
- 是Yarn框架的计算单元,是具体执行利用task
- 是一组调配的系统资源内存,cpu,磁盘,网络等
- 每一个应用程序从ApplicationMaster开始,它自身就是一个container(第0个),一旦启动,ApplicationMaster就会依据工作需要与Resourcemanager协商更多的container,在运行过程中,能够动静开释和申请container。
ApplicationMaster(AM)
- ApplicationMaster负责与scheduler协商适合的container,跟踪应用程序的状态,以及监控它们的进度
- 每个应用程序都有本人的ApplicationMaster,负责与ResourceManager协商资源(container)和NodeManager协同工作来执行和监控工作
- 当一个ApplicationMaster启动后,会周期性的向resourcemanager发送心跳报告来确认其衰弱和所需的资源状况
Yarn执行过程
- 客户端程序向ResourceManager提交利用并申请一个ApplicationMaster实例,ResourceManager在应答中给出一个applicationID
- ResourceManager找到能够运行一个Container的NodeManager,并在这个Container中启动ApplicationMaster实例
- ApplicationMaster向ResourceManager进行注册,注册之后客户端就能够查问ResourceManager取得本人ApplicationMaster的详细信息
- 在平时的操作过程中,ApplicationMaster依据resource-request协定向ResourceManager发送resource-request申请,ResourceManager会依据调度策略尽可能最优的为ApplicationMaster调配container资源,作为资源申请的应答发给ApplicationMaster
- ApplicationMaster通过向NodeManager发送container-launch-specification信息来启动Container
- 应用程序的代码在启动的Container中运行,并把运行的进度、状态等信息通过application-specific协定发送给ApplicationMaster,随着作业的执行,ApplicationMaster将心跳和进度信息发给ResourceManager,在这些心跳信息中,ApplicationMaster还能够申请和开释一些container。
- 在利用程序运行期间,提交利用的客户端被动和ApplicationMaster交换取得利用的运行状态、进度更新等信息,交换的协定也是application-specific协定
Yarn调度器与调度算法
Yarn Scheduler 策略
FIFO:将所有的Applications放到队列中,先依照作业的优先级高下、再依照达到工夫的先后,为每个app分配资源
- 长处:简略,不须要配置
- 毛病:不适宜共享集群
Capacity Scheduler:用于一个集群中运行多个Application的状况,指标是最大化吞吐量和集群利用率
- CapacityScheduler容许将整个集群的资源分成多个局部,每个组织应用其中的一部分,即每个组织有一个专门的队列,每个组织的队列还能够进一步划分成层次结构(Hierarchical Queues),从而容许组织外部的不同用户组的应用。每一个队列指定能够应用的资源范畴.
- 每一个队列外部,依照FIFO的形式调度Application.当某个队列的资源闲暇时,能够将它的残余资源共享给其余队列.
Yarn源码
参考
- https://www.cnblogs.com/dan2/...
- https://www.cnblogs.com/dan2/...
- https://www.cnblogs.com/dan2/...
- https://www.bilibili.com/vide...)
Hadoop(2.0)的组成
HDFS
HDFS的组成
NameNode:是整个HDFS集群的管理者,
- 治理HDFS的名称空间
- 治理正本策略
- 治理数据块在DataNode的地位映射信息
- 与客户端交互解决来自于客户端的读写操作。
DataNode:理论文件的存储者。
- 存储理论的数据块
- 执行数据块的读、写操作
- DataNode启动后向NameNode注册并每6小时向NameNode上报所有的块信息
- DataNode的心跳是每3秒一次,心跳返回后果带有NameNode给该DataNode的命令,例如:复制数据到另一台机器;删除某个数据块。
- 若NameNode超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
客户端
- HDFS提供的工具包,面向开发者,封装了对HDFS的操作调用。
- 负责文件切分:文件上传到HDFS时,Client负责与NameNode和DataNode交互,将文件切分为一个一个的block进行上传。
Secondary NameNode:辅助NameNode,分担其工作量。
- 定期合并Fsimage和Edits,并推送给NameNode。
- 辅助复原NameNode。
- 并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务
HDFS读写流程
HDFS上传文件
- client向NameNode申请上传文件,NameNode进行合规性检测,并创立相应的目录元数据。并返回是否能够上传。
- client将文件切分,再次询问NameNode第一个Block须要上传到哪几个DataNode服务端上。NameNode 返回 3 个 DataNode 节点,别离为 dn1、dn2、dn3。
- client将第一个block数据上传给dn1,dn1 收到申请会持续调用dn2,而后 dn2 调用 dn3,将这个通信管道建设实现。
- client开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3。
- 当一个Block传输实现之后,客户端再次申请NameNode上传第二个Block,而后反复1~4步骤。
NameNode节点抉择策略-节点间隔计算
- 如果Client与HADOOP在同一个集群,则NameNode会抉择间隔待上传数据最近间隔的DataNode接收数据。节点间隔:两个节点达到最近的独特先人的间隔总和。
- 如果Client与HADOOP不在同一个集群,则NameNode随机选一个机架上的一个节点。第二个正本在另一个机架的随机的一个节点,第三个正本在第二个正本所在的机架的随机节点。
HDFS读流程
- client向NameNode申请下载文件,NameNode返回文件块所在的DataNode地址。
- client依据节点间隔筛选一台最近的DataNode服务器,而后开始读取数据。DataNode以Packet来传输数据。
- client接管到一个packet数据后,并进行校验。校验通过后,将packet写入指标文件,再申请第二个packet。整个过程为串行的过程。(因为IO自身就是速度最慢的流程)
- 读取数据的过程中,如果client端与dn数据结点通信时呈现谬误,则尝试连贯蕴含此数据块的下一个dn数据结点。失败的dn数据结点将被client记录,并且当前不再连贯。
SecondaryNameNode
NameNode是机器的文件治理容易造成单点读写性能问题与数据存储平安问题。
SecondaryNameNode与Name帮助解决读写性能问题:NameNode的数据既存储在内存中又存储在磁盘中
- Fsimage文件:HDFS文件系统元数据的一个永久性的检查点,其中蕴含HDFS文件系统的所有目录和文件inode的序列化信息。
- Edits文件:寄存HDFS文件系统的所有更新操作的门路,文件系统客户端执行的所有写操作首先会被记录到Edits文件中。Edits文件只进行追加操作,效率很高。每当元数据有更新或者增加元数据时,批改内存中的元数据并追加到Edits中。
- NameNode启动的时候都会将Fsimage文件读入内存,加载Edits外面的更新操作,保障内存中的元数据信息是最新的、同步的。
- 长时间增加数据到 Edits 中,会导致该文件数据过大,效率升高,而且一旦断电,复原元数据须要的工夫过长。因而2NN,专门用于FsImage和Edits的合并。
SecondaryNameNode 工作机制
NameNode启动后,会创立FsImage和Edits文件。如果不是第一次启动,则间接加载FsImage和Edits文件。
- fsimage_0000000000000000002 文件最新的FsImage。
- edits_inprogress_0000000000000000003 正在进行中的edits。
- seen_txid 是一个txt的文本文件,记录是最新的edits_inprogress文件开端的数字。
Secondary NameNode 工作
- 2NN询问NN是须要CheckPoint。
- NN将当初进行中的edits文件和最新的fsimage文件拷贝到2NN并更新seen_txid里的数字,而后从新生成edits文件。
- 2NN加载编辑日志和镜像文件到内存,并合并生成新的镜像文件fsimage.chkpoint。而后拷贝回NN。NN将fsimage.chkpoint重新命名为fsImage实现一次滚写。
HDFS总结
长处
1. 高容错性:数据主动保留多个正本,通过减少正本的模式,进步容错性。某一个正本失落后,能够主动复原。2. 适宜解决大数据:可能解决的数据规模达到GB,TB甚至PB级别,文件数量能够达到百万规模以上。3. 可构建在便宜的机器上:通过多正本机制,进步可靠性。
毛病
1. 不适宜低时延的数据拜访 2. 无奈高效的对大量小文件进行存储,大量的小文件会占用NameNode大量的内存来存储文件目录和块信息,同时小文件的寻址工夫会超过读取工夫,它违反了HDFS的设计指标 3. 不反对并发写入,文件随机批改。仅反对数据追加的。
DataNode与NameNode源代码导读
代码浏览前的筹备工作:Hadoop Rpc框架指南
Rpc 协定
public interface MyInterface { Object versionID = 1; boolean demo();}
Rpc provider
public class MyHadoopServer implements MyInterface { @Override public boolean demo() { return false; } public static void main(String[] args) { Server server = new RPC.Builder(new Configuration()) .setBindAddress("localhost") .setPort(8888) .setProtocol(MyInterface.class) .setInstance(new MyHadoopServer()) .build(); server.start(); }}
Rpc consuemr
public class MyHadoopClient { public static void main(String[] args) throws Exception { MyInterface client = RPC.getProxy( MyInterface.class, MyInterface.versionID, new InetSocketAddress("localhost", 8888), new Configuration()); client.demo(); } }
NameNode 启动源码
- 启动9870端口服务
- 加载镜像文件和编辑日志
- 初始化NN的RPC服务端:用于接管DataNode的RPC申请
- NN启动资源检测
NN对心跳超时判断(启动一个线程去判断DataNode是否超时)
- HDFS默认DataNode掉线容忍工夫未 timeout = 2 heartbeat.recheck.interval + 10 dfs.heartbeat.interval(2*5+30)超过这个工夫会被认为DataNode超时
DataNode 启动源码
工作流程
源码图示
MapReduce
MapReduce示例
- 需要:有一个大小为300M的存文本文件。统计其每一个字母呈现的总次数。要求:[a-p]一个后果文件,[q-z]一个后果文件。
实现:
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 循环写出 for (String word : words) { // 封装outk outK.set(word); // 写出 context.write(outK, outV); } }}
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 累加 for (IntWritable value : values) { sum += value.get(); } outV.set(sum); // 写出 context.write(key,outV); }}
切片规定决定MapTask的数量。
- MapReduce将数据的读取形象为一个InputFormat。罕用的FileInputFormat是针对文件读取的一个具体实现。
- FileInputFormat的默认分片规定为:将待处理的数据文件进行逻辑分片,每128M为一个数据切片。一个数据切片交给一个MapTask进行并行处理。
分片数量过大,开启过多的MapTask浪费资源。分片数量过小,MapTask阶段解决慢。
ReduceTask的数量须要手工指定。
- 在Map阶段须要对每一个输入数据会通过分区算法计算分区。
- 若ReduceTask=0,则示意没有Reduce阶段,输入文件个数和Map个数统一。
- 若ReduceTask=1,所有输入文件为一个。
- ReduceTask数量要大于分区的后果不同值的数,否则数据无奈被生产会产生异样。
- 若ReduceTask的数量大于分区数量,则会有局部reduceTask处于闲置状态。
MapReduce具体的工作流程。
Map阶段
- read阶段:通过RecordReader从InputFormat分片中将读取数据并将数据解析成一个个key/value。
- map阶段:用户自定义的Mapper.map办法执行。将输出的key/value转为输入的key/value。
collect阶段:接管输入Key/Value 并调用分区算法,将输入数据写入对应的分区的环形内存缓存区中。
- spill阶段:当内存缓存区的使用率超过肯定的阈值后,将触发溢写线程。该线程当初内存中进行疾速排序,而后将数据溢写到磁盘上。
- Combine阶段:当所有数据处理实现后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
Reduce阶段
- Copy阶段:ReduceTask从所有的MapTask中拷贝同一分区的数据,每一个ReduceTask负责解决一个分区,互不影响。如果文件大小超过肯定的阈值,则溢写到磁盘上,否则存储在内存中。
- Merge阶段:ReduceTask将所有雷同分区的材料合并成一个大文件,
- sort阶段:将合并后的大文件进行归并排序。因为mapTask自身保障了分区内区内有序,因而ReduceTask只须要对所有数据进行一次归并排序即可。
- reduce阶段:执行用户的reduce办法,并将后果写到HDFS。
MapReduce优缺点
长处
- 实现简略,封装度高。
- 扩展性强:能够疾速减少机器来扩大它的计算能力。
- 高容错性:当某个节点挂掉,它会主动将工作转移到领一个节点运行,两头不须要人工参加。
- 适宜PB级别以上海量数据的离线解决。
毛病
- 不善于实时计算。
- 不善于流式计算:MapReduce的输出数据集是动态的。
- 重IO:每个 MapReduce 作业的输入后果都会写入到磁盘,会造成大量的磁盘IO。
MapTask源代码导读
MapTask.run办法是MapTask的入口.
- 读取配置,初始化MapTask生成jobId
- 判断应用的api抉择应用runNewMapper或者runOldMapper,并执行。
- MapTask执行完结,做一些清理工作。
runNewMapper
- 实例化默认的inputFormat,计算切片。依据设置的reduceTask数量实例化输入对象。实例化输出对象。
mapper.run(mapperContext)执行
- 循环确认每组kv,执行用户的map逻辑。
- map办法中调用collector.collect办法。将数据写入环形缓冲区。
output.close(mapperContext)执行最终调用MapTask的close办法
调用MapTask的flush办法。
- sortAndSpill 内存排序,并溢写到文件。每一个分区一个临时文件。区内有序
- mergeParts 归并排序,将多个临时文件合并成一个文件。
- 调用MapTask的close办法,该办法是一个空办法。
ReduceTask源代码导读:其入口为RecuceTask的run办法。
- 首先初始化copy,sort,reduce的状态器。
- initialize初始化outputformat为TextOutputFormat。
- shuffleConsumerPlugin.init(shuffleContext)办法执行。初始化inMemoryMerger和onDiskMerger。
shuffleConsumerPlugin.run();
- 创立Fetcher抓取数据,数据抓取实现后,将状态切为sort
- merger.close();里的finanlMerge执行,将内存中的数据和磁盘内的数据合并。
将状态切为reduce。runNewReducer。用户自定义的Reduce办法执行。
用户自定义的reduce办法执行,并调用context.write办法写数据。最终调用TextOutputFormat的write办法。
- 先写key
- 再写value
- 最初写入一个换行
Yarn
Yarn组成
ResourceManager(RM):全局资源的管理者
- 由两局部组成:一个是可插拔式的调度Scheduler,一个是ApplicationManager
- Scheduler :一个存粹的调度器,不负责应用程序的监控
- ApplicationManager:次要负责接管job的提交申请,为利用调配第一个Container来运行ApplicationMaster,还有就是负责监控ApplicationMaster,在遇到失败时重启ApplicationMaster运行的Container
- NodeManager(NM)
- 接管ResourceManager的申请,调配Container给利用的某个工作
- 和ResourceManager替换信息以确保整个集群安稳运行。ResourceManager就是通过收集每个NodeManager的报告信息来追踪整个集群衰弱状态的,而NodeManager负责监控本身的衰弱状态。
- 治理每个Container的生命周期
- 治理每个节点上的日志
- 执行Yarn下面利用的一些额定的服务,比方MapReduce的shuffle过程
Container
- 是Yarn框架的计算单元,是具体执行利用task
- 是一组调配的系统资源内存,cpu,磁盘,网络等
- 每一个应用程序从ApplicationMaster开始,它自身就是一个container(第0个),一旦启动,ApplicationMaster就会依据工作需要与Resourcemanager协商更多的container,在运行过程中,能够动静开释和申请container。
ApplicationMaster(AM)
- ApplicationMaster负责与scheduler协商适合的container,跟踪应用程序的状态,以及监控它们的进度
- 每个应用程序都有本人的ApplicationMaster,负责与ResourceManager协商资源(container)和NodeManager协同工作来执行和监控工作
- 当一个ApplicationMaster启动后,会周期性的向resourcemanager发送心跳报告来确认其衰弱和所需的资源状况
Yarn执行过程
- 客户端程序向ResourceManager提交利用并申请一个ApplicationMaster实例,ResourceManager在应答中给出一个applicationID
- ResourceManager找到能够运行一个Container的NodeManager,并在这个Container中启动ApplicationMaster实例
- ApplicationMaster向ResourceManager进行注册,注册之后客户端就能够查问ResourceManager取得本人ApplicationMaster的详细信息
- 在平时的操作过程中,ApplicationMaster依据resource-request协定向ResourceManager发送resource-request申请,ResourceManager会依据调度策略尽可能最优的为ApplicationMaster调配container资源,作为资源申请的应答发给ApplicationMaster
- ApplicationMaster通过向NodeManager发送container-launch-specification信息来启动Container
- 应用程序的代码在启动的Container中运行,并把运行的进度、状态等信息通过application-specific协定发送给ApplicationMaster,随着作业的执行,ApplicationMaster将心跳和进度信息发给ResourceManager,在这些心跳信息中,ApplicationMaster还能够申请和开释一些container。
- 在利用程序运行期间,提交利用的客户端被动和ApplicationMaster交换取得利用的运行状态、进度更新等信息,交换的协定也是application-specific协定
Yarn调度器与调度算法
Yarn Scheduler 策略
FIFO:将所有的Applications放到队列中,先依照作业的优先级高下、再依照达到工夫的先后,为每个app分配资源
- 长处:简略,不须要配置
- 毛病:不适宜共享集群
Capacity Scheduler:用于一个集群中运行多个Application的状况,指标是最大化吞吐量和集群利用率
- CapacityScheduler容许将整个集群的资源分成多个局部,每个组织应用其中的一部分,即每个组织有一个专门的队列,每个组织的队列还能够进一步划分成层次结构(Hierarchical Queues),从而容许组织外部的不同用户组的应用。每一个队列指定能够应用的资源范畴.
- 每一个队列外部,依照FIFO的形式调度Application.当某个队列的资源闲暇时,能够将它的残余资源共享给其余队列.
Yarn源码
参考
- https://www.cnblogs.com/dan2/...
- https://www.cnblogs.com/dan2/...
- https://www.cnblogs.com/dan2/...
- https://www.bilibili.com/vide...