关于大数据:Hadoop原理与源码

39次阅读

共计 18091 个字符,预计需要花费 46 分钟才能阅读完成。

hadoop 概述

Hadoop 历史

  1. Hadoop 最早起源于 Nutch。Nutch 的设计指标是一个网络爬虫引擎,但随着抓取网页数据量的增大,Nutch 遇到了重大的性能扩大问题。2003 年,2004 年谷歌公布两篇论文为该问题提供了一个解决方案一个是 HDFS 的前身 GFS,用于海量网页的存储;另一个是分布式计算框架 MAPERDUCE。Nutch 的创始人依据论文的领导用了·两年的工夫实现了 HDFS 和 MapReduce 代码。并将其从 Nutch 剥离进去,成为独立我的项目 Hadoop。2008 年 Hadoop 成为 Apache 顶级我的项目。
  2. 晚期的 Hadoop 并非当初大家所相熟的 Hadoop 分布式开源软件,而是指代大数据的一个生态圈,包含很多其余的软件。在 2010 前后,Hbase,HIVE,Zookeeper 等顺次脱离 Hadoop 我的项目成为 Apache 的顶级我的项目。2011 年,Hadoop 公布 2.0,在架构上进行了重大更新,引入 Yarn 框架专一于资源的治理精简了 MapReduce 的职责。同时 Yarn 框架作为一个通用的资源调度和治理模块,同时反对多种其余的编程模型,比方最闻名的 Spark。
  3. 因为 HADOOP 的版本治理简单,简单集群的部署装置配置等须要编写大量的配置文件而后散发到每一台节点上,容易出错效率低下。所以很多公司会在根底的 Hadoop 进行商业化后进行再发行。目前 Hadoop 发行版十分多,有华为发行版、Intel 发行版、Cloudera 发行版(CDH)等。

