forkJoin

signature: forkJoin(...args, selector : function): Observable

When all observables complete, emit the last emitted value from each.


:bulb: If you want corresponding emissions from multiple observables as they
occur, try zip!

:warning: If an inner observable does not complete forkJoin will never emit a
value!


Why use forkJoin?

This operator is best used when you have a group of observables and only care
about the final emitted value of each. One common use case for this is if you
wish to issue multiple requests on page load (or some other event) and only want
to take action when a response has been receieved for all. In this way it is
similar to how you might use
Promise.all.

Be aware that if any of the inner observables supplied to forkJoin error you
will lose the value of any other observables that would or have already
completed if you do not catch the
error correctly on the inner observable.
If you are only concerned with all inner observables completing successfully you
can catch the error on the outside.

It’s also worth noting that if you have an observable that emits more than one
item, and you are concerned with the previous emissions forkJoin is not the
correct choice. In these cases you may better off with an operator like
combineLatest or zip.

forkJoin - 图3

Examples

Example 1: Observables completing after different durations

( StackBlitz |
jsBin |
jsFiddle )

  1. import { delay, take } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. import { interval } from 'rxjs/observable/interval';
  5. const myPromise = val =>
  6. new Promise(resolve =>
  7. setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  8. );
  9. /*
  10. when all observables complete, give the last
  11. emitted value from each as an array
  12. */
  13. const example = forkJoin(
  14. //emit 'Hello' immediately
  15. of('Hello'),
  16. //emit 'World' after 1 second
  17. of('World').pipe(delay(1000)),
  18. //emit 0 after 1 second
  19. interval(1000).pipe(take(1)),
  20. //emit 0...1 in 1 second interval
  21. interval(1000).pipe(take(2)),
  22. //promise that resolves to 'Promise Resolved' after 5 seconds
  23. myPromise('RESULT')
  24. );
  25. //output: ["Hello", "World", 0, 1, "Promise Resolved: RESULT"]
  26. const subscribe = example.subscribe(val => console.log(val));
Example 2: Making a variable number of requests

( StackBlitz |
jsBin |
jsFiddle )

  1. import { mergeMap } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. const myPromise = val =>
  5. new Promise(resolve =>
  6. setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  7. );
  8. const source = of([1, 2, 3, 4, 5]);
  9. //emit array of all 5 results
  10. const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
  11. /*
  12. output:
  13. [
  14. "Promise Resolved: 1",
  15. "Promise Resolved: 2",
  16. "Promise Resolved: 3",
  17. "Promise Resolved: 4",
  18. "Promise Resolved: 5"
  19. ]
  20. */
  21. const subscribe = example.subscribe(val => console.log(val));
Example 3: Handling errors on outside

( StackBlitz |
jsBin |
jsFiddle )

  1. import { delay, catchError } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. import { _throw } from 'rxjs/observable/throw';
  5. /*
  6. when all observables complete, give the last
  7. emitted value from each as an array
  8. */
  9. const example = forkJoin(
  10. //emit 'Hello' immediately
  11. of('Hello'),
  12. //emit 'World' after 1 second
  13. of('World').pipe(delay(1000)),
  14. // throw error
  15. _throw('This will error')
  16. ).pipe(catchError(error => of(error)));
  17. //output: 'This will Error'
  18. const subscribe = example.subscribe(val => console.log(val));
Example 4: Getting successful results when one inner observable errors

( StackBlitz |
jsBin |
jsFiddle )

  1. import { delay, catchError } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. import { _throw } from 'rxjs/observable/throw';
  5. /*
  6. when all observables complete, give the last
  7. emitted value from each as an array
  8. */
  9. const example = forkJoin(
  10. //emit 'Hello' immediately
  11. of('Hello'),
  12. //emit 'World' after 1 second
  13. of('World').pipe(delay(1000)),
  14. // throw error
  15. _throw('This will error').pipe(catchError(error => of(error)))
  16. );
  17. //output: ["Hello", "World", "This will error"]
  18. const subscribe = example.subscribe(val => console.log(val));
Example 5: forkJoin in Angular

( plunker )

  1. @Injectable()
  2. export class MyService {
  3. makeRequest(value: string, delayDuration: number) {
  4. // simulate http request
  5. return of(`Complete: ${value}`).pipe(
  6. delay(delayDuration)
  7. );
  8. }
  9. }
  10. @Component({
  11. selector: 'my-app',
  12. template: `
  13. <div>
  14. <h2>forkJoin Example</h2>
  15. <ul>
  16. <li> {{propOne}} </li>
  17. <li> {{propTwo}} </li>
  18. <li> {{propThree}} </li>
  19. </ul>
  20. </div>
  21. `,
  22. })
  23. export class App {
  24. public propOne: string;
  25. public propTwo: string;
  26. public propThree: string;
  27. constructor(private _myService: MyService) {}
  28. ngOnInit() {
  29. // simulate 3 requests with different delays
  30. forkJoin(
  31. this._myService.makeRequest('Request One', 2000),
  32. this._myService.makeRequest('Request Two', 1000)
  33. this._myService.makeRequest('Request Three', 3000)
  34. )
  35. .subscribe(([res1, res2, res3]) => {
  36. this.propOne = res1;
  37. this.propTwo = res2;
  38. this.propThree = res3;
  39. });
  40. }
  41. }

Additional Resources


:file_folder: Source Code:
https://github.com/ReactiveX/rxjs/blob/master/src/observable/ForkJoinObservable.ts