Stream

stream 是一个抽象的接口,而且在 Node.js 中被多种对象进行了继承和扩展,比如 http.IncomingMessage 就是一个 stream。stream 是可读、可写或可读写的。所有的 stream 都是 EventEmitter 的实例。

通过 require('stream') 可以加载 Stream 的基类,提供的基类包括 Readable stream / Writable stream / Duplex stream 和 Transform stream。

本节文档主要分为三个部分:

  1. 第一部分介绍了开发者需要在开发中使用 steam 所涉及的 API
  2. 第二部分介绍了开发者创建自定义 stream 所需要的 API
  3. 第三部分深入解析了 stream 的工作机制,包括一些内部机制

stream 常用 API

Stream 可以是可读、可写或双工的(可读写)。

所有的 stream 都是 EventEmitter 的实例,但是它们也有一些独有的方法和属性。

如果 stream 是可读写的,那么它一定拥有以下所有的方法和事件。所以即使 Duplex 和 Transform stream 间存在差异,这一部分所介绍的 API 也会完整的描述它们的功能。

不要为了刻意使用 stream 而实现 Stream 接口,如果要实现自定义的 Stream,请参考第二部分的文档。

对于大多数的 Node.js 程序来说,无论多么简单都有可能会用到 Stream。下面是一个使用 stream 的示例:

  1. const http = require('http');
  2. var server = http.createServer( (req, res) => {
  3. // req is an http.IncomingMessage, which is a Readable Stream
  4. // res is an http.ServerResponse, which is a Writable Stream
  5. var body = '';
  6. // we want to get the data as utf8 strings
  7. // If you don't set an encoding, then you'll get Buffer objects
  8. req.setEncoding('utf8');
  9. // Readable streams emit 'data' events once a listener is added
  10. req.on('data', (chunk) => {
  11. body += chunk;
  12. });
  13. // the end event tells you that you have entire body
  14. req.on('end', () => {
  15. try {
  16. var data = JSON.parse(body);
  17. } catch (er) {
  18. // uh oh! bad json!
  19. res.statusCode = 400;
  20. return res.end(`error: ${er.message}`);
  21. }
  22. // write back something interesting to the user:
  23. res.write(typeof data);
  24. res.end();
  25. });
  26. });
  27. server.listen(1337);
  28. // $ curl localhost:1337 -d '{}'
  29. // object
  30. // $ curl localhost:1337 -d '"foo"'
  31. // string
  32. // $ curl localhost:1337 -d 'not json'
  33. // error: Unexpected token o

Class: stream.Duplex

Duplex stream 是同时实现了 Readable 和 Writable 接口的 stream。

Duplex stream 的示例包括:

Class: stream.Readable

Readable stream 接口是对数据源的抽象。换言之,数据由 Readable stream 产生。

Readable stream 并不主动开始发送数据,直到显式表明可以接收数据时才会发送数据(类似于惰性加载)。

Readable stream 拥有两种模式:流动模式(flowing mode)和暂停模式(paused mode)。在流动模式中,数据从操作系统底层获取并尽可能快的传递给开发者;在暂停模式中,开发者必须显式调用 stream.read() 才能获取数据。其中,默认使用暂停模式。

注意,如果没有为 data 事件设置监听器,没有设置 stream.pipe() 的输出对象,那么 stream 就会自动切换为流动模式,且数据会丢失。

开发者可以通过以下方式切换为流动模式:

  • 添加 data 事件处理器监听数据
  • 调用 stream.resume() 显式打开数据流
  • 调用 stream.pipe() 将数据发送给 Writable stream

通过以下模式可以切换为暂停模式:

  • 调用 stream.pasue() 时不传递输出对象
  • 移除 data 事件处理器且调用 stream.pasue() 时不传递输出对象

注意,为了保持向后兼容性,移除 data 事件处理器并不会自动暂停 stream。此外,如果存在输出对象,则调用 stream.pause() 并不能保证输出对象为空且要求获取数据时仍保持暂停状态。

下面是一个使用 Readable stream 的示例:

事件:’close’

当 stream 或底层资源(比如文件描述符)关闭时就会触发该事件。该事件也用于指示之后再没有其他事件会被触发,也不会再有任何计算。

并不是所有的 stream 都会触发 close 事件。

事件:’data’

  • chunk,Buffer 实例或字符串,数据块

给未显式暂停的 stream 绑定 data 事件监听器会让 stream 切换为流动模式,数据会被可能快的传送出去。

如果开发者只是想从 stream 尽快获取所有的数据,下面是最好的方式:

  1. var readable = getReadableStreamSomehow();
  2. readable.on('data', (chunk) => {
  3. console.log('got %d bytes of data', chunk.length);
  4. });

事件:’end’

当没有数据可以读取时就会触发该事件。

注意,除非所有的数据都被处理了,否则不会触发 end 事件。可以通过切换到流动模式或反复调用 stream.read() 直到结束来实现。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('data', (chunk) => {
  3. console.log('got %d bytes of data', chunk.length);
  4. });
  5. readable.on('end', () => {
  6. console.log('there will be no more data.');
  7. });

事件:’error’

  • Error 实例

如果接受到的数据存在错误就会触发该事件。

事件:’readable’

当可以读取 stream 中的数据块时,就会触发 readable 事件。

在某些情况下,如果数据还没有准备好,那么监听 readable 事件将会从系统底层读取数据并放入内部缓冲中:

  1. var readable = getReadableStreamSomehow();
  2. readable.on('readable', () => {
  3. // there is some data to read now
  4. });

一旦内部缓存为空且所有的数据准备完成时,就会再次触发 readable 事件。

唯一的异常是在流动模式中,stream 数据传送完成时不会触发 readable 事件。

