乐趣区

关于前端:精读web-streams

Node stream 比拟难了解,也比拟难用,但“流”是个很重要而且会越来越常见的概念(fetch 返回值就是流),所以咱们有必要认真学习 stream。

好在继 node stream 之后,又推出了比拟好用,好了解的 web streams API,咱们联合 Web Streams Everywhere (and Fetch for Node.js)、2016 – the year of web streams、ReadableStream、WritableStream 这几篇文章学一下。

node stream 与 web stream 能够互相转换:.fromWeb() 将 web stream 转换为 node stream;.toWeb() 将 node stream 转换为 web stream。

精读

stream(流)是什么?

stream 是一种形象 API。咱们能够和 promise 做一下类比,如果说 promise 是异步规范 API,则 stream 心愿成为 I/O 的规范 API。

什么是 I/O?就是输入输出,即信息的读取与写入,比方看视频、加载图片、浏览网页、编码解码器等等都属于 I/O 场景,所以并不一定非要大数据量才算 I/O,比方读取一个磁盘文件算 I/O,同样读取 "hello world" 字符串也能够算 I/O。

stream 就是当下对 I/O 的规范形象。

为了更好了解 stream 的 API 设计,以及让你了解的更粗浅,咱们先本人想一想一个规范 I/O API 应该如何设计?

I/O 场景应该如何形象 API?

read()write() 是咱们第一个想到的 API,持续补充的话还有 open()close() 等等。

这些 API 的确能够称得上 I/O 场景规范 API,而且也足够简略。但这些 API 有一个有余,就是不足对大数据量下读写的优化思考。什么是大数据量的读写?比方读一个几 GB 的视频文件,在 2G 慢网络环境下拜访网页,这些状况下,如果咱们只有 readwrite API,那么可能一个读取命令须要 2 个小时能力返回,而一个写入命令须要 3 个小时执行工夫,同时对用户来说,不论是看视频还是看网页,都无奈承受这么长的白屏工夫。

但为什么咱们看视频和看网页的时候没有期待这么久?因为看网页时,并不是期待所有资源都加载结束能力浏览与交互的,许多资源都是在首屏渲染后再异步加载的,视频更是如此,咱们不会加载完 30GB 的电影后再开始播放,而是先下载 300kb 片头后就能够开始播放了。

无论是视频还是网页,为了疾速响应内容,资源都是 在操作过程中继续加载的 ,如果咱们设计一个反对这种模式的 API,无论资源大还是小都能够笼罩,天然比 readwirte 设计更正当。

这种继续加载资源的行为就是 stream(流)。

什么是 stream

stream 能够认为在形容资源继续流动的状态,咱们须要把 I/O 场景看作一个继续的场景,就像把一条河的河水导流到另一条河。

做一个类比,咱们在发送 http 申请、浏览网页、看视频时,能够看作一个南水北调的过程,把 A 河的水继续调到 B 河。

在发送 http 申请时,A 河就是后端服务器,B 河就是客户端;浏览网页时,A 河就是他人的网站,B 河就是你的手机;看视频时,A 河是网络上的视频资源(当然也可能是本地的),B 河是你的视频播放器。

所以流是一个继续的过程,而且可能有多个节点,不仅网络申请是流,资源加载到本地硬盘后,读取到内存,视频解码也是流,所以这个南水北调过程中还有许多中途蓄水池节点。

将这些事件都思考到一起,最初造成了 web stream API。

一共有三种流,别离是:writable streams、readable streams、transform streams,它们的关系如下:

<img width=400 src=”https://z3.ax1x.com/2021/10/23/52vHne.png”>

  • readable streams 代表 A 河流,是数据的源头,因为是数据源头,所以只可读不可写。
  • writable streams 代表 B 河流,是数据的目的地,因为要继续蓄水,所以是只可写不可读。
  • transform streams 是两头对数据进行变换的节点,比方 A 与 B 河两头有一个大坝,这个大坝能够通过蓄水的形式管制水运输的速度,还能够装置滤网污染水源,所以它一头是 writable streams 输出 A 河流的水,另一头提供 readable streams 供 B 河流读取。

乍一看很简单的概念,但映射到河水引流就十分天然了,stream 的设计十分贴近生活概念。

要了解 stream,须要思考上面三个问题:

  1. readable streams 从哪来?
  2. 是否要应用 transform streams 进行中间件加工?
  3. 生产的 writable streams 逻辑是什么?

