第 4 部分: 使用设备组

依赖

在你的项目中添加如下依赖:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-actor_2.11</artifactId>
  5. <version>2.5.19</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.19'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.19"

简介

让我们仔细看看用例所需的主要功能。在用于监测家庭温度的完整物联网系统中,将设备传感器连接到系统的步骤可能如下:

  1. 家庭中的传感器设备通过某种协议进行连接。
  2. 管理网络连接的组件接受连接。
  3. 传感器提供其组和设备 ID,以便在系统的设备管理器组件中注册。
  4. 设备管理器组件通过查找或创建负责保持传感器状态的 Actor 来处理注册。
  5. Actor 以一种确认(acknowledgement)回应,暴露其ActorRef
  6. 网络组件现在使用ActorRef在传感器和设备 Actor 之间进行通信,而不需要经过设备管理器。

步骤 1 和 2 发生在教程系统的边界之外。在本章中,我们将开始处理步骤 3 - 6,并创建传感器在系统中注册和与 Actor 通信的方法。但首先,我们有另一个体系结构决策——我们应该使用多少个层次的 Actor 来表示设备组和设备传感器?

Akka 程序员面临的主要设计挑战之一是为 Actor 选择最佳的粒度。在实践中,根据 Actor 之间交互的特点,通常有几种有效的方法来组织系统。例如,在我们的用例中,可能有一个 Actor 维护所有的组和设备——或许可以使用哈希表(hash maps)。对于每个跟踪同一个家中所有设备状态的组来说,有一个 Actor 也是合理的。

以下指导原则可以帮助我们选择最合适的 Actor 层次结构:

  • 一般来说,更倾向于更大的粒度。引入比需要更多的细粒度 Actor 会导致比它解决的问题更多的问题。
  • 当系统需要时添加更细的粒度:
    • 更高的并发性。
    • 有许多状态的 Actor 之间的复杂交互。在下一章中,我们将看到一个很好的例子。
    • 足够多的状态,划分为较小的 Actor 是有意义地。
    • 多重无关责任。使用不同的 Actor 可以使单个 Actor 失败并恢复,而对其他的 Actor 影响很小。

设备管理器层次结构

考虑到上一节中概述的原则,我们将设备管理器组件建模为具有三个级别的 Actor 树:

  • 顶级监督者 Actor 表示设备的系统组件。它也是查找和创建设备组和设备 Actor 的入口点。
  • 在下一个级别,每个组 Actor 都监督设备 Actor 使用同一个组 ID。它们还提供服务,例如查询组中所有可用设备的温度读数。
  • 设备 Actor 管理与实际设备传感器的所有交互,例如存储温度读数。

device-manager

我们选择这三层架构的原因如下:

  • 划分组为单独的 Actor:
    • 隔离组中发生的故障。如果一个 Actor 管理所有设备组,则一个组中导致重新启动的错误将清除组的状态,否则这些组不会出现故障。
    • 简化了查询属于一个组的所有设备的问题。每个组 Actor 只包含与其组相关的状态。
    • 提高系统的并行性。因为每个组都有一个专用的 Actor,所以它们可以并发运行,我们可以并发查询多个组。
  • 将传感器建模为单个设备 Actor:
    • 将一个设备 Actor 的故障与组中的其他设备隔离开来。
    • 增加收集温度读数的平行度。来自不同传感器的网络连接直接与各自的设备 Actor 通信,从而减少了竞争点。

定义了设备体系结构后,我们就可以开始研究注册传感器的协议了。

注册协议

作为第一步,我们需要设计协议来注册一个设备,以及创建负责它的组和设备 Actor。此协议将由DeviceManager组件本身提供,因为它是唯一已知且预先可用的 Actor:设备组和设备 Actor 是按需创建的。

