关于node.js:模拟可读流可写流实现

8次阅读

共计 3562 个字符,预计需要花费 9 分钟才能阅读完成。

可读流模仿实现

const fs = require("fs");
const EventEmitter = require("events");

class MyFileReadStream extends EventEmitter {constructor(path, options = {}) {super();
    this.path = path;
    this.flag = options.flags || "r";
    this.mode = options.mode || 438;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;

    this.readOffset = 0;

    this.open();
    this.on("newListener", (type) => {
      // 判断监听类型
      console.log(type);
      if (type === "data") {this.read();
      }
    });
  }

  open() {
    //   原生 open 办法关上指定地位文件
    fs.open(this.path, this.flags, this.mode, (err, fd) => {if (err) {this.emit("error", err);
      } else {
        this.fd = fd;
        this.emit("open", fd); //  发送文件标识符
      }
    });
  }

  read() {
    // 这里无奈间接获取 fd, 这里监听一下 open 工夫,管制执行程序
    if (typeof this.fd !== "number") {return this.once("open", this.read);
    }
    let buf = Buffer.alloc(this.highWaterMark);
    let howMuchToRead;
    // 用户设置了 end 值
    howMuchToRead = this.end
      ? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
      : this.highWaterMark;
    fs.read(
      this.fd,
      buf,
      0,
      howMuchToRead,
      this.readOffset,
      (err, readBytes) => {if (readBytes) {
          this.readOffset += readBytes;
          this.emit("data", buf.slice(0, readBytes)); // 返回数据
          this.read(); // 持续读} else {this.emit("end");
          this.close();}
      }
    );
  }

  close() {fs.close(this.fd, () => {this.emit("close");
    });
  }

  // pipe 简化了操作
  pipe(ws) {this.on("data", (data) => {let flag = ws.write(data);
      if (!flag) {this.pause();
      }
    });
    ws.on("drain", () => {this.resume();
    });
  }
}

let rs = new MyFileReadStream("test.txt", {
  end: 7,
  highWaterMark: 3,
});

// 事件触发测试
// rs.on("open", (fd) => {//   console.log("open", fd);
// });
// rs.on("error", (err) => {//   console.log(err);
// });

rs.on("data", (chunk) => {console.log(chunk);
});

// rs.on("end", () => {//   console.log("end");
// });

rs.on("close", () => {console.log("close");
});

可写流模仿实现

const fs = require("fs");
const EventEmitter = require("events");
const {Queue} = require("./linkedList"); // 队列

class MyWriteStream extends EventEmitter {constructor(path, options = {}) {super();
    this.path = path;
    this.flags = options.flags || "w";
    this.mode = options.mode || 438;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.encoding = options.encoding || "utf8";
    this.highWaterMark = options.highWaterMark || 16 * 1024;

    this.open();

    this.writeOffset = this.start;
    this.writting = false; // 是否正在写入标识符
    this.writLen = 0;
    this.needDrain = false;
    this.cache = new Queue();}

  open() {
    // 原生 fs.open
    fs.open(this.path, this.flags, (err, fd) => {if (err) {this.emit("error", err);
      }
      // 失常关上文件
      this.fd = fd;
      this.emit("open", fd);
    });
  }

  write(chunk, encoding, cb) {chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // 只思考字符串或者 Buffer 两种类型
    this.writLen += chunk.length;
    let flag = this.writLen < this.highWaterMark;
    this.needDrain = !flag;
    // 判断是否正在写入
    if (this.writting) {
      // 正在写入,内容须要排队
      this.cache.enQueue({chunk, encoding, cb});
    } else {
      this.writting = true;
      // 以后不是正在写入那么就执行写入
      this._write(chunk, encoding, () => {cb();
        //   清空排队内容
        this._clearBuffer();});
    }
    return flag;
  }

  _write(chunk, encoding, cb) {
    // 写入操作, 与可读流有同样问题,须要在 open 之后在拿 fd 操作
    if (typeof this.fd !== "number") {return this.once("open", () => {return this._write(chunk, encoding, cb);
      });
    }
    fs.write(
      this.fd,
      chunk,
      this.start,
      chunk.length,
      this.writeOffset,
      (err, written) => {
        this.writeOffset += written;
        this.writLen -= written;
        cb && cb();}
    );
  }

  _clearBuffer() {let data = this.cache.deQueue();
    if (data) {this._write(data.element.chunk, data.element.encoding, () => {data.element.cb && data.element.cb();
        this._clearBuffer();});
    } else {if (this.needDrain) {
        this.needDrain = false;
        this.emit("drain");
      }
    }
  }
}

const ws = new MyWriteStream("test.txt", { highWaterMark: 1});

ws.open("open", (fd) => {console.log("open===>", fd);
});

let flag = ws.write("1", "utf8", () => {console.log("ok1");
});
// console.log(flag);
flag = ws.write("10", "utf8", () => {console.log("ok2");
});

// console.log(flag);
flag = ws.write("前端前端", "utf8", () => {console.log("ok222");
});

ws.on("drain", () => {console.log("drain");
});
正文完
 0