为什么需要大文件上传

通常我们上传是通过 <input /> 标签以提交表单的形式上传到服务器上,像下面这样:

<input type="file" />

通常我们都是以上传小文件为主,比如图片、excel 表格文件之类,直接上传没有问题,发送给后端的速度很快

但难免有需要上传大文件的时候,比如视频,数据大的 excel 表格文件,如果还是直接上传,可能会有几个结果:

  • 页面卡死:上传过程中文件会占用浏览器内存
  • 上传慢:网络请求需要时间,文件越大等待时间越长
  • 上传失败:请求时间越长,容易因为各种因素中断上传操作,比如网络请求有时间限制,容易因为超时而请求失败,
  • 重新上传:上传失败后导致需要从头开始等待上传

切片

所谓切片就是将大文件切割成多个切片后上传,每个切片就是组成大文件的一部分数据,后端收到所有切片后再合并成完整的大文件,这样做有 3 个好处:

  • 提升上传速度:利用 HTTP 并发性,同时上传多个切片
  • 实现断点续传:如果上传流程中断,后续可以通过检查已上传的切片判断从哪里开始继续上传

首先要实现对大文件的切割,需要使用 Blob 对象的 slice 方法,这个方法类似数组的 slice 方法从数组中提取指定索引范围内的元素,不同的是这个方法作用于 Blob 对象,即从一个大的二进制数据中提取指定字节范围内的二进制数据

/**
 * 每个切片的大小
 */
const CHUNK_SIZE = 10 * 1024 * 1024
/**
 * 将文件拆分成多个切片
 * @param file 上传的文件对象
 * @param size 每个切片的大小
 */
const createFileChunk = (file: File, size = CHUNK_SIZE) => {
  const fileChunkList: Blob[] = []
  let cur = 0
  while (cur < file.size) {
    fileChunkList.push(file.slice(cur, cur + size))
    cur += size
  }
  return fileChunkList
}

文件和 Blob 对象有什么关系?首先每个上传的文件都是一个 File 对象,File 对象继承自 Blob 对象,具有 Blob 对象所有功能,Blob 对象用于存储二进制数据,文件本质也是二进制数据,只是相比普通的二进制数据多了文件名、最后修改时间等额外信息

现在我们就得到文件对应一组切片,需要同时上传到后端

那么后端应该在什么时候合并所有切片呢?自然是所有切片上传完成后,那么后端怎么知道什么时候所有上传完成呢?这里可以通过前端在所有切片上传完成后额外发送一个合并请求通知后端进行合并来实现

// 并发请求并等待所有请求完成
await Promise.all(requestList)
// 发起合并请求
await mergeRequest()

后端知道什么时候合并切片后,又该怎么知道如何合并切片呢?因为字节数据的合并顺序会影响最终合并文件是否和切割前一致,因此切片的合并顺序要和切割后的切片顺序保持一致,又因为每个切片完成上传的顺序不一定,因此在请求前要通过什么方式对切片标记顺序,这里可以在切片的名称加个顺序后缀

// 每个切片添加顺序后缀
data = fileChunkList.map((file, index) => ({
		chunk: file,
        chunkName: fileName + '-' + index
}))

后端在收到每个切片后会将切片数据临时存储起来,收到合并请求后按照切片名称排序后再合并,最后删除临时的切片数据

到这里,能实现个基本需求,不过还有优化的地方,假设文件切割出上百份切片并同时发起请求,你觉得会发生什么?浏览器会在建立 TCP 连接的时候就卡死了,因此我们要限制一下并发请求的数量

/**
 * 控制并发请求数量
 * @param forms 提交数据列表
 * @param max 最大请求数量
 */
