Subject

A Subject is a double nature. It has both the behaviour from an Observer and an Observable. Thus the following is possible:

Emitting values

  1. subject.next( 1 )
  2. subject.next( 2 )

Subscribing to values

  1. const subscription = subject.subscribe( (value) => console.log(value) )

To sum it up the following operations exist on it:

  1. next([value])
  2. error([error message])
  3. complete()
  4. subscribe()
  5. unsubscribe()

Acting as a proxy

A Subject can act as a proxy, i.e receive values from another stream that the subscriber of the Subject can listen to.

  1. let source$ = Rx.Observable.interval(500).take(3);
  2. const proxySubject = new Rx.Subject();
  3. let subscriber = source$.subscribe( proxySubject );
  4. proxySubject.subscribe( (value) => console.log('proxy subscriber', value ) );
  5. proxySubject.next( 3 );

So essentially proxySubject listens to source$

But it can also add its own contribution

  1. proxySubject.next( 3 ) // emits 3 and then 0 1 2 ( async )

GOTCHA
Any next() that happens before a subscription is created is lost. There are other Subject types that can cater to this below.

Business case

So what’s interesting about this? It can listen to some source when that data arrives as well as it has the ability to emit its own data and all arrives to the same subscriber. Ability to communicate between components in a bus like manner is the most obvious use case I can think of. Component 1 can place its value through next() and Component 2 can subscribe and conversely Component 2 can emit values in turn that Component 1 can subscribe to.

  1. sharedService.getDispatcher = function(){
  2. return subject;
  3. }
  4. sharedService.dispatch = function(value){
  5. subject.next(value)
  6. }

ReplaySubject

prototype:

  1. new Rx.ReplaySubject([bufferSize], [windowSize], [scheduler])

example:

  1. let replaySubject = new Rx.ReplaySubject( 2 );
  2. replaySubject.next( 0 );
  3. replaySubject.next( 1 );
  4. replaySubject.next( 2 );
  5. // 1, 2
  6. let replaySubscription = replaySubject.subscribe((value) => {
  7. console.log('replay subscription', value);
  8. });

Wow, what happened here, what happened to the first number?
So a .next() that happens before the subscription is created, is normally lost. But in the case of a ReplaySubject we have a chance to save emitted values in the cache. Upon creation the cache has been decided to save two values.

Let’s illustrate how this works:

  1. replaySubject.next( 3 )
  2. let secondSubscriber( (value) => console.log(value) ) // 2,3

GOTCHA
It matters both when the .next() operation happens, the size of the cache as well as when your subscription is created.

In the example above it was demonstrated how to use the constructor using bufferSize argument in the constructor. However there also exist a windowSize argument where you can specify how long the values should remain in the cache. Set it to null and it remains in the cache indefinite.

Business case

It’s quite easy to imagine the business case here. You fetch some data and want the app to remember what was fetched latest, and what you fetched might only be relevant for a certain time and when enough time has passed you clear the cache.

AsyncSubject

  1. let asyncSubject = new Rx.AsyncSubject();
  2. asyncSubject.subscribe(
  3. (value) => console.log('async subject', value),
  4. (error) => console.error('async error', error),
  5. () => console.log('async completed')
  6. );
  7. asyncSubject.next( 1 );
  8. asyncSubject.next( 2 );

Looking at this we expect 1,2 to be emitted right? WRONG.
Nothing will be emitted unless complete() happen

  1. asyncSubject.next( 3 )
  2. asyncSubject.complete()
  3. // emit 3

complete() needs to happen regardless of the finishing operation before it succeeds or fails so

  1. asyncSubject( 3 )
  2. asyncSubject.error('err')
  3. asyncSubject.complete()
  4. // will emit 'err' as the last action

Business case

When you care about preserving the last state just before the stream ends, be it a value or an error. So NOT last emitted state generally but last before closing time. With state I mean value or error.

BehaviourSubject

This Subject emits the following:

  • the initial value
  • the values emitted generally
  • last emitted value.

methods:

  1. next()
  2. complete()
  3. constructor([start value])
  4. getValue()
  1. let behaviorSubject = new Rx.BehaviorSubject(42);
  2. behaviorSubject.subscribe((value) => console.log('behaviour subject',value) );
  3. console.log('Behaviour current value',behaviorSubject.getValue());
  4. behaviorSubject.next(1);
  5. console.log('Behaviour current value',behaviorSubject.getValue());
  6. behaviorSubject.next(2);
  7. console.log('Behaviour current value',behaviorSubject.getValue());
  8. behaviorSubject.next(3);
  9. console.log('Behaviour current value',behaviorSubject.getValue());
  10. // emits 42
  11. // current value 42
  12. // emits 1
  13. // current value 1
  14. // emits 2
  15. // current value 2
  16. // emits 3
  17. // current value 3

Business case

This is quite similar to ReplaySubject. There is a difference though, we can utilize a default / start value that we can show initially if it takes some time before the first values starts to arrive. We can inspect the latest emitted value and of course listen to everything that has been emitted. So think of ReplaySubject as more long term memory and BehaviourSubject as short term memory with default behaviour.