背景

  • 公司业务问题,就不过多论述,总的需要就是须要一个断点续传的性能
  • 公司的文件存储是应用的amazon s3,这个是通用的文件存储,相似阿里云的oss,一个文件能够当成是一个对象,能够给文件对象做一些操作,amazon s3是一个服务,能够创立一个一个桶用于存储文件,对于某个桶,有三种级别的权限:1、私有读,私有写;2、私有读,公有写;3、公有读,公有写
  • 为什么这个性能须要前端来做?因为后端只提供数据接口,不提供web服务,web服务都是前端本人开发和解决
  • 如果你不理解amazon s3或者阿里云的oss,那可能文章看的会有艰难

简略文件上传

amazon的s3,文件间接上传的话有这几种计划,分为:前端间接上传和应用sdk上传

一、前端间接上传,分私有写和公有写

私有写:能够间接拼接url:http://\<address\>/\<bucketname\>/\<objectname\>,在前端间接将文件发动put申请这个url即可,其中address是服务提供的域名,bucketname是咱们的桶名称,objectname是对象名(即文件名)

公有写:必须要用到minio的sdk,应用sdk提供的presignedPutObject办法生成一个上传地址,前端拿到链接后,也是在前端间接发动put申请即可,要留神,sdk只能运行在服务端,也就是必须要有本人的服务

二、应用sdk上传,sdk只能运行在服务端,流程基本上是,前端上传文件到服务端,服务端应用sdk将文件上传到S3的文件服务上

这两种计划,除了私有写的状况,其余的都须要有自建的服务端,而且从平安的角度上来说,设置私有写是不合理的

断点续传方案设计

要做这个断点续传的性能,是必须要有本人的服务的,和大多数人一样,做这个断点续传的性能的时候,我也是去网上搜他人的做法,而后找到适合的办法后,联合咱们的我的项目,我设计了这样一个计划:

  1. 用户抉择文件后,前端应用file.slice对文件进行分片
  2. 计算文件hash,也就是文件的md5,只有文件内容不变,文件的hash是不会变的,计算hash是一个同步工作,文件太大的话会把浏览器卡住,我应用的是spark-md5+浏览器的requestIdleCallback的api来解决的,能够依据本人我的项目的状况应用webworker也是能够的
  3. 应用hash查问是否已有对应的文件,这个能够本人有本人的数据库,存储hash对应的文件链接,如果是用hash作为文件名的话,也能够调用minio的sdk的statObject办法晓得这个文件是否已上传,如果曾经有了文件信息,就能够不必上传文件,间接拿到链接即可,这个就是秒传的性能
  4. 如果未上传,则读取服务器本地,以这个hash命名的文件夹下曾经上传了哪些分片,将已上传的分片返回给前端,前端选择性的上传未上传的分片即可,这个就是断点续传的性能
  5. 前端上传分片的时候,会将文件的hash、第几个分片等这些信息作为参数给到服务端,服务端拿到文件后,会以hash命名文件夹,以第几个分片命名该分片,将分片存储在服务端
  6. 所有分片上传实现后,在服务端对文件进行合并,再调用minio的putObject办法将文件上传到S3文件服务器,上传实现后,将服务器本地的文件删除

整个断点续传性能是实现了,不过这个计划有个不完满的中央,就是分片文件要在服务端存储和合并,合并后才上传到文件服务上,据我所知,规范的S3自身是有断点续传和合并文件的性能的,可不可以间接上传分片的时候,将分片上传到S3文件服务器上,分片都上传实现后,间接在S3文件服务器合并文件?答案在前面,不过过后的确没有找到能够用的计划,惟一找到最靠近的计划是百度的智能云提供的api:https://cloud.baidu.com/doc/B... 然而minio的sdk不提供uploadPart办法,这个办法也行不通,所以只能先作罢

问题

下面的计划有一个致命的问题没有思考到,就是线上是有多台机器的,这些分片会上传到不同的机器,合并文件的时候是没法合并的,导致下面设计的计划都不能用了,发现这个问题还是因为在解决一个合并文件的时候发现有分片未找到的问题的时候才思考到的,所以要重新考虑新的计划,着重点还是上传的分片怎么间接上传到S3文件服务器上,在S3文件服务器上合并

为了解决这个计划,我看了minio的源码,看了putObject的源码后,理解到putObject的外围流程如下:

  1. 应用block-stream2将文件进行分块
  2. 应用objectName调用findUploadId查问uploadId,如果没有uploadId,会调用initiateNewMultipartUpload初始化一个uploadId,通过本人的一些测试能够失去一些信息:

    2.1 每次调用initiateNewMultipartUpload返回的uploadId都不一样

    2.2 findUploadId会返回最新的uploadId

    2.3 通过查找别的信息得悉uploadId有7天的有效期

  3. 调用listParts取得已上传的part
  4. 组合参数,调用makeRequest上传分片
  5. 调用completeMultipartUpload实现分片上传,这个办法会将所有分片合并,并返回合并后的文件etag

