ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
## Pro 一个`createWriteStream`的简单实现,以求能增加对可写流的理解与应用。 ## 参数配置 ``` /** * createWriteStream * @param1 path * @param2 options */ let fs = require('fs'); let ws = fs.createWriteStream('./1.txt',{ flags:'w'//文件的打开模式 ,mode:0o666//文件的权限设置 ,encoding:'utf8'//写入文件的字符的编码 ,highWaterMark:3//最高水位线 ,start:0 //写入文件的起始索引位置 ,autoClose:true//是否自动关闭文档 }) ``` ## createWriteStream类的实例化 - 实例化一个`createWriteStream`类 - 将`path`,`options`挂载在`createWriteStream`的实例上,除此之外再在实例上挂载以下属性 - `self.fd=null`:文件打开后返回的文件描述符 - `self.pos=self.start`:用于表示文件真正写入时的指针位置 - `self.Buffer=[]`:用来表示文件的缓冲区 - `self.len=null`:用来表示缓冲区此时的大小 - `self.isWriting=false`:用来表示是否正在真正写入文件 - 调用`open`方法,打开文件(发射open事件) ## 实例write方法的执行流程 - `wirte`方法接收三个参数,`chunk`要写入的内容,`encoding`要进行的,`cb`回调函数。 - `write`执行流程: - 判断传入的`chunk`是否为buffer,如果不是,则转换成buffer,用于转化编码依据传入的`encoding`参数。 - 更新`Buffer`缓冲区的`len`长度,让len加上该次chunk的长度 - 判断`len`是否已经超过`highWaterMark`,将值存入`flag` - 判断是否处于`isWriting`状态: - 是,则先加`chunk`写入实例对象下的`Buffer缓冲区`。 - 否,更新`isWriting`,接将参数传递给实例下的`_write`方法写入文件 - 返回`flag` ## 实例_write方法的执行流程 此方法用于真正写入文件 - 查看实例的`fd`属性是否存在(文件是否打开成功) - 成功,调用`fs`模块的`write`方法正式写入数据 - 更新实例对象下的`len`以及`pos`属性 - 调用`clearBuffer`方法将缓冲区的内容写入 - 调用write方法传入的回调函数`cb` - 失败,订阅一个`open事件`(open事件将会在open方法中被发射),在订阅中的回调方法中再次以相同的参数调用_write方法 ## 实例clearBuffer方法 - 从缓冲区中取出一个数据 - 如果数据存在,调用`_write`方法 - 如果数据不存在,将`isWriting`更改为false,发射`drain`事件 ## 注意事项 - 若写入没有暂停过,即写入本地文件的速度远大于write()调用的速度,那么不会走写入缓存,也就不会触发`drain`事件。 ## 实现源码以及测试文件 ``` let fs = require('fs'); let EventEmitter = require('events'); class WriteStream extends EventEmitter { constructor(path, options) { super(); let self = this; Object.assign(self, options); //还需设置默认值 self.path = path; self.isWriting = false; self.Buffer = []; //源码中为链表实现的缓冲区 self.len = null; self.pos = self.start; //初始化写入位置 self.fd = null; self.open(); } open() { let self = this; fs.open(self.path, self.flags, self.mode, (err, fd) => { self.fd = fd; if (err) return self.destroy(err); self.emit('open'); }); } destroy(err) { fs.close(this.fd, () => { this.emit('error', err); }); } write(chunk, encoding, cb) { let self = this , ret = null; encoding = encoding?encoding:self.encoding; //优先使用write传入的编码方式 chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); self.len += chunk.length; ret = self.highWaterMark > self.len; //判断当前最新的缓冲区是否已达到最高水位线 if (self.isWriting) { //说明正在调用底层方法真正写入文件,先写入Buffer self.Buffer.push({ chunk , cb }); } else { self.isWriting = true; self._write(chunk, cb, () => self.clearBuffer()); } return ret; } _write(chunk, cb, clear) { let self = this; if (!self.fd) return self.once('open', () => { self._write(chunk, cb, clear) }); fs.write(self.fd, chunk, 0, chunk.length, self.pos, (err, bytesWritten) => { if (err) { if (self.autoClose) { self.destroy(); self.emit('error', err); } } self.len -= bytesWritten; self.pos += bytesWritten; cb && cb(); clear && clear(); }); } clearBuffer() { let self = this , data = null; data = self.Buffer.shift(); if (data) { self._write(data.chunk, data.cb, () => self.clearBuffer()); } else { //此时说明缓冲区已无数据 self.isWriting = false; self.emit('drain'); } } } module.exports = WriteStream; ``` 测试文件: ```// let fs = require('fs'); let WriteStream = require('./practice'); let ws = new WriteStream('./1.txt',{ flags:'w' ,mode:0o666 ,start:0 ,encoding:'utf8' ,autoClose:true //当流写完之后自动关闭文件 ,highWaterMark:3 }); let n = 9; ws.on('error',(err)=>{ console.log(err) }) function write(){ let flag = true; while(flag&&n>0){ flag = ws.write(n+"",'utf8',()=>{ console.log('ok'); }); n--; console.log('flag=',flag) } ws.once('drain',()=>{ console.log('drain'); write(); }) } // ws.on('drain',()=>{ // console.log('drain'); // write(); // }) write(); ``` --- 参考资料: [https://nodejs.org/dist/latest-v9.x/docs/api/stream.html#stream_writable_streams](https://nodejs.org/dist/latest-v9.x/docs/api/stream.html#stream_writable_streams)