容错

当 Actor 在处理消息或初始化过程中抛出非预期的异常、失败时,默认情况下,Actor 将停止。

  • 注释:类型化 Actor 和非类型化 Actor 之间的一个重要区别是,如果抛出异常,并且在创建 Actor 时未定义监督策略,则类型化 Actor 将会停止,而非类型化 Actor 则会重启。

请注意,失败和验证错误之间有一个重要区别:

  • 验证错误意味着发送给 Actor 的命令是无效的,这应该作为 Actor 协议的一部分建模,而不是使 Actor 抛出异常。
  • 相反,失败是一些意外的事情,或者是 Actor 本身无法控制的事情,例如断开的数据库连接。与验证错误相反,将协议的某些部分建模为发送 Actor 没有太大的作用。
  • 对于失败,应用“让它崩溃”的理念是很有用的:我们将责任转移到其他地方,而不是混合细粒度恢复和内部状态的更正,这些内部状态可能由于失败而部分无效。在许多情况下,解决方法可以是销毁 Actor,然后用我们知道有效的新状态创建一个新的 Actor。

监督

在 Akka 类型中,这个“其他地方”被称为监督(In Akka Typed this “somewhere else” is called supervision)。监督允许你声明性地描述在 Actor 内部抛出某种类型的异常时应该发生什么。要使用监督,实际的 Actor 行为将使用Behaviors.supervise进行包装,例如重新启动IllegalStateExceptions

  1. Behaviors.supervise(behavior)
  2. .onFailure(IllegalStateException.class, SupervisorStrategy.restart());

或者,要继续,请忽略失败并处理下一条消息,而不是:

  1. Behaviors.supervise(behavior)
  2. .onFailure(IllegalStateException.class, SupervisorStrategy.resume());

可以使用更复杂的重启策略,例如,在 10 秒钟内重启不超过 10 次:

  1. Behaviors.supervise(behavior)
  2. .onFailure(
  3. IllegalStateException.class,
  4. SupervisorStrategy.restart().withLimit(10, FiniteDuration.apply(10, TimeUnit.SECONDS)));

要使用不同的策略处理不同的异常,可以嵌套调用supervise方法:

  1. Behaviors.supervise(
  2. Behaviors.supervise(behavior)
  3. .onFailure(IllegalStateException.class, SupervisorStrategy.restart()))
  4. .onFailure(IllegalArgumentException.class, SupervisorStrategy.stop());

有关策略的完整列表,请参阅有关SupervisorStrategy的公共方法。

包装行为

通过改变行为(behavior)来存储状态是很常见的,例如

  1. interface CounterMessage {}
  2. public static final class Increase implements CounterMessage {}
  3. public static final class Get implements CounterMessage {
  4. final ActorRef<Got> sender;
  5. public Get(ActorRef<Got> sender) {
  6. this.sender = sender;
  7. }
  8. }
  9. public static final class Got {
  10. final int n;
  11. public Got(int n) {
  12. this.n = n;
  13. }
  14. }
  15. public static Behavior<CounterMessage> counter(int currentValue) {
  16. return Behaviors.receive(CounterMessage.class)
  17. .onMessage(
  18. Increase.class,
  19. (context, o) -> {
  20. return counter(currentValue + 1);
  21. })
  22. .onMessage(
  23. Get.class,
  24. (context, o) -> {
  25. o.sender.tell(new Got(currentValue));
  26. return Behaviors.same();
  27. })
  28. .build();
  29. }

进行此项监督时,只需将其添加到顶层:

  1. Behaviors.supervise(counter(1));

每个返回的行为都将自动与监督者重新包装。

当父级正在重新启动时,子 Actor 将停止

子 Actor 通常在重新启动父 Actor 时运行的setup块中启动。每次重新启动父 Actor 时,都会停止子 Actor,以避免资源泄漏,从而创建新的子 Actor。

  1. static Behavior<String> child(long size) {
  2. return Behaviors.receiveMessage(msg -> child(size + msg.length()));
  3. }
  4. static Behavior<String> parent() {
  5. return Behaviors.<String>supervise(
  6. Behaviors.setup(
  7. ctx -> {
  8. final ActorRef<String> child1 = ctx.spawn(child(0), "child1");
  9. final ActorRef<String> child2 = ctx.spawn(child(0), "child2");
  10. return Behaviors.receiveMessage(
  11. msg -> {
  12. // there might be bugs here...
  13. String[] parts = msg.split(" ");
  14. child1.tell(parts[0]);
  15. child2.tell(parts[1]);
  16. return Behaviors.same();
  17. });
  18. }))
  19. .onFailure(SupervisorStrategy.restart());
  20. }

