乐趣区

关于java:Ceph分布式存储实践应用之集群测试验证Rados运用

1. 创立 Cephfs

集群创立完后,默认没有文件系统,要是实现文件的存储操作,咱们还需创立一个 Cephfs 能够反对对外拜访的文件系统。

  1. 创立两个存储池, 执行两条命令:

    ceph osd pool create cephfs_data 128
    ceph osd pool create cephfs_metadata 64

    少于 5 个 OSD 可把 pg_num 设置为 128

    OSD 数量在 5 到 10,能够设置 pg_num 为 512

    OSD 数量在 10 到 50,能够设置 pg_num 为 4096

    OSD 数量大于 50,须要计算 pg_num 的值

    通过上面命令能够列出以后创立的存储池:

    ceph osd lspools
  2. 创立 fs, 名称为 fs_test:

    ceph fs new fs_test cephfs_metadata cephfs_data
  3. 状态查看,以下信息代表失常:

    [root@CENTOS7-1 mgr-dashboard]# ceph fs ls
    name: fs_test, metadata pool: cephfs_metadata, data pools: [cephfs_data]
    [root@CENTOS7-1 mgr-dashboard]# ceph mds stat
    fs_test-1/1/1 up  {0=centos7-1=up:active}

    附:如果创立谬误,须要删除,执行:

    ceph fs rm fs_test --yes-i-really-mean-it
    ceph osd pool delete cephfs_data cephfs_data  --yes-i-really-really-mean-it

    确保在 ceph.conf 中开启以下配置:

    [mon]
    mon allow pool delete = true
  4. 采纳 fuse 挂载

    先确定 ceph-fuse 命令能执行,如果没有,则装置:

     yum -y install ceph-fuse
  5. 创立挂载目录

    mkdir -p /usr/local/cephfs_directory
  6. 挂载 cephfs

    [root@node3 ~]# ceph-fuse -k /etc/ceph/ceph.client.admin.keyring -m 10.10.20.11:6789 /usr/local/cephfs_directory
    ceph-fuse[6687]: starting ceph client
    2019-07-14 21:39:09.644181 7fa5be56e040 -1 init, newargv = 0x7fa5c940b500 newargc=9
    ceph-fuse[6687]: starting fuse
  7. 查看磁盘挂载信息

    [root@CENTOS7-1 mgr-dashboard]# df -h
    Filesystem               Size  Used Avail Use% Mounted on
    /dev/mapper/centos-root   38G  3.0G   35G   8% /
    devtmpfs                 1.9G     0  1.9G   0% /dev
    tmpfs                    1.9G     0  1.9G   0% /dev/shm
    tmpfs                    1.9G   20M  1.9G   2% /run
    tmpfs                    1.9G     0  1.9G   0% /sys/fs/cgroup
    /dev/sda1                197M  167M   31M  85% /boot
    tmpfs                    378M     0  378M   0% /run/user/0
    tmpfs                    1.9G   24K  1.9G   1% /var/lib/ceph/osd/ceph-0
    ceph-fuse                 27G     0   27G   0% /usr/local/cephfs_directory
    tmpfs                    378M     0  378M   0% /run/user/1000
    

    /usr/local/cephfs_directory 目录已胜利挂载。