更详细地看一下注册,我们可以概述必要的功能:

  • DeviceManager接收到具有组和设备 ID 的请求时:
    • 如果管理器已经有了设备组的 Actor,那么它会将请求转发给它。
    • 否则,它会创建一个新的设备组 Actor,然后转发请求。
  • DeviceGroup Actor 接收为给定设备注册 Actor 的请求:
    • 如果组已经有设备的 Actor,则组 Actor 将请求转发给设备 Actor。
    • 否则,设备组 Actor 首先创建设备 Actor,然后转发请求。
  • 设备 Actor 接收请求并向原始发送者发送确认。由于设备 Actor 确认接收(而不是组 Actor),传感器现在将有ActorRef,可以直接向其 Actor 发送消息。

我们将用来传递注册请求及其确认的消息有一个简单的定义:

  1. public static final class RequestTrackDevice {
  2. public final String groupId;
  3. public final String deviceId;
  4. public RequestTrackDevice(String groupId, String deviceId) {
  5. this.groupId = groupId;
  6. this.deviceId = deviceId;
  7. }
  8. }
  9. public static final class DeviceRegistered {
  10. }

在这种情况下,我们在消息中没有包含请求 ID 字段。由于注册只发生一次,当组件将系统连接到某个网络协议时,ID 并不重要。但是,包含请求 ID 通常是一种最佳实践。

现在,我们将从头开始实现该协议。在实践中,自上向下和自下而上的方法都很有效,但是在我们的例子中,我们实使用自下而上的方法,因为它允许我们立即为新特性编写测试,而不需要模拟出稍后需要构建的部分。

向设备 Actor 添加注册支持

在我们的层次结构的底部是Device Actor。他们在注册过程中的工作很简单:回复注册请求并向发送者确认。对于带有不匹配的组或设备 ID 的请求,添加一个保护措施也是明智的。

我们假设注册消息发送者的 ID 保留在上层。我们将在下一节向你展示如何实现这一点。

设备 Actor 的注册代码如下所示:

  1. import akka.actor.AbstractActor;
  2. import akka.actor.Props;
  3. import akka.event.Logging;
  4. import akka.event.LoggingAdapter;
  5. import jdocs.tutorial_4.DeviceManager.DeviceRegistered;
  6. import jdocs.tutorial_4.DeviceManager.RequestTrackDevice;
  7. import java.util.Optional;
  8. public class Device extends AbstractActor {
  9. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  10. final String groupId;
  11. final String deviceId;
  12. public Device(String groupId, String deviceId) {
  13. this.groupId = groupId;
  14. this.deviceId = deviceId;
  15. }
  16. public static Props props(String groupId, String deviceId) {
  17. return Props.create(Device.class, () -> new Device(groupId, deviceId));
  18. }
  19. public static final class RecordTemperature {
  20. final long requestId;
  21. final double value;
  22. public RecordTemperature(long requestId, double value) {
  23. this.requestId = requestId;
  24. this.value = value;
  25. }
  26. }
  27. public static final class TemperatureRecorded {
  28. final long requestId;
  29. public TemperatureRecorded(long requestId) {
  30. this.requestId = requestId;
  31. }
  32. }
  33. public static final class ReadTemperature {
  34. final long requestId;
  35. public ReadTemperature(long requestId) {
  36. this.requestId = requestId;
  37. }
  38. }
  39. public static final class RespondTemperature {
  40. final long requestId;
  41. final Optional<Double> value;
  42. public RespondTemperature(long requestId, Optional<Double> value) {
  43. this.requestId = requestId;
  44. this.value = value;
  45. }
  46. }
  47. Optional<Double> lastTemperatureReading = Optional.empty();
  48. @Override
  49. public void preStart() {
  50. log.info("Device actor {}-{} started", groupId, deviceId);
  51. }
  52. @Override
  53. public void postStop() {
  54. log.info("Device actor {}-{} stopped", groupId, deviceId);
  55. }
  56. @Override
  57. public Receive createReceive() {
  58. return receiveBuilder()
  59. .match(RequestTrackDevice.class, r -> {
  60. if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
  61. getSender().tell(new DeviceRegistered(), getSelf());
  62. } else {
  63. log.warning(
  64. "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
  65. r.groupId, r.deviceId, this.groupId, this.deviceId
  66. );
  67. }
  68. })
  69. .match(RecordTemperature.class, r -> {
  70. log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
  71. lastTemperatureReading = Optional.of(r.value);
  72. getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
  73. })
  74. .match(ReadTemperature.class, r -> {
  75. getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
  76. })
  77. .build();
  78. }
  79. }