可以重写此项,以便在重新启动父 Actor 时不影响子 Actor。然后,重新启动的父实例将具有与失败前相同的子实例。

如果子 Actor 是从setup创建的,如前一个示例中所示,并且在重新启动父 Actor 时它们应该保持完整(不停止),则应将supervise放置在setup中,并使用SupervisorStrategy.restart().withStopChildren(false),如下所示:

  1. static Behavior<String> parent2() {
  2. return Behaviors.setup(
  3. ctx -> {
  4. final ActorRef<String> child1 = ctx.spawn(child(0), "child1");
  5. final ActorRef<String> child2 = ctx.spawn(child(0), "child2");
  6. // supervision strategy inside the setup to not recreate children on restart
  7. return Behaviors.<String>supervise(
  8. Behaviors.receiveMessage(
  9. msg -> {
  10. // there might be bugs here...
  11. String[] parts = msg.split(" ");
  12. child1.tell(parts[0]);
  13. child2.tell(parts[1]);
  14. return Behaviors.same();
  15. }))
  16. .onFailure(SupervisorStrategy.restart().withStopChildren(false));
  17. });
  18. }

这意味着setup块只在父 Actor 第一次启动时运行,而不是在重新启动时运行。

失败在整个层次结构中冒泡

在某些情况下,在 Actor 层次结构中向上推送关于如何处理失败的决策,并让父 Actor 处理失败(在非类型化的 Akka Actor 中,这是默认的工作方式)可能很有用。

当一个子 Actor 被终止时,父 Actor 要得到通知,就必须watch这个子 Actor。如果由于故障而停止子进程,则将收到包含原因的ChildFailed信号。ChildFailed扩展了Terminated,因此如果你的用例不需要区分停止和失败,你可以使用Terminated信号来处理这两个用例。

如果父级反过来不处理Terminated消息,那么它本身将失败,并抛出akka.actor.typed.DeathPactException

在某些情况下,如果希望原始异常在层次结构中冒泡,可以通过处理Terminated信号并在每个 Actor 中重新抛出异常来完成。

  1. public class BubblingSample {
  2. interface Message {}
  3. public static class Fail implements Message {
  4. public final String text;
  5. public Fail(String text) {
  6. this.text = text;
  7. }
  8. }
  9. public static Behavior<Message> failingChildBehavior =
  10. Behaviors.receive(Message.class)
  11. .onMessage(
  12. Fail.class,
  13. (context, message) -> {
  14. throw new RuntimeException(message.text);
  15. })
  16. .build();
  17. public static Behavior<Message> middleManagementBehavior =
  18. Behaviors.setup(
  19. (context) -> {
  20. context.getLog().info("Middle management starting up");
  21. final ActorRef<Message> child = context.spawn(failingChildBehavior, "child");
  22. // we want to know when the child terminates, but since we do not handle
  23. // the Terminated signal, we will in turn fail on child termination
  24. context.watch(child);
  25. // here we don't handle Terminated at all which means that
  26. // when the child fails or stops gracefully this actor will
  27. // fail with a DeathWatchException
  28. return Behaviors.receive(Message.class)
  29. .onMessage(
  30. Message.class,
  31. (innerCtx, message) -> {
  32. // just pass messages on to the child
  33. child.tell(message);
  34. return Behaviors.same();
  35. })
  36. .build();
  37. });
  38. public static Behavior<Message> bossBehavior =
  39. Behaviors.setup(
  40. (context) -> {
  41. context.getLog().info("Boss starting up");
  42. final ActorRef<Message> middleManagement =
  43. context.spawn(middleManagementBehavior, "middle-management");
  44. context.watch(middleManagement);
  45. // here we don't handle Terminated at all which means that
  46. // when middle management fails with a DeathWatchException
  47. // this actor will also fail
  48. return Behaviors.receive(Message.class)
  49. .onMessage(
  50. Message.class,
  51. (innerCtx, message) -> {
  52. // just pass messages on to the child
  53. middleManagement.tell(message);
  54. return Behaviors.same();
  55. })
  56. .build();
  57. });
  58. public static void main(String[] args) {
  59. final ActorSystem<Message> system = ActorSystem.create(bossBehavior, "boss");
  60. system.tell(new Fail("boom"));
  61. // this will now bubble up all the way to the boss and as that is the user guardian it means
  62. // the entire actor system will stop
  63. }
  64. }

英文原文链接Fault Tolerance.