node罕用内置模块(Stream)

一、Stream理解

node.js中的流就是解决流式数据的形象接口,文件操作系统和网络模块实现了流接口

不应用流的常见问题:

同步读取资源文件,用户与要期待数据读取实现资源文件最终一次性加载至内存,开销较大

应用流的图解(数据的分段传输):

配合管道对需要的加工:

流解决数据的劣势:

工夫效率:流的分段解决能够同时操作多个数据chunk空间效率:同一时间流毋庸占据大内存空间使用方便:流配合治理,扩大程序变得简略

node.js中流的分类:

Readalbe:可读流,可能是实现数据的读取Writealbe:可写流,可能实现数据的写操作Duplex:双工流,既可读又可写Transform:转换流,可读可写,还是实现数据转换

nodejs流特点:

Stream模块实现了四个具体的形象所有流都继承自EventEmitter

二、根本API

1.可读流

专门生产供程序生产数据的流

自定义可读流:

继承stream里的Readable重写_read办法调用push产出数据

可读流基本原理:

生产数据:

readable事件:当流中存在可读取的数据是触发data事件:当流中数据块传给消费者后触发

自定义可读流代码实现:

const { Readable } = require('stream');// 模仿底层数据let source = ['lg', 'zce', 'syy'];class MyReadable extends Readable {    constructor(source) {        super();        this.source = source;    }    _read() {        let data = this.source.shift() || null;     // 如果没有数据,则返回 null        this.push(data);                            // 将数据推入到流中    }}let mr = new MyReadable(source);// reaadable 默认是暂停模式 // mr.on('readable', () => {//     let data = null;//     while(data = mr.read(2)){//         console.log(data.toString());//     }// })// 有可能都不放入缓存中,间接输入mr.on('data', (chunk) => {    console.log(chunk.toString());})

2.可写流

用于生产数据的流

自定义可写流:

继承stream模块的Writeable重写_write办法,调用write执行写入

可写流事件

pipe事件:可读流调用pipe()办法时触发unpipe事件:可读流调用unpipe()办法时触发

自定义可写流代码实现:

const { Writable } = require('stream');class MyWriteable extends Writable {    constructor() {        super();    }    _write(chunk, encoding, done) {        process.stdout.write(chunk.toString() + '<-----\n');        process.nextTick(done);    }}let mw = new MyWriteable();mw.write('江江学习', 'utf-8', () => {    console.log('write success');})

3.双工流

Duplex是双工流,既能生产又能生产,读写互相独立,读操作创立的数据不能当作写操作的数据源去应用

自定义双工流

继承Duplex类重写_read办法,调用push生产数据重写_write办法,调用write生产数据

代码实现:

let { Duplex } = require('stream');class MyDuplex extends Duplex {    constructor(source) {        super();        this.source = source;    }    _read() {        let data = this.source.shift() || null;        this.push(data);    }    _write(chunk, en, next) {        process.stdout.write(chunk.toString() + '<-----\n');        process.nextTick(next);    }}let source = ['hello', 'world', '!'];let md = new MyDuplex(source);md.write('江江',()=>{    console.log('write success');})md.on('data', (chunk) => {    console.log(chunk.toString());})

Transform

Transform也是一个双工流,读写操作进行了联通

Transform自定义实现:

继承Transform类重写_transform办法,调用push和callback重写_flush办法,解决残余数据

transform自定义代码实现:

let { Transform } = require('stream');class MyTransform extends Transform{    constructor(){        super();    }    _transform(chunk,en,cb){        this.push(chunk.toString().toUpperCase());        cb(null);    }}let t = new MyTransform();t.write('hello');t.on('data',(chunk)=>{    console.log(chunk.toString());})

三、文件读写流

1.文件可读流

文件可读流代码中应用:

const fs = require('fs');const path = require('path');let rs = fs.createReadStream('test.txt', {    flags: 'r',    encoding: null,         // 返回buffer    fd: null,               // 默认值从3开始的, 0、1、2被输出、输入、谬误占用了    mode: 438,              // 权限管制    autoClose: false,       // 是否主动敞开文件    start: 0,               // 从文件的某个地位开始读取    // end: 10,             // 在文件的某个地位完结读取    highWaterMark: 16        // 每次筹备多少个字节的数据让读取(调用push放入缓存区外面),Readable中默认16个,文件可读流中(此处)默认64个})// data 事件// rs.on('data',(chunk)=>{//     console.log(chunk.toString());//     rs.pause();         // 流动模式切换到暂停模式//     setTimeout(()=>{//         rs.resume();    // 复原到流动模式//     },1000)// })// readable 事件rs.on('readable', () => {    // let data = rs.read();    // console.log(data)    let data = null;    while(data = rs.read(3)){            // 每次从缓存中读取多少个字节        console.log(data.toString());        console.log('------',rs._readableState.length);     // 残余多少个字节    }})

其它事件:

const fs = require('fs');const path = require('path');let rs = fs.createReadStream('test.txt', {    flags: 'r',    encoding: null,         // 返回buffer    fd: null,               // 默认值从3开始的, 0、1、2被输出、输入、谬误占用了    mode: 438,              // 权限管制    autoClose: false,       // 是否主动敞开文件    start: 0,               // 从文件的某个地位开始读取    // end: 10,             // 在文件的某个地位完结读取    highWaterMark: 16        // 每次筹备多少个字节的数据让读取(调用push放入缓存区外面),Readable中默认16个,文件可读流中(此处)默认64个})rs.on('open', (fd) => {    console.log('fd', fd,'文件关上了');})rs.on('close',()=>{    console.log('文件敞开了')})let bufferArr = [];rs.on('data',(chunk)=>{    bufferArr.push(chunk)})rs.on('end',()=>{    console.log(Buffer.concat(bufferArr).toString())    console.log('数据被清空之后')})rs.on('error',()=>{    console.log('出错了')})

2.文件可写流

可写流罕用事件:

const fs = require('fs');const path = require('path');const ws = fs.createWriteStream('test.txt', {    flags: 'w',    mode: 438,    fd: null,    encoding: 'utf-8',    start: 0,    highWaterMark: 16    // 默认16kb})ws.write('拉钩教育', () => {    console.log('拉钩教育-数据写完了')})// 字符串 或者 buffer ===> fs rs// ws.write(123456,()=>{//     console.log('123456-数据写完了')// })ws.on('open', (fd) => {    console.log('open', fd)})// colose 是在数据写入操作全副实现之后再执行ws.on('close',()=>{    console.log('文件敞开了');})ws.write('0');// end 执行之后就意味着数据写入操作实现ws.end('jiang');  // 可最初写入一次// ws.write('2');ws.on('error',(err)=>{    console.log('出错了');})

write执行流程:

drain事件与读写速度:

/** * 需要:"江江学习" 写入指定的文件 * 01 一次性写入 * 02 分批写入 * 比照:对内存的压力不同 */const fs = require('fs');let ws = fs.createWriteStream('test.txt', {    highWaterMark: 3});// ws.write('江江学习');let source = '江江学习'.split('');let num = 0;let flag = true;function executeWrite() {        while (num != source.length && flag) {        flag = ws.write(source[num++]);  // 当写入的数据大于等于hightWaterMark时,会返回false    }}executeWrite();ws.on('drain',()=>{     // 缓存中的数据曾经被生产完了,才触发    console.log('drain 执行了');    flag = true;    executeWrite();})

四、背压机制

让数据在的生产者与消费者平滑流动的机制

1.问题发现

看一段代码发现问题:

数据从磁盘读取进去的速度是远远大于写入磁盘的速度的(消费者的速度跟不到生产者的速度的),WriteAble外部保护了一个队列,不能即便的生产数据导致的产能过剩,就会放入该队列中,但队列长度是有下限的,所以在当读写的过程中,如果没有实现被压机制的化,就可能会导致

内存溢出其它过程运行变慢GC频繁调用

理解读写机制:

Readable运行机制:

Writeable运行机制:

背压机制基本原理代码:

let fs = require('fs');let rs = fs.createReadStream('test.txt', {    highWaterMark: 4       // Readable默认是16,fs中createReadStream默认为64})let ws = fs.createWriteStream('test1.txt', {    highWaterMark: 1})let flag = true;rs.on('data',(chunk)=>{    flag = ws.write(chunk,()=>{        console.log('写完了');    })    if(!flag){        rs.pause();    }})ws.on('drain',()=>{    rs.resume();})// 能够间接应用pipe// rs.pipe(ws);

2.模仿可读流

代码实现:

const fs = require('fs');const EventEmitter = require('events');class MyFileReadStream extends EventEmitter {    constructor(path, options = {}) {        super();        this.path = path;        this.flags = options.flags || 'r';        this.mode = options.mode || 438;        this.autoClose = options.autoClose || true;        this.start = options.start || 0;        this.end = options.end;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.readOffset = 0;        this.open();        // 当监听新的事件时,会被触发        this.on('newListener', (type) => {            if (type == 'data') {                this.read();            }        })    }    open() {        fs.open(this.path, this.flags, this.mode, (err, fd) => {            if (err) {                // 触发自生error事件,这里是回调函数,执行在同步代之后                this.emit('error', err);            }            this.fd = fd;            this.emit('open', this.fd);        });    }    read() {        if (typeof this.fd != 'number') {            return this.once('open', this.read);        }        let buf = Buffer.alloc(this.highWaterMark);        // let howMuchToRead;      // 每次读多少        // if (this.end) {        //     // 判断end是否有存在        //     howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark);       // 应用残余未读的字节数与highWaterMark中较小的一个        // } else {        //     howMuchToRead = this.highWaterMark;       // 应用残余未读的字节数与highWaterMark中较小的一个        // }                                                            // 能够取到开端end下标的值,所以这里要加一        let howMuchToRead = this.end?Math.min(this.end - this.readOffset + 1, this.highWaterMark):this.highWaterMark        fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {            if (readBytes) {                this.readOffset += readBytes;                this.emit('data', buf.slice(0, readBytes))                this.read()            } else {                this.emit('end')                this.close()            }        })    }    close() {        fs.close(this.fd, () => {            this.emit('close')        });    }}let rs = new MyFileReadStream('test.txt', {    end: 7, // 完结地位的下标,能够取到    highWaterMark: 3});rs.on('open', (fd) => { // 这里是同步代码,监听该事件在触发事件之前    console.log('open', fd);})rs.on('error', (err) => {    console.log(err);})rs.on('data', (chunk) => {    console.log(chunk)})rs.on('end', () => {    console.log('end')})rs.on('close', () => {    console.log('close')})

五、链表

应用wirte时,有些被写入的内容须要放入缓存中被排队期待,而且要遵循先进先出的规定,这里应用链表的数据结构来保留这些数据

为什么不应用数组:

数组存储数据的长度具备下限数组存在塌陷问题

模仿链表实现队列:

class Node {    constructor(element, next = null) {        this.element = element;        this.next = next;    }}class LinkedList {    constructor() {        this.head = null;        this.size = 0    }    // 获取指定地位节点    _getNode(index) {        if (index < 0 || index >= this.size) {            throw new Error('getNode --> index error')        }        let currentNode = this.head;        while (index--) {            currentNode = currentNode.next;        }        return currentNode;    }    // 确保该下标的地位非法    _checkIndex(index) {        if (index < 0 || index >= this.size) {            throw new Error('index 参数谬误')        }    }    add(index, element) {        if (arguments.length == 1) {            element = index;            index = this.size;        }        if (index < 0 || index > this.size) {            throw new Error('index 参数谬误')        }        let newNode = new Node(element);        // index == 1 与 index != 1解决形式不同        if (index == 0) {            newNode.next = this.head;            this.head = newNode;        } else {            // 获取指定地位的前一个节点            let prevNode = this._getNode(--index);            newNode.next = prevNode.next;            prevNode.next = newNode;        }        this.size++;    }    remove(index) {        if (this.size == 0) return undefined;        this._checkIndex(index);        let currentNode = this._getNode(index);        if (index == 0) {            this.head = currentNode.next;        } else {            let prevNode = this._getNode(index - 1);            prevNode.next = currentNode.next;        }        this.size--;        currentNode.next = null;        return currentNode;    }    set(index, element) {        this._checkIndex(index);        this._getNode(index).element = element;    }    get(index) {        this._checkIndex(index);        let currentNode = this._getNode(index);        currentNode.next = null;        return currentNode;    }    clear() {        this.head = null;        this.size = 0;    }}class Queue {    constructor() {        this.linkedList = new LinkedList();    }    enQueue(data) {        this.linkedList.add(data);    }    deQueue() {        return this.linkedList.remove(0);    }}const q = new Queue();q.enQueue('node1');q.enQueue('node2');console.log(q.deQueue());console.log(q.deQueue());console.log(q.deQueue());console.log(q)

模仿可写流:

const fs = require('fs');const EventsEmitter = require('events');const Queue = require('./linkedlist');class MyWriteStream extends EventsEmitter {    constructor(path, options = {}) {        super();        this.path = path;        this.flags = options.flags || 'w';        this.mode = options.mode || 438;        this.autoClose = options.autoClose || true;        this.start = options.start || 0;        this.end = options.end        this.encoding = options.encoding || 'utf8';        this.highWaterMark = options.highWaterMark || 16 * 1024;        this.writeOffset = this.start;        this.writing = false;        this.writeLen = 0;        this.needDrain = false;        this.cache = new Queue();        this.open();    }    open() {        // 原生 fs.open        fs.open(this.path, this.flags, (err, fd) => {            if (err) {                this.emit('error', err);                return;            }            this.fd = fd;            this.emit('open', fd);        })    }    write(chunk, encoding, cb) {        // 对立成buffer        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);        this.writeLen += chunk.length;        let flag = this.writeLen < this.highWaterMark;        this.needDrain = !flag;        if (this.writing) {            // 以后 是 正在写入状态,所以在这里将数据存入队列            this.cache.enQueue({                chunk,                encoding,                cb            })        } else {            // 以后 不是 正在写入状态,所以在这里执行写入            this.writing = true;            this._write(chunk, encoding, cb);            // this.writing = false;        }        return flag;    }    _write(chunk, encoding, cb) {        if (typeof this.fd != 'number') {            return this.once('open', () => {                return this._write(chunk, encoding, () => {                    cb()                    // 清空排队的内容                    this._clearBuffer();                });            })        }        fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {            this.writeOffset += written;            this.writeLen -= written;            cb && cb();        })    }    _clearBuffer() {        let data = this.cache.deQueue();        if(data){            this._write(data.element.chunk,data.element.encoding,()=>{                data.element.cb();                this._clearBuffer();            })        }else{            if(this.needDrain){                this.needDrain = false;                this.emit('drain')            }        }    }}let mws = new MyWriteStream('f04.txt', {    highWaterMark: 4});mws.on('open', (fd) => {    console.log('open--->', fd)})

pipe办法的应用:

const fs = require('fs');const rs = fs.createReadStream('./f04.txt', {    highWaterMark: 4     // 默认64kb});const ws = fs.createWriteStream('./f04_copy.txt', {    highWaterMark: 1     // 默认16kb})rs.pipe(ws);// data 须要查看数据,可监听rs data事件

自定义的pipe办法(有问题,没找进去):

const fs = require('fs');const EventEmitter = require('events');class MyFileReadStream extends EventEmitter {    constructor(path, options = {}) {...}    open() {...}    read() {...}    close() {...}    pipe(ws){        this.on('data',(data)=>{            let flag = ws.write(data);            if(!flag){                // 读数据的缓存满了。开启暂停                this.pause();                // 找不到该办法            }        });        this.on('drain',()=>{            // 缓存中的数据被生产完了,持续开启数据读入缓存            this.resume();        })    }}