readable 事件用于表示 stream 接收到了新的消息:要么是新数据可用了要么是 stream 的数据全部发送完毕了。对于前一种情况,stream.read() 将会返回新数据;对于后一种情况,stream.read() 将会返回 null。举例来说,在下面的代码中,foo.txt 是一个空文件:

  1. const fs = require('fs');
  2. var rr = fs.createReadStream('foo.txt');
  3. rr.on('readable', () => {
  4. console.log('readable:', rr.read());
  5. });
  6. rr.on('end', () => {
  7. console.log('end');
  8. });

运行后的输出结果:

  1. $ node test.js
  2. readable: null
  3. end

readable.isPaused()

  • 返回值类型:布尔值

该方法返回一个布尔值,表示 readable stream 是否是通过客户端代码(使用 stream.pause())显式暂停的。

  1. var readable = new stream.Readable
  2. readable.isPaused() // === false
  3. readable.pause()
  4. readable.isPaused() // === true
  5. readable.resume()
  6. readable.isPaused() // === false

readable.pause()

  • 返回值类型:this

该方法会让处于流动模式中的 stream 停止触发 data 事件,并切换为暂停模式。所有可用的数据都会保留在内部缓存中。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('data', (chunk) => {
  3. console.log('got %d bytes of data', chunk.length);
  4. readable.pause();
  5. console.log('there will be no more data for 1 second');
  6. setTimeout(() => {
  7. console.log('now data will start flowing again');
  8. readable.resume();
  9. }, 1000);
  10. });

readable.pipe(destination[, options])

  • destination,stream.Writable 的实例,被写入数据的目标对象
  • options,对象,包含以下属性:
    • end,布尔值,是否读取到了结束符,默认值为 true

该方法从 readable stream 拉取所有的数据,并将其写入 destination,整个过程是由系统自动管理的,所以不用担心 destination 会被 readable stream 的吞吐量压垮。

pipe() 可以接受多个 distination

  1. var readable = getReadableStreamSomehow();
  2. var writable = fs.createWriteStream('file.txt');
  3. // All the data from readable goes into 'file.txt'
  4. readable.pipe(writable);

由于该方法返回目标对象 stream,所以可以执行链式调用:

  1. var r = fs.createReadStream('file.txt');
  2. var z = zlib.createGzip();
  3. var w = fs.createWriteStream('file.txt.gz');
  4. r.pipe(z).pipe(w);

下面代码模拟了 Unix 的 cat 命令:

  1. process.stdin.pipe(process.stdout);

默认情况下当源 stream 触发了 end 事件之后,就会使用 destination 调用 stream.end(),所以 distination 将不再可写。设置 options{ end: false } 可以保持 destination stream 的开启状态。

下面的代码会让 writer 一直处于开启状态,所以 end 事件之后还可以写入数据 Goodbye

  1. reader.pipe(writer, { end: false });
  2. reader.on('end', () => {
  3. writer.end('Goodbye\n');
  4. });

注意,{ end: false }process.stderrprocess.stdout 没有效用,只有进程退出时它们才会被关闭。

readable.read([size])

  • size,数值,可选参数,用于指定读取的数据量
  • 返回值类型:字符串、Buffer 实例或 Null

read() 从内部缓存读取数据并返回该数据。如果没有可用的数据,则返回 null

如果传入了 size,则返回指定长度的数据。如果 size 长的数据不可能,除非是最后的数据块(返回剩余所有),否则返回 null。

如果未指定 size 参数,则会返回内部缓存中的所有数据。

该方法只能在暂停模式中调用,因为在流动模式中,系统会自动调用该方法直到内部缓存为空。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('readable', () => {
  3. var chunk;
  4. while (null !== (chunk = readable.read())) {
  5. console.log('got %d bytes of data', chunk.length);
  6. }
  7. });

如果该方法返回了数据块,那么它也会触发 data 事件。

注意,在 end 事件之后调用 stream.read([size]) 会返回 null,而不会抛出任何运行时错误。

readable.resume()

  • 返回值类型:this

该方法用于恢复 readable stream 的 data 事件。

该方法会将 stream 切换为流动模式。如果你不想要使用来自 stream 的数据,只想触发 end 事件,可以使用 stream.resume() 启动数据的流动模式:

  1. var readable = getReadableStreamSomehow();
  2. readable.resume();
  3. readable.on('end', () => {
  4. console.log('got to the end, but did not read anything');
  5. });

readable.setEncoding(encoding)

  • encoding,字符串,使用的编码格式
  • 返回值类型:this

该方法根据指定的编码格式返回字符串,而不是返回 Buffer 对象。举例来说,如果调用 readable.setEncoding('utf8'),则输出的数据就会被解析为 UTF-8 数据,并以字符串的形式返回。如果调用 readable.setEncoding('hex'),则会返回十六进制格式的字符串数据。

该方法可以正确处理多字节字符,但如果开发者直接获取 Buffer 数据并使用 buf.toString(encoding) 处理,则无法正确处理多字节字符。如果你想以字符串的形式读取数据,那么最好一直使用该方法。

当然,开发者也可以以 readable.setEncoding(null) 的形式调用该方法并禁用任何编码格式。该方法对于处理二进制数据或大量的多字节字符串非常有用。

  1. var readable = getReadableStreamSomehow();
  2. readable.setEncoding('utf8');
  3. readable.on('data', (chunk) => {
  4. assert.equal(typeof chunk, 'string');
  5. console.log('got %d characters of string data', chunk.length);
  6. });

readable.unpipe([destination])

  • destination,stream Writeable 实例,可选参数,解除指定 stream

该方法用于移除调用 stream.pipe() 之前的钩子方法。

如果未指定 destination,则移除所有的 pipe。

