可读流模仿实现
const fs = require("fs");const EventEmitter = require("events");class MyFileReadStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flag = options.flags || "r"; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.end = options.end; this.highWaterMark = options.highWaterMark || 64 * 1024; this.readOffset = 0; this.open(); this.on("newListener", (type) => { // 判断监听类型 console.log(type); if (type === "data") { this.read(); } }); } open() { // 原生open办法关上指定地位文件 fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); } else { this.fd = fd; this.emit("open", fd); // 发送文件标识符 } }); } read() { // 这里无奈间接获取fd,这里监听一下open工夫,管制执行程序 if (typeof this.fd !== "number") { return this.once("open", this.read); } let buf = Buffer.alloc(this.highWaterMark); let howMuchToRead; // 用户设置了end值 howMuchToRead = this.end ? Math.min(this.end - this.readOffset + 1, this.highWaterMark) : this.highWaterMark; fs.read( this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => { if (readBytes) { this.readOffset += readBytes; this.emit("data", buf.slice(0, readBytes)); // 返回数据 this.read(); // 持续读 } else { this.emit("end"); this.close(); } } ); } close() { fs.close(this.fd, () => { this.emit("close"); }); } // pipe简化了操作 pipe(ws) { this.on("data", (data) => { let flag = ws.write(data); if (!flag) { this.pause(); } }); ws.on("drain", () => { this.resume(); }); }}let rs = new MyFileReadStream("test.txt", { end: 7, highWaterMark: 3,});//事件触发测试// rs.on("open", (fd) => {// console.log("open", fd);// });// rs.on("error", (err) => {// console.log(err);// });rs.on("data", (chunk) => { console.log(chunk);});// rs.on("end", () => {// console.log("end");// });rs.on("close", () => { console.log("close");});
可写流模仿实现
const fs = require("fs");const EventEmitter = require("events");const { Queue } = require("./linkedList"); // 队列class MyWriteStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || "w"; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.encoding = options.encoding || "utf8"; this.highWaterMark = options.highWaterMark || 16 * 1024; this.open(); this.writeOffset = this.start; this.writting = false; // 是否正在写入标识符 this.writLen = 0; this.needDrain = false; this.cache = new Queue(); } open() { // 原生fs.open fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit("error", err); } // 失常关上文件 this.fd = fd; this.emit("open", fd); }); } write(chunk, encoding, cb) { chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // 只思考字符串或者Buffer两种类型 this.writLen += chunk.length; let flag = this.writLen < this.highWaterMark; this.needDrain = !flag; // 判断是否正在写入 if (this.writting) { // 正在写入,内容须要排队 this.cache.enQueue({ chunk, encoding, cb }); } else { this.writting = true; // 以后不是正在写入那么就执行写入 this._write(chunk, encoding, () => { cb(); // 清空排队内容 this._clearBuffer(); }); } return flag; } _write(chunk, encoding, cb) { // 写入操作,与可读流有同样问题,须要在open之后在拿fd操作 if (typeof this.fd !== "number") { return this.once("open", () => { return this._write(chunk, encoding, cb); }); } fs.write( this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => { this.writeOffset += written; this.writLen -= written; cb && cb(); } ); } _clearBuffer() { let data = this.cache.deQueue(); if (data) { this._write(data.element.chunk, data.element.encoding, () => { data.element.cb && data.element.cb(); this._clearBuffer(); }); } else { if (this.needDrain) { this.needDrain = false; this.emit("drain"); } } }}const ws = new MyWriteStream("test.txt", { highWaterMark: 1 });ws.open("open", (fd) => { console.log("open===>", fd);});let flag = ws.write("1", "utf8", () => { console.log("ok1");});// console.log(flag);flag = ws.write("10", "utf8", () => { console.log("ok2");});// console.log(flag);flag = ws.write("前端前端", "utf8", () => { console.log("ok222");});ws.on("drain", () => { console.log("drain");});