2. 客户端连贯验证(Rados Java)

  1. 装置好 JDK、GIT 和 MAVEN。
  2. 下载 rados java 客户端源码

    git clone https://github.com/ceph/rados…

    下载目录地位:

    [root@CENTOS7-1 rados-java]# pwd
    /usr/local/sources/rados-java
  3. 执行 MAVEN 装置, 疏忽测试用例:

    [root@CENTOS7-1 rados-java]# mvn install -Dmaven.test.skip=true

    生成 jar 包,rados-0.6.0-SNAPSHOT.jar

    [root@CENTOS7-1 target]# ll
    total 104
    drwxr-xr-x 3 root root     17 Jul 14 19:31 classes
    drwxr-xr-x 2 root root     27 Jul 14 19:31 dependencies
    drwxr-xr-x 3 root root     25 Jul 14 19:31 generated-sources
    drwxr-xr-x 2 root root     28 Jul 14 19:31 maven-archiver
    drwxr-xr-x 3 root root     35 Jul 14 19:31 maven-status
    -rw-r--r-- 1 root root 105570 Jul 14 19:31 rados-0.6.0-SNAPSHOT.jar
  4. 创立软链接,退出 CLASSPATH

    ln -s /usr/local/sources/rados-java/target/rados-0.6.0-SNAPSHOT.jar /usr/local/jdk1.8.0_181/jre/lib/ext/rados-0.6.0-SNAPSHOT.jar

    装置 jna

    yum -y install jna

    创立软链接

    ln -s /usr/share/java/jna.jar /usr/local/jdk1.8.0_181/jre/lib/ext/jna.jar

    查看

    [root@CENTOS7-1 target]# ll /usr/local/jdk1.8.0_181/jre/lib/ext/jna.jar 
    lrwxrwxrwx 1 root root 23 Jul 14 10:23 /usr/local/jdk1.8.0_181/jre/lib/ext/jna.jar -> /usr/share/java/jna.jar
    [root@CENTOS7-1 target]# ll /usr/local/jdk1.8.0_181/jre/lib/ext/rados-0.6.0-SNAPSHOT.jar 
    lrwxrwxrwx 1 root root 40 Jul 14 10:25 /usr/local/jdk1.8.0_181/jre/lib/ext/rados-0.6.0-SNAPSHOT.jar -> /usr/share/java/rados-0.6.0-SNAPSHOT.jar
  5. 创立 JAVA 测试类

    CephClient 类,留神,最新版 0.6 的异样解决包地位已发生变化。

    import com.ceph.rados.Rados;
    import com.ceph.rados.exceptions.*;
    
    import java.io.File;
    
    public class CephClient {public static void main (String args[]){
    
                    try {Rados cluster = new Rados("admin");
                            System.out.println("Created cluster handle.");
    
                            File f = new File("/etc/ceph/ceph.conf");
                            cluster.confReadFile(f);
                            System.out.println("Read the configuration file.");
    
                            cluster.connect();
                            System.out.println("Connected to the cluster.");
    
                    } catch (RadosException e) {System.out.println(e.getMessage() + ":" + e.getReturnValue());
                    }
            }
    }
  6. 运行验证

    须要在 linux 环境下运行,且要在 client 节点。

    编译并运行:

    [root@CENTOS7-1 sources]# javac CephClient.java 
    [root@CENTOS7-1 sources]# java CephClient
    Created cluster handle.
    Read the configuration file.
    Connected to the cluster.

    胜利与 ceph 建设连贯。

