目录

  • 背景
  • 实现小文件整体上传

    • 搭建前端环境
    • 搭建服务端环境
  • 如何实现大文件分片上传

    • 客户端实现分片
    • 客户端计算hash
    • 客户端上传分片
    • 服务端实现校验接口
    • 服务端实现分片上传接口
    • 服务端实现合并接口
    • 客户端实现暂停/复原性能
    • 客户端实现进度条性能
    • 客户端实现文件秒传
    • bingo
  • 总结

背景

工作中如果有负责开放平台,那么往往会有上传文件的诉求。个别10M内大小的图片,咱们能通过一个上传接口即可,但如果文件大小超过100M或者1G甚至更大,通过一个接口在人机交互上显然不敌对,冀望为用户提供进度条,实时告知上传进度;而且用户能够抉择暂停,比方断网或上传了谬误文件,用户也能随时复原上传;若用户反复上传雷同文件时,零碎能提醒秒传胜利。也就是实现相似于百度网盘的上传性能。

  • 小文件整体上传效果图

  • 大文件分片上传效果图

上面将从零搭建前端和服务端,实现小文件上传再循序渐进到上传大文件。

技术栈次要是前端:React、AntD、Typescript;服务端:TS-Node、Express...

文章首发于@lan-react/upload,转载请注明起源。客户端代码寄存、服务端代码寄存。

实现小文件整体上传

搭建前端环境

通过create-react-app --template typescript创立我的项目

引入antdyarn add antd而后yarn start运行我的项目

编写上传的组件

import React, { ChangeEvent, useState, useEffect } from 'react';import { Row, Col, Input } from 'antd';interface UploadProps {  width?: number;}interface CurrentFile {  file: File;  dataUrl?: string;  type?: string;}const isImage = (type: string = ''): boolean => type.includes('image');const Upload: React.FC<UploadProps> = (props) => {  const {    width = 300,  } = props;  const [currentFile, setCurrentFile] = useState<CurrentFile>();  const onFileChange = (event: ChangeEvent<HTMLInputElement>) => {    const file: File = event.target.files![0];    if (file) {      const reader = new FileReader();      reader.addEventListener('load', () => {        setCurrentFile({          file: file,          dataUrl: reader.result as string,          type: file.type,        })      });      reader.readAsDataURL(file);    }  }  return (    <div>      <Input type="file" style={{ width: width }} onChange={onFileChange} />      { isImage(currentFile?.type) ? <img src={currentFile?.dataUrl} style={{ width: 100 }} alt={currentFile?.file.name} /> : null }    </div>  )}export default Upload;

小文件上传应用FormData

return (  <div>    <Input type="file" style={{ width: width }} onChange={onFileChange} />+    <Button type="primary" onClick={() => onFileUpload(UploadType.WHOLE)}>小文件整体上传</Button>  </div>)

编写一个按钮而后解决上传

// 上传类型enum UploadType {  WHOLE,  PART,}// 大小检测const checkSize = (size: number = 0, maxSize: number = 2 * 1024 * 1024 * 1024): boolean => {  if (size > maxSize) {    message.error(`文件大小不能超过2G`)    return false  }  return true}const onFileUpload = (type: UploadType = UploadType.WHOLE) => {  if (!currentFile?.file) {    message.error('请抉择文件~')    return  }  if (!checkSize(currentFile?.file?.size)) return  switch (type) {    case UploadType.WHOLE:      wholeUpload();      break;  }}// 整体上传const wholeUpload = async () => {  const formData = new FormData()  formData.append('file', currentFile?.file as File)  formData.append('name', currentFile?.file.name as string)  const res = await request({    url: '/wholeUpload',    method: 'POST',    data: formData,  })  message.success('上传胜利');}

而后简略封装下request