如果指定了 destination,但没有对应的 pipe,则该操作无效。

  1. var readable = getReadableStreamSomehow();
  2. var writable = fs.createWriteStream('file.txt');
  3. // All the data from readable goes into 'file.txt',
  4. // but only for the first second
  5. readable.pipe(writable);
  6. setTimeout(() => {
  7. console.log('stop writing to file.txt');
  8. readable.unpipe(writable);
  9. console.log('manually close the file stream');
  10. writable.end();
  11. }, 1000);

readable.unshift(chunk)

  • chunk,Buffer 实例或字符串,将数据块插入到读取队列

如果某个 stream 中的数据已经被解析过了,但是又需要解析前的数据,那么就可以使用该方法进行你操作将 stream 传递给第三方。

注意,stream.unshift(chunk) 不能再 end 事件之后调用,否则会抛出运行时错误。

如果开发者发现在程序中必须多次调用 stream.unshift(chunk),那么请考虑实现一个 Transform stream。

  1. // Pull off a header delimited by \n\n
  2. // use unshift() if we get too much
  3. // Call the callback with (error, header, stream)
  4. const StringDecoder = require('string_decoder').StringDecoder;
  5. function parseHeader(stream, callback) {
  6. stream.on('error', callback);
  7. stream.on('readable', onReadable);
  8. var decoder = new StringDecoder('utf8');
  9. var header = '';
  10. function onReadable() {
  11. var chunk;
  12. while (null !== (chunk = stream.read())) {
  13. var str = decoder.write(chunk);
  14. if (str.match(/\n\n/)) {
  15. // found the header boundary
  16. var split = str.split(/\n\n/);
  17. header += split.shift();
  18. var remaining = split.join('\n\n');
  19. var buf = new Buffer(remaining, 'utf8');
  20. if (buf.length)
  21. stream.unshift(buf);
  22. stream.removeListener('error', callback);
  23. stream.removeListener('readable', onReadable);
  24. // now the body of the message can be read from the stream.
  25. callback(null, header, stream);
  26. } else {
  27. // still reading the header.
  28. header += str;
  29. }
  30. }
  31. }
  32. }

注意,与 stream.push(chunk) 不同,stream.unshift(chunk) 不会通过重置 stream 的内部读取状态来中断读取继承。如果在读取期间调用 unshift(),则有可能出现未知的结果。如果在调用 unshift() 惠州立即调用 stream.push(''),将会重置读取状态,但是最好的方式还是在读取过程中不要调用 unshift()

readable.wrap(stream)

  • stream,Stream 实例,旧式的 readable stream

在 Node.js v0.10 之前,Stream 并没有实现完整的 Stream API。

如果你正在使用旧式的 Node.js 库,那么就只能使用 data 事件和 stream.pasue() 方法,通过 wrap() 方法可以创建一个使用旧式 stream 的 Readable stream。

应该尽量少用该函数,该函数存在的价值只是为了便于与旧版的 Node.js 程序和库进行交互。

  1. const OldReader = require('./old-api-module.js').OldReader;
  2. const Readable = require('stream').Readable;
  3. const oreader = new OldReader;
  4. const myReader = new Readable().wrap(oreader);
  5. myReader.on('readable', () => {
  6. myReader.read(); // etc.
  7. });

Class: stream.Transform

Transform stream 是 Duplex stream,其中输入是由输出计算而来的。它们都实现了 Readable 和 Writable 的接口。

下面是一些使用 Transform stream 的实例:

Class: stream.Writable

Writable stream 接口是对接收数据的 destination 的抽象。

下面是一些使用 writable stream 的实例:

事件:’drain’

如果 stream.write(chunk) 返回 false,drain 事件就会通知开发者何时适合向 stream 写入更多的数据。

  1. // Write the data to the supplied writable stream one million times.
  2. // Be attentive to back-pressure.
  3. function writeOneMillionTimes(writer, data, encoding, callback) {
  4. var i = 1000000;
  5. write();
  6. function write() {
  7. var ok = true;
  8. do {
  9. i -= 1;
  10. if (i === 0) {
  11. // last time!
  12. writer.write(data, encoding, callback);
  13. } else {
  14. // see if we should continue, or wait
  15. // don't pass the callback, because we're not done yet.
  16. ok = writer.write(data, encoding);
  17. }
  18. } while (i > 0 && ok);
  19. if (i > 0) {
  20. // had to stop early!
  21. // write some more once it drains
  22. writer.once('drain', write);
  23. }
  24. }
  25. }

事件:’error’

  • Error 实例

如果写入或 pipe 数据的时候出现了错误就会触发该事件。

事件:’finish’

调用 stream.end() 并且所有数据都被刷新到系统底层后,就会触发该事件。

  1. var writer = getWritableStreamSomehow();
  2. for (var i = 0; i < 100; i ++) {
  3. writer.write('hello, #${i}!\n');
  4. }
  5. writer.end('this is the end\n');
  6. writer.on('finish', () => {
  7. console.error('all writes are now complete.');
  8. });

事件:’pipe’

  • src,stream.Readable,发起 pipe 写入操作的源 stream

当 readable stream 调用 stream.pipe() 方法时就会触发该事件,并添加 writable stream 到 destination 上。

  1. var writer = getWritableStreamSomehow();
  2. var reader = getReadableStreamSomehow();
  3. writer.on('pipe', (src) => {
  4. console.error('something is piping into the writer');
  5. assert.equal(src, reader);
  6. });
  7. reader.pipe(writer);

事件:’unpipe’

  • src,Readable stream,发起 unpipe 写入操作的源 stream

当 readable stream 调用 stream.unpipe() 方法时就会触发该事件,并移除 destination 中的 writable stream。

  1. var writer = getWritableStreamSomehow();
  2. var reader = getReadableStreamSomehow();
  3. writer.on('unpipe', (src) => {
  4. console.error('something has stopped piping into the writer');
  5. assert.equal(src, reader);
  6. });
  7. reader.pipe(writer);
  8. reader.unpipe(writer);

