错误处理

由于错误隔离是 Reactive 协定中一个很重要的部分,Stream API 已经实现了容错管道和相关的服务调用。

错误隔离可以防止 onNextonSubscribeonComplete 回调函数弹出任何异常。作为替代,这些异常被传递给 onError 回调函数,并传播给下游。一些 Action 可以积极或消极的对待这些信号,类如 when() 仅仅观察错误,而 onErrorResumeNext() 则可以切换至备用发布者

将传播过程逆向至消费侧而非生产侧是将数据生产者同管道错误隔离,保证生产者快活力的一种响应式的模式。

最后,错误将通过 onError() 回调函数通知给链中的最后一个订阅者。假设 订阅者 是一个 ConsumerActon,如果没有一个通过 Stream.consume(dataConsumer, errorConsumer) 注册的 errorConsumer 回调函数存在,Reactor 将重新路由错误信号。路由将触发当前环境的错误日志(如果有设定),默认使用 SLF4J 记录错误。

Reactor致命异常的处理也不同常规,特别是在 onSubscribe 执行过程中。这些异常将不被孤立,且不会传递给下游 subscriber(s):

  • CancelException
    • 如果 onNext 信号传播时没有可用的订阅者,此异常将被触发,例如在 onNext 信号传输时,订阅者被异步的取消了。
    • 使用 JVM 属性 -Dreactor.trace.cancel=true 可以开启 CancelException 的详细模式,并将其记录在默认的环境日志中。如果不设置,环境日志中不会记录异常以及相关的错误堆栈。
  • ReactorFatalException
    • 此异常在 Reactor 遇到不可恢复的情况时触发,例如在 Timer 的调配不能匹配条件时。
  • JVM unsafe exceptions:
    • StackOverflowError
    • VirtualMachineError
    • ThreadDeath
    • LinkageError

很多章节中都可以看到明确设定时间限制的好习惯,timeout() + retry() 将是你对付网络分裂问题的最好伴侣。流向 Stream 的数据越多,它就越应具有自愈性和良好的服务可用性。

理论上,在Reactive Streams中最多有一个错误能够穿过通道,因此你实不必在一个订阅者 上两次重复 onError(e)。而实践中,我们实现 Rx 的 retry()retryWhen() 操作符将在 onError 时进行取消和重订阅操作。就是说,在新的通道,带着新的操作示例,被同名的物化时,我们依然遵循着协定。这也意味着在这种情形下,像 buffer() 这样状态化的 Action 应当谨慎使用,因为我们只是取消了对它们的引用,它们的状态可能会丢失。我们正在研究替代方案,一个想法就是为安全的状态化 Action 引入外部持久化。你可以在相关章节 窥见一斑。回退流很有趣

良好的串联回滚
  1. Broadcaster<String> broadcaster = Broadcaster.create();
  2. Promise<List<String>> promise =
  3. broadcaster
  4. .timeout(1, TimeUnit.SECONDS, Streams.fail(new Exception("another one!")))
  5. .onErrorResumeNext(Streams.just("Alternative Message"))
  6. .toList();
  7. broadcaster.onNext("test1");
  8. broadcaster.onNext("test2");
  9. Thread.sleep(1500);
  10. try {
  11. broadcaster.onNext("test3");
  12. } catch (CancelException ce) {
  13. //Broadcaster has no subscriber, timeout disconnected the pipeline
  14. }
  15. promise.await();
  16. assertEquals(promise.get().get(0), "test1");
  17. assertEquals(promise.get().get(1), "test2");
  18. assertEquals(promise.get().get(2), "Alternative Message");
  • 当给定的时间段内没有数据发出时,TimeoutAction 可以提供回滚,但这个示例中它仅仅发出了另一个异常……
  • ……不过,我们很幸运有可以捕捉此异常的 onErrorResumeNext(Publisher) ,它传递了一些有效的字符串负荷。 另一个经典的管道容错示例在手册一节
表18,错误处理
Stream<T> API作用
when(Class<Throwable>, Consumer<Throwable>) 观察来自onError(Throwable) 信号的特定的异常类型(以及它们的子类)
oberveError(Class<Throwable>, BiConsumer<Object,Throwable>) when() 相似,但在异常初次触发时可以对失败的 onNext(Object) 内省。
onErrorReturn(Class<Throwable, Function<Throwable,T>) 提供一个信号 T的回退,给定一个匹配传递类型的异常。通常用于自愈服务。
onErrorResume(Class<Throwable, Publisher<T>) 提供一个信号 T的回退序列,给定一个匹配传递类型的异常。通常用于自愈服务。
materialize() dematerialize() 将上游信号转换为 Signal<T> ,并将其视为 onNext(Signal<T>) 信号进行处理。直接效果就是:它将接收错误信号和完结信号,因此可以用来高效的处理错误。一旦有错误产生,我们可以将 dematerialize() 回调中的 Signal<T> 转化到 Reactive Streams,保证服务的运行。
retry(int, Predicate<Throwable) 对父流进行取消操作和重订阅操作,次数由可选的 tries 限制,如果提供了 Predicate ,也需要进行匹配。
retryWhen(Function<Stream<Throwable>,Publisher<?>>) 当传递的 Function 中的 Publisher 发出 onNext(Object) 信号时,对父流进行取消操作和重订阅操作。函数将在生成的Publisher被订阅时被调用。如果 Publisher 发出 onError(e)onComplete() 信号,它们将被发送到下游。Function 接收一个可能在任一管道中出错的 Stream。可以同计数延迟配合,提供有界并高速的重试策略。
recover(Class<Throwable>, Subscriber<Object>) 只是一个 retryWhen() 的快捷方式,当onError(Throwable) 匹配给定类型时对父流进行取消操作和重订阅操作。当恢复成功时,传递的 Subscriber 参数将接受到同异常关联的最初的 onNext(Object) 信号。
ignoreError(Predicate<Throwable>) 将匹配的 onError(Throwable) 信号转换为 onComplete() 信号。如果没有给出参数,那么所有的异常都将被转化。
抛出 CancelException 这可能是我们唯一依次提及异常冒泡。在所有 onNext(T) 回调中抛出 CancelException.INSTANCE 的方法很简单,可以对传入值进行 no-ack 处理,并通知同位的(同一线程栈中的)发布者,例如核心处理器,在之后重新分配这个数据。