![image](# hadoop 概述

Hadoop 历史

  1. Hadoop 最早起源于 Nutch。Nutch 的设计指标是一个网络爬虫引擎,但随着抓取网页数据量的增大,Nutch 遇到了重大的性能扩大问题。2003 年,2004 年谷歌公布两篇论文为该问题提供了一个解决方案一个是 HDFS 的前身 GFS,用于海量网页的存储;另一个是分布式计算框架 MAPERDUCE。Nutch 的创始人依据论文的领导用了·两年的工夫实现了 HDFS 和 MapReduce 代码。并将其从 Nutch 剥离进去,成为独立我的项目 Hadoop。2008 年 Hadoop 成为 Apache 顶级我的项目。
  2. 晚期的 Hadoop 并非当初大家所相熟的 Hadoop 分布式开源软件,而是指代大数据的一个生态圈,包含很多其余的软件。在 2010 前后,Hbase,HIVE,Zookeeper 等顺次脱离 Hadoop 我的项目成为 Apache 的顶级我的项目。2011 年,Hadoop 公布 2.0,在架构上进行了重大更新,引入 Yarn 框架专一于资源的治理精简了 MapReduce 的职责。同时 Yarn 框架作为一个通用的资源调度和治理模块,同时反对多种其余的编程模型,比方最闻名的 Spark。
  3. 因为 HADOOP 的版本治理简单,简单集群的部署装置配置等须要编写大量的配置文件而后散发到每一台节点上,容易出错效率低下。所以很多公司会在根底的 Hadoop 进行商业化后进行再发行。目前 Hadoop 发行版十分多,有华为发行版、Intel 发行版、Cloudera 发行版(CDH)等。

Hadoop(2.0)的组成

HDFS

HDFS 的组成

NameNode:是整个 HDFS 集群的管理者,

  1. 治理 HDFS 的名称空间
  2. 治理正本策略
  3. 治理数据块在 DataNode 的地位映射信息
  4. 与客户端交互解决来自于客户端的读写操作。

DataNode:理论文件的存储者。

  1. 存储理论的数据块
  2. 执行数据块的读、写操作
  3. DataNode 启动后向 NameNode 注册并每 6 小时向 NameNode 上报所有的块信息
  4. DataNode 的心跳是每 3 秒一次,心跳返回后果带有 NameNode 给该 DataNode 的命令,例如:复制数据到另一台机器;删除某个数据块。
  5. 若 NameNode 超过 10 分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。

客户端

  1. HDFS 提供的工具包,面向开发者,封装了对 HDFS 的操作调用。
  2. 负责文件切分:文件上传到 HDFS 时,Client 负责与 NameNode 和 DataNode 交互,将文件切分为一个一个的 block 进行上传。

Secondary NameNode:辅助 NameNode,分担其工作量。

  1. 定期合并 Fsimage 和 Edits,并推送给 NameNode。
  2. 辅助复原 NameNode。
  3. 并非 NameNode 的热备。当 NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务

HDFS 读写流程

HDFS 上传文件

  1. client 向 NameNode 申请上传文件,NameNode 进行合规性检测,并创立相应的目录元数据。并返回是否能够上传。
  2. client 将文件切分,再次询问 NameNode 第一个 Block 须要上传到哪几个 DataNode 服务端上。NameNode 返回 3 个 DataNode 节点,别离为 dn1、dn2、dn3。
  3. client 将第一个 block 数据上传给 dn1,dn1 收到申请会持续调用 dn2,而后 dn2 调用 dn3,将这个通信管道建设实现。
  4. client 开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3。
  5. 当一个 Block 传输实现之后,客户端再次申请 NameNode 上传第二个 Block,而后反复 1~4 步骤。

NameNode 节点抉择策略 - 节点间隔计算

  1. 如果 Client 与 HADOOP 在同一个集群,则 NameNode 会抉择间隔待上传数据最近间隔的 DataNode 接收数据。节点间隔: 两个节点达到最近的独特先人的间隔总和。
  2. 如果 Client 与 HADOOP 不在同一个集群,则 NameNode 随机选一个机架上的一个节点。第二个正本在另一个机架的随机的一个节点,第三个正本在第二个正本所在的机架的随机节点。

HDFS 读流程

  1. client 向 NameNode 申请下载文件,NameNode 返回文件块所在的 DataNode 地址。
  2. client 依据节点间隔筛选一台最近的 DataNode 服务器,而后开始读取数据。DataNode 以 Packet 来传输数据。
  3. client 接管到一个 packet 数据后,并进行校验。校验通过后,将 packet 写入指标文件,再申请第二个 packet。整个过程为串行的过程。(因为 IO 自身就是速度最慢的流程)
  4. 读取数据的过程中,如果 client 端与 dn 数据结点通信时呈现谬误,则尝试连贯蕴含此数据块的下一个 dn 数据结点。失败的 dn 数据结点将被 client 记录,并且当前不再连贯。

SecondaryNameNode

NameNode 是机器的文件治理容易造成单点读写性能问题与数据存储平安问题。

SecondaryNameNode 与 Name 帮助解决读写性能问题:NameNode 的数据既存储在内存中又存储在磁盘中

  1. Fsimage 文件:HDFS 文件系统元数据的一个永久性的检查点,其中蕴含 HDFS 文件系统的所有目录和文件 inode 的序列化信息。
  2. Edits 文件: 寄存 HDFS 文件系统的所有更新操作的门路,文件系统客户端执行的所有写操作首先会被记录到 Edits 文件中。Edits 文件只进行追加操作,效率很高。每当元数据有更新或者增加元数据时,批改内存中的元数据并追加到 Edits 中。
  3. NameNode 启动的时候都会将 Fsimage 文件读入内存,加载 Edits 外面的更新操作,保障内存中的元数据信息是最新的、同步的。
  4. 长时间增加数据到 Edits 中,会导致该文件数据过大,效率升高,而且一旦断电,复原元数据须要的工夫过长。因而 2NN,专门用于 FsImage 和 Edits 的合并。

SecondaryNameNode 工作机制

  1. NameNode 启动后,会创立 FsImage 和 Edits 文件。如果不是第一次启动,则间接加载 FsImage 和 Edits 文件。

    1. fsimage_0000000000000000002 文件最新的 FsImage。
    2. edits_inprogress_0000000000000000003 正在进行中的 edits。
    3. seen_txid 是一个 txt 的文本文件,记录是最新的 edits_inprogress 文件开端的数字。
  2. Secondary NameNode 工作

    1. 2NN 询问 NN 是须要 CheckPoint。
    2. NN 将当初进行中的 edits 文件和最新的 fsimage 文件拷贝到 2NN 并更新 seen_txid 里的数字,而后从新生成 edits 文件。
    3. 2NN 加载编辑日志和镜像文件到内存,并合并生成新的镜像文件 fsimage.chkpoint。而后拷贝回 NN。NN 将 fsimage.chkpoint 重新命名为 fsImage 实现一次滚写。

HDFS 总结

  1. 长处

    1. 高容错性:数据主动保留多个正本,通过减少正本的模式,进步容错性。某一个正本失落后,能够主动复原。2. 适宜解决大数据:可能解决的数据规模达到 GB,TB 甚至 PB 级别,文件数量能够达到百万规模以上。3. 可构建在便宜的机器上:通过多正本机制,进步可靠性。
  1. 毛病

     1. 不适宜低时延的数据拜访
     2. 无奈高效的对大量小文件进行存储,大量的小文件会占用 NameNode 大量的内存来存储文件目录和块信息,同时小文件的寻址工夫会超过读取工夫,它违反了 HDFS 的设计指标
     3. 不反对并发写入,文件随机批改。仅反对数据追加的。

DataNode 与 NameNode 源代码导读

代码浏览前的筹备工作:Hadoop Rpc 框架指南

Rpc 协定


public interface MyInterface {
    Object versionID = 1;

    boolean demo();}

Rpc provider


public class MyHadoopServer implements MyInterface {
    @Override
    public boolean demo() {return false;}

    public static void main(String[] args) {Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(8888)
                .setProtocol(MyInterface.class)
                .setInstance(new MyHadoopServer())
                .build();

        server.start();}
}

Rpc consuemr

public class MyHadoopClient {public static void main(String[] args) throws Exception {
        MyInterface client = RPC.getProxy(
                MyInterface.class,
                MyInterface.versionID,
                new InetSocketAddress("localhost", 8888),
                new Configuration());
        
        client.demo();}
    
}

NameNode 启动源码

  1. 启动 9870 端口服务
  2. 加载镜像文件和编辑日志 
  3. 初始化 NN 的 RPC 服务端:用于接管 DataNode 的 RPC 申请
  4. NN 启动资源检测
  5. NN 对心跳超时判断(启动一个线程去判断 DataNode 是否超时)

    1. HDFS 默认 DataNode 掉线容忍工夫未 timeout = 2 heartbeat.recheck.interval + 10 dfs.heartbeat.interval(2*5+30)超过这个工夫会被认为 DataNode 超时

DataNode 启动源码

工作流程

源码图示

MapReduce

MapReduce 示例

  • 需要:有一个大小为 300M 的存文本文件。统计其每一个字母呈现的总次数。要求:[a-p] 一个后果文件,[q-z] 一个后果文件。
  • 实现:

    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private Text outK = new Text();
      private IntWritable outV = new IntWritable(1);
    
      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
          // 1 获取一行
          String line = value.toString();
    
          // 2 切割
          String[] words = line.split(" ");
    
          // 3 循环写出
          for (String word : words) {
              // 封装 outk
              outK.set(word);
    
              // 写出
              context.write(outK, outV);
          }
      }
    }
    
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        
        // 累加
        for (IntWritable value : values) {sum += value.get();
        }

        outV.set(sum);

        // 写出
        context.write(key,outV);
    }
}