export interface Config {  baseUrl?: string;  url?: string;  method?: string;  headers?: any;  data?: any;}export const request = (conf: Config): Promise<any> => {  const config: Config = {    method: 'GET',    baseUrl: 'http://localhost:8000',    headers: {},    data: {},    ...conf  }  return new Promise((resolve, reject) => {    const xhr = new XMLHttpRequest();    xhr.open(config.method as string, `${config.baseUrl}${config.url}`);    for (const key in config.headers) {      if (Object.prototype.hasOwnProperty.call(config.headers, key)) {        const value = config.headers[key];        xhr.setRequestHeader(key, value);      }    }    xhr.responseType = 'json';    xhr.onreadystatechange = () => {      if (xhr.readyState === 4) {        if (xhr.status === 200) {          resolve(xhr.response);        } else {          reject(xhr.response);        }      }    }    xhr.send(config.data);  })}

搭建服务端环境

应用nodemonts-node

"scripts": {  "dev": "cross-env PORT=8000 nodemon --exec ts-node --files ./src/www.ts"},

借助http模块起服务编写./src/www.ts

import app from './app'import http from 'http'const port = process.env.PORT || 8000const server = http.createServer(app)const onError = (error: any) => {  console.error(error)}const onListening = () => {  console.log(`Listening on port ${port}`)}server.listen(port)server.on('error', onError)server.on('listening', onListening)

而后编写app.ts

import express, { Request, Response, NextFunction } from 'express'import path from 'path'import fs from 'fs-extra'import logger from 'morgan'import cors from "cors"import multiparty from 'multiparty'import createError from 'http-errors'import { INTERNAL_SERVER_ERROR } from 'http-status-codes'const app = express()const PUBLIC_DIR = path.resolve(__dirname, 'public')app.use(logger('dev'))app.use(express.json())app.use(express.urlencoded({ extended: true }))app.use(cors())app.use(express.static(PUBLIC_DIR))app.post('/upload', async (req: Request, res: Response, next: NextFunction) => {  const form = new multiparty.Form()  form.parse(req, async (err: any, fields, files) => {    if (err) return next(err)    const name = fields.name[0]    const file = files.file[0]    await fs.move(file.path, path.resolve(PUBLIC_DIR, name), { overwrite: true })    res.json({      success: true    })  })})app.use((_req: Request, _res: Response, next: NextFunction) => {  next(createError(404))})app.use((error: any, _req: Request, res: Response, _next: NextFunction) => {  res.status(error.status || INTERNAL_SERVER_ERROR)  res.json({    succuess: false,    error,  })})export default app

将文件寄存到本地,即服务端我的项目根目录的public目录。

而后启动服务npm run dev

再如下操作,实现上传小文件。

接下来将实现大文件的分片上传

如何实现大文件分片上传

大文件分片上传的思路

  • 客户端将大文件进行宰割。(这里粒度为10M,宰割后文件按fileName-${index}进行排序以便服务端合并)
  • 为了实现秒传性能,须要对文件内容计算出hash值。(计算hash比拟耗时,借助worker实现,并提供进度条)
  • 客户端对宰割后的小文件顺次调用接口上传。
  • 服务端提供上传接口。(将所有小文件寄存到长期目录)
  • 客户端上传所有分片文件后,调用申请合并的接口。
  • 服务端提供合并接口。(按上述已排序的文件名进行合并,合并成大文件后寄存本地)
  • 客户端提供暂停/复原性能。(暂停即调用xhr.abort(),复原即从新上传)
  • 特地的:在上传之前客户端会调用校验接口。(得悉文件是否已上传?文件上传了哪一部分?)
  • 服务端提供校验接口。(依据文件名在长期目录下读取分片,如果有则将分片信息返回客户端)
  • 客户端依据返回内容进行解决。(有成品文件即为秒传、有分片文件则只上传残余局部)
  • 上传完毕

上述大抵实现思路,上面将介绍实现细节和留神点。

客户端实现分片

