乐趣区

关于node.js:基于nodejs的minio的sdk自建服务中转文件的断点续传方案

背景

  • 公司业务问题,就不过多论述,总的需要就是须要一个断点续传的性能
  • 公司的文件存储是应用的 amazon s3,这个是通用的文件存储,相似阿里云的 oss,一个文件能够当成是一个对象,能够给文件对象做一些操作,amazon s3 是一个服务,能够创立一个一个桶用于存储文件,对于某个桶,有三种级别的权限:1、私有读,私有写;2、私有读,公有写;3、公有读,公有写
  • 为什么这个性能须要前端来做?因为后端只提供数据接口,不提供 web 服务,web 服务都是前端本人开发和解决
  • 如果你不理解 amazon s3 或者阿里云的 oss,那可能文章看的会有艰难

简略文件上传

amazon 的 s3,文件间接上传的话有这几种计划,分为:前端间接上传和应用 sdk 上传

一、前端间接上传,分私有写和公有写

​ 私有写:能够间接拼接 url:http://\<address\>/\<bucketname\>/\<objectname\>,在前端间接将文件发动 put 申请这个 url 即可,其中 address 是服务提供的域名,bucketname 是咱们的桶名称,objectname 是对象名(即文件名)

​ 公有写:必须要用到 minio 的 sdk,应用 sdk 提供的 presignedPutObject 办法生成一个上传地址,前端拿到链接后,也是在前端间接发动 put 申请即可,要留神,sdk 只能运行在服务端,也就是必须要有本人的服务

二、应用 sdk 上传,sdk 只能运行在服务端,流程基本上是,前端上传文件到服务端,服务端应用 sdk 将文件上传到 S3 的文件服务上

这两种计划,除了私有写的状况,其余的都须要有自建的服务端,而且从平安的角度上来说,设置私有写是不合理的

断点续传方案设计

要做这个断点续传的性能,是必须要有本人的服务的,和大多数人一样,做这个断点续传的性能的时候,我也是去网上搜他人的做法,而后找到适合的办法后,联合咱们的我的项目,我设计了这样一个计划:

  1. 用户抉择文件后,前端应用 file.slice 对文件进行分片
  2. 计算文件 hash,也就是文件的 md5,只有文件内容不变,文件的 hash 是不会变的,计算 hash 是一个同步工作,文件太大的话会把浏览器卡住,我应用的是 spark-md5+ 浏览器的 requestIdleCallback 的 api 来解决的,能够依据本人我的项目的状况应用 webworker 也是能够的
  3. 应用 hash 查问是否已有对应的文件,这个能够本人有本人的数据库,存储 hash 对应的文件链接,如果是用 hash 作为文件名的话,也能够调用 minio 的 sdk 的 statObject 办法晓得这个文件是否已上传,如果曾经有了文件信息,就能够不必上传文件,间接拿到链接即可,这个就是秒传的性能
  4. 如果未上传,则读取服务器本地,以这个 hash 命名的文件夹下曾经上传了哪些分片,将已上传的分片返回给前端,前端选择性的上传未上传的分片即可,这个就是断点续传的性能
  5. 前端上传分片的时候,会将文件的 hash、第几个分片等这些信息作为参数给到服务端,服务端拿到文件后,会以 hash 命名文件夹,以第几个分片命名该分片,将分片存储在服务端
  6. 所有分片上传实现后,在服务端对文件进行合并,再调用 minio 的 putObject 办法将文件上传到 S3 文件服务器,上传实现后,将服务器本地的文件删除

整个断点续传性能是实现了,不过这个计划有个不完满的中央,就是分片文件要在服务端存储和合并,合并后才上传到文件服务上,据我所知,规范的 S3 自身是有断点续传和合并文件的性能的,可不可以间接上传分片的时候,将分片上传到 S3 文件服务器上,分片都上传实现后,间接在 S3 文件服务器合并文件?答案在前面,不过过后的确没有找到能够用的计划,惟一找到最靠近的计划是百度的智能云提供的 api:https://cloud.baidu.com/doc/B… 然而 minio 的 sdk 不提供 uploadPart 办法,这个办法也行不通,所以只能先作罢

问题

下面的计划有一个致命的问题没有思考到,就是线上是有多台机器的,这些分片会上传到不同的机器,合并文件的时候是没法合并的,导致下面设计的计划都不能用了,发现这个问题还是因为在解决一个合并文件的时候发现有分片未找到的问题的时候才思考到的,所以要重新考虑新的计划,着重点还是上传的分片怎么间接上传到 S3 文件服务器上,在 S3 文件服务器上合并

为了解决这个计划,我看了 minio 的源码,看了 putObject 的源码后,理解到 putObject 的外围流程如下:

  1. 应用 block-stream2 将文件进行分块
  2. 应用 objectName 调用 findUploadId 查问 uploadId,如果没有 uploadId,会调用 initiateNewMultipartUpload 初始化一个 uploadId,通过本人的一些测试能够失去一些信息:

    2.1 每次调用 initiateNewMultipartUpload 返回的 uploadId 都不一样

    2.2 findUploadId 会返回最新的 uploadId

    2.3 通过查找别的信息得悉 uploadId 有 7 天的有效期

  3. 调用 listParts 取得已上传的 part
  4. 组合参数,调用 makeRequest 上传分片
  5. 调用 completeMultipartUpload 实现分片上传,这个办法会将所有分片合并,并返回合并后的文件 etag