切片规定决定 MapTask 的数量。

  1. MapReduce 将数据的读取形象为一个 InputFormat。罕用的 FileInputFormat 是针对文件读取的一个具体实现。
  2. FileInputFormat 的默认分片规定为:将待处理的数据文件进行逻辑分片,每 128M 为一个数据切片。一个数据切片交给一个 MapTask 进行并行处理。
  3. 分片数量过大,开启过多的 MapTask 浪费资源。分片数量过小,MapTask 阶段解决慢。

    ReduceTask 的数量须要手工指定。

  4. 在 Map 阶段须要对每一个输入数据会通过分区算法计算分区。
  5. 若 ReduceTask=0,则示意没有 Reduce 阶段,输入文件个数和 Map 个数统一。
  6. 若 ReduceTask=1,所有输入文件为一个。
  7. ReduceTask 数量要大于分区的后果不同值的数,否则数据无奈被生产会产生异样。
  8. 若 ReduceTask 的数量大于分区数量,则会有局部 reduceTask 处于闲置状态。

MapReduce 具体的工作流程。

Map 阶段

  • read 阶段:通过 RecordReader 从 InputFormat 分片中将读取数据并将数据解析成一个个 key/value。
  • map 阶段:用户自定义的 Mapper.map 办法执行。将输出的 key/value 转为输入的 key/value。
  • collect 阶段:接管输入 Key/Value 并调用分区算法,将输入数据写入对应的分区的环形内存缓存区中。

    • spill 阶段:当内存缓存区的使用率超过肯定的阈值后,将触发溢写线程。该线程当初内存中进行疾速排序,而后将数据溢写到磁盘上。
  • Combine 阶段:当所有数据处理实现后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

Reduce 阶段

  • Copy 阶段:ReduceTask 从所有的 MapTask 中拷贝同一分区的数据,每一个 ReduceTask 负责解决一个分区,互不影响。如果文件大小超过肯定的阈值,则溢写到磁盘上,否则存储在内存中。
  • Merge 阶段:ReduceTask 将所有雷同分区的材料合并成一个大文件,
  • sort 阶段:将合并后的大文件进行归并排序。因为 mapTask 自身保障了分区内区内有序,因而 ReduceTask 只须要对所有数据进行一次归并排序即可。
  • reduce 阶段:执行用户的 reduce 办法,并将后果写到 HDFS。

MapReduce 优缺点

长处

  1. 实现简略,封装度高。
  2. 扩展性强:能够疾速减少机器来扩大它的计算能力。
  3. 高容错性:当某个节点挂掉,它会主动将工作转移到领一个节点运行,两头不须要人工参加。
  4. 适宜 PB 级别以上海量数据的离线解决。

