面向流实现者的API


无论实现任何形式的流,模式都是一样的:

  1. 在你的子类中扩展合适的父类。(util.inherits() 方法对此很有帮助。)

  2. 在你的构造函数中调用合适的父类的构造函数,以确保内部机制设置正确。

  3. 实现一个或多个特定的方法,如下详述。

类扩充和实现方法取决于你要编写哪种流类:

使用情景 要实现的方法
只读 Readable _read
只写 Writable _write, _writev
读写 Duplex _read, _write, _writev
操作写入的数据,然后读取结果 Transform _transform, _flush

在你的实现代码中,需要强调的是绝对不要调用面向流消费者的 API 中描述的方法。否则,可能会导致在消费你的流接口的过程中产生不良副作用。

stream.Readable类

stream.Readable 是一个被设计为拓展底层实现 stream._read(size) 方法的抽象类。

如何在你的程序中消费流,请参阅面向流消费者的 API 。下文解释了如何在你的程序中实现可读流。

new stream.Readable([options])

  • options {Object}

    • highWaterMark {Number} 停止从底层资源读取前能够存储在内部缓冲区的最大字节数。默认:16384(16kb)或 objectMode 流中的 16

    • encoding {String} 如果指定,则缓冲区将使用指定的编码字符串解码。默认:null

    • objectMode {Boolean} 此流是否应该表现为对象流。意味着 stream.read(n) 返回单个值用于替代一个 n 大小的 Buffer 。默认:false

    • read {Function} 实现 stream._read() 的方法

在拓展自可读(Readable)类的类中,请确保调用 Readable 构造函数,以便缓冲设置可以被正确地初始化。

readable._read(size)

  • size {Number} 异步读取的字节数

注意:请实现这个方法,但不要直接调用它

这是一个带有下划线前缀的方法,因为它是在类内部定义的,应该仅被 Readable 类内部的方法的调用。所有的 Readable 流实现都必须提供一个 _read 方法用于从底层资源获取数据。

当调用 _read() 时,如果从资源获取的数据可用,_read() 的实现应该通过调用 this.push(dataChunk) 开始将数据推入到读取队列中。_read() 应该继续从资源处读取并推送数据知道推送返回 false,此时应停止从资源处读取。仅当 _read() 在停止后被再次调用时,它应该开始从资源处读取更多的数据,并将数据推送到队列中。

注意:一旦调用 _read() 方法,直到 stream.push() 方法被调用前将不能被再次调用。

size 参数仅做参考。实现中 “read” 是一个单一的回调,返回的数据可以用这个来知道有多少数据获取。实现中不相关的,如 TCP 或 TLS ,可能会忽略此参数,并且简单的提供可用的数据。没有必要在调用 stream.push(chunk) 前,比如 “wait” 直到 size 字节的数据可用。

readable.push(chunk[, encoding])

  • chunk {Buffer} | {Null} | {String} 推入读队列的数据块

  • encoding {String} 字符串块的编码。必须是一个有效的 Buffer 编码,比如 'utf8''ascii'

  • 返回 {Boolean} 是否应该执行更多的推入

注意:该方法应该在 Readable 流的实现者中调用,而不是 Readable 流的消费者

如果传了一个非 null 值,push() 方法会将数据块放入到流处理器随后消费的队列里。如果传了 null ,它标志着已到达流的末尾(EOF),之后没有更多的数据可以被写入。

当触发 ‘readable’ 时间时,用 push() 添加的数据可以通过调用 stream.read() 方法拉取。

这个 API 被设计成尽可能地灵活。例如,你可以包裹有某种暂停/恢复机制的低级资源和一个数据回调。在这种情况下,你可以通过这样做来包裹低级资源对象:

  1. // source is an object with readStop() and readStart() methods,
  2. // and an `ondata` member that gets called when it has data, and
  3. // an `onend` member that gets called when the data is over.
  4. util.inherits(SourceWrapper, Readable);
  5. function SourceWrapper(options) {
  6. Readable.call(this, options);
  7. this._source = getLowlevelSourceObject();
  8. // Every time there's data, we push it into the internal buffer.
  9. this._source.ondata = (chunk) => {
  10. // if push() returns false, then we need to stop reading from source
  11. if (!this.push(chunk))
  12. this._source.readStop();
  13. };
  14. // When the source ends, we push the EOF-signaling `null` chunk
  15. this._source.onend = () => {
  16. this.push(null);
  17. };
  18. }
  19. // _read will be called when the stream wants to pull more data in
  20. // the advisory size argument is ignored in this case.
  21. SourceWrapper.prototype._read = function (size) {
  22. this._source.readStart();
  23. };

例子:一种计数流

