第 1 部分: Actor 的体系结构

依赖

在你的项目中添加如下依赖:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-actor_2.11</artifactId>
  5. <version>2.5.19</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.19'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.19"

简介

使用 Akka 可以让你从为 Actor 系统创建基础设施和编写控制基本行为所需的初级(low-level)代码中解脱出来。为了理解这一点,让我们看看你在代码中创建的 Actor 与 Akka 在内部为你创建和管理的 Actor 之间的关系,Actor 的生命周期和失败处理。

Akka 的 Actor 层级

Akka 的 Actor 总是属于父 Actor。通常,你可以通过调用getContext().actorOf()来创建 Actor。与创建一个“独立的” Actor 不同,这会将新 Actor 作为一个子节点注入到已经存在的树中:创建 Actor 的 Actor 成为新创建的子 Actor 的父级。你可能会问,你创造的第一个 Actor 的父节点是谁?

如下图所示,所有的 Actor 都有一个共同的父节点,即用户守护者。可以使用system.actorOf()在当前 Actor 下创建新的 Actor 实例。正如我们在「快速入门 Akka Java 指南」中介绍的那样,创建 Actor 将返回一个有效的 URL 引用。例如,如果我们用system.actorOf(…, "someActor")创建一个名为someActor的 Actor,它的引用将包括路径/user/someActor

Part 1: Actor Architecture

事实上,在你在代码中创建 Actor 之前,Akka 已经在系统中创建了三个 Actor 。这些内置的 Actor 的名字包含guardian,因为他们守护他们所在路径下的每一个子 Actor。守护者 Actor 包括:

  • /,根守护者(root guardian)。这是系统中所有 Actor 的父 Actor,也是系统本身终止时要停止的最后一个 Actor。

  • /user,守护者(guardian)。这是用户创建的所有 Actor 的父 Actor。不要让用户名混淆,它与最终用户和用户处理无关。使用 Akka 库创建的每个 Actor 都将有一个事先准备的固定路径/user/

  • /system,系统守护者(system guardian)。这是除上述三个 Actor 外,系统创建的所有 Actor 的父 Actor,

Hello World示例中,我们已经看到system.actorOf()如何直接在/user下创建 Actor。我们称之为顶级 Actor,尽管实际上它只是在用户定义的层次结构的顶部。你的ActorSystem中通常只有一个(或极少数)顶级 Actor。我们通过从现有的 Actor 调用context.actorOf()来创建子 Actor 或非顶级 Actor。context.actorOf()方法具有与system.actorOf()相同的签名,后者是其对应的顶级。

查看 Actor 层次结构的最简单方法是打印ActorRef实例。在这个小实验中,我们创建了一个 Actor,打印了它的引用,创建了这个 Actor 的一个子 Actor,并打印了这个子 Actor 的引用。我们从Hello World项目开始,如果你还没有下载它,请从「Lightbend Tech Hub」下载 QuickStart 项目。

在你的Hello World项目中,导航到com.example包并创建一个新的名为ActorHierarchyExperiments.java的 Java 文件。将下面代码段中的代码复制并粘贴到此新源文件中。保存文件并运行sbt "runMain com.example.ActorHierarchyExperiments"来观察输出。

  1. package com.example;
  2. import akka.actor.AbstractActor;
  3. import akka.actor.AbstractActor.Receive;
  4. import akka.actor.ActorRef;
  5. import akka.actor.ActorSystem;
  6. import akka.actor.Props;
  7. public class ActorHierarchyExperiments {
  8. public static void main(String[] args) throws java.io.IOException {
  9. ActorSystem system = ActorSystem.create("testSystem");
  10. ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor");
  11. System.out.println("First: " + firstRef);
  12. firstRef.tell("printit", ActorRef.noSender());
  13. System.out.println(">>> Press ENTER to exit <<<");
  14. try {
  15. System.in.read();
  16. } finally {
  17. system.terminate();
  18. }
  19. }
  20. }
  21. class PrintMyActorRefActor extends AbstractActor {
  22. static Props props() {
  23. return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new);
  24. }
  25. @Override
  26. public Receive createReceive() {
  27. return receiveBuilder()
  28. .matchEquals("printit", p -> {
  29. ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
  30. System.out.println("Second: " + secondRef);
  31. })
  32. .build();
  33. }
  34. }

