一. HDFS 客户端外围代码
1. Configuration
Configuration 提供对配置参数的拜访,通常称之为配置文件类 。次要 用于加载或者设定程序运行时相干的参数属性。
1. Configuration 加载默认配置
首先加载了静态方法和动态代码块,其中在动态代码块中显示默认加载了两个配置文件:
core-default.xml 以及 core-site.xml
2. Configuration 加载用户设置
通过 conf.set 设置的属性也会被加载。
1. FileSystem
FileSystem 类是一个通用的文件系统的形象基类。具体来说它 能够实现为一个分布式的文件系统 , 也能够实现为一个本地文件系统 。所有的可能会应用到 HDFS 的用户代码在进行编写时都应该应用 FileSystem 对象。
代表 本地文件系统的实现是 LocalFileSystem,代表 分布式文件系统的实现是 DistributedFileSystem。当然针对其余 hadoop 反对的文件系统也有不同的具体实现。
因而 HDFS 客户端在进行读写操作之前,须要创立 FileSystem 对象的实例。
1. 获取 FileSystem 实例
FileSystem 对象是通过调用 getInternal 办法失去的。
首先在 getInternal 办法中调用了 createFileSystem 办法,进去该办法:
FileSystem 实例是通过反射的形式取得的,具体实现是通过调用反射工具类 ReflectionUtils 的 newInstance 办法并将 class 对象以及 Configuration 对象作为参数传入最终失去了 FileSystem 实例。
二. HDFS 通信协议
1. 简介
HDFS 作为一个分布式文件系统,它的某些流程是非常复杂的(例如读、写文件等典型流程),经常波及数据节点、名字节点和客户端三者之间的配合、互相调用能力实现。为了升高节点间代码的耦合性,进步单个节点代码的内聚性,HDFS 将这些节点间的调用形象成不同的接口。
HDFS 节点间的接口次要有两种类型:
Hadoop RPC 接口 :基于 Hadoop RPC 框架实现的接口;
流式接口:基于 TCP 或者 HTTP 实现的接口;
2. Hadoop RPC 接口
1. RPC 介绍
RPC 全称 Remote Procedure Call——近程过程调用。就是为了解决近程调用服务的一种技术,使得调用者像调用本地服务一样不便通明。
通信模块: 传输 RPC 申请和响应的网络通信模块,能够基于 TCP 协定,也能够基于 UDP 协定,能够是同步,也能够是异步的。
客户端 Stub 程序: 服务器和客户端都包含 Stub 程序。在客户端,Stub 程序体现的就像本地程序一样,但底层却会将调用申请和参数序列化并通过通信模块发送给服务器。之后 Stub 程序期待服务器的响应信息,将响应信息反序列化并返回给申请程序。
服务器端 Stub 程序: 在服务器端,Stub 程序会将近程客户端发送的调用申请和参数反序列化,依据调用信息触发对应的服务程序,而后将服务程序返回的响应信息序列化并发回客户端。
申请程序: 申请程序会像调用本地办法一样调用客户端 Stub 程序,而后接管 Stub 程序返回的响应信息。
服务程序: 服务器会接管来自 Stub 程序的调用申请,执行对应的逻辑并返回执行后果。
Hadoop RPC 调用使得 HDFS 过程可能像本地调用一样调用另一个过程中的办法,并且能够传递 Java 根本类型或者自定义类作为参数,同时接管返回值。如果近程过程在调用过程中出现异常,本地过程也会收到对应的异样。目前Hadoop RPC 调用是基于 Protobuf 实现的。
Hadoop RPC 接口次要定义在 org.apache.hadoop.hdfs.protocol 包和 org.apache.hadoop.hdfs.server.protocol 包中,外围的接口有:
ClientProtocol、ClientDatanodeProtocol、DatanodeProtocol。
2. ClientProtocol
ClientProtocol 定义了客户端与名字节点间的接口,这个接口定义的办法十分多,客户端对文件系统的所有操作都须要通过这个接口 ,同时 客户端读、写文件等操作也须要先通过这个接口与 Namenode 协商之后,再进行数据块的读出和写入操作。
ClientProtocol 定义了所有由客户端发动的、由 Namenode 响应的操作。这个接口十分大,有 80 多个办法,外围的是:HDFS 文件读相干的操作、HDFS 文件写以及追加写的相干操作。
- 读数据相干的办法
ClientProtocol 中与客户端读取文件相干的办法次要有两个:getBlockLocations()和 reportBadBlocks()
客户端会 调用 ClientProtocol.getBlockLocations)办法获取 HDFS 文件指定范畴内所有数据块的地位信息 。这个办法的 参数是 HDFS 文件的文件名以及读取范畴 , 返回值是文件指定范畴内所有数据块的文件名以及它们的地位信息 ,应用 LocatedBlocks 对象封装。每个数据块的地位信息指的是存储这个数据块正本的所有 Datanode 的信息,这些 Datanode 会以与以后客户端的间隔远近排序。 客户端读取数据时,会首先调用 getBlockLocations()办法获取 HDFS 文件的所有数据块的地位信息,而后客户端会依据这些地位信息从数据节点读取数据块。
客户端会调用 ClientProtocol.reportBadBlocks()办法向 Namenode 汇报谬误的数据块。当客户端从数据节点读取数据块且发现数据块的校验和并不正确时,就会调用这个办法向 Namenode 汇报这个谬误的数据块信息。 - 写、追加数据相干办法
在 HDFS 客户端操作中最重要的一部分就是写入一个新的 HDFS 文件,或者关上一个已有的 HDFS 文件并执行追加写操作。ClientProtocol 中定义了 8 个办法反对 HDFS 文件的写操作 :create()、append()、addBlock()、complete(),abandonBlockO),getAddtionnalDatanodes()、updateBlockForPipeline() 和 updatePipeline()。
create()办法用于在 HDFS 的文件系统目录树中创立一个新的空文件 ,创立的门路由 src 参数指定。这个空文件创建后对于其余的客户端是“可读”的,然而这些客户端不能删除、重命名或者挪动这个文件,直到这个文件被敞开或者租约过期。客户端写一个新的文件时, 会首先调用 create 办法在文件系统目录树中创立一个空文件,而后调用 addBlock 办法获取存储文件数据的数据块的地位信息,最初客户端就能够依据地位信息建设数据流管道,向数据节点写入数据了。
当客户端实现了整个文件的写入操作后,会调用 complete()办法告诉 Namenode。这个操作会提交新写入 HDFS 文件的所有数据块,当这些数据块的正本数量满足系统配置的最小正本系数(默认值为 1),也就是该文件的所有数据块至多有一个无效正本时,complete()办法会返回 true,这时 Namenode 中文件的状态也会从构建中状态转换为失常状态;否则,complete 会返回 false,客户端就须要反复调用 complete 操作,直至该办法返回 true
- ClientDatanodeProtocol
客户端与数据节点间的接口。ClientDatanodeProtocol 中定义的办法次要是用于客户端获取数据节点信息时调用,而真正的数据读写交互则是通过流式接口进行的。
ClientDatanodeProtocol 中定义的接口能够分为两局部:一部分是反对 HDFS 文件读取操作的 ,例如 getReplicaVisibleLength()以及 getBlockLocalPathInfo); 另一部分是反对 DFSAdmin 中与数据节点治理相干的命令。咱们重点关注第一局部。
- getReplicaVisibleLength
客户端会 调用 getReplicaVisibleLength()办法从数据节点获取某个数据块正本实在的数据长度。当客户端读取一个 HDFS 文件时,须要获取这个文件对应的所有数据块的长度,用于建设数据块的输出流,而后读取数据。然而 Namenode 元数据中文件的最初一个数据块长度与 Datanode 理论存储的可能不统一,所以客户端在创立输出流时就须要调用 getReplicaVisibleLength()办法从 Datanode 获取这个数据块的实在长度。 - getBlockLocalPathInfo
HDFS 对于本地读取,也就是 Client 和保留该数据块的 Datanode 在同一台物理机器上时,是有很多优化的。Client 会调用 ClientProtocol.getBlockLocalPathInfo)办法获取指定数据块文件以及数据块校验文件在以后节点上的本地门路,而后利用这个本地门路执行本地读取操作,而不是通过流式接口执行近程读取,这样也就大大优化了读取的性能。 -
DatanodeProtocol
数据节点通过这个接口与名字节点通信,同时名字节点会通过这个接口中办法的返回值向数据节点下发指令。留神,这是名字节点与数据节点通信的惟一形式。这个接口十分重要,数据节点会通过这个接口向名字节点注册、汇报数据块的全量以及增量的存储状况。同时,名字节点也会通过这个接口中办法的返回值,将名字节点指令带回该数据块,依据这些指令,数据节点会执行数据块的复制、删除以及复原操作。
能够将 DatanodeProtocol 定义的办法分为三种类型: Datanode 启动相干、心跳相干以及数据块读写相干。2. 基于 TCP/HTTP 流式接口
HDFS 除了定义 RPC 调用接口外,还定义了流式接口,流式接口是 HDFS 中基于 TCP 或者 HTTP 实现的接口。在 HDFS 中,流式接口包含了基于 TCP 的 DataTransferProtocol 接口,以及 HA 架构中 Active Namenode 和 Standby Namenode 之间的 HTTP 接口。
1. DataTransferProtocol
DataTransferProtocol 是用来形容写入或者读出 Datanode 上数据的基于 TCP 的流式接口,HDFS 客户端与数据节点以及数据节点与数据节点之间的数据块传输就是基于 DataTransferProtocol 接口实现的。HDFS 没有采纳 Hadoop RPC 来实现 HDFS 文件的读写性能,是因为 Hadoop RPC 框架的效率目前还不足以撑持超大文件的读写,而应用基于 TCP 的流式接口有利于批量解决数据,同时进步了数据的吞吐量。
DataTransferProtocol 中最重要的办法就是readBlock()和 writeBlock()。 - readBlock:从以后 Datanode 读取指定的数据块
- writeBlock:将指定数据块写入数据流管道(pipeLine)中。
DataTransferProtocol 接口调用并没有应用 Hadoop RPC 框架提供的性能,而是定义了用于发送 DataTransferProtocol 申请的 Sender 类,以及用于响应 DataTransferProtocol 申请的 Receiver 类。
Sender 类和 Receiver 类都实现了 DataTransferProtocol 接口。咱们假如 DFSClient 发动了一个 DataTransferProtocol.readBlock()操作,那么 DFSClient 会调用 Sender 将这个申请序列化,并传输给远端的 Receiver。远端的 Receiver 接管到这个申请后, 会反序列化申请, 而后调用代码 执行读取操作。
3. 数据写入流程剖析
1. 写入流程图
2. 写入数据代码
package cn.itcast.hdfs.write;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.FileInputStream;
public class HDFSWriteDemo {public static void main(String[] args) throws Exception{
// 设置客户端用户身份:root 具备在 hdfs 读写权限
System.setProperty("HADOOP_USER_NAME","root");
// 创立 Conf 对象
Configuration conf = new Configuration();
// 设置操作的文件系统是 HDFS 默认是 file:///
conf.set("fs.defaultFS","hdfs://node1:8020");
// 创立 FileSystem 对象 其是一个通用的文件系统的形象基类
FileSystem fs = FileSystem.get(conf);
// 设置文件输入的门路
Path path = new Path("/helloworld.txt");
// 调用 create 办法创立文件
FSDataOutputStream out = fs.create(path);
// 创立本地文件输出流
FileInputStream in = new FileInputStream("D:\\datasets\\hdfs\\helloworld.txt");
// IO 工具类实现流对拷贝
IOUtils.copy(in,out);
// 敞开连贯
fs.close();}
}
3. 写入数据流程梳理
1. 客户端申请 NameNode 创立
HDFS 客户端通过对 DistributedFileSystem 对象调用 create()申请创立文件。DistributedFileSystem 为客户端返回 FSDataOutputStream 输入流对象。通过源码正文能够发现 FSDataOutputStream 是一个包装类,所包装的是 DFSOutputStream。
能够通过 create 办法调用一直跟上来,能够发现最终的调用也验证了上述论断,返回的是 DFSOutputStream。
点击进入代码 DFSOutputStream dfsos = dfs.create 能够发现,DFSOutputStream 这个类是从 DFSClient 类的 create 办法中返回过去的
点击进入代码 DFSOutputStream dfsos = dfs.create 能够发现,DFSOutputStream 这个类是从 DFSClient 类的 create 办法中返回过去的。
DFSClient 类中的 DFSOutputStream 实例对象是通过调用 DFSOutputStream 类的 newStreamForCreate 办法产生的。
2. Namenode 执行申请查看
DistributedFileSystem 对 namenode 进行 RPC 调用,申请上传文件。namenode 执行各种查看判断:指标文件是否存在、父目录是否存在、客户端是否具备创立该文件的权限。查看通过,namenode 就会为创立新文件记录一条记录。否则,文件创建失败并向客户端抛出一个 IOException。
3. DataStreamer 类
在之前的 newStreamForCreate 办法中,咱们发现了最终返回的是 out 对象,并且在返回之前,调用了 out 对象的 start 办法。
DataStreamer 类是 DFSOutputSteam 的一个外部类,在这个类中,有一个办法叫做 run 办法,数据写入的要害代码就在这个 run 办法中实现。
4. DataStreamer 写数据
在客户端写入数据时 ,DFSOutputStream 将它分成一个个数据包(packet 默认 64kb), 并 写入一个称之为数据队列(data queue)的外部队列。DataStreamer 申请 NameNode 挑选出适宜存储数据正本的一组 DataNode。这一组 DataNode 采纳 pipeline 机制做数据的发送。默认是 3 正本存储。
DataStreamer 将数据包流式传输到 pipeline 的第一个 datanode, 该 DataNode 存储数据包并将它发送到 pipeline 的第二个 DataNode。同样,第二个 DataNode 存储数据包并且发送给第三个(也是最初一个)DataNode。
DFSOutputStream 也保护着一个外部数据包队列来期待 DataNode 的收到确认回执,称之为确认队列(ack queue), 收到 pipeline 中所有 DataNode 确认信息后,该数据包才会从确认队列删除。
客户端实现数据写入后,将在流上调用 close()办法敞开。该操作将残余的所有数据包写入 DataNode pipeline,并在分割到 NameNode 告知其文件写入实现之前,期待确认。
因为 namenode 曾经晓得文件由哪些块组成(DataStream 申请调配数据块),因而它仅需期待最小复制块即可胜利返回。数据块最小复制是由参数 dfs.namenode.replication.min 指定,默认是 1.
4. 数据读取流程剖析
1. 读取流程图
2. 读取数据代码
package cn.itcast.hdfs.read;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.FileInputStream;
import java.io.FileOutputStream;
public class HDFSReadDemo {public static void main(String[] args) throws Exception{
// 设置客户端用户身份:root 具备在 hdfs 读写权限
System.setProperty("HADOOP_USER_NAME","root");
// 创立 Conf 对象
Configuration conf = new Configuration();
// 设置操作的文件系统是 HDFS 默认是 file:///
conf.set("fs.defaultFS","hdfs://node1:8020");
// 创立 FileSystem 对象 其是一个通用的文件系统的形象基类
FileSystem fs = FileSystem.get(conf);
// 调用 open 办法读取文件
FSDataInputStream in = fs.open(new Path("/helloworld.txt"));
// 创立本地文件输入流
FileOutputStream out = new FileOutputStream("D:\\helloworld.txt");
//IO 工具类实现流对拷贝
IOUtils.copy(in,out);
// 敞开连贯
fs.close();}
}