这是一个可读(Readable)流的基本示例。它触发从 1 到 1,000,000 的升序操作,然后结束。

  1. const Readable = require('stream').Readable;
  2. const util = require('util');
  3. util.inherits(Counter, Readable);
  4. function Counter(opt) {
  5. Readable.call(this, opt);
  6. this._max = 1000000;
  7. this._index = 1;
  8. }
  9. Counter.prototype._read = function () {
  10. var i = this._index++;
  11. if (i > this._max)
  12. this.push(null);
  13. else {
  14. var str = '' + i;
  15. var buf = new Buffer(str, 'ascii');
  16. this.push(buf);
  17. }
  18. };

例子:简单的协议 V1(次优)

这类似于此处所描述的 parseHeader 函数,但作为一个自定义的流来实现。另请注意,这种实现不会将输入的数据转换为字符串。

而然,最好通过转换(Transform)流来实现。参阅简单的协议分析器 V2,一个更好的实现。

  1. // A parser for a simple data protocol.
  2. // The "header" is a JSON object, followed by 2 \n characters, and
  3. // then a message body.
  4. //
  5. // NOTE: This can be done more simply as a Transform stream!
  6. // Using Readable directly for this is sub-optimal. See the
  7. // alternative example below under the Transform section.
  8. const Readable = require('stream').Readable;
  9. const util = require('util');
  10. util.inherits(SimpleProtocol, Readable);
  11. function SimpleProtocol(source, options) {
  12. if (!(this instanceof SimpleProtocol))
  13. return new SimpleProtocol(source, options);
  14. Readable.call(this, options);
  15. this._inBody = false;
  16. this._sawFirstCr = false;
  17. // source is a readable stream, such as a socket or file
  18. this._source = source;
  19. source.on('end', () => {
  20. this.push(null);
  21. });
  22. // give it a kick whenever the source is readable
  23. // read(0) will not consume any bytes
  24. source.on('readable', () => {
  25. this.read(0);
  26. });
  27. this._rawHeader = [];
  28. this.header = null;
  29. }
  30. SimpleProtocol.prototype._read = function (n) {
  31. if (!this._inBody) {
  32. var chunk = this._source.read();
  33. // if the source doesn't have data, we don't have data yet.
  34. if (chunk === null)
  35. return this.push('');
  36. // check if the chunk has a \n\n
  37. var split = -1;
  38. for (var i = 0; i < chunk.length; i++) {
  39. if (chunk[i] === 10) { // '\n'
  40. if (this._sawFirstCr) {
  41. split = i;
  42. break;
  43. } else {
  44. this._sawFirstCr = true;
  45. }
  46. } else {
  47. this._sawFirstCr = false;
  48. }
  49. }
  50. if (split === -1) {
  51. // still waiting for the \n\n
  52. // stash the chunk, and try again.
  53. this._rawHeader.push(chunk);
  54. this.push('');
  55. } else {
  56. this._inBody = true;
  57. var h = chunk.slice(0, split);
  58. this._rawHeader.push(h);
  59. var header = Buffer.concat(this._rawHeader).toString();
  60. try {
  61. this.header = JSON.parse(header);
  62. } catch (er) {
  63. this.emit('error', new Error('invalid simple protocol data'));
  64. return;
  65. }
  66. // now, because we got some extra data, unshift the rest
  67. // back into the read queue so that our consumer will see it.
  68. var b = chunk.slice(split);
  69. this.unshift(b);
  70. // calling unshift by itself does not reset the reading state
  71. // of the stream; since we're inside _read, doing an additional
  72. // push('') will reset the state appropriately.
  73. this.push('');
  74. // and let them know that we are done parsing the header.
  75. this.emit('header', this.header);
  76. }
  77. } else {
  78. // from there on, just provide the data to our consumer.
  79. // careful not to push(null), since that would indicate EOF.
  80. var chunk = this._source.read();
  81. if (chunk) this.push(chunk);
  82. }
  83. };
  84. // Usage:
  85. // var parser = new SimpleProtocol(source);
  86. // Now parser is a readable stream that will emit 'header'
  87. // with the parsed header data.

stream.Writable类

stream.Writable 是一个被设计为拓展底层实现 stream._write(chunk, encoding, callback) 方法的抽象类。

如何在你的程序中消费可写流,请参阅面向流消费者的 API 。下文解释了如何在你的程序中实现可写流。

new stream.Writable([options])

  • options {Object}

    • highWaterMark {Number} 当 stream.write() 开始返回 false 时的 Buffer 等级。默认:16384(16kb)或 objectMode 流中的 16

    • decodeStrings {Boolean} 在传递给 stream._write() 前是否解码字符串到缓冲区。默认:true

    • objectMode {Boolean} stream.write(anyObj) 是否是一个有效的操作。如果设置,你可以写入任意的数据,而不只是 Buffer / String 数据。默认:false

    • write {Function} stream._write() 方法的实现

    • writev {Function} stream._writev() 方法的实现

