shareReplay

signature: shareReplay(bufferSize?: number, windowTime?: number, scheduler?I IScheduler): Observable

Share source and replay specified number of emissions on subscription.

Why use shareReplay?

You generally want to use shareReplay when you have side-effects or taxing
computations that you do not wish to be executed amongst multiple subscribers.
It may also be valuable in situations where you know you will have late
subscribers to a stream that need access to previously emitted values. This
ability to replay values on subscription is what differentiates
share and shareReplay.

For instance, suppose you have an observable that emits the last visited url. In
the first example we are going to use share:

  1. // simulate url change with subject
  2. const routeEnd = new Subject<{data: any, url: string}>();
  3. // grab url and share with subscribers
  4. const lastUrl = routeEnd.pipe(
  5. pluck('url'),
  6. share()
  7. );
  8. // initial subscriber required
  9. const initialSubscriber = lastUrl.subscribe(console.log);
  10. // simulate route change
  11. routeEnd.next({data: {}, url: 'my-path'});
  12. // nothing logged
  13. const lateSubscriber = lastUrl.subscribe(console.log);

In the above example nothing is logged as the lateSubscriber subscribes to the
source. Now suppose instead we wanted to give access to the last emitted value
on subscription, we can accomplish this with shareReplay:

  1. import { Subject } from 'rxjs/Subject';
  2. import { ReplaySubject } from 'rxjs/ReplaySubject';
  3. import { pluck, share, shareReplay, tap } from 'rxjs/operators';
  4. // simulate url change with subject
  5. const routeEnd = new Subject<{data: any, url: string}>();
  6. // grab url and share with subscribers
  7. const lastUrl = routeEnd.pipe(
  8. tap(_ => console.log('executed')),
  9. pluck('url'),
  10. // defaults to last 1 value
  11. shareReplay()
  12. );
  13. // requires initial subscription
  14. const initialSubscriber = lastUrl.subscribe(console.log);
  15. // simulate route change
  16. // logged: 'executed', 'my-path'
  17. routeEnd.next({data: {}, url: 'my-path'});
  18. // logged: 'my-path'
  19. const lateSubscriber = lastUrl.subscribe(console.log);

Note that this is similar behavior to what you would see if you subscribed a
ReplaySubject to the lastUrl stream, then subscribed to that Subject:

  1. // simulate url change with subject
  2. const routeEnd = new Subject<{data: any, url: string}>();
  3. // instead of using shareReplay, use ReplaySubject
  4. const shareWithReplay = new ReplaySubject();
  5. // grab url and share with subscribers
  6. const lastUrl = routeEnd.pipe(
  7. pluck('url')
  8. )
  9. .subscribe(val => shareWithReplay.next(val));
  10. // simulate route change
  11. routeEnd.next({data: {}, url: 'my-path'});
  12. // subscribe to ReplaySubject instead
  13. // logged: 'my path'
  14. shareWithReplay.subscribe(console.log);

In fact, if we dig into the source code we can see a very similar technique is
being used. When a subscription is made, shareReplay will subscribe to the
source, sending values through an internal ReplaySubject:

(
source
)

  1. return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
  2. refCount++;
  3. if (!subject || hasError) {
  4. hasError = false;
  5. subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
  6. subscription = source.subscribe({
  7. next(value) { subject.next(value); },
  8. error(err) {
  9. hasError = true;
  10. subject.error(err);
  11. },
  12. complete() {
  13. isComplete = true;
  14. subject.complete();
  15. },
  16. });
  17. }
  18. const innerSub = subject.subscribe(this);
  19. return () => {
  20. refCount--;
  21. innerSub.unsubscribe();
  22. if (subscription && refCount === 0 && isComplete) {
  23. subscription.unsubscribe();
  24. }
  25. };
  26. };
  27. }
shareReplay - 图1

Examples

Example 1: Multiple subscribers sharing source

( Stackblitz )

  1. import { Subject } from 'rxjs/Subject';
  2. import { ReplaySubject } from 'rxjs/ReplaySubject';
  3. import { pluck, share, shareReplay, tap } from 'rxjs/operators';
  4. // simulate url change with subject
  5. const routeEnd = new Subject<{data: any, url: string}>();
  6. // grab url and share with subscribers
  7. const lastUrl = routeEnd.pipe(
  8. tap(_ => console.log('executed')),
  9. pluck('url'),
  10. // defaults to last 1 value
  11. shareReplay()
  12. );
  13. // requires initial subscription
  14. const initialSubscriber = lastUrl.subscribe(console.log)
  15. // simulate route change
  16. // logged: 'executed', 'my-path'
  17. routeEnd.next({data: {}, url: 'my-path'});
  18. // logged: 'my-path'
  19. const lateSubscriber = lastUrl.subscribe(console.log);

Additional Resources


:file_folder: Source Code:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/shareReplay.ts