Operators combination

There are many operators out there that allows you to combine the values from 2 or more source, they act a bit differently though and its important to know the difference.

combineLatest

The signature on this one is:

  1. Rx.Observable.combineLatest([ source_1, ... source_n])
  1. let source1 = Rx.Observable.interval(100)
  2. .map( val => "source1 " + val ).take(5);
  3. let source2 = Rx.Observable.interval(50)
  4. .map( val => "source2 " + val ).take(2);
  5. let stream$ = Rx.Observable.combineLatest(
  6. source1,
  7. source2
  8. );
  9. stream$.subscribe(data => console.log(data));
  10. // emits source1: 0, source2 : 0 | source1 : 0, source2 : 1 | source1 : 1, source2 : 1, etc

What this does is to essentially take the latest response from each source and return it as an array of x number of elements. One element per source.

As you can see source2 stops emitting after 2 values but is able to keep sending the latest emitted.

Business case

The business case is when you are interested in the very latest from each source and past values is of less interest, and of course you have more than one source that you want to combine.

concat

The signature is :

  1. Rx.Observable([ source_1,... sournce_n ])

Looking at the following data, it’s easy to think that it should care when data is emitted:

  1. let source1 = Rx.Observable.interval(100)
  2. .map( val => "source1 " + val ).take(5);
  3. let source2 = Rx.Observable.interval(50)
  4. .map( val => "source2 " + val ).take(2);
  5. let stream$ = Rx.Observable.concat(
  6. source1,
  7. source2
  8. );
  9. stream$.subscribe( data => console.log('Concat ' + data));
  10. // source1 : 0, source1 : 1, source1 : 2, source1 : 3, source1 : 4
  11. // source2 : 0, source2 : 1

The result however is that the resulting observable takes all the values from the first source and emits those first then it takes all the values from source 2, so order in which the source go into concat() operator matters.

So if you have a case where a source somehow should be prioritized then this is the operator for you.

merge

This operator enables you two merge several streams into one.

  1. let merged$ = Rx.Observable.merge(
  2. Rx.Observable.of(1).delay(500),
  3. Rx.Observable.of(3,2,5)
  4. )
  5. let observer = {
  6. next : data => console.log(data)
  7. }
  8. merged$.subscribe(observer);

Point with is operator is to combine several streams and as you can see above any time operators such as delay() is respected.

zip

  1. let stream$ = Rx.Observable.zip(
  2. Promise.resolve(1),
  3. Rx.Observable.of(2,3,4),
  4. Rx.Observable.of(7)
  5. );
  6. stream$.subscribe(observer);

Gives us 1,2,7

Let’s look at another example

  1. let stream$ = Rx.Observable.zip(
  2. Rx.Observable.of(1,5),
  3. Rx.Observable.of(2,3,4),
  4. Rx.Observable.of(7,9)
  5. );
  6. stream$.subscribe(observer);

Gives us 1,2,7 and 5,3,9 so it joins values on column basis. It will act on the least common denominator which in this case is 2. The 4 is ignored in the 2,3,4 sequence. As you can see from the first example it’s also possible to mix different async concepts such as Promises with Observables because interval conversion happens.

Business case

If you really care about what different sources emitted at a certain position. Let’s say the 2nd response from all your sources then zip() is your operator.