concatAll

signature: concatAll(): Observable

Collect observables and subscribe to next when previous completes.


:warning: Be wary of
backpressure
when the source emits at a faster pace than inner observables complete!

:bulb: In many cases you can use concatMap as
a single operator instead!


concatAll - 图3

Examples

(
example tests
)

Example 1: concatAll with observable

( StackBlitz |
jsBin |
jsFiddle )

  1. import { map, concatAll } from 'rxjs/operators';
  2. import { of } from 'rxjs/observable/of';
  3. import { interval } from 'rxjs/observable/interval';
  4. //emit a value every 2 seconds
  5. const source = interval(2000);
  6. const example = source.pipe(
  7. //for demonstration, add 10 to and return as observable
  8. map(val => of(val + 10)),
  9. //merge values from inner observable
  10. concatAll()
  11. );
  12. //output: 'Example with Basic Observable 10', 'Example with Basic Observable 11'...
  13. const subscribe = example.subscribe(val =>
  14. console.log('Example with Basic Observable:', val)
  15. );
Example 2: concatAll with promise

( StackBlitz |
jsBin |
jsFiddle )

  1. import { map, concatAll } from 'rxjs/operators';
  2. import { interval } from 'rxjs/observable/interval';
  3. //create and resolve basic promise
  4. const samplePromise = val => new Promise(resolve => resolve(val));
  5. //emit a value every 2 seconds
  6. const source = interval(2000);
  7. const example = source.pipe(
  8. map(val => samplePromise(val)),
  9. //merge values from resolved promise
  10. concatAll()
  11. );
  12. //output: 'Example with Promise 0', 'Example with Promise 1'...
  13. const subscribe = example.subscribe(val =>
  14. console.log('Example with Promise:', val)
  15. );
Example 3: Delay while inner observables complete

( StackBlitz |
jsBin |
jsFiddle )

  1. import { take, concatAll } from 'rxjs/operators';
  2. import { interval } from 'rxjs/observable/interval';
  3. import { of } from 'rxjs/observable/of';
  4. const obs1 = interval(1000).pipe(take(5));
  5. const obs2 = interval(500).pipe(take(2));
  6. const obs3 = interval(2000).pipe(take(1));
  7. //emit three observables
  8. const source = of(obs1, obs2, obs3);
  9. //subscribe to each inner observable in order when previous completes
  10. const example = source.pipe(concatAll());
  11. /*
  12. output: 0,1,2,3,4,0,1,0
  13. How it works...
  14. Subscribes to each inner observable and emit values, when complete subscribe to next
  15. obs1: 0,1,2,3,4 (complete)
  16. obs2: 0,1 (complete)
  17. obs3: 0 (complete)
  18. */
  19. const subscribe = example.subscribe(val => console.log(val));

Additional Resources


:file_folder: Source Code:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/concatAll.ts