const onFileUpload = (type: UploadType = UploadType.WHOLE) => {  // ...  switch (type) {    case UploadType.WHOLE:      wholeUpload();      break;+    case UploadType.PART:+      partUpload()+      break;  }}
interface Part {  chunk: Blob;  size: number;  fileName?: string;  chunkName?: string;  loaded?: number;  percent?: number;  xhr?: XMLHttpRequest;}const partUpload = async () => {  setUploadStatus(UploadStatus.UPLOADING);  // 1. 对文件进行分片  // 2. 依据分片计算文件hash  // 3. 分片上传  const partList = createChunks(currentFile?.file as File);  const fileHash = await generateHash(partList);  console.log(fileHash, 'fileHash');  const lastDotIdx = currentFile?.file.name.lastIndexOf('.');  const extName = currentFile?.file.name.slice(lastDotIdx);  const fileName = `${fileHash}${extName}`;  partList.forEach((part: Part, index: number) => {    part.fileName = fileName;    part.chunkName = `${fileName}-${index}`;    part.loaded = 0;    part.percent = 0;  })  setFileName(fileName);  setPartList(partList);  await uploadParts(partList, fileName)}

再顺次实现createChunksgenerateHash两个外围办法

const createChunks = (file: File, size: number = DEAFULT_SIZE): Part[] => {  let current: number = 0;  const partList: Part[] = [];  while (current < file.size) {    const chunk: Blob = file.slice(current, current + size);    partList.push({      chunk,      size: chunk.size,    })    current += size  }  return partList;}

客户端计算hash

const generateHash = (partList: Part[]): Promise<any> => {  return new Promise((resolve, reject) => {    const worker = new Worker('/generateHash.js');    worker.postMessage({ partList });    worker.onmessage = (event) => {      const { percent, hash } = event.data;      setHashPercent(percent);      if (hash) {        resolve(hash);      }    }    worker.onerror = error => {      reject(error);    }  })}

次要借助Workerpublic下新建generateHash.js文件

self.importScripts('https://cdn.bootcss.com/spark-md5/3.0.0/spark-md5.js');self.onmessage = async (event) => {  const { partList } = event.data;  const spark = new self.SparkMD5.ArrayBuffer();  let percent = 0;  const perSize = 100 / partList.length;  const buffers = await Promise.all(partList.map(({ chunk }) => new Promise((resolve, reject) => {    const reader = new FileReader();    reader.readAsArrayBuffer(chunk);    reader.onload = (e) => {      percent += perSize;      self.postMessage({ percent: Number(percent.toFixed(2)) });      resolve(e.target.result);    }  })));  buffers.forEach(buffer => spark.append(buffer));  self.postMessage({ percent: 100, hash: spark.end() });  self.close();}

将每一个分片的计算进度postMessagegenerateHash,而后实时同步进度setHashPercent

客户端上传分片

const uploadParts = async (partList: Part[], fileName: string) => {  const res = await request({    url: `/verify/${fileName}`,  })  if (res.code === 200) {    if (!res.data.needUpload) {      message.success('秒传胜利');      setPartList(partList.map((part: Part) => ({        ...part,        loaded: DEAFULT_SIZE,        percent: 100,      })))      reset()      return    }    try {      const { uploadedList } = res.data      const requestList = createRequestList(partList, uploadedList, fileName);      const partsRes = await Promise.all(requestList);      if (partsRes.every(item => item.code === 200)) {        const mergeRes = await request({          url: `/merge/${fileName}`,        })        if (mergeRes.code === 200) {          message.success('上传胜利');          reset()        } else {          message.error('上传失败,请稍后重试~');        }      } else {        message.error('上传失败,请稍后重试~');      }    } catch (error) {      message.error('上传失败或暂停');      console.error(error);    }  }}
  • 在上传之前客户端会调用校验接口。(得悉文件是否已上传?文件上传了哪一部分?)
  • 客户端对宰割后的小文件顺次调用接口上传。
  • 客户端上传所有分片文件后,调用申请合并的接口。

服务端实现校验接口

export const PUBLIC_DIR = path.resolve(__dirname, 'public')export const TEMP_DIR = path.resolve(__dirname, 'temp')const DEAFULT_SIZE = 1024 * 1024 * 10;// ...app.get('/verify/:fileName', async (req: Request, res: Response, _next: NextFunction) => {  const { fileName } = req.params  const filePath = path.resolve(PUBLIC_DIR, fileName)  const existFile = await fs.pathExists(filePath)  if (existFile) {    res.json({      code: 200,      msg: 'success',      data: {        needUpload: false,      },    })    return  }  const folderPath = path.resolve(TEMP_DIR, fileName)  const existFolder = await fs.pathExists(folderPath)  let uploadedList: any[] = []  if (existFolder) {    uploadedList = await fs.readdir(folderPath)    uploadedList = await Promise.all(uploadedList.map(async (fileName: string) => {      const stat = await fs.stat(path.resolve(folderPath, fileName))      return {        fileName,        size: stat.size,      }    }))  }  res.json({    code: 200,    msg: 'success',    data: {      needUpload: true,      uploadedList,    }  })})

服务端实现分片上传接口

app.post('/partUpload/:fileName/:start/:chunkName', async (req: Request, res: Response, _next: NextFunction) => {  const { fileName, chunkName, start } = req.params  const folderPath = path.resolve(TEMP_DIR, fileName)  const existFolder = await fs.pathExists(folderPath)  if (!existFolder) {    await fs.mkdirs(folderPath)  }  const filePath = path.resolve(folderPath, chunkName)  const ws = fs.createWriteStream(filePath, {    start: Number(start),    flags: 'a',  })  req.on('end', () => {    ws.close()    res.json({      code: 200,      msg: 'success',      data: true,    })  })  req.on('error', () => {    ws.close()  })  req.on('close', () => {    ws.close()  })  req.pipe(ws)})
  • fileName 依据文件名创立长期目录寄存分片文件
  • chunkName 分片名,命名格局为fileName-${index}
  • start 记录分片上传了多少

服务端实现合并接口

app.get('/merge/:fileName', async (req: Request, res: Response, _next: NextFunction) => {  const { fileName } = req.params  try {    await mergeChunks(fileName)    res.json({      code: 200,      msg: 'success',      data: true,    })  } catch (error) {    res.json({      code: 1,      msg: 'error',      data: false,    })  }})

与客户端绝对应,合并规定与宰割规定相同。

const getIndex = (str: string) => {  const matched = str.match(/-(\d{1,})$/)  return matched ? Number(matched[1]) : 0}const pipeStream = (filePath: string, ws: WriteStream) => new Promise((resolve, _reject) => {  const rs = fs.createReadStream(filePath)  rs.on('end', async () => {    await fs.unlink(filePath)    resolve()  })  rs.pipe(ws)})export const mergeChunks = async (fileName: string, size: number = DEAFULT_SIZE) => {  const filePath = path.resolve(PUBLIC_DIR, fileName)  const folderPath = path.resolve(TEMP_DIR, fileName)  const folderFiles = await fs.readdir(folderPath)  folderFiles.sort((a, b) => getIndex(a) - getIndex(b))  await Promise.all(folderFiles.map((chunk: string, index: number) => pipeStream(    path.resolve(folderPath, chunk),    fs.createWriteStream(filePath, {      start: index * size    })  )))  await fs.rmdir(folderPath)}

应用流而非间接写文件的形式

客户端实现暂停/复原性能

<Row>  <Col span={24}>    {      uploadStatus === UploadStatus.INIT && <Button type="primary" onClick={() => onFileUpload(UploadType.PART)}>大文件分片上传</Button>    }    {      uploadStatus === UploadStatus.UPLOADING && <Button type="primary" onClick={() => onFilePause()}>暂停</Button>    }    {      uploadStatus === UploadStatus.PAUSE && <Button type="primary" onClick={() => onFileResume()}>复原</Button>    }  </Col></Row>

实现暂停即调用xhr.abort办法,须要先在request.ts中为其提供可挂载

// request.tsif (config.setXhr) {  config.setXhr(xhr);}// upload.tsxconst onFilePause = () => {  partList.forEach((part: Part) => part.xhr && part.xhr.abort())  setUploadStatus(UploadStatus.PAUSE)}

在此之前还需欠缺createRequestList办法

const createRequestList = (partList: Part[], uploadedList: Uploaded[], fileName: string): Promise<any>[] => {  return partList.filter((part: Part) => {    const uploadedFile = uploadedList.find(item => item.fileName === part.chunkName);    if (!uploadedFile) { // 此chunk还没上传过      part.loaded = 0;      part.percent = 0;      return true;    }    if (uploadedFile.size < part.chunk.size) { // 此chunk上传了一部分      part.loaded = uploadedFile.size;      part.percent = Number((part.loaded / part.chunk.size * 100).toFixed(2));      return true;    }    // 上传过了    return false;  }).map((part: Part) => request({    url: `/partUpload/${fileName}/${part.loaded}/${part.chunkName}`,    method: 'POST',    headers: {      'Content-Type': 'application/octet-stream',    },    setXhr: (xhr: XMLHttpRequest) => { // +挂载      part.xhr = xhr;    },    onProgress: (event: ProgressEvent) => { // +进度条      part.percent = Number(((part.loaded! + event.loaded) / part.chunk.size * 100).toFixed(2));      console.log('part percent: ', part.percent)      setPartList([...partList])    },    data: part.chunk.slice(part.loaded),  }))}

再实现复原性能,即从新申请一次

const onFileResume = async () => {  await uploadParts(partList, fileName)  setUploadStatus(UploadStatus.UPLOADING)}

客户端实现进度条性能

如下面的办法createRequestList代码,在onProgress中实时获取到上传进度

const columns = [  {    title: '分片名',    dataIndex: 'chunkName',    key: 'chunkName',    width: '20%',  },  {    title: '进度条',    dataIndex: 'percent',    key: 'percent',    width: '80%',    render: (value: number) => {      return <Progress percent={value} />    }  }]const totalPercent = partList.length > 0   ? partList.reduce((memo: number, curr: Part) => memo + curr.percent!, 0) / partList.length  : 0{ uploadStatus !== UploadStatus.INIT ? (  <>    <Row>      <Col span={4}>        Hash进度条      </Col>      <Col>        <Progress percent={hashPercent} />      </Col>    </Row>    <Row>      <Col span={4}>        Total进度条      </Col>      <Col>        <Progress percent={totalPercent} />      </Col>    </Row>    <Table       columns={columns}      dataSource={partList}      rowKey={row => row.chunkName as string}    />  </>) : null }

客户端实现文件秒传

在下面uploadParts办法中曾经实现

const uploadParts = async (partList: Part[], fileName: string) => {  const res = await request({    url: `/verify/${fileName}`,  })  if (res.code === 200) {    if (!res.data.needUpload) {      message.success('秒传胜利');      setPartList(partList.map((part: Part) => ({        ...part,        loaded: DEAFULT_SIZE,        percent: 100,      })))      reset()      return    }    // ...  }}

bingo

效果图如下

总结

  • 小文件上传应用FormData,大文件上传设置'Content-Type': 'application/octet-stream'FormData可携带参数,octet-stream参数可设置在url中。
formData.append('file', currentFile?.file as File)formData.append('name', currentFile?.file.name as string)request({ url: `/partUpload/${fileName}/${part.loaded}/${part.chunkName}` })
  • 因为File继承自Blob,客户端可应用Blob.slice对大文件进行宰割;服务端对分片文件存储,提供合并接口按切割程序进行合并(应用createWriteStream/createReadStream)。
  • 为了实现秒传性能,须要对文件进行惟一标识,服务端校验为已上传文件间接返回胜利和拜访地址。

    • 应用Worker创立后台任务计算大文件惟一标识,防止页面卡死。
    • 应用Spark-md5计算文件惟一标识MD5
  • 提供进度条性能

    • 计算MD5时可借助Worker.postMessage按分片粒度告诉前端计算进度
    • 上传分片可借助xhr.upload.onprogress实时告诉前端上传进度
    • 前端借助Antd-Progress/Table展现进度条
  • 提供暂停/复原性能

    • 暂停借助xhr.abort()终止申请
    • 从新上传获取上传状况,再只上传未上传局部。在服务端读取上传分片状况,客户端上传时再次借助Blob.slice(part.loaded)。服务端存储时按fs.createWriteStream(filePath, { start: Number(start), flags: 'a' })进行追加文件。