在拓展自可写(Writable)类的类中,请确保调用 Writable 构造函数,以便缓冲设置可以被正确地初始化。

writable._write(chunk, encoding, callback)

  • chunk {Buffer} | {String} 被写入的(数据)块。总会是一个 buffer ,除非 decodeStrings 参数被设置成 false

  • encoding {String} 如果该块是一个字符串,那么这是编码类型。如果该块是一个 buffer ,那么这是一个指定的值 - ‘buffer’,在这种情况下请忽略它。

  • callback {Function} 当你处理完成所提供的块时调用此函数(有一个可选的 error 参数)。

所有的 Writable 流实现都必须提供一个 _write 方法用于向底层资源发送数据。

注意:此函数禁止被直接调用。它应该由子类来实现,并且仅被可写(Writable)类的内部方法调用。

该回调函数采用标准的 callback(error) 模式来表明写入成功完成还是遇到错误。

如果构造函数选项中设定了 decodeStrings 标志,则 chunk 可能会是字符串而不是 Buffer,并且 encoding 表明了字符串的格式。这种设计是为了支持对某些字符串数据编码提供优化处理的实现。如果您没有明确地将 decodeStrings 选项设定为 false ,那么您可以安全地忽略 encoding 参数,并假定 chunk 总是一个 Buffer。

这是一个带有下划线前缀的方法,因为它是在类内部定义的,并且不应该由用户程序直接调用。但是,我们希望你在自己的扩展类中重写此方法。

writable._writev(chunks, callback)

  • chunks {Array} 被写入的块。每个块都是以下格式:{ chunk: ..., encoding: ... }

  • callback {Function} 当你处理完成所提供的块时调用此函数(有一个可选的 error 参数)。

注意:此函数禁止被直接调用。它可能是由子类实现,并且仅被可写(Writable)类的内部方法调用。

此函数是完全可选的实现。在大多数情况下,它是不必要的。如果实现,它将被所有滞留在写入队列中的数据块调用。

stream.Duplex类

“双工”(duplex)流兼具可读和可写特性,比如一个 TCP 嵌套字连接。

值得注意的是,stream.Duplex 是一个被设计为拓展底层实现 stream._read(size)stream._write(chunk, encoding, callback) 方法的抽象类,就像你实现可读(Readable)或可写(Writable)类所做的那样。

由于 JavaScript 并不具备多原型继承能力,这个类实际上继承自 Readable,并寄生自 Writable。从而让用户在双工(Duplex)类的拓展中能同时实现低级的 stream._read(n)stream._write(chunk, encoding, callback) 方法。

new stream.Duplex(options)

  • options {Object} 同时传入可读(Readable)和可写(Writable)构造函数。同时有以下字段:

    • allowHalfOpen {Boolean} 默认:true。如果设置为 false,那么当写入端结束后流将自动结束读取端,反之亦然。

    • readableObjectMode {Boolean} 默认:false。设置流读取端的 objectMode。如果 objectModetrue 也没有影响。

    • writableObjectMode {Boolean} 默认:false。设置流读取端的 objectMode。如果 objectModetrue 也没有影响。

在拓展自双工(Duplex)类的类中,请确保调用其构造函数,以便缓冲设置可以被正确地初始化。

stream.Transform类

“转换”(transform)流实际上是一个输出与输入存在因果关系的双工(duplex)流,比如 zlib 流或 crypto 流。

它并不要求输入和输出需要相同大小、相同块数或同时到达。例如,一个 Hash 流只会在输入结束时产生一个数据块的输出;一个 zlib 流会产生比输入小得多或大得多的输出。

转换(Transform)类必须实现 stream._transform() 方法,而不是 stream._read()stream._write() 方法,同时也可以选择性地实现 stream._flush() 方法。(详见下文。)

new stream.Transform([options])

在拓展自转换(Transform)类的类中,请确保调用其构造函数,以便缓冲设置可以被正确地初始化。

‘finish’和’end’事件

‘finish’‘end’ 事件分别来自可写(Writable)父类和可读(Readable)父类。'finish' 事件在调用 stream.end() 和所有的块都已被 stream._transform() 处理后触发。'end' 在调用回调函数 stream._flush() 输出所有数据后触发。

transform._transform(chunk, encoding, callback)

  • chunk {Buffer} | {String} 需要被转换的数据块。总会是一个 buffer,除非 decodeStrings 选项被设置成 false

  • encoding {String} 如果该块是一个字符串,那么这是编码类型。如果该块是一个 buffer ,那么这是一个指定的值 - ‘buffer’,在这种情况下请忽略它。

  • callback {Function} 当你处理完成所提供的块时调用此函数(有一个可选的 error 参数)