writable.cork()

该方法强制系统缓存所有写入的数据。

调用 stream.uncork()stream.end() 时都会刷新缓存数据。

writable.end([chunk][, encoding][, callback])

  • chunk,字符串或 Buffer 实例,写入的数据
  • encoding,字符串,如果 chunk 是字符串,则该参数指定编码格式
  • callback,函数,stream 结束时执行的回调函数

当没有数据需要写入到 stream 时可以调用该方法。如果指定了 callback,则该参数会被绑定为 finish 事件的监听器。

stream.end() 之后调用 stream.write() 将会触发错误。

  1. // write 'hello, ' and then end with 'world!'
  2. var file = fs.createWriteStream('example.txt');
  3. file.write('hello, ');
  4. file.end('world!');
  5. // writing more now is not allowed!

writable.setDefaultEncoding(encodign)

  • encoding,字符串,编码格式

该方法用于设置 writable stream 的默认字符串编码格式。

writable.uncork()

该方法用于刷新调用 stream.cork() 之后缓存的所有数据。

writable.write(chunk[, coding][, callback])

  • chunk,字符串或 Buffer 实例,写入的数据
  • encoding,字符串,如果 chunk 是字符串,则该参数指定编码格式
  • callback,函数,数据块刷新后执行的回调函数
  • 返回值类型:布尔值,如果数据处理完成则返回 true

该方法用于向系统底层写入数据,并在数据完全写入后指定回调函数。如果出现了错误,将无法确定是否会执行 callback,所以为了检测错误,请监听 error 事件。

该方法的返回值用于通知开发者是否应该继续写入数据。如果数据已在内部缓存,那么就会返回 false,否则返回 true。

该返回值主要是建议性的,开发者还是可以继续写入数据,即使返回的是 false。不过,写入的数据会被缓存在内存中,所以最好不要这么做。相反,开发者可以等出发了 drain 事件之后再继续写入数据。

自定义 Stream 的 API

实现自定义 Stream 的模式:

  1. 从恰当的父类创建你需要的子类,util.inherits() 方法对此很有帮助
  2. 在子类的构造器中合理调用父类的构造器,确保内部机制初始化成功
  3. 实现一个或多个方法,如下所示

所要扩展的类和要实现的方法取决于开发者编写的 stream 的类型:

用途 实现的方法
只读 Readable _read
只写 Writable _write, _writev
读写 Duplex _read, _write, _writev
写数据读结果 Transform _transform, _flush

在开发者的实现代码里,千万不要调用第一部分的代码,否则会引起不利的副作用。

Class: stream.Duplex

Duplex stream 是可读写的 stream,比如 TCP socket 连接。

注意,stream.Duplex 是一个抽象类,也是 stream._read(size)stream._write(chunk, encoding, callback) 的底层基础。

因为 JavaScript 没有多原型继承机制,所以该类继承自 Readable,而又寄生于 Writable。从而允许开发者实现底层的 stream._read(n)stream._write(chunk, encoding, callback) 方法来扩展 Duplex 类。

new stream.Duplex(options)

  • options 是一个对象,同时传递给 Writable 和 Readable 的构造器,具有以下属性:
    • allowHalfOpen,布尔值,默认值为 true。如果值为 false,则当 writable 端停止时 readable 端的 stream 也会自动停止,反之异然
    • readableObjectMode,布尔值,默认值为 false。为 readable stream 设置 objectMode。如果 objectMode === true,则没有任何效果
    • writableObjectMode,布尔值,默认值为 false。为 writable stream 设置 objectMode。如果 objectMode === true,则没有任何效果

对于继承了 Duplex class 的类,只有正确调用构造器才能确保成功初始化缓存设置。

Class: stream.PassThrough

该类是 Transform stream 的一个实现,相对而言并不重要,只是简单的将输入的数据传送给输出。创建该类的目的主要是为了演示和测试,但也偶尔用做新型 stream 的构建基块。

Class: stream.Readable

stream.Readable 是一个可扩展的底层抽象类,常服务于 stream._read(size) 方法。

new stream.Readable([options])

  • options,对象
    • highwatermark,数值,表示内部缓存可以存储的最大字节量,默认值为 16384(16kb),对于 objectMode stream,默认值是 16
    • encoding,字符串,如果指定了该参数,则 Buffer 实例会被转换为指定格式的字符串,默认值为 null
    • objectmode,布尔值,表示 stream 的行为是否应该像是对象 stream。也就是说,stream.read(n) 返回一个单值,而不是长度为 n 的 Buffer 实例,默认值为 false
    • read,函数,stream._read() 方法的实现

对于继承了 Readable class 的类,只有正确调用构造器才能确保成功初始化缓存设置。

readable._read(size)

  • size,数值,异步读取的字节量

注意,可以实现该方法,但不要直接使用该方法。

该方法使用下划线作为前缀,这表明它是类的内部方法,只应该在 Readable 类内部使用。所有 Readable stream 的实现都应该提供一个 _read 方法从系统底层获取资源和数据。

当调用 _read() 时,如果资源数据可以使用了,则 _read() 的实现应该通过调用 this.push(dataChunk) 将数据加入到读取队列中。_read() 需要持续读取资源并将其添加到队列中,直到返回 false 则停止读取数据。

只有在数据读取停止之后再次调用 _read() 才能读取更多的数据并将数据添加到队列。

注意,一旦调用了 _read() 方法,那么只有调用 stream.push() 之后才会再次调用 _read()

size 参数只具有建议性而不具有强制性,该参数只是用来通知读取的数据量。但与具体的实现无关,比如对于 TCP 和 TLS,它们可能会忽略该参数,只要数据可用就会输出数据。举例来说,调用 stream.push(chunk) 之前也没必要等到 size 长的数据块准备完毕。

