co 模块

基本用法

co 模块是著名程序员 TJ Holowaychuk 于 2013 年 6 月发布的一个小工具,用于 Generator 函数的自动执行。

下面是一个 Generator 函数,用于依次读取两个文件。

  1. var gen = function* () {
  2. var f1 = yield readFile('/etc/fstab');
  3. var f2 = yield readFile('/etc/shells');
  4. console.log(f1.toString());
  5. console.log(f2.toString());
  6. };

co 模块可以让你不用编写 Generator 函数的执行器。

  1. var co = require('co');
  2. co(gen);

上面代码中,Generator 函数只要传入co函数,就会自动执行。

co函数返回一个Promise对象,因此可以用then方法添加回调函数。

  1. co(gen).then(function (){
  2. console.log('Generator 函数执行完成');
  3. });

上面代码中,等到 Generator 函数执行结束,就会输出一行提示。

co 模块的原理

为什么 co 可以自动执行 Generator 函数?

前面说过,Generator 就是一个异步操作的容器。它的自动执行需要一种机制,当异步操作有了结果,能够自动交回执行权。

两种方法可以做到这一点。

(1)回调函数。将异步操作包装成 Thunk 函数,在回调函数里面交回执行权。

(2)Promise 对象。将异步操作包装成 Promise 对象,用then方法交回执行权。

co 模块其实就是将两种自动执行器(Thunk 函数和 Promise 对象),包装成一个模块。使用 co 的前提条件是,Generator 函数的yield命令后面,只能是 Thunk 函数或 Promise 对象。如果数组或对象的成员,全部都是 Promise 对象,也可以使用 co,详见后文的例子。

上一节已经介绍了基于 Thunk 函数的自动执行器。下面来看,基于 Promise 对象的自动执行器。这是理解 co 模块必须的。

基于 Promise 对象的自动执行

还是沿用上面的例子。首先,把fs模块的readFile方法包装成一个 Promise 对象。

  1. var fs = require('fs');
  2. var readFile = function (fileName){
  3. return new Promise(function (resolve, reject){
  4. fs.readFile(fileName, function(error, data){
  5. if (error) return reject(error);
  6. resolve(data);
  7. });
  8. });
  9. };
  10. var gen = function* (){
  11. var f1 = yield readFile('/etc/fstab');
  12. var f2 = yield readFile('/etc/shells');
  13. console.log(f1.toString());
  14. console.log(f2.toString());
  15. };

然后,手动执行上面的 Generator 函数。

  1. var g = gen();
  2. g.next().value.then(function(data){
  3. g.next(data).value.then(function(data){
  4. g.next(data);
  5. });
  6. });

手动执行其实就是用then方法,层层添加回调函数。理解了这一点,就可以写出一个自动执行器。

  1. function run(gen){
  2. var g = gen();
  3. function next(data){
  4. var result = g.next(data);
  5. if (result.done) return result.value;
  6. result.value.then(function(data){
  7. next(data);
  8. });
  9. }
  10. next();
  11. }
  12. run(gen);

上面代码中,只要 Generator 函数还没执行到最后一步,next函数就调用自身,以此实现自动执行。

co 模块的源码

co 就是上面那个自动执行器的扩展,它的源码只有几十行,非常简单。

首先,co 函数接受 Generator 函数作为参数,返回一个 Promise 对象。

  1. function co(gen) {
  2. var ctx = this;
  3. return new Promise(function(resolve, reject) {
  4. });
  5. }

在返回的 Promise 对象里面,co 先检查参数gen是否为 Generator 函数。如果是,就执行该函数,得到一个内部指针对象;如果不是就返回,并将 Promise 对象的状态改为resolved

  1. function co(gen) {
  2. var ctx = this;
  3. return new Promise(function(resolve, reject) {
  4. if (typeof gen === 'function') gen = gen.call(ctx);
  5. if (!gen || typeof gen.next !== 'function') return resolve(gen);
  6. });
  7. }

