多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# 流 ~~~ 稳定度: 2 - 不稳定 ~~~ 流是一个抽象接口,被 Node 中的很多对象所实现。比如[对一个 HTTP 服务器的请求](#)是一个流,[stdout](#) 也是一个流。流是可读、可写或兼具两者的。所有流都是 [EventEmitter](#) 的实例。 您可以通过 `require('stream')` 加载 Stream 基类,其中包括了 [Readable](#) 流、[Writable](#) 流、[Duplex](#) 流和 [Transform](#) 流的基类。 本文档分为三个章节。第一章节解释了您在您的程序中使用流时需要了解的那部分 API,如果您不打算自己实现一个流式 API,您可以只阅读这一章节。 第二章节解释了当您自己实现一个流时需要用到的那部分 API,这些 API 是为了方便您这么做而设计的。 第三章节深入讲解了流的工作方式,包括一些内部机制和函数,除非您明确知道您在做什么,否则尽量不要改动它们。 ### 面向流消费者的 API 流可以是可读([Readable](#))或可写([Writable](#)),或者兼具两者([Duplex](#),双工)的。 所有流都是 EventEmitter,但它们也具有其它自定义方法和属性,取决于它们是 Readable、Writable 或 Duplex。 如果一个流既可读(Readable)也可写(Writable),则它实现了下文所述的所有方法和事件。因此,这些 API 同时也涵盖了 [Duplex](#) 或 [Transform](#) 流,即便它们的实现可能有点不同。 为了消费流而在您的程序中自己实现 Stream 接口是没有必要的。如果您**确实**正在您自己的程序中实现流式接口,请同时参考下文[面向流实现者的 API](#)。 几乎所有 Node 程序,无论多简单,都在某种途径用到了流。这里有一个使用流的 Node 程序的例子: ~~~ var http = require('http'); <!-- endsection --> <!-- section:5dd53fb86ef5aa2fb0a6e831e46cc135 --> var server = http.createServer(function (req, res) { // req 为 http.IncomingMessage,是一个可读流(Readable Stream) // res 为 http.ServerResponse,是一个可写流(Writable Stream) <!-- endsection --> <!-- section:fd5e086becb475ded97300c6e8b1f889 --> var body = ''; // 我们打算以 UTF-8 字符串的形式获取数据 // 如果您不设置编码,您将得到一个 Buffer 对象 req.setEncoding('utf8'); <!-- endsection --> <!-- section:bb5a4bf69e5c71de2331fe85918ed96b --> // 一旦监听器被添加,可读流会触发 'data' 事件 req.on('data', function (chunk) { body += chunk; }) <!-- endsection --> <!-- section:5768f3afd395c860ba272f79026a6799 --> // 'end' 事件表明您已经得到了完整的 body req.on('end', function () { try { var data = JSON.parse(body); } catch (er) { // uh oh! bad json! res.statusCode = 400; return res.end('错误: ' + er.message); } <!-- endsection --> <!-- section:812496c72ef4682c63a7ba8837f9610a --> // 向用户回写一些有趣的信息 res.write(typeof data); res.end(); }) }) <!-- endsection --> <!-- section:3bbc30d951532659ecc70a505ea1e985 --> server.listen(1337); <!-- endsection --> <!-- section:f0dea661693acf21ed203ec804a4f05a --> // $ curl localhost:1337 -d '{}' // object // $ curl localhost:1337 -d '"foo"' // string // $ curl localhost:1337 -d 'not json' // 错误: Unexpected token o ~~~ ### 类: stream.Readable Readable(可读)流接口是对您正在读取的数据的*来源*的抽象。换言之,数据*出自*一个 Readable 流。 在您表明您就绪接收之前,Readable 流并不会开始发生数据。 Readable 流有两种“模式”:**流动模式**和**暂停模式**。当处于流动模式时,数据由底层系统读出,并尽可能快地提供给您的程序;当处于暂停模式时,您必须明确地调用 `stream.read()` 来取出若干数据块。流默认处于暂停模式。 **注意**:如果没有绑定 data 事件处理器,并且没有 [`pipe()`](#) 目标,同时流被切换到流动模式,那么数据会流失。 您可以通过下面几种做法切换到流动模式: - 添加一个 [`'data'` 事件](#)处理器来监听数据。 - 调用 [`resume()`](#) 方法来明确开启数据流。 - 调用 [`pipe()`](#) 方法将数据发送到一个 [Writable](#)。 您可以通过下面其中一种做法切换回暂停模式: - 如果没有导流目标,调用 [`pause()`](#) 方法。 - 如果有导流目标,移除所有 [`'data'` 事件][] 处理器、调用 [`unpipe()`](#) 方法移除所有导流目标。 请注意,为了向后兼容考虑,移除 `'data'` 事件监听器并**不会**自动暂停流。同样的,当有导流目标时,调用 `pause()` 并不能保证流在那些目标排空并请求更多数据时*维持*暂停状态。 一些可读流的例子: - [客户端上的 HTTP 响应](#) - [服务器上的 HTTP 请求](#) - [fs 读取流](#) - [zlib 流](#) - [crypto 流](#) - [TCP 嵌套字](#) - [子进程的 stdout 和 stderr](#) - [process.stdin](#) #### 事件: 'readable' 当一个数据块可以从流中被读出时,它会触发一个 `'readable'` 事件。 在某些情况下,假如未准备好,监听一个 `'readable'` 事件会使得一些数据从底层系统被读出到内部缓冲区中。 ~~~ var readable = getReadableStreamSomehow(); readable.on('readable', function() { // 现在有数据可以读了 }) ~~~ 当内部缓冲区被排空后,一旦更多数据时,一个 `readable` 事件会被再次触发。 #### 事件: 'data' - `chunk` {Buffer | String} 数据块。 绑定一个 `data` 事件监听器到一个未被明确暂停的流会将流切换到流动模式,数据会被尽可能地传递。 如果您想从流尽快取出所有数据,这是最理想的方式。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('得到了 %d 字节的数据', chunk.length); }) ~~~ #### 事件: 'end' 该事件会在没有更多数据能够提供时被触发。 请注意,`end` 事件在数据被完全消费之前**不会被触发**。这可通过切换到流动模式,或者在到达末端前不断调用 `read()` 来实现。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('得到了 %d 字节的数据', chunk.length); }) readable.on('end', function() { console.log('读取完毕。'); }); ~~~ #### 事件: 'close' 当底层数据源(比如,源头的文件描述符)被关闭时触发。并不是所有流都会触发这个事件。 #### 事件: 'error' 当数据接收时发生错误时触发。 #### readable.read([size]) - `size` {Number} 可选参数,指定要读取多少数据。 - 返回 {String | Buffer | null} `read()` 方法从内部缓冲区中拉取并返回若干数据。当没有更多数据可用时,它会返回 `null`。 若您传入了一个 `size` 参数,那么它会返回相当字节的数据;当 `size` 字节不可用时,它则返回 `null`。 若您没有指定 `size` 参数,那么它会返回内部缓冲区中的所有数据。 该方法仅应在暂停模式时被调用。在流动模式中,该方法会被自动调用直到内部缓冲区排空。 ~~~ var readable = getReadableStreamSomehow(); readable.on('readable', function() { var chunk; while (null !== (chunk = readable.read())) { console.log('得到了 %d 字节的数据', chunk.length); } }); ~~~ 当该方法返回了一个数据块,它同时也会触发 [`'data'` 事件](#)。 #### readable.setEncoding(encoding) - `encoding` {String} 要使用的编码。 - 返回: `this` 调用此函数会使得流返回指定编码的字符串而不是 Buffer 对象。比如,当您 `readable.setEncoding('utf8')`,那么输出数据会被作为 UTF-8 数据解析,并以字符串返回。如果您 `readable.setEncoding('hex')`,那么数据会被编码成十六进制字符串格式。 该方法能正确处理多字节字符。假如您不这么做,仅仅直接取出 Buffer 并对它们调用 `buf.toString(encoding)`,很可能会导致字节错位。因此如果您打算以字符串读取数据,请总是使用这个方法。 ~~~ var readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', function(chunk) { assert.equal(typeof chunk, 'string'); console.log('得到了 %d 个字符的字符串数据', chunk.length); }) ~~~ #### readable.resume() - 返回: `this` 该方法让可读流继续触发 `data` 事件。 该方法会将流切换到流动模式。如果您*不想*从流中消费数据,但您*想*得到它的 `end` 事件,您可以调用 [`readable.resume()`](#) 来启动数据流。 ~~~ var readable = getReadableStreamSomehow(); readable.resume(); readable.on('end', function(chunk) { console.log('到达末端,但并未读取任何东西'); }) ~~~ #### readable.pause() - 返回: `this` 该方法会使一个处于流动模式的流停止触发 `data` 事件,切换到非流动模式,并让后续可用数据留在内部缓冲区中。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('取得 %d 字节数据', chunk.length); readable.pause(); console.log('接下来 1 秒内不会有数据'); setTimeout(function() { console.log('现在数据会再次开始流动'); readable.resume(); }, 1000); }) ~~~ #### readable.pipe(destination, [options]) - `destination` {[Writable](#) Stream} 写入数据的目标 - `options` {Object} 导流选项 - `end` {Boolean} 在读取者结束时结束写入者。缺省为 `true` 该方法从可读流中拉取所有数据,并写入到所提供的目标。该方法能自动控制流量以避免目标被快速读取的可读流所淹没。 可以导流到多个目标。 ~~~ var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // 所有来自 readable 的数据会被写入到 'file.txt' readable.pipe(writable); ~~~ 该函数返回目标流,因此您可以建立导流链: ~~~ var r = fs.createReadStream('file.txt'); var z = zlib.createGzip(); var w = fs.createWriteStream('file.txt.gz'); r.pipe(z).pipe(w); ~~~ 例如,模拟 Unix 的 `cat` 命令: ~~~ process.stdin.pipe(process.stdout); ~~~ 缺省情况下当来源流触发 `end` 时目标的 [`end()`](#) 会被调用,所以此时 `destination` 不再可写。传入 `{ end: false }` 作为 `options` 可以让目标流保持开启状态。 这将让 `writer` 保持开启,因此最后可以写入 "Goodbye"。 ~~~ reader.pipe(writer, { end: false }); reader.on('end', function() { writer.end('Goodbye\n'); }); ~~~ 请注意 `process.stderr` 和 `process.stdout` 在进程结束前都不会被关闭,无论是否指定选项。 #### readable.unpipe([destination]) - `destination` {[Writable](#) Stream} 可选,指定解除导流的流 该方法会移除之前调用 `pipe()` 所设定的钩子。 如果不指定目标,所有导流都会被移除。 如果指定了目标,但并没有与之建立导流,则什么事都不会发生。 ~~~ var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // 来自 readable 的所有数据都会被写入 'file.txt', // 但仅发生在第 1 秒 readable.pipe(writable); setTimeout(function() { console.log('停止写入到 file.txt'); readable.unpipe(writable); console.log('自行关闭文件流'); writable.end(); }, 1000); ~~~ #### readable.unshift(chunk) - `chunk` {Buffer | String} 要插回读取队列开头的数据块 该方法在许多场景中都很有用,比如一个流正在被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”回来源,以便流能将它传递给其它消费者。 如果您发现您需要在您的程序中频繁调用 `stream.unshift(chunk)`,请考虑实现一个 [Transform](#) 流。(详见下文面向流实现者的 API。) ~~~ // 取出以 \n\n 分割的头部并将多余部分 unshift() 回去 // callback 以 (error, header, stream) 形式调用 var StringDecoder = require('string_decoder').StringDecoder; function parseHeader(stream, callback) { stream.on('error', callback); stream.on('readable', onReadable); var decoder = new StringDecoder('utf8'); var header = ''; function onReadable() { var chunk; while (null !== (chunk = stream.read())) { var str = decoder.write(chunk); if (str.match(/\n\n/)) { // 找到头部边界 var split = str.split(/\n\n/); header += split.shift(); var remaining = split.join('\n\n'); var buf = new Buffer(remaining, 'utf8'); if (buf.length) stream.unshift(buf); stream.removeListener('error', callback); stream.removeListener('readable', onReadable); // 现在可以从流中读取消息的主体了 callback(null, header, stream); } else { // 仍在读取头部 header += str; } } } } ~~~ #### readable.wrap(stream) - `stream` {Stream} 一个“旧式”可读流 Node v0.10 版本之前的流并未实现现今所有流 API。(更多信息详见下文“兼容性”章节。) 如果您正在使用早前版本的 Node 库,它触发 `'data'` 事件并且有一个