readable.push(chunk[, encoding])

  • chunk,Buffer 实例、字符串或 Null,添加到读取队列的数据块
  • encoding,字符串,字符串的编码格式,必须是一个有效的 Buffer 编码格式,比如 utf8 或 ascii
  • 返回值类型:布尔值,用于指定是否应该继续推送数据

注意,Readable 的实现者必须调用该方法,而不是由 Readable stream 的使用调用该方法。

如果传入的值不是 null,则 push() 方法将数据块推送到队列,便于随后的 stream 处理器使用。如果传入的是 null,则是向 stream 发送结束信号(EOF),之后将不会再写入数据。

当触发了 readable 事件之后,通过 stream.read() 可以拉取使用 push() 添加的数据。

该方法设计的非常灵活。举例来说,开发者可以使用该方法封装一个底层资源,包含了一些暂停和回复机制,以及一个数据回调函数:

  1. // source is an object with readStop() and readStart() methods,
  2. // and an `ondata` member that gets called when it has data, and
  3. // an `onend` member that gets called when the data is over.
  4. util.inherits(SourceWrapper, Readable);
  5. function SourceWrapper(options) {
  6. Readable.call(this, options);
  7. this._source = getLowlevelSourceObject();
  8. // Every time there's data, we push it into the internal buffer.
  9. this._source.ondata = (chunk) => {
  10. // if push() returns false, then we need to stop reading from source
  11. if (!this.push(chunk))
  12. this._source.readStop();
  13. };
  14. // When the source ends, we push the EOF-signaling `null` chunk
  15. this._source.onend = () => {
  16. this.push(null);
  17. };
  18. }
  19. // _read will be called when the stream wants to pull more data in
  20. // the advisory size argument is ignored in this case.
  21. SourceWrapper.prototype._read = function(size) {
  22. this._source.readStart();
  23. };

实例:计数 stream

这是一个基础的 Readable stream 实例,它从 1 到 1000000 递增的顺序触发数字直到结束:

  1. const Readable = require('stream').Readable;
  2. const util = require('util');
  3. util.inherits(Counter, Readable);
  4. function Counter(opt) {
  5. Readable.call(this, opt);
  6. this._max = 1000000;
  7. this._index = 1;
  8. }
  9. Counter.prototype._read = function() {
  10. var i = this._index++;
  11. if (i > this._max)
  12. this.push(null);
  13. else {
  14. var str = '' + i;
  15. var buf = new Buffer(str, 'ascii');
  16. this.push(buf);
  17. }
  18. };

实例:简单协议 v1(初始版)

该实例与 parseHeader 方法类似,但它是一个自定义的 stream。值得注意的是,该方法不会将传入的数据转换为字符串。

实际上,更好的方式是使用 Transform stream 实现该方法,详情请查看 SimpleProtocol v2

  1. // A parser for a simple data protocol.
  2. // The "header" is a JSON object, followed by 2 \n characters, and
  3. // then a message body.
  4. //
  5. // NOTE: This can be done more simply as a Transform stream!
  6. // Using Readable directly for this is sub-optimal. See the
  7. // alternative example below under the Transform section.
  8. const Readable = require('stream').Readable;
  9. const util = require('util');
  10. util.inherits(SimpleProtocol, Readable);
  11. function SimpleProtocol(source, options) {
  12. if (!(this instanceof SimpleProtocol))
  13. return new SimpleProtocol(source, options);
  14. Readable.call(this, options);
  15. this._inBody = false;
  16. this._sawFirstCr = false;
  17. // source is a readable stream, such as a socket or file
  18. this._source = source;
  19. source.on('end', () => {
  20. this.push(null);
  21. });
  22. // give it a kick whenever the source is readable
  23. // read(0) will not consume any bytes
  24. source.on('readable', () => {
  25. this.read(0);
  26. });
  27. this._rawHeader = [];
  28. this.header = null;
  29. }
  30. SimpleProtocol.prototype._read = function(n) {
  31. if (!this._inBody) {
  32. var chunk = this._source.read();
  33. // if the source doesn't have data, we don't have data yet.
  34. if (chunk === null)
  35. return this.push('');
  36. // check if the chunk has a \n\n
  37. var split = -1;
  38. for (var i = 0; i < chunk.length; i++) {
  39. if (chunk[i] === 10) { // '\n'
  40. if (this._sawFirstCr) {
  41. split = i;
  42. break;
  43. } else {
  44. this._sawFirstCr = true;
  45. }
  46. } else {
  47. this._sawFirstCr = false;
  48. }
  49. }
  50. if (split === -1) {
  51. // still waiting for the \n\n
  52. // stash the chunk, and try again.
  53. this._rawHeader.push(chunk);
  54. this.push('');
  55. } else {
  56. this._inBody = true;
  57. var h = chunk.slice(0, split);
  58. this._rawHeader.push(h);
  59. var header = Buffer.concat(this._rawHeader).toString();
  60. try {
  61. this.header = JSON.parse(header);
  62. } catch (er) {
  63. this.emit('error', new Error('invalid simple protocol data'));
  64. return;
  65. }
  66. // now, because we got some extra data, unshift the rest
  67. // back into the read queue so that our consumer will see it.
  68. var b = chunk.slice(split);
  69. this.unshift(b);
  70. // calling unshift by itself does not reset the reading state
  71. // of the stream; since we're inside _read, doing an additional
  72. // push('') will reset the state appropriately.
  73. this.push('');
  74. // and let them know that we are done parsing the header.
  75. this.emit('header', this.header);
  76. }
  77. } else {
  78. // from there on, just provide the data to our consumer.
  79. // careful not to push(null), since that would indicate EOF.
  80. var chunk = this._source.read();
  81. if (chunk) this.push(chunk);
  82. }
  83. };
  84. // Usage:
  85. // var parser = new SimpleProtocol(source);
  86. // Now parser is a readable stream that will emit 'header'
  87. // with the parsed header data.