毛病

  1. 不善于实时计算。
  2. 不善于流式计算:MapReduce 的输出数据集是动态的。
  3. 重 IO:每个 MapReduce 作业的输入后果都会写入到磁盘,会造成大量的磁盘 IO。

MapTask 源代码导读

  1. MapTask.run 办法是 MapTask 的入口.

    1. 读取配置,初始化 MapTask 生成 jobId
    2. 判断应用的 api 抉择应用 runNewMapper 或者 runOldMapper,并执行。
    3. MapTask 执行完结,做一些清理工作。
  2. runNewMapper

    1. 实例化默认的 inputFormat,计算切片。依据设置的 reduceTask 数量实例化输入对象。实例化输出对象。
    2. mapper.run(mapperContext) 执行

      1. 循环确认每组 kv,执行用户的 map 逻辑。
      2. map 办法中调用 collector.collect 办法。将数据写入环形缓冲区。
    3. output.close(mapperContext) 执行最终调用 MapTask 的 close 办法

      1. 调用 MapTask 的 flush 办法。

        1. sortAndSpill 内存排序,并溢写到文件。每一个分区一个临时文件。区内有序
        2. mergeParts 归并排序,将多个临时文件合并成一个文件。
      2. 调用 MapTask 的 close 办法,该办法是一个空办法。

ReduceTask 源代码导读: 其入口为 RecuceTask 的 run 办法。

  1. 首先初始化 copy,sort,reduce 的状态器。
  2. initialize 初始化 outputformat 为 TextOutputFormat。
  3. shuffleConsumerPlugin.init(shuffleContext) 办法执行。初始化 inMemoryMerger 和 onDiskMerger。
  4. shuffleConsumerPlugin.run();

    1. 创立 Fetcher 抓取数据,数据抓取实现后,将状态切为 sort
    2. merger.close(); 里的 finanlMerge 执行,将内存中的数据和磁盘内的数据合并。
  5. 将状态切为 reduce。runNewReducer。用户自定义的 Reduce 办法执行。

    1. 用户自定义的 reduce 办法执行,并调用 context.write 办法写数据。最终调用 TextOutputFormat 的 write 办法。

      1. 先写 key
      2. 再写 value
      3. 最初写入一个换行

Yarn

Yarn 组成

  1. ResourceManager(RM): 全局资源的管理者

    1. 由两局部组成: 一个是可插拔式的调度 Scheduler,一个是 ApplicationManager
    2. Scheduler : 一个存粹的调度器, 不负责应用程序的监控
    3. ApplicationManager: 次要负责接管 job 的提交申请,为利用调配第一个 Container 来运行 ApplicationMaster,还有就是负责监控 ApplicationMaster,在遇到失败时重启 ApplicationMaster 运行的 Container
    4. NodeManager(NM)
    5. 接管 ResourceManager 的申请,调配 Container 给利用的某个工作
    6. 和 ResourceManager 替换信息以确保整个集群安稳运行。ResourceManager 就是通过收集每个 NodeManager 的报告信息来追踪整个集群衰弱状态的,而 NodeManager 负责监控本身的衰弱状态。
    7. 治理每个 Container 的生命周期
    8. 治理每个节点上的日志
    9. 执行 Yarn 下面利用的一些额定的服务,比方 MapReduce 的 shuffle 过程
    10. Container

      1. 是 Yarn 框架的计算单元,是具体执行利用 task
      2. 是一组调配的系统资源内存,cpu, 磁盘, 网络等
      3. 每一个应用程序从 ApplicationMaster 开始,它自身就是一个 container(第 0 个),一旦启动,ApplicationMaster 就会依据工作需要与 Resourcemanager 协商更多的 container,在运行过程中,能够动静开释和申请 container。
    11. ApplicationMaster(AM)

      1. ApplicationMaster 负责与 scheduler 协商适合的 container,跟踪应用程序的状态,以及监控它们的进度
      2. 每个应用程序都有本人的 ApplicationMaster,负责与 ResourceManager 协商资源(container)和 NodeManager 协同工作来执行和监控工作
      3.  当一个 ApplicationMaster 启动后,会周期性的向 resourcemanager 发送心跳报告来确认其衰弱和所需的资源状况

Yarn 执行过程

  1. 客户端程序向 ResourceManager 提交利用并申请一个 ApplicationMaster 实例,ResourceManager 在应答中给出一个 applicationID
  2. ResourceManager 找到能够运行一个 Container 的 NodeManager,并在这个 Container 中启动 ApplicationMaster 实例
  3. ApplicationMaster 向 ResourceManager 进行注册,注册之后客户端就能够查问 ResourceManager 取得本人 ApplicationMaster 的详细信息
  4. 在平时的操作过程中,ApplicationMaster 依据 resource-request 协定向 ResourceManager 发送 resource-request 申请,ResourceManager 会依据调度策略尽可能最优的为 ApplicationMaster 调配 container 资源,作为资源申请的应答发给 ApplicationMaster
  5. ApplicationMaster 通过向 NodeManager 发送 container-launch-specification 信息来启动 Container
  6. 应用程序的代码在启动的 Container 中运行,并把运行的进度、状态等信息通过 application-specific 协定发送给 ApplicationMaster,随着作业的执行,ApplicationMaster 将心跳和进度信息发给 ResourceManager,在这些心跳信息中,ApplicationMaster 还能够申请和开释一些 container。
  7. 在利用程序运行期间,提交利用的客户端被动和 ApplicationMaster 交换取得利用的运行状态、进度更新等信息,交换的协定也是 application-specific 协定

