Node stream 比拟难了解,也比拟难用,但“流”是个很重要而且会越来越常见的概念(fetch
返回值就是流),所以咱们有必要认真学习 stream。
好在继 node stream 之后,又推出了比拟好用,好了解的 web streams API,咱们联合 Web Streams Everywhere (and Fetch for Node.js)、2016 – the year of web streams、ReadableStream、WritableStream 这几篇文章学一下。
node stream 与 web stream 能够互相转换:
.fromWeb()
将 web stream 转换为 node stream;.toWeb()
将 node stream 转换为 web stream。
精读
stream(流)是什么?
stream 是一种形象 API。咱们能够和 promise 做一下类比,如果说 promise 是异步规范 API,则 stream 心愿成为 I/O 的规范 API。
什么是 I/O?就是输入输出,即信息的读取与写入,比方看视频、加载图片、浏览网页、编码解码器等等都属于 I/O 场景,所以并不一定非要大数据量才算 I/O,比方读取一个磁盘文件算 I/O,同样读取 "hello world"
字符串也能够算 I/O。
stream 就是当下对 I/O 的规范形象。
为了更好了解 stream 的 API 设计,以及让你了解的更粗浅,咱们先本人想一想一个规范 I/O API 应该如何设计?
I/O 场景应该如何形象 API?
read()
、write()
是咱们第一个想到的 API,持续补充的话还有 open()
、close()
等等。
这些 API 的确能够称得上 I/O 场景规范 API,而且也足够简略。但这些 API 有一个有余,就是不足对大数据量下读写的优化思考。什么是大数据量的读写?比方读一个几 GB 的视频文件,在 2G 慢网络环境下拜访网页,这些状况下,如果咱们只有 read
、write
API,那么可能一个读取命令须要 2 个小时能力返回,而一个写入命令须要 3 个小时执行工夫,同时对用户来说,不论是看视频还是看网页,都无奈承受这么长的白屏工夫。
但为什么咱们看视频和看网页的时候没有期待这么久?因为看网页时,并不是期待所有资源都加载结束能力浏览与交互的,许多资源都是在首屏渲染后再异步加载的,视频更是如此,咱们不会加载完 30GB 的电影后再开始播放,而是先下载 300kb 片头后就能够开始播放了。
无论是视频还是网页,为了疾速响应内容,资源都是 在操作过程中继续加载的 ,如果咱们设计一个反对这种模式的 API,无论资源大还是小都能够笼罩,天然比 read
、wirte
设计更正当。
这种继续加载资源的行为就是 stream(流)。
什么是 stream
stream 能够认为在形容资源继续流动的状态,咱们须要把 I/O 场景看作一个继续的场景,就像把一条河的河水导流到另一条河。
做一个类比,咱们在发送 http 申请、浏览网页、看视频时,能够看作一个南水北调的过程,把 A 河的水继续调到 B 河。
在发送 http 申请时,A 河就是后端服务器,B 河就是客户端;浏览网页时,A 河就是他人的网站,B 河就是你的手机;看视频时,A 河是网络上的视频资源(当然也可能是本地的),B 河是你的视频播放器。
所以流是一个继续的过程,而且可能有多个节点,不仅网络申请是流,资源加载到本地硬盘后,读取到内存,视频解码也是流,所以这个南水北调过程中还有许多中途蓄水池节点。
将这些事件都思考到一起,最初造成了 web stream API。
一共有三种流,别离是:writable streams、readable streams、transform streams,它们的关系如下:
<img width=400 src=”https://z3.ax1x.com/2021/10/23/52vHne.png”>
- readable streams 代表 A 河流,是数据的源头,因为是数据源头,所以只可读不可写。
- writable streams 代表 B 河流,是数据的目的地,因为要继续蓄水,所以是只可写不可读。
- transform streams 是两头对数据进行变换的节点,比方 A 与 B 河两头有一个大坝,这个大坝能够通过蓄水的形式管制水运输的速度,还能够装置滤网污染水源,所以它一头是 writable streams 输出 A 河流的水,另一头提供 readable streams 供 B 河流读取。
乍一看很简单的概念,但映射到河水引流就十分天然了,stream 的设计十分贴近生活概念。
要了解 stream,须要思考上面三个问题:
- readable streams 从哪来?
- 是否要应用 transform streams 进行中间件加工?
- 生产的 writable streams 逻辑是什么?
还是再解释一下,为什么相比 read()
、write()
,stream 要多这三个思考:stream 既然将 I/O 形象为流的概念,也就是具备持续性,那么读取的资源就必须是一个 readable 流,所以咱们要结构一个 readable streams(将来可能越来越多函数返回值就是流,也就是在流的环境下工作,就不必思考如何结构流了)。对流的读取是一个继续的过程,所以不是调用一个函数一次性读取那么简略,因而 writable streams 也有肯定 API 语法。正是因为对资源进行了形象,所以无论是读取还是生产,都被包装了一层 stream API,而一般的 read
函数读取的资源都是其自身,所以才没有这些额定思维累赘。
好在 web streams API 设计都比较简单易用,而且作为一种标准规范,更加有把握的必要,上面别离阐明:
readable streams
读取流不可写,所以只有初始化时能力设置值:
const readableStream = new ReadableStream({start(controller) {controller.enqueue('h')
controller.enqueue('e')
controller.enqueue('l')
controller.enqueue('l')
controller.enqueue('o')
controller.close()}
})
controller.enqueue()
能够填入任意值,相当于是将值退出队列,controller.close()
敞开后,就无奈持续 enqueue
了,并且这里的敞开机会,会在 writable streams 的 close
回调响应。
下面只是 mock 的例子,理论场景中,读取流往往是一些调用函数返回的对象,最常见的就是 fetch
函数:
async function fetchStream() {const response = await fetch('https://example.com')
const stream = response.body;
}
可见,fetch
函数返回的 response.body
就是一个 readable stream。
咱们能够通过以下形式间接生产读取流:
readableStream.getReader().read().then({value, done} => {})
也能够 readableStream.pipeThrough(transformStream)
到一个转换流,也能够 readableStream.pipeTo(writableStream)
到一个写入流。
不论是手动 mock 还是函数返回,咱们都能猜到, 读取流不肯定一开始就充斥数据 ,比方 response.body
就可能因为读的比拟早而须要期待,就像接入的水管水流较慢,而源头水池的水很多一样。咱们也能够手动模仿读取较慢的状况:
const readableStream = new ReadableStream({start(controller) {controller.enqueue('h')
controller.enqueue('e')
setTimeout(() => {controller.enqueue('l')
controller.enqueue('l')
controller.enqueue('o')
controller.close()}, 1000)
}
})
下面例子中,如果咱们一开始就用写入流对接,必然要期待 1s 能力失去残缺的 'hello'
数据,但如果 1s 后再对接写入流,那么霎时就能读取整个 'hello'
。另外,写入流可能解决的速度也会慢,如果写入流解决每个单词的工夫都是 1s,那么写入流无论何时执行,都比读取流更慢。
所以能够领会到,流的设计就是为了让整个数据处理过程最大水平的高效,无论读取流数据 ready 的多迟、开始对接写入流的工夫有多晚、写入流解决的多慢,整个链路都是尽可能最高效的:
- 如果 readableStream ready 的迟,咱们能够晚一点对接,让 readableStream 筹备好再开始疾速生产。
- 如果 writableStream 解决的慢,也只是这一处生产的慢,对接的“水管”readableStream 可能早就 ready 了,此时换一个高效生产的 writableStream 就能晋升整体效率。
writable streams
写入流不可读,能够通过如下形式创立:
const writableStream = new WritableStream({write(chunk) {
return new Promise(resolve => {
// 生产的中央,能够执行插入 dom 等等操作
console.log(chunk)
resolve()});
},
close() {// 写入流 controller.close() 时,这里被调用
},
})
写入流不必关怀读取流是什么,所以只有关怀数据写入就行了,实现写入回调 write
。
write
回调须要返回一个 Promise,所以如果咱们生产 chunk
的速度比较慢,写入流执行速度就会变慢,咱们能够了解为 A 河流引水到 B 河流,就算 A 河流的河道很宽,一下就把河水全副灌入了,但 B 河流的河道很窄,无奈解决那么大的水流量,所以受限于 B 河流河道宽度,整体水流速度还是比较慢的(当然这里不可能产生洪灾)。
那么 writableStream 如何触发写入呢?能够通过 write()
函数间接写入:
writableStream.getWriter().write('h')
也能够通过 pipeTo()
间接对接 readableStream,就像原本是手动滴水,当初间接对接一个水管,这样咱们只管解决写入就行了:
readableStream.pipeTo(writableStream)
当然通过最原始的 API 也能够拼装出 pipeTo
的成果,为了了解的更粗浅,咱们用原始办法模仿一个 pipeTo
:
const reader = readableStream.getReader()
const writer = writableStream.getWriter()
function tryRead() {reader.read().then(({done, value}) => {if (done) {return}
writer.ready().then(() => writer.write(value))
tryRead()})
}
tryRead()
transform streams
转换流外部是一个写入流 + 读取流,创立转换流的形式如下:
const decoder = new TextDecoder()
const decodeStream = new TransformStream({transform(chunk, controller) {controller.enqueue(decoder.decode(chunk, {stream: true}))
}
})
chunk
是 writableStream 拿到的包,controller.enqueue
是 readableStream 的入列办法,所以它其实底层实现就是两个流的叠加,API 上简化为 transform
了,能够一边写入读到的数据,一边转化为读取流,供前面的写入流生产。
当然有很多原生的转换流能够用,比方 TextDecoderStream
:
const textDecoderStream = TextDecoderStream()
readable to writable streams
上面是一个蕴含了编码转码的残缺例子:
// 创立读取流
const readableStream = new ReadableStream({start(controller) {const textEncoder = new TextEncoder()
const chunks = textEncoder.encode('hello', { stream: true})
chunks.forEach(chunk => controller.enqueue(chunk))
controller.close()}
})
// 创立写入流
const writableStream = new WritableStream({write(chunk) {const textDecoder = new TextDecoder()
return new Promise(resolve => {const buffer = new ArrayBuffer(2);
const view = new Uint16Array(buffer);
view[0] = chunk;
const decoded = textDecoder.decode(view, { stream: true});
console.log('decoded', decoded)
setTimeout(() => {resolve()
}, 1000)
});
},
close() {console.log('writable stream close')
},
})
readableStream.pipeTo(writableStream)
首先 readableStream 利用 TextEncoder
以极快的速度霎时将 hello
这 5 个字母退出队列,并执行 controller.close()
,意味着这个 readableStream 霎时就实现了初始化,并且前面无奈批改,只能读取了。
咱们在 writableStream 的 write
办法中,利用 TextDecoder
对 chunk
进行解码,一次解码一个字母,并打印到控制台,而后过了 1s 才 resolve
,所以写入流会每隔 1s 打印一个字母:
h
# 1s later
e
# 1s later
l
# 1s later
l
# 1s later
o
writable stream close
这个例子转码解码解决的还不够优雅,咱们不须要将转码与解码写在流函数里,而是写在转换流中,比方:
readableStream
.pipeThrough(new TextEncoderStream())
.pipeThrough(customStream)
.pipeThrough(new TextDecoderStream())
.pipeTo(writableStream)
这样 readableStream 与 writableStream 都不须要解决编码与解码,但流在两头被转化为了 Uint8Array,不便被其它转换流解决,最初通过解码转换流转换为文字后,再 pipeTo
给写入流,这样写入流拿到的就是文字了。
但也并不总是这样,比方咱们要传输一个视频流,可能 readableStream 原始值就曾经是 Uint8Array,所以具体要不要对接转换流看状况。
总结
streams 是对 I/O 形象的规范解决 API,其反对继续小片段数据处理的个性并不是偶尔,而是对 I/O 场景进行形象后的必然。
咱们通过水流的例子类比了 streams 的概念,当 I/O 产生时,源头的流转换是有固定速度的 x M/s,指标客户端比方视频的转换也是有固定速度的 y M/s,网络申请也有速度并且是个继续的过程,所以 fetch
人造也是一个流,速度时 z M/s,咱们最终看到视频的速度就是 min(x, y, z)
,当然如果服务器提前将 readableStream 提供好,那么 x 的速度就能够疏忽,此时看到视频的速度是 min(y, z)
。
不仅视频如此,关上文件、关上网页等等都是如此,浏览器解决 html 也是一个流的过程:
new Response(stream, {headers: { 'Content-Type': 'text/html'},
})
如果这个 readableStream 的 controller.enqueue
过程被刻意解决的比较慢,网页甚至能够一个字一个字的逐渐出现:Serving a string, slowly Demo。
只管流的场景如此广泛,但也没有必要将所有代码都改成流式解决,因为代码在内存中执行速度很快,变量的赋值是没必要应用流解决的,但如果这个变量的值来自于一个关上的文件,或者网络申请,那么应用流进行解决是最高效的。
探讨地址是:精读《web streams》· Issue #363 · dt-fe/weekly
如果你想参加探讨,请 点击这里,每周都有新的主题,周末或周一公布。前端精读 – 帮你筛选靠谱的内容。
关注 前端精读微信公众号
<img width=200 src=”https://img.alicdn.com/tfs/TB165W0MCzqK1RjSZFLXXcn2XXa-258-258.jpg”>
版权申明:自在转载 - 非商用 - 非衍生 - 放弃署名(创意共享 3.0 许可证)