共计 3562 个字符,预计需要花费 9 分钟才能阅读完成。
可读流模仿实现
const fs = require("fs");
const EventEmitter = require("events");
class MyFileReadStream extends EventEmitter {constructor(path, options = {}) {super();
this.path = path;
this.flag = options.flags || "r";
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.end = options.end;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.readOffset = 0;
this.open();
this.on("newListener", (type) => {
// 判断监听类型
console.log(type);
if (type === "data") {this.read();
}
});
}
open() {
// 原生 open 办法关上指定地位文件
fs.open(this.path, this.flags, this.mode, (err, fd) => {if (err) {this.emit("error", err);
} else {
this.fd = fd;
this.emit("open", fd); // 发送文件标识符
}
});
}
read() {
// 这里无奈间接获取 fd, 这里监听一下 open 工夫,管制执行程序
if (typeof this.fd !== "number") {return this.once("open", this.read);
}
let buf = Buffer.alloc(this.highWaterMark);
let howMuchToRead;
// 用户设置了 end 值
howMuchToRead = this.end
? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
: this.highWaterMark;
fs.read(
this.fd,
buf,
0,
howMuchToRead,
this.readOffset,
(err, readBytes) => {if (readBytes) {
this.readOffset += readBytes;
this.emit("data", buf.slice(0, readBytes)); // 返回数据
this.read(); // 持续读} else {this.emit("end");
this.close();}
}
);
}
close() {fs.close(this.fd, () => {this.emit("close");
});
}
// pipe 简化了操作
pipe(ws) {this.on("data", (data) => {let flag = ws.write(data);
if (!flag) {this.pause();
}
});
ws.on("drain", () => {this.resume();
});
}
}
let rs = new MyFileReadStream("test.txt", {
end: 7,
highWaterMark: 3,
});
// 事件触发测试
// rs.on("open", (fd) => {// console.log("open", fd);
// });
// rs.on("error", (err) => {// console.log(err);
// });
rs.on("data", (chunk) => {console.log(chunk);
});
// rs.on("end", () => {// console.log("end");
// });
rs.on("close", () => {console.log("close");
});
可写流模仿实现
const fs = require("fs");
const EventEmitter = require("events");
const {Queue} = require("./linkedList"); // 队列
class MyWriteStream extends EventEmitter {constructor(path, options = {}) {super();
this.path = path;
this.flags = options.flags || "w";
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.encoding = options.encoding || "utf8";
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.open();
this.writeOffset = this.start;
this.writting = false; // 是否正在写入标识符
this.writLen = 0;
this.needDrain = false;
this.cache = new Queue();}
open() {
// 原生 fs.open
fs.open(this.path, this.flags, (err, fd) => {if (err) {this.emit("error", err);
}
// 失常关上文件
this.fd = fd;
this.emit("open", fd);
});
}
write(chunk, encoding, cb) {chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // 只思考字符串或者 Buffer 两种类型
this.writLen += chunk.length;
let flag = this.writLen < this.highWaterMark;
this.needDrain = !flag;
// 判断是否正在写入
if (this.writting) {
// 正在写入,内容须要排队
this.cache.enQueue({chunk, encoding, cb});
} else {
this.writting = true;
// 以后不是正在写入那么就执行写入
this._write(chunk, encoding, () => {cb();
// 清空排队内容
this._clearBuffer();});
}
return flag;
}
_write(chunk, encoding, cb) {
// 写入操作, 与可读流有同样问题,须要在 open 之后在拿 fd 操作
if (typeof this.fd !== "number") {return this.once("open", () => {return this._write(chunk, encoding, cb);
});
}
fs.write(
this.fd,
chunk,
this.start,
chunk.length,
this.writeOffset,
(err, written) => {
this.writeOffset += written;
this.writLen -= written;
cb && cb();}
);
}
_clearBuffer() {let data = this.cache.deQueue();
if (data) {this._write(data.element.chunk, data.element.encoding, () => {data.element.cb && data.element.cb();
this._clearBuffer();});
} else {if (this.needDrain) {
this.needDrain = false;
this.emit("drain");
}
}
}
}
const ws = new MyWriteStream("test.txt", { highWaterMark: 1});
ws.open("open", (fd) => {console.log("open===>", fd);
});
let flag = ws.write("1", "utf8", () => {console.log("ok1");
});
// console.log(flag);
flag = ws.write("10", "utf8", () => {console.log("ok2");
});
// console.log(flag);
flag = ws.write("前端前端", "utf8", () => {console.log("ok222");
});
ws.on("drain", () => {console.log("drain");
});
正文完