注意信息要求第一个 Actor 完成工作的方式。我们使用父 Actor 的引用firstRef.tell("printit", ActorRef.noSender())发送消息。当代码执行时,输出包括第一个 Actor 的引用,以及匹配printit模式时创建的子 Actor 的引用。你的输出应该与下面的内容相似:

  1. First: Actor[akka://testSystem/user/first-actor#1053618476]
  2. Second: Actor[akka://testSystem/user/first-actor/second-actor#-1544706041]

注意 Actor 引用的结构:

  • 两条路径都以akka://testSystem/开头。因为所有 Actor 的引用都是有效的 URL,所以akka://是协议字段的值。
  • 接下来,就像在万维网(World Wide Web)上一样,URL 标识系统。在本例中,系统名为testSystem,但它可以是任何其他名称。如果启用了多个系统之间的远程通信,则 URL 的这一部分包括主机名,以便其他系统可以在网络上找到它。
  • 因为第二个 Actor 的引用包含路径/first-actor/,这个标识它为第一个 Actor 的子 Actor。
  • Actor 引用的最后一部分,即#1053618476#-1544706041是一个在大多数情况下可以忽略的唯一标识符。

既然你了解了 Actor 层次结构的样子,你可能会想:为什么我们需要这个层次结构?它是用来干什么的?

层次结构的一个重要作用是安全地管理 Actor 的生命周期。接下来,我们来考虑一下,这些知识如何帮助我们编写更好的代码。

Actor 的生命周期

Actor 在被创建时就会出现,然后在用户请求时被停止。每当一个 Actor 被停止时,它的所有子 Actor 也会被递归地停止。这种行为大大简化了资源清理,并有助于避免诸如由打开的套接字和文件引起的资源泄漏。事实上,在处理初级多线程代码时,一个通常被忽视的困难是各种并发资源的生命周期管理。

要停止 Actor,建议的模式是调用 Actor 内部的getContext().stop(getSelf())来停止自身,通常是对某些用户定义的停止消息的响应,或者当 Actor 完成其任务时。从技术上讲,通过调用getContext().stop(actorRef)是可以停止另一个 Actor 的,但通过这种方式停止任意的 Actor 被认为是一种糟糕的做法:停止 Actor 的一个比较好的方法是,尝试向他们发送一个“毒丸(PoisonPill)”或自定义的停止消息

Akka Actor 的 API 暴露了许多生命周期的钩子,你可以在 Actor 的实现中覆盖这些钩子。最常用的是preStart()postStop()方法。

  • preStart()在 Actor 启动之后但在处理其第一条消息之前调用。
  • postStop()在 Actor 停止之前调用,在此时之后将不再处理任何消息。

让我们在一个简单的实验中使用生命周期中的preStart()postStop()钩子来观察停止一个 Actor 时的行为。首先,将以下两个 Actor 类添加到项目中:

  1. class StartStopActor1 extends AbstractActor {
  2. static Props props() {
  3. return Props.create(StartStopActor1.class, StartStopActor1::new);
  4. }
  5. @Override
  6. public void preStart() {
  7. System.out.println("first started");
  8. getContext().actorOf(StartStopActor2.props(), "second");
  9. }
  10. @Override
  11. public void postStop() {
  12. System.out.println("first stopped");
  13. }
  14. @Override
  15. public Receive createReceive() {
  16. return receiveBuilder()
  17. .matchEquals("stop", s -> {
  18. getContext().stop(getSelf());
  19. })
  20. .build();
  21. }
  22. }
  23. class StartStopActor2 extends AbstractActor {
  24. static Props props() {
  25. return Props.create(StartStopActor2.class, StartStopActor2::new);
  26. }
  27. @Override
  28. public void preStart() {
  29. System.out.println("second started");
  30. }
  31. @Override
  32. public void postStop() {
  33. System.out.println("second stopped");
  34. }
  35. // Actor.emptyBehavior is a useful placeholder when we don't
  36. // want to handle any messages in the actor.
  37. @Override
  38. public Receive createReceive() {
  39. return receiveBuilder()
  40. .build();
  41. }
  42. }

接下来,创建一个主函数来启动 Actors,然后向他们发送stop消息:

  1. ActorRef first = system.actorOf(StartStopActor1.props(), "first");
  2. first.tell("stop", ActorRef.noSender());

你可以再次使用sbt命令来启动这个项目,其输出结果应该如下面这样:

  1. first started
  2. second started
  3. second stopped
  4. first stopped

当我们停止first Actor 时,它会在停止自身之前,先停止了它的子 Actor second。这个顺序是严格的,在调用父 Actor 的postStop()钩子之前,会先调用所有子 Actor 的postStop()钩子。

在 Akka 参考手册的「Actor Lifecycle」部分提供了关于 Actor 的全套生命周期钩子的详细信息。

失败处理

父 Actor 和子 Actor 在他们的生命周期中是相互联系的。当一个 Actor 失败(抛出一个异常或从接收中冒出一个未处理的异常)时,它将暂时挂起。如前所述,失败信息被传播到父 Actor,然后父 Actor 决定如何处理由子 Actor 引起的异常。这样,父 Actor 就可以作为子 Actor 的监督者(supervisors)。默认的监督策略是停止并重新启动子 Actor。如果不更改默认策略,所有失败都会导致重新启动。

让我们在一个简单的实验中观察默认策略。将下面的类添加到项目中,就像之前的类一样:

  1. class SupervisingActor extends AbstractActor {
  2. static Props props() {
  3. return Props.create(SupervisingActor.class, SupervisingActor::new);
  4. }
  5. ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor");
  6. @Override
  7. public Receive createReceive() {
  8. return receiveBuilder()
  9. .matchEquals("failChild", f -> {
  10. child.tell("fail", getSelf());
  11. })
  12. .build();
  13. }
  14. }
  15. class SupervisedActor extends AbstractActor {
  16. static Props props() {
  17. return Props.create(SupervisedActor.class, SupervisedActor::new);
  18. }
  19. @Override
  20. public void preStart() {
  21. System.out.println("supervised actor started");
  22. }
  23. @Override
  24. public void postStop() {
  25. System.out.println("supervised actor stopped");
  26. }
  27. @Override
  28. public Receive createReceive() {
  29. return receiveBuilder()
  30. .matchEquals("fail", f -> {
  31. System.out.println("supervised actor fails now");
  32. throw new Exception("I failed!");
  33. })
  34. .build();
  35. }
  36. }

然后运行:

  1. ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor");
  2. supervisingActor.tell("failChild", ActorRef.noSender());

你应该看到类似如下的输出结果:

  1. supervised actor started
  2. supervised actor fails now
  3. supervised actor stopped
  4. supervised actor started
  5. [ERROR] [03/29/2017 10:47:14.150] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/supervising-actor/supervised-actor] I failed!
  6. java.lang.Exception: I failed!
  7. at tutorial_1.SupervisedActor$$anonfun$receive$4.applyOrElse(ActorHierarchyExperiments.scala:57)
  8. at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
  9. at tutorial_1.SupervisedActor.aroundReceive(ActorHierarchyExperiments.scala:47)
  10. at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519)
  11. at akka.actor.ActorCell.invoke(ActorCell.scala:488)
  12. at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
  13. at akka.dispatch.Mailbox.run(Mailbox.scala:224)
  14. at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
  15. at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  16. at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  17. at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  18. at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我们看到失败后,被监督的 Actor 停止并立即重新启动。我们还看到一个日志条目,报告处理的异常,在本例中是我们的测试异常。在这个例子中,我们使用了preStart()postStop()钩子,这是重启后和重启前默认调用的钩子,因此我们无法区分 Actor 内部是第一次启动还是重启。这通常是正确的做法,重新启动的目的是将 Actor 设置为已知的良好状态,这通常意味着一个干净的开始阶段。实际上,在重新启动时,调用的是preRestart()postRestart()方法,但如果不重写这两个方法,则默认分别委托给postStop()preStart()。你可以尝试重写这些附加方法,并查看输出是如何变化的。

无论如何,我们还是建议查看「监督和监控」,以了解更深入的细节。

总结

我们已经了解了 Akka 是如何管理层级结构中的 Actor 的,在层级结构中,父 Actor 会监督他们的子 Actor 并处理异常情况。我们看到了如何创造一个非常简单的 Actor 和其子 Actor。接下来,我们将会把这些知识应该到我们的示例中,获取设备 Actor 的信息。稍后,我们将讨论如何管理小组中的 Actor。


英文原文链接Part 1: Actor Architecture.