const sendRequest = (forms: { formData: FormData; index: number; status: number }[], max = 4) => {
  return new Promise((resolve, reject) => {
    let cur = max
    let counter = 0
    const start = () => {
      while (counter < forms.length && cur > 0) {
        const idx = forms.findIndex(
          (item) => item.status === Status.WAIT
        )
        // 表示只剩下正在上传的切片停止循环
        if (idx === -1) break
        const formData = forms[idx].formData
        const index = forms[idx].index
        request({
          url: 'http://localhost:3000',
          data: formData,
          onProgress: createProgressHandler(data.value[index])
        })
          .then(() => {
            cur++
            counter++
            forms[idx].status = Status.DONE
            if (counter === forms.length) {
              resolve(true)
            } else {
              start()
            }
          })
        cur--
        forms[idx].status = Status.UPLOADING
      }
    }
    start()
  })
}

简单总结一下就是:

  • 指定最大的并发数
  • 不断轮询判断是否存在空闲的请求通道并且是否存在未上传的切片
  • 未上传的切片不停地占用空闲的请求通道
  • 直到所有切片上传完

秒传

所谓秒传其实通过向服务端校验文件是否已存在,已存在则直接提示上传成功来达到欺骗用户以为上传很快的效果

首先需要根据文件内容计算出 hash 作为上传到后端存储的文件名,保证文件内容不重复才能校验是否存在,不然可能出现文件名相同但文件内容不同的误判

import SparkMD5 from 'spark-md5'
 
const reader = new FileReader()
reader.readAsArrayBuffer(file)
reader.onload = (e) => {
	if (e.target && e.target.result instanceof ArrayBuffer) {
		spark.append(e.target.result)
		spark.end()
    }
}

这个计算过程还是挺耗时的,建议放到 worker 线程中进行,避免阻塞主线程,同时记得计算完成后关闭就行

/**
 * 计算 hash
 * @param fileChunkList 切片列表
 */
const calculateHash = (fileChunkList: Blob[]): Promise<string> => {
  return new Promise((resolve) => {
    const worker = new Worker('/hash.js')
 
    worker.postMessage({ fileChunkList })
 
    worker.onmessage = (e) => {
      const { precentage, hash } = e.data
 
      hashPrecentage.value = precentage
 
      if (hash) {
        resolve(hash)
      }
    }
  })
}

worker 线程中

self.importScripts('/spark-md5.min.js')
 
self.onmessage = (e) => {
  const { fileChunkList } = e.data
 
  const spark = new self.SparkMD5.ArrayBuffer()
 
  let count = 0
 
  const loadNext = (index) => {
    const reader = new FileReader()
 
    reader.readAsArrayBuffer(fileChunkList[index])
 
    reader.onload = (e) => {
      count++
 
      spark.append(e.target.result)
 
      if (count === fileChunkList.length) {
        self.postMessage({
          precentage: 100,
          hash: spark.end()
        })
 
        self.close()
      } else {
        loadNext(count)
      }
    }
  }
 
  loadNext(0)
}

除了使用 worker 线程,还可以使用 requestIdleCallback 函数计算 hash,这个函数会将回调函数延迟到主线程空闲时执行,也能避免阻塞主线程,而且代码可读性和可组织性更好,这种设计又叫“时间分片”,将一个长耗时任务拆分到多个空闲时间段中进行

import SparkMD5 from 'spark-md5'
/**
 * 计算 hash
 * @param fileChunkList 切片列表
 */
const calculateHash = (file: File): Promise<string> => {
  return new Promise((resolve) => {
    const spark = new SparkMD5.ArrayBuffer()
 
    const appendToSpark = async (file: Blob) => {
      return new Promise((resolve) => {
        const reader = new FileReader()
        reader.readAsArrayBuffer(file)
        reader.onload = (e) => {
          if (e.target && e.target.result instanceof ArrayBuffer) {
            spark.append(e.target.result)
          }
          resolve(true)
        }
      })
    }
 
    let count = 0
 
    const workLoop = async (deadline: IdleDeadline) => {
      while (count < chunks.length && deadline.timeRemaining() > 0) {
        await appendToSpark(chunks[count])
 
        count++
      }
 
      if (count === chunks.length) {
        resolve(spark.end())
      } else {
        requestIdleCallback(workLoop)
      }
    }
 
    requestIdleCallback(workLoop)
  })
}