我们现在可以编写两个新的测试用例,一个成功注册,另一个在 ID 不匹配时测试用例:

  1. @Test
  2. public void testReplyToRegistrationRequests() {
  3. TestKit probe = new TestKit(system);
  4. ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
  5. deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
  6. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  7. assertEquals(deviceActor, probe.getLastSender());
  8. }
  9. @Test
  10. public void testIgnoreWrongRegistrationRequests() {
  11. TestKit probe = new TestKit(system);
  12. ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
  13. deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
  14. probe.expectNoMessage();
  15. deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
  16. probe.expectNoMessage();
  17. }
  • 注释:我们使用了TestKit中的expectNoMsg()帮助者方法。此断言等待到定义的时间限制,如果在此期间收到任何消息,则会失败。如果在等待期间未收到任何消息,则断言通过。通常最好将这些超时保持在较低的水平(但不要太低),因为它们会增加大量的测试执行时间。

向设备组 Actor 添加注册支持

我们已经完成了设备级别的注册支持,现在我们必须在组级别实现它。当涉及到注册时,组 Actor 有更多的工作要做,包括:

  • 通过将注册请求转发给现有设备 Actor 或创建新 Actor 并转发消息来处理注册请求。
  • 跟踪组中存在哪些设备 Actor,并在停止时将其从组中删除。

处理注册请求

设备组 Actor 必须将请求转发给现有的子 Actor,或者应该创建一个子 Actor。要通过设备 ID 查找子 Actor,我们将使用Map<String, ActorRef>

我们还希望保留请求的原始发送者的 ID,以便设备 Actor 可以直接回复。这可以通过使用forward而不是tell运算符来实现。两者之间的唯一区别是,forward保留原始发送者,而tell将发送者设置为当前 Actor。就像我们的设备 Actor 一样,我们确保不响应错误的组 ID。将以下内容添加到你的源文件中:

  1. public class DeviceGroup extends AbstractActor {
  2. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  3. final String groupId;
  4. public DeviceGroup(String groupId) {
  5. this.groupId = groupId;
  6. }
  7. public static Props props(String groupId) {
  8. return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
  9. }
  10. final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
  11. @Override
  12. public void preStart() {
  13. log.info("DeviceGroup {} started", groupId);
  14. }
  15. @Override
  16. public void postStop() {
  17. log.info("DeviceGroup {} stopped", groupId);
  18. }
  19. private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
  20. if (this.groupId.equals(trackMsg.groupId)) {
  21. ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
  22. if (deviceActor != null) {
  23. deviceActor.forward(trackMsg, getContext());
  24. } else {
  25. log.info("Creating device actor for {}", trackMsg.deviceId);
  26. deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
  27. deviceIdToActor.put(trackMsg.deviceId, deviceActor);
  28. deviceActor.forward(trackMsg, getContext());
  29. }
  30. } else {
  31. log.warning(
  32. "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
  33. groupId, this.groupId
  34. );
  35. }
  36. }
  37. @Override
  38. public Receive createReceive() {
  39. return receiveBuilder()
  40. .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
  41. .build();
  42. }
  43. }

正如我们对设备所做的那样,我们测试了这个新功能。我们还测试了两个不同 ID 返回的 Actor 实际上是不同的,我们还尝试记录每个设备的温度读数,以查看 Actor 是否有响应。

  1. @Test
  2. public void testRegisterDeviceActor() {
  3. TestKit probe = new TestKit(system);
  4. ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
  5. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  6. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  7. ActorRef deviceActor1 = probe.getLastSender();
  8. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  9. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  10. ActorRef deviceActor2 = probe.getLastSender();
  11. assertNotEquals(deviceActor1, deviceActor2);
  12. // Check that the device actors are working
  13. deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
  14. assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
  15. deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
  16. assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
  17. }
  18. @Test
  19. public void testIgnoreRequestsForWrongGroupId() {
  20. TestKit probe = new TestKit(system);
  21. ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
  22. groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
  23. probe.expectNoMessage();
  24. }

