大数据系列5hdfs的学习

35次阅读

共计 5861 个字符,预计需要花费 15 分钟才能阅读完成。

1. hdfs(分布式文件系统)

1.1 分布式文件系统

数据集的大小超过一台独立的计算机的存储能力时, 就要通过网络中的多个机器来存储数据集, 把管理网络中多台计算机组成的文件系统, 称为分布式文件系统

1.2 hdfs 的特点

  • 分布式

    • 数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统 ,
  • 高可用

    • 副本机制
  • 通透性

    • 实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般

1.3 hdfs 的体系架构

  • namenode

    • 名称节点
    • 文件系统的管理节点
    • 维护着整个文件系统的文件目录树
    • 接收用户的请求
  • datanode

    • 数据节点
    • 存储 block(一个 block 在 hadoop1.x 的版本中 64mb, 在 hadoop2.x 的版本中是 128mb)

1.4 hdfs 的设计

  • 流式数据的访问

    • 一次写入多次读取
  • 商用硬件

    • hadoop 不需要运行在昂贵的商业机器上 (ibm 的小型机等), 只需要普通的机器即可
  • 低时间延时的数据访问

    • 要求几十毫秒获取响应结果的应用数据不能使用 hdfs 来存储
    • 虽然 hdfs 不能解决低延迟的访问, 但是基于 hdfs 的 hbase 能解决延迟问题
  • 大量的小文件

    • 每个文件在 namenode 中, 存储文件目录信息,block 信息, 约占 150byte
    • hdfs 不适合存储小文件
  • 多用户写入, 任意修改文件

    • 存储在 hdfs 中的文件只能有一个写入者 (writer)
    • 只能在文件末尾追加数据, 不能在任意位置修改文件

1.4 block 的大小规划

  • block: 数据块

    • 大数据集存储的基本单位
    • block 在 hadoop1.x 的版本中 64mb, 在 hadoop2.x 的版本中是 128mb
    • 为什么会有以上的设计

      • 硬盘有个寻址时间 (10ms)
      • 寻址时间占传输时间的 1%
      • 硬盘的读取速率一般为 100mb/s

1.5 secondary namenode

  • 合并 edits 与 fsimage
  • 合并的时机

    • 3600s
    • 64mb

2. hdfs 的操作

2.1 图形化操作

2.2 shell 操作

2.3 API 操作

3. hdfs 的操作 (图形界面)

3.1 hdfs 的启动流程

  1. 进入安全模式
  2. 加载 fsimage
  3. 加载 edits
  4. 保存检查点 (融合 fsimage 和 edits 文件, 生成新的 fsimage)
  5. 退出安全模式

3.2 通过浏览器访问

http://namenode:50070

4. hdfs 的操作 (shell 操作)

  • hdfs dfs
  • hadoop fs

5. hdfs 的操作 (API 操作)

5.1 依赖 POM

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.4</version>
</dependency>

5.2 hdfs 读写文件

import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Test;
public class HdfsTest {
    /**
     * 写文件操作
     */
    @Test
    public void testWriteFile() throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://uplooking01:8020");
        // 创建文件系统对象
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/test002.txt");
        FSDataOutputStream fsDataOutputStream = fs.create(path, true);
        fsDataOutputStream.write("hello".getBytes());
        fsDataOutputStream.flush();
        fsDataOutputStream.close();}

    /**
     * 读文件操作
     */
    @Test
    public void testReadFile() throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://uplooking01:8020");
        // 创建文件系统对象
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/test002.txt");
        FSDataInputStream fsDataInputStream = fs.open(path);
        IOUtils.copy(fsDataInputStream, System.out);
    }


    /**
     * 上传文件操作
     */
    @Test
    public void testuploadFile() throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://uplooking01:8020");
        // 创建文件系统对象
        FileSystem fs = FileSystem.get(conf);
        Path fromPath = new Path("file:///f:/test01.txt");
        Path toPath = new Path("/test01.txt");
        fs.copyFromLocalFile(false, fromPath, toPath);
    }

    /**
     * 下载文件操作
     */
    @Test
    public void testdownloadFile() throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://uplooking01:8020");
        // 创建文件系统对象
        FileSystem fs = FileSystem.get(conf);
        Path fromPath = new Path("/test01.txt");
        Path toPath = new Path("file:///f:/test01.txt");
        fs.copyToLocalFile(false, fromPath, toPath);
    }


    /**
     * 下载文件操作
     */
    @Test
    public void testOtherFile() throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://uplooking01:8020");
        // 创建文件系统对象
        FileSystem fs = FileSystem.get(conf);