为了进一步提高 hash 的计算效率,我们可以采用抽样 hash 的策略,只取每个切片一小部分合并起来计算 hash,虽然牺牲了一点命中率,但大大提高计算的效率

/**
 * 计算 hash
 * @param fileChunkList 切片列表
 */
const calculateHash = (file: File): Promise<string> => {
  return new Promise((resolve) => {
    const spark = new SparkMD5.ArrayBuffer()
 
    const appendToSpark = async (file: Blob) => {
      return new Promise((resolve) => {
        const reader = new FileReader()
        reader.readAsArrayBuffer(file)
        reader.onload = (e) => {
          if (e.target && e.target.result instanceof ArrayBuffer) {
            spark.append(e.target.result)
          }
          resolve(true)
        }
      })
    }
 
    let count = 0
 
    const workLoop = async (deadline: IdleDeadline) => {
      while (count < chunks.length && deadline.timeRemaining() > 0) {
        await appendToSpark(chunks[count])
 
        hashPrecentage.value = Math.ceil(((count + 1) / chunks.length) * 100)
 
        count++
      }
 
      if (count === chunks.length) {
        resolve(spark.end())
      } else {
        requestIdleCallback(workLoop)
      }
    }
 
    const offset = 2 * 1024 * 1024
 
    let chunks = [file.slice(0, offset)]
 
    let cur = offset
 
    const getSampleLoop = (deadline: IdleDeadline) => {
      while (cur < file.size && deadline.timeRemaining() > 0) {
        if (cur + offset >= file.size) {
          chunks.push(file.slice(cur, cur + offset))
        } else {
          const mid = cur + offset / 2
          const end = cur + offset
 
          chunks.push(file.slice(cur, cur + 2))
          chunks.push(file.slice(mid, mid + 2))
          chunks.push(file.slice(end - 2, end))
        }
 
        cur += offset
      }
 
      if (cur < file.size) {
        requestIdleCallback(getSampleLoop)
      } else {
        requestIdleCallback(workLoop)
      }
    }
 
    requestIdleCallback(getSampleLoop)
  })
}

hash 计算完成就要带着 hash 发送校验请求,后端收到校验请求后,校验 hash 对应的文件名是否存在,不存在则响应客户端继续上传

/**
 * 校验服务器是否已经存在文件
 * @param fileName 文件名
 * @param fileHash 文件 hash
 */
const verifyUpload = async (fileName: string, fileHash: string): Promise<Record<string, any>> => {
  const res: any = await request({
    url: 'http://localhost:3000/verify',
    headers: {
      'content-type': 'application/json'
    },
    data: JSON.stringify({
      fileName,
      fileHash
    })
  })
  return JSON.parse(res.data)
}

断点

所谓断点就是在上传过程中的某个时间点进行中断,中断又分为主动和被动,被动不用说也知道,各种网络波动,服务器异常都会导致请求被动中断,而主动就是用户自己中止了剩余切片的上传操作

实现原理就是将所有请求对象缓存起来,然后分别调用用请求对象的中止方法即可

/**
 * 暂停上传
 */
const handlePause = () => {
  requestList.forEach((xhr) => xhr.abort())
  requestList.length = 0
}

续传

所谓续传,就是接着上次断点的位置继续上传

要实现这个,就需要知道上次断点的位置在哪里,换句话说,哪些切片是已经上传过的,然后后续上传时过滤掉这些切片即可,前面实现秒传时有一个校验的步骤,可以在这个步骤中返回已上传的上切片

/**
 * 恢复上传
 */
const handleResume = async () => {
  const { shouldUpload, uploadedList } = await verifyUpload(currentFile.name, currentFileHash)
  if (shouldUpload) {
    await uploadChunks(uploadedList)
  } else {
    alert('跳过')
  }
}

源码

https://github.com/ReinerLau/file-upload