新计划

通过看 putObject 办法的源码,咱们把咱们现有的计划做一些批改

  1. 用户抉择文件后,对文件进行分片
  2. 计算文件 hash,和之前统一,以文件 hash 作为新的文件名,或者加上固定的前缀,必须以固定的规定命名,同一个文件最好不要名字不统一,因为 minio 的服务端名字是惟一的 key
  3. 用文件名检索是否已存在文件,次要是调用的 minio 的 statObject 办法,如果未存在,以文件名获取 uploadId,再用 uploadId 获取已上传的分片( 此处区别于之前,因为分片文件不存在服务器本地,所以分片信息要存入数据库,其实还能够调用 sdk 的 listParts 办法获取到已上传的分片,然而调用 listParts 返回的信息没有带上冀望失去的 partNumber 参数,可能是公司搭建的 S3 服务的起因,所以分片只能入库
  4. 前端拿到已上传信息后,和之前解决统一,如果已存在文件,则不上传,否则计算须要上传分片而后上传
  5. 自行开发一个 uploadPart 的办法,服务端接管到分片后,拿到分片文件的 ArrayBuffer,拿到 uploadId,拼装参数,调用 sdk 的 makeRequest 办法将分片上传到 S3 文件服务器服务器,上传实现后删除文件分片,将上传的分片信息入库
  6. 前端承受到所有分片都上传实现后,调用合并文件接口,服务端合并文件,调用 sdk 的 completeMultipartUpload 办法,会将在 S3 文件服务器服务器上的分片都合并

到此新的计划就实现了,上面贴上一些代码

前端:

文件分片:

function createChunks(file, size = SINGLECHUNKSIZE) {
    let cur = 0,
        index = 1;
    const chunks = [];
    while (cur < file.size) {
        chunks.push({
            start: cur, // 文件开始地位的字节
            file: file.slice(cur, cur + size), // 分片文件
            hash: "", // 文件 hash
            progress: 0, // 上传进度
            uploaded: false, // 是否已上传
            index: index, // 第几个分片
        });
        index++;
        cur += size;
    }

    return chunks;
}

计算文件 hash:

const md5ByRequestIdle = (chunks, { onProgress}) => {return new Promise((resolve) => {const spark = new SparkMD5.ArrayBuffer();
        let count = 0;
        const workLoop = async () => {if (count < chunks.length) {const reader = new FileReader();
                reader.onload = e => {const add = (deadline) => {if (deadline.timeRemaining() > 1) {spark.append(e.target.result);
                            count++;
                            const progress = parseInt((count / chunks.length) * 100) / 100;
                            if (count < chunks.length) {onProgress && onProgress(progress);
                                window.requestIdleCallback(workLoop);
                            } else {onProgress && onProgress(1);
                                resolve(spark.end());
                            }
                        } else {window.requestIdleCallback(add);
                        }
                    }
                    window.requestIdleCallback(add)
                }
                reader.readAsArrayBuffer(chunks[count].file);
            } else {resolve(spark.end());
            }
        }
        window.requestIdleCallback(workLoop);
    });
}

服务端:

上传文件分片:

async uploadPart(file, index, filename, uploadId?) {const part = Number(index);
    if (!uploadId) {uploadId = await this.getUploadIdByFilename(filename)
    }
    const curList = await this.ctx.model.etagCenter.findBy({
      filename,
      part,
      uploadId,
    })
    if (curList.length > 0) {return true}
    const client = new Client({
      endPoint: this.config.S3 文件服务器 v3.endPoint,
      accessKey: this.config.S3 文件服务器 v3.accessKey,
      secretKey: this.config.S3 文件服务器 v3.secretKey,
    })
    const chunk = await fse.readFile(file.filepath)
    const query = querystring.stringify({
      partNumber: part,
      uploadId,
    })
    const options = {
      method: 'PUT',
      query,
      headers: {
        'Content-Length': chunk.length,
        'Content-Type': mime.lookup(filename),
      },
      bucketName: this.config.S3 文件服务器 v3.bucketName,
      objectName: filename,
    }
    const etag = await new Promise((resolve, reject) => {client.makeRequest(options, chunk, [200], '', true, function(
        err,
        response,
      ) {if (err) return reject(err) // In order to aggregate the parts together, we need to collect the etags.

        let etag = response.headers.etag
        if (etag) {etag = etag.replace(/^"/,'').replace(/"$/,'')
        }
        fse.unlink(file.filepath)
        resolve(etag)
      })
    })
    const insertResult = await this.ctx.model.etagCenter.add({
      filename,
      etag,
      part,
      uploadId,
    })
    return insertResult.insertedCount > 0
}

计划和代码还有很多须要改良的中央,也还在思考

后续优化

当初分片文件须要先上传到咱们本人的服务,而后才上传到 S3 文件服务器,两头会消耗一些工夫和效率,后续冀望能间接在前端将分片文件间接上传到 S3 文件服务器上,当然会在保障平安的前提下,不过目前没有工夫思考这个,如果行得通的话后续的优化会朝这个方向来

退出移动版