multicast

签名: multicast(selector: Function): Observable

使用提供 的 Subject 来共享源 observable

multicast - 图1

示例

示例 1: 使用标准的 Subject 进行 multicast

( jsBin |
jsFiddle )

  1. import { interval } from 'rxjs/observable/of';
  2. import { Subject } from 'rxjs/Subject';
  3. import { take, tap, multicast } 'rxjs/operators';
  4. // 每2秒发出值并只取前5个
  5. const source = interval(2000).pipe(take(5));
  6. const example = source.pipe(
  7. // 因为我们在下面进行了多播,所以副作用只会调用一次
  8. tap(() => console.log('Side Effect #1')),
  9. mapTo('Result!');
  10. );
  11. // 使用 subject 订阅 source 需要调用 connect() 方法
  12. const multi = example.pipe(multicast(() => new Subject()));
  13. /*
  14. 多个订阅者会共享 source
  15. 输出:
  16. "Side Effect #1"
  17. "Result!"
  18. "Result!"
  19. ...
  20. */
  21. const subscriberOne = multi.subscribe(val => console.log(val));
  22. const subscriberTwo = multi.subscribe(val => console.log(val));
  23. // 使用 subject 订阅 source
  24. multi.connect();
示例 2: 使用 ReplaySubject 进行 multicast

( jsBin |
jsFiddle )

  1. import { interval } from 'rxjs/observable/of';
  2. import { take, multicast } 'rxjs/operators';
  3. // 每2秒发出值并只取前5个
  4. const source = interval(2000).pipe(take(5));
  5. // 使用 ReplaySubject 的示例
  6. const example = source.pipe(
  7. // 因为我们在下面进行了多播,所以副作用只会调用一次
  8. tap(_ => console.log('Side Effect #2')),
  9. mapTo('Result Two!')
  10. );
  11. // 可以使用任何类型的 subject
  12. const multi = example.pipe(multicast(() => new Rx.ReplaySubject(5)));
  13. // 使用 subject 订阅 source
  14. multi.connect();
  15. setTimeout(() => {
  16. /*
  17. 因为使用的是 ReplaySubject,订阅者会接收到 subscription 中的之前所有值。
  18. */
  19. const subscriber = multi.subscribe(val => console.group(val));
  20. }, 5000);

其他资源


:file_folder: 源码: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/multicast.ts