Class: stream.Transform

Transform stream 是 Duplex stream,输入和输出具有因果关系,比如 zlib stream 和 crypto stream。

输入和输出没有数据块大小、数据块数量以及到达时间的要求。举例来说,一个哈希 stream 只会在结束时向输出发送一个单一的数据块;一个 zlib stream 则会生成比输入或大或小的输出结果。

Transform class 必须实现 stream._transform() 方法,可以选择性地实现 stream._flush() 方法。

new stream.Transform([options])

  • options,对象,将会传递给 Writable 和 Readable class 的构造器,包含以下属性:
    • transform,函数,是 stream._transform() 的实现
    • flush,函数,是 stream._flush() 的实现

对于继承了 Transform class 的类,只有正确调用构造器才能确保成功初始化缓存设置。

事件:’finish’ 和 ‘end’

finishend 事件分别来自于 Writable 和 Readable class。调用 stream.read() 并使用 stream._transform() 方法处理完所有的数据块之后就会触发 finish() 事件;stream._flush() 调用完内部的回调函数并输出完所有的数据之后触发 end 事件。

transform._flush(callback)

  • callback,函数,当开发者刷新完所有的剩余数据之后执行该回调函数

注意,一定不要直接调用该函数。可以在子类中实现该方法,且只允许 Transform class 的内部方法调用它。

在某些情况下爱,transform 操作需要在 stream 的最后触发额外的数据。举例来说,一个 zlib 压缩 stream 会存储一些优化压缩结果的内部状态。

在这些情况下,开发者可以实现一个 _flush() 方法,该方法会在所有写入的数据被处理之后、触发 end 事件结束 readable stream 之前被调用。与 stream._transform() 类似,当刷新操作完成之后,会调用 transform.push(chunk) 零次或多次,最后调用 callback

该方法名使用了下划线的前缀,表示它是类的内部方法,不应该被开发者的程序直接调用,而是希望开发者在自定义的扩展类中重写该方法。

transform._transform(chunk, encoding, callback)

  • chunk,Buffer 实例或字符串,用于传输的数据块。除非 decodeStrigns === false,否则该参数都是 Buffer 实例
  • encoding,字符串,如果 chunk 是一个字符串,则该参数指定字符串的编码格式。如果 chunk 是一个 Buffer 实例,则该参数是一个特殊值 “buffer”,在这种情况下请忽略该值。
  • callback,函数,当处理完输入的数据块之后将会执行该回调函数

注意,一定不要直接调用该函数。可以在子类中实现该该方法,且只允许 Transform class 的内部方法调用它。

所有的 Transform stream 实现都必须提供一个 _transform() 方法接收输入并生成输出数据。

_transform 可以处理 Transform class 中规定的任何事情,比如处理写入的字节、将它们传给 readable stream、处理异步 I/O等任务。

调用 transform.push(outputChunk) 根据输入生成输出数据的次数取决于开发者想要输出的数据量。

之后当前数据块被完全处理之后才可以调用回调函数。注意,输入块或许有也或许没有对应的输出块。如果给回调函数设置了第二个参数,则该参数会被传递给 push 方法。换言之,以下代码相等:

  1. transform.prototype._transform = function (data, encoding, callback) {
  2. this.push(data);
  3. callback();
  4. };
  5. transform.prototype._transform = function (data, encoding, callback) {
  6. callback(null, data);
  7. };

该方法名使用了下划线的前缀,表示它是类的内部方法,不应该被开发者的程序直接调用,而是希望开发者在自定义的扩展类中重写该方法。

实例:SimpleProtocol 解析器 v2

上面的简单协议解析器可以使用高阶的 Transform stream class 来实现,处理方式与 parseHeaderSimpleProtocol v1 类似。

在下面的代码中,并没有将输入作为参数,而是将其 pipe 进了解析器,这种方案更符合 Node.js stream 的使用习惯:

  1. const util = require('util');
  2. const Transform = require('stream').Transform;
  3. util.inherits(SimpleProtocol, Transform);
  4. function SimpleProtocol(options) {
  5. if (!(this instanceof SimpleProtocol))
  6. return new SimpleProtocol(options);
  7. Transform.call(this, options);
  8. this._inBody = false;
  9. this._sawFirstCr = false;
  10. this._rawHeader = [];
  11. this.header = null;
  12. }
  13. SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  14. if (!this._inBody) {
  15. // check if the chunk has a \n\n
  16. var split = -1;
  17. for (var i = 0; i < chunk.length; i++) {
  18. if (chunk[i] === 10) { // '\n'
  19. if (this._sawFirstCr) {
  20. split = i;
  21. break;
  22. } else {
  23. this._sawFirstCr = true;
  24. }
  25. } else {
  26. this._sawFirstCr = false;
  27. }
  28. }
  29. if (split === -1) {
  30. // still waiting for the \n\n
  31. // stash the chunk, and try again.
  32. this._rawHeader.push(chunk);
  33. } else {
  34. this._inBody = true;
  35. var h = chunk.slice(0, split);
  36. this._rawHeader.push(h);
  37. var header = Buffer.concat(this._rawHeader).toString();
  38. try {
  39. this.header = JSON.parse(header);
  40. } catch (er) {
  41. this.emit('error', new Error('invalid simple protocol data'));
  42. return;
  43. }
  44. // and let them know that we are done parsing the header.
  45. this.emit('header', this.header);
  46. // now, because we got some extra data, emit this first.
  47. this.push(chunk.slice(split));
  48. }
  49. } else {
  50. // from there on, just provide the data to our consumer as-is.
  51. this.push(chunk);
  52. }
  53. done();
  54. };
  55. // Usage:
  56. // var parser = new SimpleProtocol();
  57. // source.pipe(parser)
  58. // Now parser is a readable stream that will emit 'header'
  59. // with the parsed header data.

