组合操作符

有一些操作符允许你组合两个及以上的 source,它们的行为有所不同,重要的是要知道它们之间的区别。

combineLatest

函数签名如下:

  1. Rx.Observable.combineLatest([ source_1, ... source_n])
  1. let source1 = Rx.Observable.interval(100)
  2. .map( val => "source1 " + val ).take(5);
  3. let source2 = Rx.Observable.interval(50)
  4. .map( val => "source2 " + val ).take(2);
  5. let stream$ = Rx.Observable.combineLatest(
  6. source1,
  7. source2
  8. );
  9. stream$.subscribe(data => console.log(data));
  10. // 发出 source1: 0, source2 : 0 | source1 : 0, source2 : 1 | source1 : 1, source2 : 1, 等等

combineLatest 实际上是从每个 source 取最新的响应值然后返回有x个元素的数组。每个 source 对应一个元素。

如你所见,source2 在发出2个值后就停止了,但仍然可以持续发出最新的值。

业务场景

业务场景是当你对每个 source 的最新值感兴趣,而对过往的值不感兴趣,当然你要有一个以上想要组合的 source 。

concat

函数签名如下:

  1. Rx.Observable([ source_1,... sournce_n ])

看看下面输出的数据,很容易可以想到数据是何时发出的:

  1. let source1 = Rx.Observable.interval(100)
  2. .map( val => "source1 " + val ).take(5);
  3. let source2 = Rx.Observable.interval(50)
  4. .map( val => "source2 " + val ).take(2);
  5. let stream$ = Rx.Observable.concat(
  6. source1,
  7. source2
  8. );
  9. stream$.subscribe( data => console.log('Concat ' + data));
  10. // source1 : 0, source1 : 1, source1 : 2, source1 : 3, source1 : 4
  11. // source2 : 0, source2 : 1

从结果可以看出,组合后的 observable 接收了第一个 source 的所有值然后先将它们发出,然后再接收 source 2的所有值,所以说 concat() 操作符中的 source 顺序很重要。

所以当遇到应该优先考虑某个 source 的情况时,就要使用 concat 操作符。

merge

这个操作符可以将多个流合并成一个。

  1. let merged$ = Rx.Observable.merge(
  2. Rx.Observable.of(1).delay(500),
  3. Rx.Observable.of(3,2,5)
  4. )
  5. let observer = {
  6. next : data => console.log(data)
  7. }
  8. merged$.subscribe(observer);

要点是这个操作符组合了几个流,并且就像你在上面所看到的一样,任何像 delay() 这样的时间操作符都是起作用的。

zip

  1. let stream$ = Rx.Observable.zip(
  2. Promise.resolve(1),
  3. Rx.Observable.of(2,3,4),
  4. Rx.Observable.of(7)
  5. );
  6. stream$.subscribe(observer);

我们得到 1,2,7

再来看另外一个示例

  1. let stream$ = Rx.Observable.zip(
  2. Rx.Observable.of(1,5),
  3. Rx.Observable.of(2,3,4),
  4. Rx.Observable.of(7,9)
  5. );
  6. stream$.subscribe(observer);

得到的是1,2,75,3,9,所以它是以列为基础连接值的。它将采用最小的共同标准,在这个案例中是2。2,3,4序列中4会被忽略。正如你在第一个示例中所看见的,它还可以混用不同的异步概念,比如 Promise 和 Observable,这是因为发生了间隔转换。

业务场景

如果你真正关心不同 sources 在同一个位置所发出值的区别,假设所有 sources 的第2个响应值,那么你需要 zip() 操作符。