新计划

通过看putObject办法的源码,咱们把咱们现有的计划做一些批改

  1. 用户抉择文件后,对文件进行分片
  2. 计算文件hash,和之前统一,以文件hash作为新的文件名,或者加上固定的前缀,必须以固定的规定命名,同一个文件最好不要名字不统一,因为minio的服务端名字是惟一的key
  3. 用文件名检索是否已存在文件,次要是调用的minio的statObject办法,如果未存在,以文件名获取uploadId,再用uploadId获取已上传的分片(此处区别于之前,因为分片文件不存在服务器本地,所以分片信息要存入数据库,其实还能够调用sdk的listParts办法获取到已上传的分片,然而调用listParts返回的信息没有带上冀望失去的partNumber参数,可能是公司搭建的S3服务的起因,所以分片只能入库
  4. 前端拿到已上传信息后,和之前解决统一,如果已存在文件,则不上传,否则计算须要上传分片而后上传
  5. 自行开发一个uploadPart的办法,服务端接管到分片后,拿到分片文件的ArrayBuffer,拿到uploadId,拼装参数,调用sdk的makeRequest办法将分片上传到S3文件服务器服务器,上传实现后删除文件分片,将上传的分片信息入库
  6. 前端承受到所有分片都上传实现后,调用合并文件接口,服务端合并文件,调用sdk的completeMultipartUpload办法,会将在S3文件服务器服务器上的分片都合并

到此新的计划就实现了,上面贴上一些代码

前端:

文件分片:

function createChunks(file, size = SINGLECHUNKSIZE) {    let cur = 0,        index = 1;    const chunks = [];    while (cur < file.size) {        chunks.push({            start: cur, // 文件开始地位的字节            file: file.slice(cur, cur + size), // 分片文件            hash: "", // 文件hash            progress: 0, // 上传进度            uploaded: false, // 是否已上传            index: index, // 第几个分片        });        index++;        cur += size;    }    return chunks;}

计算文件hash:

const md5ByRequestIdle = (chunks, { onProgress }) => {    return new Promise((resolve) => {        const spark = new SparkMD5.ArrayBuffer();        let count = 0;        const workLoop = async () => {            if (count < chunks.length) {                const reader = new FileReader();                reader.onload = e => {                    const add = (deadline) => {                        if (deadline.timeRemaining() > 1) {                            spark.append(e.target.result);                            count++;                            const progress = parseInt((count / chunks.length) * 100) / 100;                            if (count < chunks.length) {                                onProgress && onProgress(progress);                                window.requestIdleCallback(workLoop);                            } else {                                onProgress && onProgress(1);                                resolve(spark.end());                            }                        } else {                            window.requestIdleCallback(add);                        }                    }                    window.requestIdleCallback(add)                }                reader.readAsArrayBuffer(chunks[count].file);            } else {                resolve(spark.end());            }        }        window.requestIdleCallback(workLoop);    });}

服务端:

上传文件分片:

async uploadPart(file, index, filename, uploadId?) {    const part = Number(index);    if (!uploadId) {      uploadId = await this.getUploadIdByFilename(filename)    }    const curList = await this.ctx.model.etagCenter.findBy({      filename,      part,      uploadId,    })    if (curList.length > 0) {      return true    }    const client = new Client({      endPoint: this.config.S3文件服务器v3.endPoint,      accessKey: this.config.S3文件服务器v3.accessKey,      secretKey: this.config.S3文件服务器v3.secretKey,    })    const chunk = await fse.readFile(file.filepath)    const query = querystring.stringify({      partNumber: part,      uploadId,    })    const options = {      method: 'PUT',      query,      headers: {        'Content-Length': chunk.length,        'Content-Type': mime.lookup(filename),      },      bucketName: this.config.S3文件服务器v3.bucketName,      objectName: filename,    }    const etag = await new Promise((resolve, reject) => {      client.makeRequest(options, chunk, [200], '', true, function(        err,        response,      ) {        if (err) return reject(err) // In order to aggregate the parts together, we need to collect the etags.        let etag = response.headers.etag        if (etag) {          etag = etag.replace(/^"/, '').replace(/"$/, '')        }        fse.unlink(file.filepath)        resolve(etag)      })    })    const insertResult = await this.ctx.model.etagCenter.add({      filename,      etag,      part,      uploadId,    })    return insertResult.insertedCount > 0}

计划和代码还有很多须要改良的中央,也还在思考

后续优化

当初分片文件须要先上传到咱们本人的服务,而后才上传到S3文件服务器,两头会消耗一些工夫和效率,后续冀望能间接在前端将分片文件间接上传到S3文件服务器上,当然会在保障平安的前提下,不过目前没有工夫思考这个,如果行得通的话后续的优化会朝这个方向来