Class: stream.Writable

stream.Writable 是一个可扩展的抽象类,可用于 stream._write(chunk, encoding, callback) 等方法的底层实现。

new stream.Writable([options])

  • options,对象
    • highwatermark,数值,当 stream.write() 开始返回 false 时的缓存级别,默认值是 16384(16kb),对于 objectMode stream,默认值是 16
    • decodeString,布尔值,该参数决定是否在讲字符串传递给 stream._write() 之前将其转换为 Buffer,默认值为 true
    • objectmode,布尔值,决定 stream.write(anyObj) 是否是一个有效操作。如果值为 true,则可以写入任意类型的数据,而不只是 Buffer 和字符串数据,默认值为 false
    • write,函数,stream._write() 的实现
    • writev,函数,stream._writev() 的实现

对于继承了 Writable class 的类,只有正确调用构造器才能确保成功初始化缓存设置。

writable._write(chunk, encoding, callback)

  • chunk,Buffer 实例或字符串,写入的数据块。除非 decodeStrings === false,否则该参数只能是 Buffer 实例
  • encoding,字符串,如果 chunk 是一个字符串,则该参数指定字符串的编码格式。如果 chunk 是一个 Buffer 实例,则该参数是一个特殊值 “buffer”,在这种情况下请忽略该值。
  • callback,函数,当处理完输入的数据块之后将会执行该回调函数

注意,一定不能直接调用该方法。可以在子类中实现该方法,且只允许 Writable class 的内部方法调用它。

调用 callback(err) 用于通知系统数据写入完成或者出现了错误。

如果在构造器中设置了 decodeStrings 选项,那么 chunk 就只能是字符串而不能是 Buffer 实例,encoding 参数用于表示字符串的编码格式。这种实现是为了优化某些字符串的处理。如果没有显式设置 decodeStrings === false,那么系统会忽略 encoding 参数,并假设 chunk 是一个 Buffer 实例。

该方法名使用了下划线的前缀,表示它是类的内部方法,不应该被开发者的程序直接调用,而是希望开发者在自定义的扩展类中重写该方法。

writable._writev(chunks, callback)

  • chunk,数组,写入的数据。每一个数据块都遵循如下格式:{ chunk: ..., encoding: ... }
  • callback,函数,当处理完数据块之后将会执行该回调函数

注意,一定不能直接调用该方法。可以在子类中实现该方法,且只允许 Writable class 的内部方法调用它

该方法名使用了下划线的前缀,表示它是类的内部方法,不应该被开发者的程序直接调用,而是希望开发者在自定义的扩展类中重写该方法。

简化构造器 API

在某些简单的情况下,不通过继承创建 stream 也大有用处。

通过向构造器传递恰当的方法即可实现这一目标。

Duplex

  1. var duplex = new stream.Duplex({
  2. read: function(n) {
  3. // sets this._read under the hood
  4. // push data onto the read queue, passing null
  5. // will signal the end of the stream (EOF)
  6. this.push(chunk);
  7. },
  8. write: function(chunk, encoding, next) {
  9. // sets this._write under the hood
  10. // An optional error can be passed as the first argument
  11. next()
  12. }
  13. });
  14. // or
  15. var duplex = new stream.Duplex({
  16. read: function(n) {
  17. // sets this._read under the hood
  18. // push data onto the read queue, passing null
  19. // will signal the end of the stream (EOF)
  20. this.push(chunk);
  21. },
  22. writev: function(chunks, next) {
  23. // sets this._writev under the hood
  24. // An optional error can be passed as the first argument
  25. next()
  26. }
  27. });

Readable

  1. var readable = new stream.Readable({
  2. read: function(n) {
  3. // sets this._read under the hood
  4. // push data onto the read queue, passing null
  5. // will signal the end of the stream (EOF)
  6. this.push(chunk);
  7. }
  8. });

Transform

  1. var transform = new stream.Transform({
  2. transform: function(chunk, encoding, next) {
  3. // sets this._transform under the hood
  4. // generate output as many times as needed
  5. // this.push(chunk);
  6. // call when the current chunk is consumed
  7. next();
  8. },
  9. flush: function(done) {
  10. // sets this._flush under the hood
  11. // generate output as many times as needed
  12. // this.push(chunk);
  13. done();
  14. }
  15. });

Writable

  1. var writable = new stream.Writable({
  2. write: function(chunk, encoding, next) {
  3. // sets this._write under the hood
  4. // An optional error can be passed as the first argument
  5. next()
  6. }
  7. });
  8. // or
  9. var writable = new stream.Writable({
  10. writev: function(chunks, next) {
  11. // sets this._writev under the hood
  12. // An optional error can be passed as the first argument
  13. next()
  14. }
  15. });

Stream: 内部玄机

缓存

Writable 和 Readable stream 都会在对象内部缓存数据,该对象可以通过 _writableState.getBuffer()_readableState.buffer 获取。

缓存的总量取决于构造器中接收的 highWaterMark 配置信息。

调用 stream.push(chunk) 可以将数据缓存到 Readable stream 中。如果数据没有经过 stream.read() 处理,就会一直待在内部队列,直到被拉去和处理。

当开发者反复 stream.write(chunk) 时就会将数据缓存到 Writable stream 中,直到 stream.write(chunk) 返回 false

设计 stream 的初衷,尤其是对于 stream.pipe() 方法,是为了在可控范围内限制数据的缓存量,所以即使输入资源和输出的目标对象之间的速度存在差异,都不会过度影响内存的使用。