Yarn 调度器与调度算法

Yarn Scheduler 策略

  1. FIFO: 将所有的 Applications 放到队列中,先依照作业的优先级高下、再依照达到工夫的先后,为每个 app 分配资源

    1. 长处: 简略, 不须要配置
    2. 毛病: 不适宜共享集群
  2. Capacity Scheduler: 用于一个集群中运行多个 Application 的状况, 指标是最大化吞吐量和集群利用率

    1. CapacityScheduler 容许将整个集群的资源分成多个局部,每个组织应用其中的一部分,即每个组织有一个专门的队列,每个组织的队列还能够进一步划分成层次结构(Hierarchical Queues),从而容许组织外部的不同用户组的应用。每一个队列指定能够应用的资源范畴.
    2. 每一个队列外部, 依照 FIFO 的形式调度 Application. 当某个队列的资源闲暇时, 能够将它的残余资源共享给其余队列.

Yarn 源码

参考

  • https://www.cnblogs.com/dan2/…
  • https://www.cnblogs.com/dan2/…
  • https://www.cnblogs.com/dan2/…
  • https://www.bilibili.com/vide…)

Hadoop(2.0)的组成

HDFS

HDFS 的组成

NameNode:是整个 HDFS 集群的管理者,

  1. 治理 HDFS 的名称空间
  2. 治理正本策略
  3. 治理数据块在 DataNode 的地位映射信息
  4. 与客户端交互解决来自于客户端的读写操作。

DataNode:理论文件的存储者。

  1. 存储理论的数据块
  2. 执行数据块的读、写操作
  3. DataNode 启动后向 NameNode 注册并每 6 小时向 NameNode 上报所有的块信息
  4. DataNode 的心跳是每 3 秒一次,心跳返回后果带有 NameNode 给该 DataNode 的命令,例如:复制数据到另一台机器;删除某个数据块。
  5. 若 NameNode 超过 10 分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。

客户端

  1. HDFS 提供的工具包,面向开发者,封装了对 HDFS 的操作调用。
  2. 负责文件切分:文件上传到 HDFS 时,Client 负责与 NameNode 和 DataNode 交互,将文件切分为一个一个的 block 进行上传。

Secondary NameNode:辅助 NameNode,分担其工作量。

  1. 定期合并 Fsimage 和 Edits,并推送给 NameNode。
  2. 辅助复原 NameNode。
  3. 并非 NameNode 的热备。当 NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务

HDFS 读写流程

HDFS 上传文件

  1. client 向 NameNode 申请上传文件,NameNode 进行合规性检测,并创立相应的目录元数据。并返回是否能够上传。
  2. client 将文件切分,再次询问 NameNode 第一个 Block 须要上传到哪几个 DataNode 服务端上。NameNode 返回 3 个 DataNode 节点,别离为 dn1、dn2、dn3。
  3. client 将第一个 block 数据上传给 dn1,dn1 收到申请会持续调用 dn2,而后 dn2 调用 dn3,将这个通信管道建设实现。
  4. client 开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3。
  5. 当一个 Block 传输实现之后,客户端再次申请 NameNode 上传第二个 Block,而后反复 1~4 步骤。

NameNode 节点抉择策略 - 节点间隔计算

  1. 如果 Client 与 HADOOP 在同一个集群,则 NameNode 会抉择间隔待上传数据最近间隔的 DataNode 接收数据。节点间隔: 两个节点达到最近的独特先人的间隔总和。
  2. 如果 Client 与 HADOOP 不在同一个集群,则 NameNode 随机选一个机架上的一个节点。第二个正本在另一个机架的随机的一个节点,第三个正本在第二个正本所在的机架的随机节点。

HDFS 读流程

  1. client 向 NameNode 申请下载文件,NameNode 返回文件块所在的 DataNode 地址。
  2. client 依据节点间隔筛选一台最近的 DataNode 服务器,而后开始读取数据。DataNode 以 Packet 来传输数据。
  3. client 接管到一个 packet 数据后,并进行校验。校验通过后,将 packet 写入指标文件,再申请第二个 packet。整个过程为串行的过程。(因为 IO 自身就是速度最慢的流程)
  4. 读取数据的过程中,如果 client 端与 dn 数据结点通信时呈现谬误,则尝试连贯蕴含此数据块的下一个 dn 数据结点。失败的 dn 数据结点将被 client 记录,并且当前不再连贯。