注意:此函数禁止被直接调用。它可能是由子类实现,并且仅被转换(Transform)类的内部方法调用。

所有的转换(Transform)流的实现都必须提供一个 _transform() 方法用于接受输入并产生输出。

_transform() 应当承担特定的 Transform 类中所有处理被写入的字节、并将它们丢给接口的可读部分的职责,进行异步 I/O,处理其它事情等。

调用 transform.push(outputChunk) 零次或多次来从输入块生成输出,这取决于你有多少数据块要作为结果输出。

仅在当前块被完全消费后才调用回调函数。需要注意的是,输出可能会也可能不会作为任何特定的输入块的结果。如果提供了回调函数的第二个参数,它会被传递给 push 方法。换言之,以下是等效的:

  1. transform.prototype._transform = function (data, encoding, callback) {
  2. this.push(data);
  3. callback();
  4. };
  5. transform.prototype._transform = function (data, encoding, callback) {
  6. callback(null, data);
  7. };

这是一个带有下划线前缀的方法,因为它是在类内部定义的,并且不应该由用户程序直接调用。但是,我们希望你在自己的扩展类中重写此方法。

transform._flush(callback)

  • callback {Function} 当你强制刷新任何剩余数据时调用此函数(有一个可选的 error 参数)

注意:此函数禁止被直接调用。它可能是由子类实现,如果是这样的话,它仅被转换(Transform)类的内部方法调用。

在一些情景中,你的转换操作可能需要在流的末尾多发一些数据。例如,一个 Zlib 压缩流会储存一些内部状态以便更好地压缩输出,但在最后它需要尽可能好地处理剩下的东西以使数据完整。

在这些情况下,你可以实现一个 _flush() 方法,它会在所有写入数据被消费后,并且在标志着可读端到达末尾的 ‘end’ 事件触发前的最后一刻被调用。和 stream._transform() 一样,只需在 flush 操作完成时适当地调用 transform.push(chunk) 零或多次。

这是一个带有下划线前缀的方法,因为它是在类内部定义的,并且不应该由用户程序直接调用。但是,我们希望你在自己的扩展类中重写此方法。

例子:简单的协议分析器 V2

这里的简易协议解析器例子能够很简单地使用高级的 Transform 流类实现,类似于 parseHeaderSimpleProtocol v1 示例。

在这个示例中,输入会被导流到解析器中,而不是作为一个输入的参数提供。这种做法更符合 Node.js 流的惯例。

  1. const util = require('util');
  2. const Transform = require('stream').Transform;
  3. util.inherits(SimpleProtocol, Transform);
  4. function SimpleProtocol(options) {
  5. if (!(this instanceof SimpleProtocol))
  6. return new SimpleProtocol(options);
  7. Transform.call(this, options);
  8. this._inBody = false;
  9. this._sawFirstCr = false;
  10. this._rawHeader = [];
  11. this.header = null;
  12. }
  13. SimpleProtocol.prototype._transform = function (chunk, encoding, done) {
  14. if (!this._inBody) {
  15. // check if the chunk has a \n\n
  16. var split = -1;
  17. for (var i = 0; i < chunk.length; i++) {
  18. if (chunk[i] === 10) { // '\n'
  19. if (this._sawFirstCr) {
  20. split = i;
  21. break;
  22. } else {
  23. this._sawFirstCr = true;
  24. }
  25. } else {
  26. this._sawFirstCr = false;
  27. }
  28. }
  29. if (split === -1) {
  30. // still waiting for the \n\n
  31. // stash the chunk, and try again.
  32. this._rawHeader.push(chunk);
  33. } else {
  34. this._inBody = true;
  35. var h = chunk.slice(0, split);
  36. this._rawHeader.push(h);
  37. var header = Buffer.concat(this._rawHeader).toString();
  38. try {
  39. this.header = JSON.parse(header);
  40. } catch (er) {
  41. this.emit('error', new Error('invalid simple protocol data'));
  42. return;
  43. }
  44. // and let them know that we are done parsing the header.
  45. this.emit('header', this.header);
  46. // now, because we got some extra data, emit this first.
  47. this.push(chunk.slice(split));
  48. }
  49. } else {
  50. // from there on, just provide the data to our consumer as-is.
  51. this.push(chunk);
  52. }
  53. done();
  54. };
  55. // Usage:
  56. // var parser = new SimpleProtocol();
  57. // source.pipe(parser)
  58. // Now parser is a readable stream that will emit 'header'
  59. // with the parsed header data.

stream.PassThrough类

这是转换(Transform)流的一个简单实现,将输入的字节简单地传递给输出。它的主要用途是演示和测试,但偶尔也能在构建某种特殊流时派上用场。