接着,co 将 Generator 函数的内部指针对象的next方法,包装成onFulfilled函数。这主要是为了能够捕捉抛出的错误。

  1. function co(gen) {
  2. var ctx = this;
  3. return new Promise(function(resolve, reject) {
  4. if (typeof gen === 'function') gen = gen.call(ctx);
  5. if (!gen || typeof gen.next !== 'function') return resolve(gen);
  6. onFulfilled();
  7. function onFulfilled(res) {
  8. var ret;
  9. try {
  10. ret = gen.next(res);
  11. } catch (e) {
  12. return reject(e);
  13. }
  14. next(ret);
  15. }
  16. });
  17. }

最后,就是关键的next函数,它会反复调用自身。

  1. function next(ret) {
  2. if (ret.done) return resolve(ret.value);
  3. var value = toPromise.call(ctx, ret.value);
  4. if (value && isPromise(value)) return value.then(onFulfilled, onRejected);
  5. return onRejected(
  6. new TypeError(
  7. 'You may only yield a function, promise, generator, array, or object, '
  8. + 'but the following object was passed: "'
  9. + String(ret.value)
  10. + '"'
  11. )
  12. );
  13. }

上面代码中,next函数的内部代码,一共只有四行命令。

第一行,检查当前是否为 Generator 函数的最后一步,如果是就返回。

第二行,确保每一步的返回值,是 Promise 对象。

第三行,使用then方法,为返回值加上回调函数,然后通过onFulfilled函数再次调用next函数。

第四行,在参数不符合要求的情况下(参数非 Thunk 函数和 Promise 对象),将 Promise 对象的状态改为rejected,从而终止执行。

处理并发的异步操作

co 支持并发的异步操作,即允许某些操作同时进行,等到它们全部完成,才进行下一步。

这时,要把并发的操作都放在数组或对象里面,跟在yield语句后面。

  1. // 数组的写法
  2. co(function* () {
  3. var res = yield [
  4. Promise.resolve(1),
  5. Promise.resolve(2)
  6. ];
  7. console.log(res);
  8. }).catch(onerror);
  9. // 对象的写法
  10. co(function* () {
  11. var res = yield {
  12. 1: Promise.resolve(1),
  13. 2: Promise.resolve(2),
  14. };
  15. console.log(res);
  16. }).catch(onerror);

下面是另一个例子。

  1. co(function* () {
  2. var values = [n1, n2, n3];
  3. yield values.map(somethingAsync);
  4. });
  5. function* somethingAsync(x) {
  6. // do something async
  7. return y
  8. }

上面的代码允许并发三个somethingAsync异步操作,等到它们全部完成,才会进行下一步。

实例:处理 Stream

Node 提供 Stream 模式读写数据,特点是一次只处理数据的一部分,数据分成一块块依次处理,就好像“数据流”一样。这对于处理大规模数据非常有利。Stream 模式使用 EventEmitter API,会释放三个事件。

  • data事件:下一块数据块已经准备好了。
  • end事件:整个“数据流”处理完了。
  • error事件:发生错误。

使用Promise.race()函数,可以判断这三个事件之中哪一个最先发生,只有当data事件最先发生时,才进入下一个数据块的处理。从而,我们可以通过一个while循环,完成所有数据的读取。

  1. const co = require('co');
  2. const fs = require('fs');
  3. const stream = fs.createReadStream('./les_miserables.txt');
  4. let valjeanCount = 0;
  5. co(function*() {
  6. while(true) {
  7. const res = yield Promise.race([
  8. new Promise(resolve => stream.once('data', resolve)),
  9. new Promise(resolve => stream.once('end', resolve)),
  10. new Promise((resolve, reject) => stream.once('error', reject))
  11. ]);
  12. if (!res) {
  13. break;
  14. }
  15. stream.removeAllListeners('data');
  16. stream.removeAllListeners('end');
  17. stream.removeAllListeners('error');
  18. valjeanCount += (res.toString().match(/valjean/ig) || []).length;
  19. }
  20. console.log('count:', valjeanCount); // count: 1120
  21. });

上面代码采用 Stream 模式读取《悲惨世界》的文本文件,对于每个数据块都使用stream.once方法,在dataenderror三个事件上添加一次性回调函数。变量res只有在data事件发生时才有值,然后累加每个数据块之中valjean这个词出现的次数。