//        BlockLocation[] blockLocations = fs.getFileBlockLocations(new Path("/test01.txt"), 0, 134217730);
//        System.out.println(blockLocations);
        FileStatus[] listStatus = fs.listStatus(new Path("/test01.txt"));
        System.out.println(listStatus);
    }
}

3. hdfs 的高级操作

回滚 edits: hdfs dfsadmin -rollEdits

进入安全模式: hdfs dfsadmin -safemode | enter | leave| get| wait

融合 edits 和 fsimage: hdfs dfsadmin -saveNamespace:

查看 fsimage: hdfs oiv -i -o -p

查看 edits: hdfs oev -i -o -p

4. hdfs 中的配额管理

  • 目录配额

    • 设置目录配额

      • hdfs dfsadmin -setQuota n dir
      • n: 指的是目录配额的个数, 如果个数为 1, 则不能存放任何文件, 如果为 2 则只能放一个文件, 以此类推.
    • 清除目录配额

      • hdfs dfsadmin -clrQuota dir
  • 空间配额

    • 设置空间配额

      • hdfs dfsadmin -setSpaceQuota n dir

        • n: 指空间的大小
    • 清除空间配额

      • hdfs dfsadmin -clrSpaceQuota dir

5. 获取配置

hdfs getconf -confKey keyname

6. hadoop 中的 RPC

  • RPC(Remote Procedure Call)——远程过程调用协议
  • 它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议
  • 设计目的:

    • 调用远程的方法和调用本地方法一样方便

6.1 编写 RPC 服务端

定义协议

/**
 * 定义协议
 */
public interface IHelloService extends VersionedProtocol {
    public long versionID = 123456798L;// 定义协议的版本
    public String sayHello(String name);// 协议的具体条目
}

定义 RPC 的服务器实例类

/**
 * 实例类, 实现了协议的类
 */
public class HelloServiceImpl implements IHelloService {
    @Override
    public String sayHello(String name) {System.out.println("==================" + name + "==================");
        return "hello" + name;
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {return versionID;}

    @Override
    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {return new ProtocolSignature();
    }
}

定义 RPC 程序的启动程序

public class MyRpcServer {public static void main(String[] args) throws IOException {Configuration conf = new Configuration();
        RPC.Server server = new RPC.Builder(conf)
            .setBindAddress("172.16.4.3")// 配置主机
            .setPort(8899)// 配置端口
            .setProtocol(IHelloService.class)// 配置协议
            .setInstance(new HelloServiceImpl())// 配置实例, 可以配置多个
            .build();
        server.start();
        System.out.println("RPC 服务器启动成功....");
    }
}

6.2 编写 RPC 客户端

定义协议

/**
 * 定义协议
 */
public interface IHelloService extends VersionedProtocol {
    public long versionID = 123456798L;// 定义协议的版本
    public String sayHello(String name);// 协议的具体条目
}

定义客户端启动程序

Configuration conf = new Configuration();
ProtocolProxy<IHelloService> proxy = RPC.getProtocolProxy(IHelloService.class, IHelloService.versionID, new InetSocketAddress("172.16.4.3", 8899), conf);
IHelloService helloService = proxy.getProxy();
String ret = helloService.sayHello("xiaoming");
System.out.println(ret);

7. 独立启动 namenode datanode

hadoop-daemon.sh start namenode

hadoop-daemon.sh start datanode

hadoop-daemon.sh start secondarynamenode

yarn-daemon.sh start resourcemanager

yarn-daemon.sh start nodemanager

8. 节点的服役和退役

  • 动态的添加节点, 不需要停止整个集群
  • hdfs 中维护着一个白名单和一个黑名单

8.1 节点服役

== 在 namenode 中操作 ==

hdfs-site.xm

<!-- 白名单 -->
<property>
    <name>dfs.hosts</name>
    <value>/opt/hadoop/etc/hadoop/dfs.include</value>
</property> 

创建白名单文件

/opt/hadoop/etc/hadoop/dfs.include

​ uplooking03

​ uplooking04

​ uplooking05

​ uplooking06

刷新节点:

hdfs dfsadmin -refreshNodes

8.1 节点退役

  • 从白名单移除
  • 添加到黑名单
  • 刷新节点
  • 从黑名单移除
  • 停止 datanode 进程

正文完
 0