如果注册请求已经存在设备 Actor,我们希望使用现有的 Actor 而不是新的 Actor。我们尚未对此进行测试,因此需要修复此问题:

  1. @Test
  2. public void testReturnSameActorForSameDeviceId() {
  3. TestKit probe = new TestKit(system);
  4. ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
  5. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  6. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  7. ActorRef deviceActor1 = probe.getLastSender();
  8. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  9. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  10. ActorRef deviceActor2 = probe.getLastSender();
  11. assertEquals(deviceActor1, deviceActor2);
  12. }

跟踪组内的设备 Actor

到目前为止,我们已经实现了在组中注册设备 Actor 的逻辑。然而,设备增增减减(come and go),所以我们需要一种方法从Map<String, ActorRef>中删除设备 Actor。我们假设当一个设备被删除时,它对应的设备 Actor 被停止。正如我们前面讨论的,监督只处理错误场景——而不是优雅的停止。因此,当其中一个设备 Actor 停止时,我们需要通知其父 Actor。

Akka 提供了一个死亡观察功能(Death Watch feature),允许一个 Actor 观察另一个 Actor,并在另一个 Actor 被停止时得到通知。与监督者不同的是,观察(watching)并不局限于父子关系,任何 Actor 只要知道ActorRef就可以观察其他 Actor。在被观察的 Actor 停止后,观察者接收一条Terminated(actorRef)消息,该消息还包含对被观察的 Actor 的引用。观察者可以显式处理此消息,也可以失败并出现DeathPactException。如果在被观察的 Actor 被停止后,该 Actor 不能再履行自己的职责,则后者很有用。在我们的例子中,组应该在一个设备停止后继续工作,所以我们需要处理Terminated(actorRef)消息。

我们的设备组 Actor 需要包括以下功能:

  • 当新设备 Actor 被创建时开始观察(watching)。
  • 当通知指示设备已停止时,从映射Map<String, ActorRef>中删除设备 Actor。

不幸的是,Terminated的消息只包含子 Actor 的ActorRef。我们需要 Actor 的 ID 将其从现有设备到设备的 Actor 映射中删除。为了能够进行删除,我们需要引入另一个占位符Map<ActorRef, String>,它允许我们找到与给定ActorRef对应的设备 ID。

添加用于标识 Actor 的功能后,代码如下:

  1. public class DeviceGroup extends AbstractActor {
  2. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  3. final String groupId;
  4. public DeviceGroup(String groupId) {
  5. this.groupId = groupId;
  6. }
  7. public static Props props(String groupId) {
  8. return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
  9. }
  10. final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
  11. final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  12. @Override
  13. public void preStart() {
  14. log.info("DeviceGroup {} started", groupId);
  15. }
  16. @Override
  17. public void postStop() {
  18. log.info("DeviceGroup {} stopped", groupId);
  19. }
  20. private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
  21. if (this.groupId.equals(trackMsg.groupId)) {
  22. ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
  23. if (deviceActor != null) {
  24. deviceActor.forward(trackMsg, getContext());
  25. } else {
  26. log.info("Creating device actor for {}", trackMsg.deviceId);
  27. deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
  28. getContext().watch(deviceActor);
  29. actorToDeviceId.put(deviceActor, trackMsg.deviceId);
  30. deviceIdToActor.put(trackMsg.deviceId, deviceActor);
  31. deviceActor.forward(trackMsg, getContext());
  32. }
  33. } else {
  34. log.warning(
  35. "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
  36. groupId, this.groupId
  37. );
  38. }
  39. }
  40. private void onTerminated(Terminated t) {
  41. ActorRef deviceActor = t.getActor();
  42. String deviceId = actorToDeviceId.get(deviceActor);
  43. log.info("Device actor for {} has been terminated", deviceId);
  44. actorToDeviceId.remove(deviceActor);
  45. deviceIdToActor.remove(deviceId);
  46. }
  47. @Override
  48. public Receive createReceive() {
  49. return receiveBuilder()
  50. .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
  51. .match(Terminated.class, this::onTerminated)
  52. .build();
  53. }
  54. }

