# 流
~~~
稳定度: 2 - 不稳定
~~~
流是一个抽象接口,被 Node 中的很多对象所实现。比如[对一个 HTTP 服务器的请求](#)是一个流,[stdout](#) 也是一个流。流是可读、可写或兼具两者的。所有流都是 [EventEmitter](#) 的实例。
您可以通过 `require('stream')` 加载 Stream 基类,其中包括了 [Readable](#) 流、[Writable](#) 流、[Duplex](#) 流和 [Transform](#) 流的基类。
本文档分为三个章节。第一章节解释了您在您的程序中使用流时需要了解的那部分 API,如果您不打算自己实现一个流式 API,您可以只阅读这一章节。
第二章节解释了当您自己实现一个流时需要用到的那部分 API,这些 API 是为了方便您这么做而设计的。
第三章节深入讲解了流的工作方式,包括一些内部机制和函数,除非您明确知道您在做什么,否则尽量不要改动它们。
### 面向流消费者的 API
流可以是可读([Readable](#))或可写([Writable](#)),或者兼具两者([Duplex](#),双工)的。
所有流都是 EventEmitter,但它们也具有其它自定义方法和属性,取决于它们是 Readable、Writable 或 Duplex。
如果一个流既可读(Readable)也可写(Writable),则它实现了下文所述的所有方法和事件。因此,这些 API 同时也涵盖了 [Duplex](#) 或 [Transform](#) 流,即便它们的实现可能有点不同。
为了消费流而在您的程序中自己实现 Stream 接口是没有必要的。如果您**确实**正在您自己的程序中实现流式接口,请同时参考下文[面向流实现者的 API](#)。
几乎所有 Node 程序,无论多简单,都在某种途径用到了流。这里有一个使用流的 Node 程序的例子:
~~~
var http = require('http');
<!-- endsection -->
<!-- section:5dd53fb86ef5aa2fb0a6e831e46cc135 -->
var server = http.createServer(function (req, res) {
// req 为 http.IncomingMessage,是一个可读流(Readable Stream)
// res 为 http.ServerResponse,是一个可写流(Writable Stream)
<!-- endsection -->
<!-- section:fd5e086becb475ded97300c6e8b1f889 -->
var body = '';
// 我们打算以 UTF-8 字符串的形式获取数据
// 如果您不设置编码,您将得到一个 Buffer 对象
req.setEncoding('utf8');
<!-- endsection -->
<!-- section:bb5a4bf69e5c71de2331fe85918ed96b -->
// 一旦监听器被添加,可读流会触发 'data' 事件
req.on('data', function (chunk) {
body += chunk;
})
<!-- endsection -->
<!-- section:5768f3afd395c860ba272f79026a6799 -->
// 'end' 事件表明您已经得到了完整的 body
req.on('end', function () {
try {
var data = JSON.parse(body);
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('错误: ' + er.message);
}
<!-- endsection -->
<!-- section:812496c72ef4682c63a7ba8837f9610a -->
// 向用户回写一些有趣的信息
res.write(typeof data);
res.end();
})
})
<!-- endsection -->
<!-- section:3bbc30d951532659ecc70a505ea1e985 -->
server.listen(1337);
<!-- endsection -->
<!-- section:f0dea661693acf21ed203ec804a4f05a -->
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// 错误: Unexpected token o
~~~
### 类: stream.Readable
Readable(可读)流接口是对您正在读取的数据的*来源*的抽象。换言之,数据*出自*一个 Readable 流。
在您表明您就绪接收之前,Readable 流并不会开始发生数据。
Readable 流有两种“模式”:**流动模式**和**暂停模式**。当处于流动模式时,数据由底层系统读出,并尽可能快地提供给您的程序;当处于暂停模式时,您必须明确地调用 `stream.read()` 来取出若干数据块。流默认处于暂停模式。
**注意**:如果没有绑定 data 事件处理器,并且没有 [`pipe()`](#) 目标,同时流被切换到流动模式,那么数据会流失。
您可以通过下面几种做法切换到流动模式:
- 添加一个 [`'data'` 事件](#)处理器来监听数据。
- 调用 [`resume()`](#) 方法来明确开启数据流。
- 调用 [`pipe()`](#) 方法将数据发送到一个 [Writable](#)。
您可以通过下面其中一种做法切换回暂停模式:
- 如果没有导流目标,调用 [`pause()`](#) 方法。
- 如果有导流目标,移除所有 [`'data'` 事件][] 处理器、调用 [`unpipe()`](#) 方法移除所有导流目标。
请注意,为了向后兼容考虑,移除 `'data'` 事件监听器并**不会**自动暂停流。同样的,当有导流目标时,调用 `pause()` 并不能保证流在那些目标排空并请求更多数据时*维持*暂停状态。
一些可读流的例子:
- [客户端上的 HTTP 响应](#)
- [服务器上的 HTTP 请求](#)
- [fs 读取流](#)
- [zlib 流](#)
- [crypto 流](#)
- [TCP 嵌套字](#)
- [子进程的 stdout 和 stderr](#)
- [process.stdin](#)
#### 事件: 'readable'
当一个数据块可以从流中被读出时,它会触发一个 `'readable'` 事件。
在某些情况下,假如未准备好,监听一个 `'readable'` 事件会使得一些数据从底层系统被读出到内部缓冲区中。
~~~
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
// 现在有数据可以读了
})
~~~
当内部缓冲区被排空后,一旦更多数据时,一个 `readable` 事件会被再次触发。
#### 事件: 'data'
- `chunk` {Buffer | String} 数据块。
绑定一个 `data` 事件监听器到一个未被明确暂停的流会将流切换到流动模式,数据会被尽可能地传递。
如果您想从流尽快取出所有数据,这是最理想的方式。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('得到了 %d 字节的数据', chunk.length);
})
~~~
#### 事件: 'end'
该事件会在没有更多数据能够提供时被触发。
请注意,`end` 事件在数据被完全消费之前**不会被触发**。这可通过切换到流动模式,或者在到达末端前不断调用 `read()` 来实现。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('得到了 %d 字节的数据', chunk.length);
})
readable.on('end', function() {
console.log('读取完毕。');
});
~~~
#### 事件: 'close'
当底层数据源(比如,源头的文件描述符)被关闭时触发。并不是所有流都会触发这个事件。
#### 事件: 'error'
当数据接收时发生错误时触发。
#### readable.read([size])
- `size` {Number} 可选参数,指定要读取多少数据。
- 返回 {String | Buffer | null}
`read()` 方法从内部缓冲区中拉取并返回若干数据。当没有更多数据可用时,它会返回 `null`。
若您传入了一个 `size` 参数,那么它会返回相当字节的数据;当 `size` 字节不可用时,它则返回 `null`。
若您没有指定 `size` 参数,那么它会返回内部缓冲区中的所有数据。
该方法仅应在暂停模式时被调用。在流动模式中,该方法会被自动调用直到内部缓冲区排空。
~~~
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('得到了 %d 字节的数据', chunk.length);
}
});
~~~
当该方法返回了一个数据块,它同时也会触发 [`'data'` 事件](#)。
#### readable.setEncoding(encoding)
- `encoding` {String} 要使用的编码。
- 返回: `this`
调用此函数会使得流返回指定编码的字符串而不是 Buffer 对象。比如,当您 `readable.setEncoding('utf8')`,那么输出数据会被作为 UTF-8 数据解析,并以字符串返回。如果您 `readable.setEncoding('hex')`,那么数据会被编码成十六进制字符串格式。
该方法能正确处理多字节字符。假如您不这么做,仅仅直接取出 Buffer 并对它们调用 `buf.toString(encoding)`,很可能会导致字节错位。因此如果您打算以字符串读取数据,请总是使用这个方法。
~~~
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('得到了 %d 个字符的字符串数据', chunk.length);
})
~~~
#### readable.resume()
- 返回: `this`
该方法让可读流继续触发 `data` 事件。
该方法会将流切换到流动模式。如果您*不想*从流中消费数据,但您*想*得到它的 `end` 事件,您可以调用 [`readable.resume()`](#) 来启动数据流。
~~~
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
console.log('到达末端,但并未读取任何东西');
})
~~~
#### readable.pause()
- 返回: `this`
该方法会使一个处于流动模式的流停止触发 `data` 事件,切换到非流动模式,并让后续可用数据留在内部缓冲区中。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('取得 %d 字节数据', chunk.length);
readable.pause();
console.log('接下来 1 秒内不会有数据');
setTimeout(function() {
console.log('现在数据会再次开始流动');
readable.resume();
}, 1000);
})
~~~
#### readable.pipe(destination, [options])
- `destination` {[Writable](#) Stream} 写入数据的目标
- `options` {Object} 导流选项
- `end` {Boolean} 在读取者结束时结束写入者。缺省为 `true`
该方法从可读流中拉取所有数据,并写入到所提供的目标。该方法能自动控制流量以避免目标被快速读取的可读流所淹没。
可以导流到多个目标。
~~~
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 所有来自 readable 的数据会被写入到 'file.txt'
readable.pipe(writable);
~~~
该函数返回目标流,因此您可以建立导流链:
~~~
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
~~~
例如,模拟 Unix 的 `cat` 命令:
~~~
process.stdin.pipe(process.stdout);
~~~
缺省情况下当来源流触发 `end` 时目标的 [`end()`](#) 会被调用,所以此时 `destination` 不再可写。传入 `{ end: false }` 作为 `options` 可以让目标流保持开启状态。
这将让 `writer` 保持开启,因此最后可以写入 "Goodbye"。
~~~
reader.pipe(writer, { end: false });
reader.on('end', function() {
writer.end('Goodbye\n');
});
~~~
请注意 `process.stderr` 和 `process.stdout` 在进程结束前都不会被关闭,无论是否指定选项。
#### readable.unpipe([destination])
- `destination` {[Writable](#) Stream} 可选,指定解除导流的流
该方法会移除之前调用 `pipe()` 所设定的钩子。
如果不指定目标,所有导流都会被移除。
如果指定了目标,但并没有与之建立导流,则什么事都不会发生。
~~~
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 来自 readable 的所有数据都会被写入 'file.txt',
// 但仅发生在第 1 秒
readable.pipe(writable);
setTimeout(function() {
console.log('停止写入到 file.txt');
readable.unpipe(writable);
console.log('自行关闭文件流');
writable.end();
}, 1000);
~~~
#### readable.unshift(chunk)
- `chunk` {Buffer | String} 要插回读取队列开头的数据块
该方法在许多场景中都很有用,比如一个流正在被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”回来源,以便流能将它传递给其它消费者。
如果您发现您需要在您的程序中频繁调用 `stream.unshift(chunk)`,请考虑实现一个 [Transform](#) 流。(详见下文面向流实现者的 API。)
~~~
// 取出以 \n\n 分割的头部并将多余部分 unshift() 回去
// callback 以 (error, header, stream) 形式调用
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
var decoder = new StringDecoder('utf8');
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// 找到头部边界
var split = str.split(/\n\n/);
header += split.shift();
var remaining = split.join('\n\n');
var buf = new Buffer(remaining, 'utf8');
if (buf.length)
stream.unshift(buf);
stream.removeListener('error', callback);
stream.removeListener('readable', onReadable);
// 现在可以从流中读取消息的主体了
callback(null, header, stream);
} else {
// 仍在读取头部
header += str;
}
}
}
}
~~~
#### readable.wrap(stream)
- `stream` {Stream} 一个“旧式”可读流
Node v0.10 版本之前的流并未实现现今所有流 API。(更多信息详见下文“兼容性”章节。)
如果您正在使用早前版本的 Node 库,它触发 `'data'` 事件并且有一个