Observable 剖析

Observable 的 subscribe 方法签名如下:

  1. stream.subscribe(fnValue, fnError, fnComplete)

下面所演示的是第一个参数 fnValue

  1. let stream$ = Rx.Observable.create((observer) => {
  2. observer.next(1)
  3. });
  4. stream$.subscribe((data) => {
  5. console.log('Data', data);
  6. })
  7. // 1

当执行 observer.next(<value>) 时, fnValue 就会被调用。

第二个回调函数 fnError 是错误回调,通过下面的代码来调用,例如 observer.error(<message>)

  1. let stream$ = Rx.Observable.create((observer) => {
  2. observer.error('error message');
  3. })
  4. stream$.subscribe(
  5. (data) => console.log('Data', data)),
  6. (error) => console.log('Error', error)

最后是 fnComplete,当流完成时调用,并且不会再发出任何值。它是通过 observer.complete() 来触发的,像这样:

  1. let stream$ = Rx.Observable.create((observer) => {
  2. // 多次调用 observer.next(<value>)
  3. observer.complete();
  4. })

Unsubscribe

目前为止,我们创建的是一个不负责任的 Observable 。在这里不负责任是指它并没有清理它自身。那么我们来看下如何做到这点:

  1. let stream$ = new Rx.Observable.create((observer) => {
  2. let i = 0;
  3. let id = setInterval(() => {
  4. observer.next(i++);
  5. },1000)
  6. return function(){
  7. clearInterval( id );
  8. }
  9. })
  10. let subscription = stream$.subscribe((value) => {
  11. console.log('Value', value)
  12. });
  13. setTimeout(() => {
  14. subscription.unsubscribe() // 在这我们调用了清理函数
  15. }, 3000)

所以你要确保

  • 定义一个清理函数
  • 通过调用 subscription.unsubscribe() 隐式的调用清理函数