到目前为止,我们还没有办法获得组设备 Actor 跟踪的设备,因此,我们还不能测试我们的新功能。为了使其可测试,我们添加了一个新的查询功能(消息RequestDeviceList),其中列出了当前活动的设备 ID:

  1. public class DeviceGroup extends AbstractActor {
  2. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  3. final String groupId;
  4. public DeviceGroup(String groupId) {
  5. this.groupId = groupId;
  6. }
  7. public static Props props(String groupId) {
  8. return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
  9. }
  10. public static final class RequestDeviceList {
  11. final long requestId;
  12. public RequestDeviceList(long requestId) {
  13. this.requestId = requestId;
  14. }
  15. }
  16. public static final class ReplyDeviceList {
  17. final long requestId;
  18. final Set<String> ids;
  19. public ReplyDeviceList(long requestId, Set<String> ids) {
  20. this.requestId = requestId;
  21. this.ids = ids;
  22. }
  23. }
  24. final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
  25. final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  26. @Override
  27. public void preStart() {
  28. log.info("DeviceGroup {} started", groupId);
  29. }
  30. @Override
  31. public void postStop() {
  32. log.info("DeviceGroup {} stopped", groupId);
  33. }
  34. private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
  35. if (this.groupId.equals(trackMsg.groupId)) {
  36. ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
  37. if (deviceActor != null) {
  38. deviceActor.forward(trackMsg, getContext());
  39. } else {
  40. log.info("Creating device actor for {}", trackMsg.deviceId);
  41. deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
  42. getContext().watch(deviceActor);
  43. actorToDeviceId.put(deviceActor, trackMsg.deviceId);
  44. deviceIdToActor.put(trackMsg.deviceId, deviceActor);
  45. deviceActor.forward(trackMsg, getContext());
  46. }
  47. } else {
  48. log.warning(
  49. "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
  50. groupId, this.groupId
  51. );
  52. }
  53. }
  54. private void onDeviceList(RequestDeviceList r) {
  55. getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
  56. }
  57. private void onTerminated(Terminated t) {
  58. ActorRef deviceActor = t.getActor();
  59. String deviceId = actorToDeviceId.get(deviceActor);
  60. log.info("Device actor for {} has been terminated", deviceId);
  61. actorToDeviceId.remove(deviceActor);
  62. deviceIdToActor.remove(deviceId);
  63. }
  64. @Override
  65. public Receive createReceive() {
  66. return receiveBuilder()
  67. .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
  68. .match(RequestDeviceList.class, this::onDeviceList)
  69. .match(Terminated.class, this::onTerminated)
  70. .build();
  71. }
  72. }

我们几乎准备好测试设备的移除功能了。但是,我们仍然需要以下功能:

  • 为了通过我们的测试用例停止一个设备 Actor。从外面看,任何 Actor 都可以通过发送一个特殊的内置消息PoisonPill来停止,该消息指示 Actor 停止。
  • 为了在设备 Actor 停止后得到通知。我们也可以使用Death Watch功能观察设备。TestKit有两条消息,我们可以很容易地使用watch()来观察指定的 Actor,使用expectTerminated来断言被观察的 Actor 已被终止。