SecondaryNameNode

NameNode 是机器的文件治理容易造成单点读写性能问题与数据存储平安问题。

SecondaryNameNode 与 Name 帮助解决读写性能问题:NameNode 的数据既存储在内存中又存储在磁盘中

  1. Fsimage 文件:HDFS 文件系统元数据的一个永久性的检查点,其中蕴含 HDFS 文件系统的所有目录和文件 inode 的序列化信息。
  2. Edits 文件: 寄存 HDFS 文件系统的所有更新操作的门路,文件系统客户端执行的所有写操作首先会被记录到 Edits 文件中。Edits 文件只进行追加操作,效率很高。每当元数据有更新或者增加元数据时,批改内存中的元数据并追加到 Edits 中。
  3. NameNode 启动的时候都会将 Fsimage 文件读入内存,加载 Edits 外面的更新操作,保障内存中的元数据信息是最新的、同步的。
  4. 长时间增加数据到 Edits 中,会导致该文件数据过大,效率升高,而且一旦断电,复原元数据须要的工夫过长。因而 2NN,专门用于 FsImage 和 Edits 的合并。

SecondaryNameNode 工作机制

  1. NameNode 启动后,会创立 FsImage 和 Edits 文件。如果不是第一次启动,则间接加载 FsImage 和 Edits 文件。

    1. fsimage_0000000000000000002 文件最新的 FsImage。
    2. edits_inprogress_0000000000000000003 正在进行中的 edits。
    3. seen_txid 是一个 txt 的文本文件,记录是最新的 edits_inprogress 文件开端的数字。
  2. Secondary NameNode 工作

    1. 2NN 询问 NN 是须要 CheckPoint。
    2. NN 将当初进行中的 edits 文件和最新的 fsimage 文件拷贝到 2NN 并更新 seen_txid 里的数字,而后从新生成 edits 文件。
    3. 2NN 加载编辑日志和镜像文件到内存,并合并生成新的镜像文件 fsimage.chkpoint。而后拷贝回 NN。NN 将 fsimage.chkpoint 重新命名为 fsImage 实现一次滚写。

HDFS 总结

  1. 长处

    1. 高容错性:数据主动保留多个正本,通过减少正本的模式,进步容错性。某一个正本失落后,能够主动复原。2. 适宜解决大数据:可能解决的数据规模达到 GB,TB 甚至 PB 级别,文件数量能够达到百万规模以上。3. 可构建在便宜的机器上:通过多正本机制,进步可靠性。
  1. 毛病

     1. 不适宜低时延的数据拜访
     2. 无奈高效的对大量小文件进行存储,大量的小文件会占用 NameNode 大量的内存来存储文件目录和块信息,同时小文件的寻址工夫会超过读取工夫,它违反了 HDFS 的设计指标
     3. 不反对并发写入,文件随机批改。仅反对数据追加的。

DataNode 与 NameNode 源代码导读

代码浏览前的筹备工作:Hadoop Rpc 框架指南

Rpc 协定


public interface MyInterface {
    Object versionID = 1;

    boolean demo();}

Rpc provider


public class MyHadoopServer implements MyInterface {
    @Override
    public boolean demo() {return false;}

    public static void main(String[] args) {Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(8888)
                .setProtocol(MyInterface.class)
                .setInstance(new MyHadoopServer())
                .build();

        server.start();}
}

Rpc consuemr

public class MyHadoopClient {public static void main(String[] args) throws Exception {
        MyInterface client = RPC.getProxy(
                MyInterface.class,
                MyInterface.versionID,
                new InetSocketAddress("localhost", 8888),
                new Configuration());
        
        client.demo();}
    
}

NameNode 启动源码

  1. 启动 9870 端口服务
  2. 加载镜像文件和编辑日志 
  3. 初始化 NN 的 RPC 服务端:用于接管 DataNode 的 RPC 申请
  4. NN 启动资源检测
  5. NN 对心跳超时判断(启动一个线程去判断 DataNode 是否超时)

    1. HDFS 默认 DataNode 掉线容忍工夫未 timeout = 2 heartbeat.recheck.interval + 10 dfs.heartbeat.interval(2*5+30)超过这个工夫会被认为 DataNode 超时

DataNode 启动源码

工作流程

源码图示

MapReduce

MapReduce 示例

  • 需要:有一个大小为 300M 的存文本文件。统计其每一个字母呈现的总次数。要求:[a-p] 一个后果文件,[q-z] 一个后果文件。
  • 实现:

    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private Text outK = new Text();
      private IntWritable outV = new IntWritable(1);
    
      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
          // 1 获取一行
          String line = value.toString();
    
          // 2 切割
          String[] words = line.split(" ");
    
          // 3 循环写出
          for (String word : words) {
              // 封装 outk
              outK.set(word);
    
              // 写出
              context.write(outK, outV);
          }
      }
    }
    
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        
        // 累加
        for (IntWritable value : values) {sum += value.get();
        }

        outV.set(sum);

        // 写出
        context.write(key,outV);
    }
}

