可读流模仿实现

const fs = require("fs");const EventEmitter = require("events");class MyFileReadStream extends EventEmitter {  constructor(path, options = {}) {    super();    this.path = path;    this.flag = options.flags || "r";    this.mode = options.mode || 438;    this.autoClose = options.autoClose || true;    this.start = options.start || 0;    this.end = options.end;    this.highWaterMark = options.highWaterMark || 64 * 1024;    this.readOffset = 0;    this.open();    this.on("newListener", (type) => {      // 判断监听类型      console.log(type);      if (type === "data") {        this.read();      }    });  }  open() {    //   原生open办法关上指定地位文件    fs.open(this.path, this.flags, this.mode, (err, fd) => {      if (err) {        this.emit("error", err);      } else {        this.fd = fd;        this.emit("open", fd); //  发送文件标识符      }    });  }  read() {    // 这里无奈间接获取fd,这里监听一下open工夫,管制执行程序    if (typeof this.fd !== "number") {      return this.once("open", this.read);    }    let buf = Buffer.alloc(this.highWaterMark);    let howMuchToRead;    // 用户设置了end值    howMuchToRead = this.end      ? Math.min(this.end - this.readOffset + 1, this.highWaterMark)      : this.highWaterMark;    fs.read(      this.fd,      buf,      0,      howMuchToRead,      this.readOffset,      (err, readBytes) => {        if (readBytes) {          this.readOffset += readBytes;          this.emit("data", buf.slice(0, readBytes)); // 返回数据          this.read(); // 持续读        } else {          this.emit("end");          this.close();        }      }    );  }  close() {    fs.close(this.fd, () => {      this.emit("close");    });  }  // pipe简化了操作  pipe(ws) {    this.on("data", (data) => {      let flag = ws.write(data);      if (!flag) {        this.pause();      }    });    ws.on("drain", () => {      this.resume();    });  }}let rs = new MyFileReadStream("test.txt", {  end: 7,  highWaterMark: 3,});//事件触发测试// rs.on("open", (fd) => {//   console.log("open", fd);// });// rs.on("error", (err) => {//   console.log(err);// });rs.on("data", (chunk) => {  console.log(chunk);});// rs.on("end", () => {//   console.log("end");// });rs.on("close", () => {  console.log("close");});

可写流模仿实现

const fs = require("fs");const EventEmitter = require("events");const { Queue } = require("./linkedList"); // 队列class MyWriteStream extends EventEmitter {  constructor(path, options = {}) {    super();    this.path = path;    this.flags = options.flags || "w";    this.mode = options.mode || 438;    this.autoClose = options.autoClose || true;    this.start = options.start || 0;    this.encoding = options.encoding || "utf8";    this.highWaterMark = options.highWaterMark || 16 * 1024;    this.open();    this.writeOffset = this.start;    this.writting = false; // 是否正在写入标识符    this.writLen = 0;    this.needDrain = false;    this.cache = new Queue();  }  open() {    // 原生fs.open    fs.open(this.path, this.flags, (err, fd) => {      if (err) {        this.emit("error", err);      }      // 失常关上文件      this.fd = fd;      this.emit("open", fd);    });  }  write(chunk, encoding, cb) {    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // 只思考字符串或者Buffer两种类型    this.writLen += chunk.length;    let flag = this.writLen < this.highWaterMark;    this.needDrain = !flag;    // 判断是否正在写入    if (this.writting) {      // 正在写入,内容须要排队      this.cache.enQueue({ chunk, encoding, cb });    } else {      this.writting = true;      // 以后不是正在写入那么就执行写入      this._write(chunk, encoding, () => {        cb();        //   清空排队内容        this._clearBuffer();      });    }    return flag;  }  _write(chunk, encoding, cb) {    // 写入操作,与可读流有同样问题,须要在open之后在拿fd操作    if (typeof this.fd !== "number") {      return this.once("open", () => {        return this._write(chunk, encoding, cb);      });    }    fs.write(      this.fd,      chunk,      this.start,      chunk.length,      this.writeOffset,      (err, written) => {        this.writeOffset += written;        this.writLen -= written;        cb && cb();      }    );  }  _clearBuffer() {    let data = this.cache.deQueue();    if (data) {      this._write(data.element.chunk, data.element.encoding, () => {        data.element.cb && data.element.cb();        this._clearBuffer();      });    } else {      if (this.needDrain) {        this.needDrain = false;        this.emit("drain");      }    }  }}const ws = new MyWriteStream("test.txt", { highWaterMark: 1 });ws.open("open", (fd) => {  console.log("open===>", fd);});let flag = ws.write("1", "utf8", () => {  console.log("ok1");});// console.log(flag);flag = ws.write("10", "utf8", () => {  console.log("ok2");});// console.log(flag);flag = ws.write("前端前端", "utf8", () => {  console.log("ok222");});ws.on("drain", () => {  console.log("drain");});