还是再解释一下,为什么相比 read()write(),stream 要多这三个思考:stream 既然将 I/O 形象为流的概念,也就是具备持续性,那么读取的资源就必须是一个 readable 流,所以咱们要结构一个 readable streams(将来可能越来越多函数返回值就是流,也就是在流的环境下工作,就不必思考如何结构流了)。对流的读取是一个继续的过程,所以不是调用一个函数一次性读取那么简略,因而 writable streams 也有肯定 API 语法。正是因为对资源进行了形象,所以无论是读取还是生产,都被包装了一层 stream API,而一般的 read 函数读取的资源都是其自身,所以才没有这些额定思维累赘。

好在 web streams API 设计都比较简单易用,而且作为一种标准规范,更加有把握的必要,上面别离阐明:

readable streams

读取流不可写,所以只有初始化时能力设置值:

const readableStream = new ReadableStream({start(controller) {controller.enqueue('h')
    controller.enqueue('e')
    controller.enqueue('l')
    controller.enqueue('l')
    controller.enqueue('o')
    controller.close()}
})

controller.enqueue() 能够填入任意值,相当于是将值退出队列,controller.close() 敞开后,就无奈持续 enqueue 了,并且这里的敞开机会,会在 writable streams 的 close 回调响应。

下面只是 mock 的例子,理论场景中,读取流往往是一些调用函数返回的对象,最常见的就是 fetch 函数:

async function fetchStream() {const response = await fetch('https://example.com')
  const stream = response.body;
}

可见,fetch 函数返回的 response.body 就是一个 readable stream。

咱们能够通过以下形式间接生产读取流:

readableStream.getReader().read().then({value, done} => {})

也能够 readableStream.pipeThrough(transformStream) 到一个转换流,也能够 readableStream.pipeTo(writableStream) 到一个写入流。

不论是手动 mock 还是函数返回,咱们都能猜到, 读取流不肯定一开始就充斥数据 ,比方 response.body 就可能因为读的比拟早而须要期待,就像接入的水管水流较慢,而源头水池的水很多一样。咱们也能够手动模仿读取较慢的状况:

const readableStream = new ReadableStream({start(controller) {controller.enqueue('h')
    controller.enqueue('e')

    setTimeout(() => {controller.enqueue('l')
      controller.enqueue('l')
      controller.enqueue('o')
      controller.close()}, 1000)
  }
})

下面例子中,如果咱们一开始就用写入流对接,必然要期待 1s 能力失去残缺的 'hello' 数据,但如果 1s 后再对接写入流,那么霎时就能读取整个 'hello'。另外,写入流可能解决的速度也会慢,如果写入流解决每个单词的工夫都是 1s,那么写入流无论何时执行,都比读取流更慢。

所以能够领会到,流的设计就是为了让整个数据处理过程最大水平的高效,无论读取流数据 ready 的多迟、开始对接写入流的工夫有多晚、写入流解决的多慢,整个链路都是尽可能最高效的:

  • 如果 readableStream ready 的迟,咱们能够晚一点对接,让 readableStream 筹备好再开始疾速生产。
  • 如果 writableStream 解决的慢,也只是这一处生产的慢,对接的“水管”readableStream 可能早就 ready 了,此时换一个高效生产的 writableStream 就能晋升整体效率。

writable streams

写入流不可读,能够通过如下形式创立:

const writableStream = new WritableStream({write(chunk) {
    return new Promise(resolve => {
      // 生产的中央,能够执行插入 dom 等等操作
      console.log(chunk)

      resolve()});
  },
  close() {// 写入流 controller.close() 时,这里被调用
  },
})

写入流不必关怀读取流是什么,所以只有关怀数据写入就行了,实现写入回调 write

write 回调须要返回一个 Promise,所以如果咱们生产 chunk 的速度比较慢,写入流执行速度就会变慢,咱们能够了解为 A 河流引水到 B 河流,就算 A 河流的河道很宽,一下就把河水全副灌入了,但 B 河流的河道很窄,无奈解决那么大的水流量,所以受限于 B 河流河道宽度,整体水流速度还是比较慢的(当然这里不可能产生洪灾)。

那么 writableStream 如何触发写入呢?能够通过 write() 函数间接写入:

writableStream.getWriter().write('h')

也能够通过 pipeTo() 间接对接 readableStream,就像原本是手动滴水,当初间接对接一个水管,这样咱们只管解决写入就行了:

readableStream.pipeTo(writableStream)

当然通过最原始的 API 也能够拼装出 pipeTo 的成果,为了了解的更粗浅,咱们用原始办法模仿一个 pipeTo

const reader = readableStream.getReader()
const writer = writableStream.getWriter()

function tryRead() {reader.read().then(({done, value}) => {if (done) {return}

    writer.ready().then(() => writer.write(value))

    tryRead()})
}