我们现在再添加两个测试用例。在第一个测试中,我们测试在添加了一些设备之后,是否能返回正确的 ID 列表。第二个测试用例确保在设备 Actor 停止后正确删除设备 ID:

  1. @Test
  2. public void testListActiveDevices() {
  3. TestKit probe = new TestKit(system);
  4. ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
  5. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  6. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  7. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  8. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  9. groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
  10. DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
  11. assertEquals(0L, reply.requestId);
  12. assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
  13. }
  14. @Test
  15. public void testListActiveDevicesAfterOneShutsDown() {
  16. TestKit probe = new TestKit(system);
  17. ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
  18. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  19. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  20. ActorRef toShutDown = probe.getLastSender();
  21. groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  22. probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  23. groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
  24. DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
  25. assertEquals(0L, reply.requestId);
  26. assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
  27. probe.watch(toShutDown);
  28. toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
  29. probe.expectTerminated(toShutDown);
  30. // using awaitAssert to retry because it might take longer for the groupActor
  31. // to see the Terminated, that order is undefined
  32. probe.awaitAssert(() -> {
  33. groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
  34. DeviceGroup.ReplyDeviceList r =
  35. probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
  36. assertEquals(1L, r.requestId);
  37. assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
  38. return null;
  39. });
  40. }

创建设备管理器 Actor

在我们的层次结构中,我们需要在DeviceManager源文件中为设备管理器组件创建入口点。此 Actor 与设备组 Actor 非常相似,但创建的是设备组 Actor 而不是设备 Actor:

  1. public class DeviceManager extends AbstractActor {
  2. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  3. public static Props props() {
  4. return Props.create(DeviceManager.class, DeviceManager::new);
  5. }
  6. public static final class RequestTrackDevice {
  7. public final String groupId;
  8. public final String deviceId;
  9. public RequestTrackDevice(String groupId, String deviceId) {
  10. this.groupId = groupId;
  11. this.deviceId = deviceId;
  12. }
  13. }
  14. public static final class DeviceRegistered {
  15. }
  16. final Map<String, ActorRef> groupIdToActor = new HashMap<>();
  17. final Map<ActorRef, String> actorToGroupId = new HashMap<>();
  18. @Override
  19. public void preStart() {
  20. log.info("DeviceManager started");
  21. }
  22. @Override
  23. public void postStop() {
  24. log.info("DeviceManager stopped");
  25. }
  26. private void onTrackDevice(RequestTrackDevice trackMsg) {
  27. String groupId = trackMsg.groupId;
  28. ActorRef ref = groupIdToActor.get(groupId);
  29. if (ref != null) {
  30. ref.forward(trackMsg, getContext());
  31. } else {
  32. log.info("Creating device group actor for {}", groupId);
  33. ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
  34. getContext().watch(groupActor);
  35. groupActor.forward(trackMsg, getContext());
  36. groupIdToActor.put(groupId, groupActor);
  37. actorToGroupId.put(groupActor, groupId);
  38. }
  39. }
  40. private void onTerminated(Terminated t) {
  41. ActorRef groupActor = t.getActor();
  42. String groupId = actorToGroupId.get(groupActor);
  43. log.info("Device group actor for {} has been terminated", groupId);
  44. actorToGroupId.remove(groupActor);
  45. groupIdToActor.remove(groupId);
  46. }
  47. public Receive createReceive() {
  48. return receiveBuilder()
  49. .match(RequestTrackDevice.class, this::onTrackDevice)
  50. .match(Terminated.class, this::onTerminated)
  51. .build();
  52. }
  53. }

我们将设备管理器的测试留给你作为练习,因为它与我们为设备组 Actor 编写的测试非常相似。

下一步是什么?

我们现在有了一个用于注册和跟踪设备以及记录测量值的分层组件。我们已经了解了如何实现不同类型的对话模式,例如:

  • 请求响应(Request-respond),用于温度记录。
  • 代理响应(Delegate-respond),用于设备注册。
  • 创建监视终止(Create-watch-terminate),用于将组和设备 Actor 创建为子级。

在下一章中,我们将介绍组查询功能,这将建立一种新的分散收集(scatter-gather)对话模式。特别地,我们将实现允许用户查询属于一个组的所有设备的状态的功能。


英文原文链接Part 4: Working with Device Groups.