3. Ceph 与我的项目集成应用

  1. 工程设计

    演示常常应用的文件上传与下载性能,看 java 是如何在我的项目中应用。

  2. 工程构造
    创立一个 Spring Boot 工程,创立一个是启动类和一个 ceph 操作封装类。
  3. 工程实现

    CephDemoApplication 启动类:

    @SpringBootApplication
    public class CephDemoApplication {public static void main(String[] args) {System.out.println("start....");
            String username = "admin";
            String monIp = "10.10.20.11:6789;10.10.20.12:6789;10.10.20.13:6789";
            String userKey = "AQBZBypdMchvBRAAbWVnIGyYNvxWQZ2UkuiYew==";
            String mountPath = "/";
            CephOperator cephOperate = null;
            try {String opt = (args == null || args.length < 1)? "" : args[0];
                cephOperate = new CephOperator(username, monIp, userKey, mountPath);
                if("upload".equals(opt)) {cephOperate.uploadFileByPath("/temp_upload_fs", args[1]);
                }else if("download".equals(opt)) {cephOperate.downloadFileByPath("/temp_download_fs", args[1]);
                }else {System.out.println("Unrecognized Command! Usage  opt[upload|download] filename[path]!");
                }
            }catch(Exception e) {e.printStackTrace();
            }finally {if(null != cephOperate) {cephOperate.umount();
                }
            }
            System.out.println("end....");
    
        }
    
    }

    monIp 为 ceph client 节点连贯地址与端口,反对多个;

    userKey 为密钥,对应 ceph.client.admin.keyring 中的 key 值。

    启动接管两个参数,一个是标识上传或下载,另一个是标识文件名称。

    CephOperator 操作类的实现:

    public class CephOperator {
    
        private CephMount mount;
        private String username;
        private String monIp;
        private String userKey;
    
        public CephOperator(String username, String monIp, String userKey, String mountPath) {
            this.username = username;
            this.monIp = monIp;
            this.userKey = userKey;
            this.mount = new CephMount(username);
            this.mount.conf_set("mon_host", monIp);
            mount.conf_set("key", userKey);
            mount.mount(mountPath);
        }
    
        // 查看目录列表
        public void listDir(String path) throws IOException {String[] dirs = mount.listdir(path);
            System.out.println("contents of the dir:" + Arrays.asList(dirs));
        }
    
        // 新建目录
        public void mkDir(String path) throws IOException {mount.mkdirs(path, 0755);// 0 示意十进制
        }
    
        // 删除目录
        public void delDir(String path) throws IOException {mount.rmdir(path);
        }
    
        // 重命名目录 or 文件
        public void renameDir(String oldName, String newName) throws IOException {mount.rename(oldName, newName);
        }
    
        // 删除文件
        public void delFile(String path) throws IOException {mount.unlink(path);
        }
    
        /**
         * 上传指定门路文件
         * @param filePath
         * @param fileName
         * @return
         */
        public Boolean uploadFileByPath(String filePath, String fileName) {
    
            // 如果 mount 操作单元为空则间接返回
            if (this.mount == null) {return null;}
            // 文件形容信息定义
            char pathChar = File.separatorChar;
            String fileFullName = "";
            Long fileLength = 0l;
            Long uploadedLength = 0l;
            File file = null;
    
            // 定义文件流
            FileInputStream fis = null;
    
            // 获取本地文件信息
            fileFullName = filePath + pathChar + fileName;
            file = new File(fileFullName);
            if (!file.exists()) {return false;}
            fileLength = file.length();
    
            // 获取本地文件流
            try {fis = new FileInputStream(file);
            } catch (FileNotFoundException e) {e.printStackTrace();
            }
    
            // 判断文件是否曾经存在
            String[] dirList = null;
            Boolean fileExist = false;
            try {dirList = this.mount.listdir("/");
                for (String fileInfo : dirList) {if (fileInfo.equals(fileName)) {fileExist = true;}
                }
            } catch (FileNotFoundException e) {e.printStackTrace();
            }
    
            if (!fileExist) {
                try {
                    // 创立文件并设置为写入模式
                    this.mount.open(fileName, CephMount.O_CREAT, 0);
                    int fd = this.mount.open(fileName, CephMount.O_RDWR, 0);
    
                    // 开始文件传输
                    int length = 0;
                    byte[] bytes = new byte[1024];
                    while ((length = fis.read(bytes, 0, bytes.length)) != -1) {
                        // 上传写入数据
                        this.mount.write(fd, bytes, length, uploadedLength); 
    
                        // 更新上传进度
                        uploadedLength += length;
    
                        // 输入上传百分比
                        float rate = (float) uploadedLength * 100 / (float) fileLength;
                        String rateValue = (int) rate + "%";
                        System.out.println(rateValue);
    
                        // 上传实现后退出
                        if (uploadedLength == fileLength) {break;}
                    }
                    System.out.println("文件传输胜利!");
    
                    // 设置文件权限
                    this.mount.fchmod(fd, 0666);
    
                    // 敞开操作
                    this.mount.close(fd);
                    if (fis != null) {fis.close();
                    }
                    return true;
                } catch (Exception e) {e.printStackTrace();
                }
            } else if (fileExist) {
                try {
                    // 获取文件大小
                    CephStat stat = new CephStat();
                    this.mount.stat(fileName, stat);
                    long lastLen= stat.size;
                    int fd = this.mount.open(fileName, CephMount.O_RDWR, 0);
    
                    // 开始文件传输
                    int length = 0;
                    byte[] bytes = new byte[1024];
                    long uploadActLen= 0;
                    while ((length = fis.read(bytes, 0, bytes.length)) != -1) {
                        // 更新写入
                        this.mount.write(fd, bytes, length, lastLen);
    
                        // 更新文件大小
                        uploadActLen += length;
                        // 更新文件上传百分比
                        float rate = (float) uploadActLen * 100 / (float) fileLength;
                        String rateValue = (int) rate + "%";
                        System.out.println(rateValue);
    
                        // complete flag
                        if (uploadActLen == fileLength) {break;}
                    }
                    // 屡次上传会进行追加
                    System.out.println("追加文件传输胜利!");
    
                    // 设置文件权限
                    this.mount.fchmod(fd, 0666);
    
                    // 敞开操作
                    this.mount.close(fd);
                    if (fis != null) {fis.close();
                    }
                    return true;
                } catch (Exception e) {e.printStackTrace();
                }
            } else {
                try {if (fis != null) {fis.close();
                    }
                } catch (Exception e) {e.printStackTrace();
                }
                return false;
            }
    
            return false;
        }
    
        // 设置以后的工作上传目录
        public void setWorkDir(String path) throws IOException {mount.chdir(path);
        }
     
        // 内部获取 mount
        public CephMount getMount() {return this.mount;}
    
        // 勾销文件挂载
        public void umount() {mount.unmount();
        }
       
        /**
         * 下载文件到指定门路
         * @param filePath
         * @param fileName
         * @return
         */
        public Boolean downloadFileByPath(String filePath, String fileName) {
    
            // 如果 mount 操作单元为空则间接返回
            if (this.mount == null) {return null;}
    
            // 文件形容信息定义
            char pathChar = File.separatorChar;
            String fileFullName = "";
            Long fileLength = 0l;
            Long downloadedLength = 0l;
            File file = null;
    
            // 定义文件流
            FileOutputStream fos = null;
            RandomAccessFile raf = null;
    
            // 创立新的文件
            fileFullName = filePath + pathChar + fileName;
            file = new File(fileFullName);
    
            // 获取 cephfs 的文件大小
            try {CephStat stat = new CephStat();
                this.mount.stat(fileName, stat);
                fileLength = stat.size;
            } catch (Exception e) {e.printStackTrace();
            }
    
            if (fileLength != 0) {if (!file.exists()) {
                    // 下载文件
                    int length = 10240;
                    byte[] bytes = new byte[length];
                    try {int fd = this.mount.open(fileName, CephMount.O_RDONLY, 0);
                        fos = new FileOutputStream(file);
                        float rate = 0;
                        String rateValue = "";
                        while ((fileLength - downloadedLength) >= length && (this.mount.read(fd, bytes, (long) length, downloadedLength)) != -1) {fos.write(bytes, 0, length);
                            fos.flush();
                            downloadedLength += (long) length;
    
                            // 输入进度百分比
                            rate = (float) downloadedLength * 100 / (float) fileLength;
                            rateValue = (int) rate + "%";
                            System.out.println(rateValue);
    
                            if (downloadedLength == fileLength) {break;}
                        }
                        if (downloadedLength != fileLength) {this.mount.read(fd, bytes, fileLength - downloadedLength, downloadedLength);
                            fos.write(bytes, 0, (int) (fileLength - downloadedLength));
                            fos.flush();
                            downloadedLength = fileLength;
    
                            // 输入进度百分比
                            rate = (float) downloadedLength * 100 / (float) fileLength;
                            rateValue = (int) rate + "%";
                            System.out.println(rateValue);
                        }
    
                        System.out.println("Download Success!");
                        fos.close();
                        this.mount.close(fd);
                        return true;
                    } catch (Exception e) {e.printStackTrace();
                    }
                } else if (file.exists()) {
                    // 下载文件
                    int length = 10240;
                    byte[] bytes = new byte[length];
                    Long filePoint = file.length();
                    try {int fd = this.mount.open(fileName, CephMount.O_RDONLY, 0);
                        raf = new RandomAccessFile(file, "rw");
                        raf.seek(filePoint);
                        downloadedLength = filePoint;
                        float rate = 0;
                        String rateValue = "";
                        while ((fileLength - downloadedLength) >= length && (this.mount.read(fd, bytes, (long) length, downloadedLength)) != -1) {raf.write(bytes, 0, length);
                            downloadedLength += (long) length;
    
                            // 输入进度百分比
                            rate = (float) downloadedLength * 100 / (float) fileLength;
                            rateValue = (int) rate + "%";
                            System.out.println(rateValue);
    
                            if (downloadedLength == fileLength) {break;}
                        }
                        if (downloadedLength != fileLength) {this.mount.read(fd, bytes, fileLength - downloadedLength, downloadedLength);
                            raf.write(bytes, 0, (int) (fileLength - downloadedLength));
                            downloadedLength = fileLength;
    
                            // 输入进度百分比
                            rate = (float) downloadedLength * 100 / (float) fileLength;
                            rateValue = (int) rate + "%";
                            System.out.println(rateValue);
                        }
                        // 如果下载中断, 会从上一次下载完结地位进行上传
                        System.out.println("Cut Point Download Success!");
                        raf.close();
                        this.mount.close(fd);
                        return true;
                    } catch (Exception e) {e.printStackTrace();
                    }
                } else {return false;}
            }else {System.out.println("the file is empty!");
            } 
            return true;   
        }   
    }

    POM 文件配置:

    <dependencies>
        <!-- Spring Boot 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <!-- Rados Java Api 依赖 -->
        <dependency>
            <groupId>com.ceph</groupId>
            <artifactId>rados</artifactId>
            <version>0.6.0</version>
        </dependency>
        <!-- Cephfs 文件系统依赖 -->
        <dependency>
            <groupId>com.ceph</groupId>
            <artifactId>libcephfs</artifactId>
            <version>0.80.5</version>
        </dependency>
    </dependencies>
    
    <!--Spring Boot 打包插件 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
  4. 测试验证

    通过 maven 命令 clean install 打包生成 jar 文件。

    • rz 命令上传至 client node 服务器

      [root@CENTOS7-1 sources]# ll ceph-demo.jar 
      -rw-r--r-- 1 root root 16915296 Jul 14  2019 ceph-demo.jar
    • 代码中的上传目录为 /temp_upload_fs,创立名为 upload.txt 的文件,内容为 abc123

      [root@CENTOS7-1 sources]# cat /temp_upload_fs/upload.txt
      abc123
    • 上传至 cephfs 服务

      [root@CENTOS7-1 sources]# java -jar ceph-demo.jar upload upload.txt
      start....
      100%
      文件传输胜利!end....
    • 下载 cephfs 文件

      文件名称为上传时创立的 upload.txt

      [root@CENTOS7-1 sources]# java -jar ceph-demo.jar download upload.txt
      start....
      100%
      Download Success!
      end....
    • 查看下载内容

      [root@CENTOS7-1 sources]# cat /temp_download_fs/upload.txt 
      abc123

      通过演示,能够看到可能通过 java client 在我的项目中胜利操作 ceph。

      ![file](/img/bVcQ44K)
      
  5. FAQ 问题

    1. 如果运行过程当中呈现 jni 找不到动静库,须要装置相干依赖:

      yum -y install libcephfs2 libcephfs_jni-devel

      并查看相应的软链接:

      [root@CENTOS7-1 sources]# ll /usr/lib/libcephfs_jni.so.2 
      lrwxrwxrwx 1 root root 25 Jul 14 11:34 /usr/lib/libcephfs_jni.so.2 -> /usr/lib64/libcephfs.so.2
    2. 如果 rados 版本 0.6.0 依赖,须要手工上传至 MAVEN 仓库,命令:

      mvn deploy:deploy-file -DgroupId=com.ceph -DartifactId=rados -Dversion=2.0.1 -Dpackaging=jar -Dfile=d:/TestCode/rados-0.6.0-SNAPSHOT.jar -url=http://192.168.19.102:8082/repository/maven-releases/ -DrepositoryId=nexus-releases

本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn

退出移动版