shareReplay

签名: shareReplay(bufferSize?: number, windowTime?: number, scheduler?I IScheduler): Observable

共享源 observable 并重放指定次数的发出。

为什么使用 shareReplay

通常啊,当有副作用或繁重的计算时,你不希望在多个订阅者之间重复执行时,会使用 shareReplay
当你知道流的后来订阅者也需要访问之前发出的值,shareReplay 在这种场景下也是有价值的。
这种在订阅过程中重放值的能力是区分 shareshareReplay 的关键。

例如,加入你有一个发出最后访问 url 的 observable 。
在第一个示例中,我们将使用 share:

  1. // 使用 subject 模拟 url 的变化
  2. const routeEnd = new Subject<{data: any, url: string}>();
  3. // 提取 url 并与后来订阅者共享
  4. const lastUrl = routeEnd.pipe(
  5. pluck('url'),
  6. share()
  7. );
  8. // 起始订阅者是必须的
  9. const initialSubscriber = lastUrl.subscribe(console.log);
  10. // 模拟路由变化
  11. routeEnd.next({data: {}, url: 'my-path'});
  12. // 没有任何输出
  13. const lateSubscriber = lastUrl.subscribe(console.log);

上面的示例中,lateSubscriber 订阅源 observable 后没有任何输出。
现在我们想要访问订阅中的最新发出值,可以通过 shareReplay 来完成:

  1. import { Subject } from 'rxjs/Subject';
  2. import { ReplaySubject } from 'rxjs/ReplaySubject';
  3. import { pluck, share, shareReplay, tap } from 'rxjs/operators';
  4. // 使用 subject 模拟 url 的变化
  5. const routeEnd = new Subject<{data: any, url: string}>();
  6. // 提取 url 并与后来订阅者共享
  7. const lastUrl = routeEnd.pipe(
  8. tap(_ => console.log('executed')),
  9. pluck('url'),
  10. // 默认为重放最后一个值
  11. shareReplay()
  12. );
  13. // 起始订阅者是必须的
  14. const initialSubscriber = lastUrl.subscribe(console.log);
  15. // 模拟路由变化
  16. // 输出: 'executed', 'my-path'
  17. routeEnd.next({data: {}, url: 'my-path'});
  18. // 输出: 'my-path'
  19. const lateSubscriber = lastUrl.subscribe(console.log);

注意,如果使用 ReplaySubject 订阅 lastUrl 流,然后再订阅 ReplaySubject
这种行为与使用 shareReplay 类似:

  1. // 使用 subject 模拟 url 的变化
  2. const routeEnd = new Subject<{data: any, url: string}>();
  3. // 使用 ReplaySubject 来替代 shareReplay
  4. const shareWithReplay = new ReplaySubject();
  5. // 取 url 并与后来订阅者共享
  6. const lastUrl = routeEnd.pipe(
  7. pluck('url')
  8. )
  9. .subscribe(val => shareWithReplay.next(val));
  10. // 模拟路由变化
  11. routeEnd.next({data: {}, url: 'my-path'});
  12. // 订阅 ReplaySubject
  13. // 输出: 'my path'
  14. shareWithReplay.subscribe(console.log);

事实上,如果深入源码,我们可以发现两者之间使用的技术是类似的。
当订阅发生后,shareReplay 会订阅源 observable,并通过内部的 ReplaySubject
来发送值:

(
source
)

  1. return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
  2. refCount++;
  3. if (!subject || hasError) {
  4. hasError = false;
  5. subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
  6. subscription = source.subscribe({
  7. next(value) { subject.next(value); },
  8. error(err) {
  9. hasError = true;
  10. subject.error(err);
  11. },
  12. complete() {
  13. isComplete = true;
  14. subject.complete();
  15. },
  16. });
  17. }
  18. const innerSub = subject.subscribe(this);
  19. return () => {
  20. refCount--;
  21. innerSub.unsubscribe();
  22. if (subscription && refCount === 0 && isComplete) {
  23. subscription.unsubscribe();
  24. }
  25. };
  26. };
  27. }
shareReplay - 图1

示例

示例 1: 多个订阅者共享源 observable

( Stackblitz )

  1. import { Subject } from 'rxjs/Subject';
  2. import { ReplaySubject } from 'rxjs/ReplaySubject';
  3. import { pluck, share, shareReplay, tap } from 'rxjs/operators';
  4. // 使用 subject 模拟 url 的变化
  5. const routeEnd = new Subject<{data: any, url: string}>();
  6. // 提取 url 并与后来订阅者共享
  7. const lastUrl = routeEnd.pipe(
  8. tap(_ => console.log('executed')),
  9. pluck('url'),
  10. // 默认为重放最后一个值
  11. shareReplay()
  12. );
  13. // 起始订阅者是必须的
  14. const initialSubscriber = lastUrl.subscribe(console.log)
  15. // 模拟路由变化
  16. // 输出: 'executed', 'my-path'
  17. routeEnd.next({data: {}, url: 'my-path'});
  18. // 输出: 'my-path'
  19. const lateSubscriber = lastUrl.subscribe(console.log);

其他资源


:file_folder: 源码:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/shareReplay.ts