切片规定决定 MapTask 的数量。

  1. MapReduce 将数据的读取形象为一个 InputFormat。罕用的 FileInputFormat 是针对文件读取的一个具体实现。
  2. FileInputFormat 的默认分片规定为:将待处理的数据文件进行逻辑分片,每 128M 为一个数据切片。一个数据切片交给一个 MapTask 进行并行处理。
  3. 分片数量过大,开启过多的 MapTask 浪费资源。分片数量过小,MapTask 阶段解决慢。

    ReduceTask 的数量须要手工指定。

  4. 在 Map 阶段须要对每一个输入数据会通过分区算法计算分区。
  5. 若 ReduceTask=0,则示意没有 Reduce 阶段,输入文件个数和 Map 个数统一。
  6. 若 ReduceTask=1,所有输入文件为一个。
  7. ReduceTask 数量要大于分区的后果不同值的数,否则数据无奈被生产会产生异样。
  8. 若 ReduceTask 的数量大于分区数量,则会有局部 reduceTask 处于闲置状态。

MapReduce 具体的工作流程。

Map 阶段

  • read 阶段:通过 RecordReader 从 InputFormat 分片中将读取数据并将数据解析成一个个 key/value。
  • map 阶段:用户自定义的 Mapper.map 办法执行。将输出的 key/value 转为输入的 key/value。
  • collect 阶段:接管输入 Key/Value 并调用分区算法,将输入数据写入对应的分区的环形内存缓存区中。

    • spill 阶段:当内存缓存区的使用率超过肯定的阈值后,将触发溢写线程。该线程当初内存中进行疾速排序,而后将数据溢写到磁盘上。
  • Combine 阶段:当所有数据处理实现后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

Reduce 阶段

  • Copy 阶段:ReduceTask 从所有的 MapTask 中拷贝同一分区的数据,每一个 ReduceTask 负责解决一个分区,互不影响。如果文件大小超过肯定的阈值,则溢写到磁盘上,否则存储在内存中。
  • Merge 阶段:ReduceTask 将所有雷同分区的材料合并成一个大文件,
  • sort 阶段:将合并后的大文件进行归并排序。因为 mapTask 自身保障了分区内区内有序,因而 ReduceTask 只须要对所有数据进行一次归并排序即可。
  • reduce 阶段:执行用户的 reduce 办法,并将后果写到 HDFS。

MapReduce 优缺点

长处

  1. 实现简略,封装度高。
  2. 扩展性强:能够疾速减少机器来扩大它的计算能力。
  3. 高容错性:当某个节点挂掉,它会主动将工作转移到领一个节点运行,两头不须要人工参加。
  4. 适宜 PB 级别以上海量数据的离线解决。

毛病

  1. 不善于实时计算。
  2. 不善于流式计算:MapReduce 的输出数据集是动态的。
  3. 重 IO:每个 MapReduce 作业的输入后果都会写入到磁盘,会造成大量的磁盘 IO。

MapTask 源代码导读

  1. MapTask.run 办法是 MapTask 的入口.

    1. 读取配置,初始化 MapTask 生成 jobId
    2. 判断应用的 api 抉择应用 runNewMapper 或者 runOldMapper,并执行。
    3. MapTask 执行完结,做一些清理工作。
  2. runNewMapper

    1. 实例化默认的 inputFormat,计算切片。依据设置的 reduceTask 数量实例化输入对象。实例化输出对象。
    2. mapper.run(mapperContext) 执行

      1. 循环确认每组 kv,执行用户的 map 逻辑。
      2. map 办法中调用 collector.collect 办法。将数据写入环形缓冲区。
    3. output.close(mapperContext) 执行最终调用 MapTask 的 close 办法

      1. 调用 MapTask 的 flush 办法。

        1. sortAndSpill 内存排序,并溢写到文件。每一个分区一个临时文件。区内有序
        2. mergeParts 归并排序,将多个临时文件合并成一个文件。
      2. 调用 MapTask 的 close 办法,该办法是一个空办法。

ReduceTask 源代码导读: 其入口为 RecuceTask 的 run 办法。

  1. 首先初始化 copy,sort,reduce 的状态器。
  2. initialize 初始化 outputformat 为 TextOutputFormat。
  3. shuffleConsumerPlugin.init(shuffleContext) 办法执行。初始化 inMemoryMerger 和 onDiskMerger。
  4. shuffleConsumerPlugin.run();

    1. 创立 Fetcher 抓取数据,数据抓取实现后,将状态切为 sort
    2. merger.close(); 里的 finanlMerge 执行,将内存中的数据和磁盘内的数据合并。
  5. 将状态切为 reduce。runNewReducer。用户自定义的 Reduce 办法执行。

    1. 用户自定义的 reduce 办法执行,并调用 context.write 办法写数据。最终调用 TextOutputFormat 的 write 办法。

      1. 先写 key
      2. 再写 value
      3. 最初写入一个换行

