node罕用内置模块(Stream)
一、Stream理解
node.js中的流就是解决流式数据的形象接口,文件操作系统和网络模块实现了流接口
不应用流的常见问题:
同步读取资源文件,用户与要期待数据读取实现资源文件最终一次性加载至内存,开销较大
应用流的图解(数据的分段传输):
配合管道对需要的加工:
流解决数据的劣势:
工夫效率:流的分段解决能够同时操作多个数据chunk空间效率:同一时间流毋庸占据大内存空间使用方便:流配合治理,扩大程序变得简略
node.js中流的分类:
Readalbe:可读流,可能是实现数据的读取Writealbe:可写流,可能实现数据的写操作Duplex:双工流,既可读又可写Transform:转换流,可读可写,还是实现数据转换
nodejs流特点:
Stream模块实现了四个具体的形象所有流都继承自EventEmitter
二、根本API
1.可读流
专门生产供程序生产数据的流
自定义可读流:
继承stream里的Readable重写_read办法调用push产出数据
可读流基本原理:
生产数据:
readable事件:当流中存在可读取的数据是触发data事件:当流中数据块传给消费者后触发
自定义可读流代码实现:
const { Readable } = require('stream');// 模仿底层数据let source = ['lg', 'zce', 'syy'];class MyReadable extends Readable { constructor(source) { super(); this.source = source; } _read() { let data = this.source.shift() || null; // 如果没有数据,则返回 null this.push(data); // 将数据推入到流中 }}let mr = new MyReadable(source);// reaadable 默认是暂停模式 // mr.on('readable', () => {// let data = null;// while(data = mr.read(2)){// console.log(data.toString());// }// })// 有可能都不放入缓存中,间接输入mr.on('data', (chunk) => { console.log(chunk.toString());})
2.可写流
用于生产数据的流
自定义可写流:
继承stream模块的Writeable重写_write办法,调用write执行写入
可写流事件
pipe事件:可读流调用pipe()办法时触发unpipe事件:可读流调用unpipe()办法时触发
自定义可写流代码实现:
const { Writable } = require('stream');class MyWriteable extends Writable { constructor() { super(); } _write(chunk, encoding, done) { process.stdout.write(chunk.toString() + '<-----\n'); process.nextTick(done); }}let mw = new MyWriteable();mw.write('江江学习', 'utf-8', () => { console.log('write success');})
3.双工流
Duplex是双工流,既能生产又能生产,读写互相独立,读操作创立的数据不能当作写操作的数据源去应用
自定义双工流
继承Duplex类重写_read办法,调用push生产数据重写_write办法,调用write生产数据
代码实现:
let { Duplex } = require('stream');class MyDuplex extends Duplex { constructor(source) { super(); this.source = source; } _read() { let data = this.source.shift() || null; this.push(data); } _write(chunk, en, next) { process.stdout.write(chunk.toString() + '<-----\n'); process.nextTick(next); }}let source = ['hello', 'world', '!'];let md = new MyDuplex(source);md.write('江江',()=>{ console.log('write success');})md.on('data', (chunk) => { console.log(chunk.toString());})
Transform
Transform也是一个双工流,读写操作进行了联通
Transform自定义实现:
继承Transform类重写_transform办法,调用push和callback重写_flush办法,解决残余数据
transform自定义代码实现:
let { Transform } = require('stream');class MyTransform extends Transform{ constructor(){ super(); } _transform(chunk,en,cb){ this.push(chunk.toString().toUpperCase()); cb(null); }}let t = new MyTransform();t.write('hello');t.on('data',(chunk)=>{ console.log(chunk.toString());})
三、文件读写流
1.文件可读流
文件可读流代码中应用:
const fs = require('fs');const path = require('path');let rs = fs.createReadStream('test.txt', { flags: 'r', encoding: null, // 返回buffer fd: null, // 默认值从3开始的, 0、1、2被输出、输入、谬误占用了 mode: 438, // 权限管制 autoClose: false, // 是否主动敞开文件 start: 0, // 从文件的某个地位开始读取 // end: 10, // 在文件的某个地位完结读取 highWaterMark: 16 // 每次筹备多少个字节的数据让读取(调用push放入缓存区外面),Readable中默认16个,文件可读流中(此处)默认64个})// data 事件// rs.on('data',(chunk)=>{// console.log(chunk.toString());// rs.pause(); // 流动模式切换到暂停模式// setTimeout(()=>{// rs.resume(); // 复原到流动模式// },1000)// })// readable 事件rs.on('readable', () => { // let data = rs.read(); // console.log(data) let data = null; while(data = rs.read(3)){ // 每次从缓存中读取多少个字节 console.log(data.toString()); console.log('------',rs._readableState.length); // 残余多少个字节 }})
其它事件:
const fs = require('fs');const path = require('path');let rs = fs.createReadStream('test.txt', { flags: 'r', encoding: null, // 返回buffer fd: null, // 默认值从3开始的, 0、1、2被输出、输入、谬误占用了 mode: 438, // 权限管制 autoClose: false, // 是否主动敞开文件 start: 0, // 从文件的某个地位开始读取 // end: 10, // 在文件的某个地位完结读取 highWaterMark: 16 // 每次筹备多少个字节的数据让读取(调用push放入缓存区外面),Readable中默认16个,文件可读流中(此处)默认64个})rs.on('open', (fd) => { console.log('fd', fd,'文件关上了');})rs.on('close',()=>{ console.log('文件敞开了')})let bufferArr = [];rs.on('data',(chunk)=>{ bufferArr.push(chunk)})rs.on('end',()=>{ console.log(Buffer.concat(bufferArr).toString()) console.log('数据被清空之后')})rs.on('error',()=>{ console.log('出错了')})
2.文件可写流
可写流罕用事件:
const fs = require('fs');const path = require('path');const ws = fs.createWriteStream('test.txt', { flags: 'w', mode: 438, fd: null, encoding: 'utf-8', start: 0, highWaterMark: 16 // 默认16kb})ws.write('拉钩教育', () => { console.log('拉钩教育-数据写完了')})// 字符串 或者 buffer ===> fs rs// ws.write(123456,()=>{// console.log('123456-数据写完了')// })ws.on('open', (fd) => { console.log('open', fd)})// colose 是在数据写入操作全副实现之后再执行ws.on('close',()=>{ console.log('文件敞开了');})ws.write('0');// end 执行之后就意味着数据写入操作实现ws.end('jiang'); // 可最初写入一次// ws.write('2');ws.on('error',(err)=>{ console.log('出错了');})
write执行流程:
drain事件与读写速度:
/** * 需要:"江江学习" 写入指定的文件 * 01 一次性写入 * 02 分批写入 * 比照:对内存的压力不同 */const fs = require('fs');let ws = fs.createWriteStream('test.txt', { highWaterMark: 3});// ws.write('江江学习');let source = '江江学习'.split('');let num = 0;let flag = true;function executeWrite() { while (num != source.length && flag) { flag = ws.write(source[num++]); // 当写入的数据大于等于hightWaterMark时,会返回false }}executeWrite();ws.on('drain',()=>{ // 缓存中的数据曾经被生产完了,才触发 console.log('drain 执行了'); flag = true; executeWrite();})
四、背压机制
让数据在的生产者与消费者平滑流动的机制
1.问题发现
看一段代码发现问题:
数据从磁盘读取进去的速度是远远大于写入磁盘的速度的(消费者的速度跟不到生产者的速度的),WriteAble外部保护了一个队列,不能即便的生产数据导致的产能过剩,就会放入该队列中,但队列长度是有下限的,所以在当读写的过程中,如果没有实现被压机制的化,就可能会导致
内存溢出其它过程运行变慢GC频繁调用
理解读写机制:
Readable运行机制:
Writeable运行机制:
背压机制基本原理代码:
let fs = require('fs');let rs = fs.createReadStream('test.txt', { highWaterMark: 4 // Readable默认是16,fs中createReadStream默认为64})let ws = fs.createWriteStream('test1.txt', { highWaterMark: 1})let flag = true;rs.on('data',(chunk)=>{ flag = ws.write(chunk,()=>{ console.log('写完了'); }) if(!flag){ rs.pause(); }})ws.on('drain',()=>{ rs.resume();})// 能够间接应用pipe// rs.pipe(ws);
2.模仿可读流
代码实现:
const fs = require('fs');const EventEmitter = require('events');class MyFileReadStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || 'r'; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.end = options.end; this.highWaterMark = options.highWaterMark || 64 * 1024; this.readOffset = 0; this.open(); // 当监听新的事件时,会被触发 this.on('newListener', (type) => { if (type == 'data') { this.read(); } }) } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { // 触发自生error事件,这里是回调函数,执行在同步代之后 this.emit('error', err); } this.fd = fd; this.emit('open', this.fd); }); } read() { if (typeof this.fd != 'number') { return this.once('open', this.read); } let buf = Buffer.alloc(this.highWaterMark); // let howMuchToRead; // 每次读多少 // if (this.end) { // // 判断end是否有存在 // howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark); // 应用残余未读的字节数与highWaterMark中较小的一个 // } else { // howMuchToRead = this.highWaterMark; // 应用残余未读的字节数与highWaterMark中较小的一个 // } // 能够取到开端end下标的值,所以这里要加一 let howMuchToRead = this.end?Math.min(this.end - this.readOffset + 1, this.highWaterMark):this.highWaterMark fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => { if (readBytes) { this.readOffset += readBytes; this.emit('data', buf.slice(0, readBytes)) this.read() } else { this.emit('end') this.close() } }) } close() { fs.close(this.fd, () => { this.emit('close') }); }}let rs = new MyFileReadStream('test.txt', { end: 7, // 完结地位的下标,能够取到 highWaterMark: 3});rs.on('open', (fd) => { // 这里是同步代码,监听该事件在触发事件之前 console.log('open', fd);})rs.on('error', (err) => { console.log(err);})rs.on('data', (chunk) => { console.log(chunk)})rs.on('end', () => { console.log('end')})rs.on('close', () => { console.log('close')})
五、链表
应用wirte时,有些被写入的内容须要放入缓存中被排队期待,而且要遵循先进先出的规定,这里应用链表的数据结构来保留这些数据
为什么不应用数组:
数组存储数据的长度具备下限数组存在塌陷问题
模仿链表实现队列:
class Node { constructor(element, next = null) { this.element = element; this.next = next; }}class LinkedList { constructor() { this.head = null; this.size = 0 } // 获取指定地位节点 _getNode(index) { if (index < 0 || index >= this.size) { throw new Error('getNode --> index error') } let currentNode = this.head; while (index--) { currentNode = currentNode.next; } return currentNode; } // 确保该下标的地位非法 _checkIndex(index) { if (index < 0 || index >= this.size) { throw new Error('index 参数谬误') } } add(index, element) { if (arguments.length == 1) { element = index; index = this.size; } if (index < 0 || index > this.size) { throw new Error('index 参数谬误') } let newNode = new Node(element); // index == 1 与 index != 1解决形式不同 if (index == 0) { newNode.next = this.head; this.head = newNode; } else { // 获取指定地位的前一个节点 let prevNode = this._getNode(--index); newNode.next = prevNode.next; prevNode.next = newNode; } this.size++; } remove(index) { if (this.size == 0) return undefined; this._checkIndex(index); let currentNode = this._getNode(index); if (index == 0) { this.head = currentNode.next; } else { let prevNode = this._getNode(index - 1); prevNode.next = currentNode.next; } this.size--; currentNode.next = null; return currentNode; } set(index, element) { this._checkIndex(index); this._getNode(index).element = element; } get(index) { this._checkIndex(index); let currentNode = this._getNode(index); currentNode.next = null; return currentNode; } clear() { this.head = null; this.size = 0; }}class Queue { constructor() { this.linkedList = new LinkedList(); } enQueue(data) { this.linkedList.add(data); } deQueue() { return this.linkedList.remove(0); }}const q = new Queue();q.enQueue('node1');q.enQueue('node2');console.log(q.deQueue());console.log(q.deQueue());console.log(q.deQueue());console.log(q)
模仿可写流:
const fs = require('fs');const EventsEmitter = require('events');const Queue = require('./linkedlist');class MyWriteStream extends EventsEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || 'w'; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.end = options.end this.encoding = options.encoding || 'utf8'; this.highWaterMark = options.highWaterMark || 16 * 1024; this.writeOffset = this.start; this.writing = false; this.writeLen = 0; this.needDrain = false; this.cache = new Queue(); this.open(); } open() { // 原生 fs.open fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit('error', err); return; } this.fd = fd; this.emit('open', fd); }) } write(chunk, encoding, cb) { // 对立成buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); this.writeLen += chunk.length; let flag = this.writeLen < this.highWaterMark; this.needDrain = !flag; if (this.writing) { // 以后 是 正在写入状态,所以在这里将数据存入队列 this.cache.enQueue({ chunk, encoding, cb }) } else { // 以后 不是 正在写入状态,所以在这里执行写入 this.writing = true; this._write(chunk, encoding, cb); // this.writing = false; } return flag; } _write(chunk, encoding, cb) { if (typeof this.fd != 'number') { return this.once('open', () => { return this._write(chunk, encoding, () => { cb() // 清空排队的内容 this._clearBuffer(); }); }) } fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => { this.writeOffset += written; this.writeLen -= written; cb && cb(); }) } _clearBuffer() { let data = this.cache.deQueue(); if(data){ this._write(data.element.chunk,data.element.encoding,()=>{ data.element.cb(); this._clearBuffer(); }) }else{ if(this.needDrain){ this.needDrain = false; this.emit('drain') } } }}let mws = new MyWriteStream('f04.txt', { highWaterMark: 4});mws.on('open', (fd) => { console.log('open--->', fd)})
pipe办法的应用:
const fs = require('fs');const rs = fs.createReadStream('./f04.txt', { highWaterMark: 4 // 默认64kb});const ws = fs.createWriteStream('./f04_copy.txt', { highWaterMark: 1 // 默认16kb})rs.pipe(ws);// data 须要查看数据,可监听rs data事件
自定义的pipe办法(有问题,没找进去):
const fs = require('fs');const EventEmitter = require('events');class MyFileReadStream extends EventEmitter { constructor(path, options = {}) {...} open() {...} read() {...} close() {...} pipe(ws){ this.on('data',(data)=>{ let flag = ws.write(data); if(!flag){ // 读数据的缓存满了。开启暂停 this.pause(); // 找不到该办法 } }); this.on('drain',()=>{ // 缓存中的数据被生产完了,持续开启数据读入缓存 this.resume(); }) }}