exhaustMap

签名: exhaustMap(project: function, resultSelector: function): Observable

映射成内部 observable,忽略其他值直到该 observable 完成。

exhaustMap - 图1

示例

示例 1: 使用 interval 的 exhaustMap

( Stackblitz |
jsBin |
jsFiddle )

  1. import { interval } from 'rxjs/observable/interval';
  2. import { merge } from 'rxjs/observable/merge';
  3. import { of } from 'rxjs/observable/of';
  4. import { delay, take, exhaustMap } from 'rxjs/operators';
  5. const sourceInterval = interval(1000);
  6. const delayedInterval = sourceInterval.pipe(delay(10), take(4));
  7. const exhaustSub = merge(
  8. // 延迟10毫秒,然后开始 interval 并发出4个值
  9. delayedInterval,
  10. // 立即发出
  11. of(true)
  12. )
  13. .pipe(exhaustMap(_ => sourceInterval.pipe(take(5))))
  14. /*
  15. * 第一个发出的值 (of(true)) 会被映射成每秒发出值、
  16. * 5秒后完成的 interval observable 。
  17. * 因为 delayedInterval 的发送是晚于前者的,虽然 observable
  18. * 仍然是活动的,但它们会被忽略。
  19. *
  20. * 与类似的操作符进行下对比:
  21. * concatMap 会进行排队
  22. * switchMap 会在每次发送时切换成新的内部 observable
  23. * mergeMap 会为每个发出值维护新的 subscription
  24. */
  25. // 输出: 0, 1, 2, 3, 4
  26. .subscribe(val => console.log(val));
示例 2: 另一个使用 interval 的 exhaustMap

( Stackblitz |
jsBin |
jsFiddle )

  1. import { interval } from 'rxjs/observable/interval';
  2. import { exhaustMap, tap, take } from 'rxjs/operators';
  3. const firstInterval = interval(1000).pipe(take(10));
  4. const secondInterval = interval(1000).pipe(take(2));
  5. const exhaustSub = firstInterval
  6. .pipe(
  7. exhaustMap(f => {
  8. console.log(`Emission Corrected of first interval: ${f}`);
  9. return secondInterval;
  10. })
  11. )
  12. /*
  13. 当我们订阅第一个 interval 时,它开始发出值(从0开始)。
  14. 这个值会映射成第二个 interval,然后它开始发出值(从0开始)。
  15. 当第二个 interval 出于激活状态时,第一个 interval 的值会被忽略。
  16. 我们可以看到 firstInterval 发出的数字为3,6,等等...
  17. 输出:
  18. Emission of first interval: 0
  19. 0
  20. 1
  21. Emission of first interval: 3
  22. 0
  23. 1
  24. Emission of first interval: 6
  25. 0
  26. 1
  27. Emission of first interval: 9
  28. 0
  29. 1
  30. */
  31. .subscribe(s => console.log(s));

外部示例

exhaustMap 用于 @ngrx 示例应用 的 login Effect

(
Source
)

  1. @Effect()
  2. login$ = this.actions$.pipe(
  3. ofType(AuthActionTypes.Login),
  4. map((action: Login) => action.payload),
  5. exhaustMap((auth: Authenticate) =>
  6. this.authService
  7. .login(auth)
  8. .pipe(
  9. map(user => new LoginSuccess({ user })),
  10. catchError(error => of(new LoginFailure(error)))
  11. )
  12. )
  13. );

其他资源


:file_folder: 源码: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/exhaustMap.ts