Yarn

Yarn 组成

  1. ResourceManager(RM): 全局资源的管理者

    1. 由两局部组成: 一个是可插拔式的调度 Scheduler,一个是 ApplicationManager
    2. Scheduler : 一个存粹的调度器, 不负责应用程序的监控
    3. ApplicationManager: 次要负责接管 job 的提交申请,为利用调配第一个 Container 来运行 ApplicationMaster,还有就是负责监控 ApplicationMaster,在遇到失败时重启 ApplicationMaster 运行的 Container
    4. NodeManager(NM)
    5. 接管 ResourceManager 的申请,调配 Container 给利用的某个工作
    6. 和 ResourceManager 替换信息以确保整个集群安稳运行。ResourceManager 就是通过收集每个 NodeManager 的报告信息来追踪整个集群衰弱状态的,而 NodeManager 负责监控本身的衰弱状态。
    7. 治理每个 Container 的生命周期
    8. 治理每个节点上的日志
    9. 执行 Yarn 下面利用的一些额定的服务,比方 MapReduce 的 shuffle 过程
    10. Container

      1. 是 Yarn 框架的计算单元,是具体执行利用 task
      2. 是一组调配的系统资源内存,cpu, 磁盘, 网络等
      3. 每一个应用程序从 ApplicationMaster 开始,它自身就是一个 container(第 0 个),一旦启动,ApplicationMaster 就会依据工作需要与 Resourcemanager 协商更多的 container,在运行过程中,能够动静开释和申请 container。
    11. ApplicationMaster(AM)

      1. ApplicationMaster 负责与 scheduler 协商适合的 container,跟踪应用程序的状态,以及监控它们的进度
      2. 每个应用程序都有本人的 ApplicationMaster,负责与 ResourceManager 协商资源(container)和 NodeManager 协同工作来执行和监控工作
      3.  当一个 ApplicationMaster 启动后,会周期性的向 resourcemanager 发送心跳报告来确认其衰弱和所需的资源状况

Yarn 执行过程

  1. 客户端程序向 ResourceManager 提交利用并申请一个 ApplicationMaster 实例,ResourceManager 在应答中给出一个 applicationID
  2. ResourceManager 找到能够运行一个 Container 的 NodeManager,并在这个 Container 中启动 ApplicationMaster 实例
  3. ApplicationMaster 向 ResourceManager 进行注册,注册之后客户端就能够查问 ResourceManager 取得本人 ApplicationMaster 的详细信息
  4. 在平时的操作过程中,ApplicationMaster 依据 resource-request 协定向 ResourceManager 发送 resource-request 申请,ResourceManager 会依据调度策略尽可能最优的为 ApplicationMaster 调配 container 资源,作为资源申请的应答发给 ApplicationMaster
  5. ApplicationMaster 通过向 NodeManager 发送 container-launch-specification 信息来启动 Container
  6. 应用程序的代码在启动的 Container 中运行,并把运行的进度、状态等信息通过 application-specific 协定发送给 ApplicationMaster,随着作业的执行,ApplicationMaster 将心跳和进度信息发给 ResourceManager,在这些心跳信息中,ApplicationMaster 还能够申请和开释一些 container。
  7. 在利用程序运行期间,提交利用的客户端被动和 ApplicationMaster 交换取得利用的运行状态、进度更新等信息,交换的协定也是 application-specific 协定

Yarn 调度器与调度算法

Yarn Scheduler 策略

  1. FIFO: 将所有的 Applications 放到队列中,先依照作业的优先级高下、再依照达到工夫的先后,为每个 app 分配资源

    1. 长处: 简略, 不须要配置
    2. 毛病: 不适宜共享集群
  2. Capacity Scheduler: 用于一个集群中运行多个 Application 的状况, 指标是最大化吞吐量和集群利用率

    1. CapacityScheduler 容许将整个集群的资源分成多个局部,每个组织应用其中的一部分,即每个组织有一个专门的队列,每个组织的队列还能够进一步划分成层次结构(Hierarchical Queues),从而容许组织外部的不同用户组的应用。每一个队列指定能够应用的资源范畴.
    2. 每一个队列外部, 依照 FIFO 的形式调度 Application. 当某个队列的资源闲暇时, 能够将它的残余资源共享给其余队列.

Yarn 源码

参考

  • https://www.cnblogs.com/dan2/…
  • https://www.cnblogs.com/dan2/…
  • https://www.cnblogs.com/dan2/…
  • https://www.bilibili.com/vide…

正文完
 0