兼容性

在 Node.js v0.10 之前的版本,Readable stream 的接口非常简单,功能贫乏,实用性不强。

  • 在老版本中,系统不会等待开发者调用 stream.read() 方法,而是触发 data 事件。如果开发者要决定如何处理数据,那么就需要手动缓存数据块
  • 在老版本中,stream.pause() 只是建议性的方法,而不是绝对有效的。这也就是说,即使 stream 处于暂停状态,开发者仍有可能接收到 data 事件。

在 Node.js v0.10,添加了 Readable。为了保持向后兼容性,当添加了 data 事件处理器之后或调用了 stream.resume() 之后,Readable stream 就会切换为流动模式。这么做的好处是,即使你没有使用 stream.read()readable 事件,也无需担心会丢失数据块。

虽然大多数的程序都会正常运行,但还是有必要介绍一些边缘用例:

  • 没有添加任何 data 事件处理器
  • 从没调用过 stream.resume() 方法
  • stream 没有 pipe 到任何 writable destination

举例来说,思考一下下面的代码:

  1. // WARNING! BROKEN!
  2. net.createServer((socket) => {
  3. // we add an 'end' method, but never consume the data
  4. socket.on('end', () => {
  5. // It will never get here.
  6. socket.end('I got your message (but didnt read it)\n');
  7. });
  8. }).listen(1337);

在 Node.js v0.10 之前,新传入的消息数据会被直接丢弃。不过从 Node.js v0.10 之后,socket 会保持暂停状态。

这种情况下的变通方法就是使用 stream.resume() 启动数据流:

  1. // Workaround
  2. net.createServer((socket) => {
  3. socket.on('end', () => {
  4. socket.end('I got your message (but didnt read it)\n');
  5. });
  6. // start the flow of data, discarding it.
  7. socket.resume();
  8. }).listen(1337);

为了让新的 Readable stream 切换到流动模式,在 v0.10 之前 stream 可以通过 stream.wrap() 包装成 Readable stream。

Object Mode

通常来说,stream 只适用于对字符串和 Buffer 实例的操作。

但处于 object mode 的 stream 则可以处理任何 JavaScript 值。

在 object mode 中,不论调用 stream.read(size)size 是多少,Readable stream 都会返回一个单元素。

在 object mode 中,不论调用 stream.write(data, encoding)encoding 是什么,Writable stream 都会忽略该参数。

特殊值 null 在 object mode 中仍然保持了特殊性。也就是说,对于 object mode 中的 Readable stream,如果 stream.read() 返回了 null,表示没有数据了;如果调用 stream.push(null),表示 stream 的数据推送结束了。

Node.js 的核心 stream 没有一个是 object mode stream。object mode 只存在于用户的 stream 库中。

开发者应该在 stream 子类的构造器中设置 objectMode 配置信息,在其他地方设置则不安全。

对于 Duplex stream 的 objectMode 参数,可以通过 readableObjectModewritableObjectMode 设置为 readable 或 writable。这些选项可以通过 Transform stream 来实现解析器和序列化器。

  1. const util = require('util');
  2. const StringDecoder = require('string_decoder').StringDecoder;
  3. const Transform = require('stream').Transform;
  4. util.inherits(JSONParseStream, Transform);
  5. // Gets \n-delimited JSON string data, and emits the parsed objects
  6. function JSONParseStream() {
  7. if (!(this instanceof JSONParseStream))
  8. return new JSONParseStream();
  9. Transform.call(this, { readableObjectMode : true });
  10. this._buffer = '';
  11. this._decoder = new StringDecoder('utf8');
  12. }
  13. JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
  14. this._buffer += this._decoder.write(chunk);
  15. // split on newlines
  16. var lines = this._buffer.split(/\r?\n/);
  17. // keep the last partial line buffered
  18. this._buffer = lines.pop();
  19. for (var l = 0; l < lines.length; l++) {
  20. var line = lines[l];
  21. try {
  22. var obj = JSON.parse(line);
  23. } catch (er) {
  24. this.emit('error', er);
  25. return;
  26. }
  27. // push the parsed object out to the readable consumer
  28. this.push(obj);
  29. }
  30. cb();
  31. };
  32. JSONParseStream.prototype._flush = function(cb) {
  33. // Just handle any leftover
  34. var rem = this._buffer.trim();
  35. if (rem) {
  36. try {
  37. var obj = JSON.parse(rem);
  38. } catch (er) {
  39. this.emit('error', er);
  40. return;
  41. }
  42. // push the parsed object out to the readable consumer
  43. this.push(obj);
  44. }
  45. cb();
  46. };

stream.read(0)

在某些情况下,开发者只想刷新底层的 readable stream 机制,而不处理任何数据,那么就可以使用 stream.read(0),该方法返回 null。

如果内部的 Buffer 数据长度小于 highWaterMark,且尚未被 stream 读取,那么调用 stream.read(0) 就调用发底层的 stream._read()

一般来说没有调用该方法的必要。不过,开发者可能会发现在 Node.js 内部有这样的调用,特别是 Readable stream 的内部。

stream.push(‘’)

推送一个零字节的字符串或 Buffer 实例数据会发生一些有趣的现象。因为调用了 stream.push(),所以会终止 reading 进程。不过,实际上并没有向 readable buffer 添加任何数据,也就不需要开发者处理任何数据了。

虽然现在很少有空数据的情况,但是通过调用 stream.read(0) 可以检查是否有任务在处理你的 stream。出于这种目的,你还是有可能调用 stream.push('') 的。

到目前为止,只在 tls.CryptoStream 类中使用过该手段,但该方法在 Node.js/io.js v1.0 已被抛弃了。如果你必须使用 stream.push('') 方法,请先思考是否有其他处理方式,因为这种做法会被视为发生了极其严重的错误。