tryRead()

transform streams

转换流外部是一个写入流 + 读取流,创立转换流的形式如下:

const decoder = new TextDecoder()
const decodeStream = new TransformStream({transform(chunk, controller) {controller.enqueue(decoder.decode(chunk, {stream: true}))
  }
})

chunk 是 writableStream 拿到的包,controller.enqueue 是 readableStream 的入列办法,所以它其实底层实现就是两个流的叠加,API 上简化为 transform 了,能够一边写入读到的数据,一边转化为读取流,供前面的写入流生产。

当然有很多原生的转换流能够用,比方 TextDecoderStream

const textDecoderStream = TextDecoderStream()

readable to writable streams

上面是一个蕴含了编码转码的残缺例子:

// 创立读取流
const readableStream = new ReadableStream({start(controller) {const textEncoder = new TextEncoder()
    const chunks = textEncoder.encode('hello', { stream: true})
    chunks.forEach(chunk => controller.enqueue(chunk))
    controller.close()}
})

// 创立写入流
const writableStream = new WritableStream({write(chunk) {const textDecoder = new TextDecoder()
    return new Promise(resolve => {const buffer = new ArrayBuffer(2);
      const view = new Uint16Array(buffer);
      view[0] = chunk;
      const decoded = textDecoder.decode(view, { stream: true});
      console.log('decoded', decoded)

      setTimeout(() => {resolve()
      }, 1000)
    });
  },
  close() {console.log('writable stream close')
  },
})

readableStream.pipeTo(writableStream)

首先 readableStream 利用 TextEncoder 以极快的速度霎时将 hello 这 5 个字母退出队列,并执行 controller.close(),意味着这个 readableStream 霎时就实现了初始化,并且前面无奈批改,只能读取了。

咱们在 writableStream 的 write 办法中,利用 TextDecoderchunk 进行解码,一次解码一个字母,并打印到控制台,而后过了 1s 才 resolve,所以写入流会每隔 1s 打印一个字母:

h
# 1s later
e
# 1s later
l
# 1s later
l
# 1s later
o
writable stream close

这个例子转码解码解决的还不够优雅,咱们不须要将转码与解码写在流函数里,而是写在转换流中,比方:

readableStream
  .pipeThrough(new TextEncoderStream())
  .pipeThrough(customStream)
  .pipeThrough(new TextDecoderStream())
  .pipeTo(writableStream)

这样 readableStream 与 writableStream 都不须要解决编码与解码,但流在两头被转化为了 Uint8Array,不便被其它转换流解决,最初通过解码转换流转换为文字后,再 pipeTo 给写入流,这样写入流拿到的就是文字了。

但也并不总是这样,比方咱们要传输一个视频流,可能 readableStream 原始值就曾经是 Uint8Array,所以具体要不要对接转换流看状况。

总结

streams 是对 I/O 形象的规范解决 API,其反对继续小片段数据处理的个性并不是偶尔,而是对 I/O 场景进行形象后的必然。

咱们通过水流的例子类比了 streams 的概念,当 I/O 产生时,源头的流转换是有固定速度的 x M/s,指标客户端比方视频的转换也是有固定速度的 y M/s,网络申请也有速度并且是个继续的过程,所以 fetch 人造也是一个流,速度时 z M/s,咱们最终看到视频的速度就是 min(x, y, z),当然如果服务器提前将 readableStream 提供好,那么 x 的速度就能够疏忽,此时看到视频的速度是 min(y, z)

不仅视频如此,关上文件、关上网页等等都是如此,浏览器解决 html 也是一个流的过程:

new Response(stream, {headers: { 'Content-Type': 'text/html'},
})

如果这个 readableStream 的 controller.enqueue 过程被刻意解决的比较慢,网页甚至能够一个字一个字的逐渐出现:Serving a string, slowly Demo。

只管流的场景如此广泛,但也没有必要将所有代码都改成流式解决,因为代码在内存中执行速度很快,变量的赋值是没必要应用流解决的,但如果这个变量的值来自于一个关上的文件,或者网络申请,那么应用流进行解决是最高效的。

探讨地址是:精读《web streams》· Issue #363 · dt-fe/weekly

如果你想参加探讨,请 点击这里,每周都有新的主题,周末或周一公布。前端精读 – 帮你筛选靠谱的内容。

关注 前端精读微信公众号

<img width=200 src=”https://img.alicdn.com/tfs/TB165W0MCzqK1RjSZFLXXcn2XXa-258-258.jpg”>

版权申明:自在转载 - 非商用 - 非衍生 - 放弃署名(创意共享 3.0 许可证)

退出移动版