Actor 发现

依赖

为了使用 Akka Actor 类型,你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-actor-typed_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.23"

简介

对于「非类型化的 Actor」,你将使用ActorSelection来查找 Actor。给定一个包含地址信息的 Actor 路径,你可以为任何 Actor 获取ActorRef。在 Akka 类型中不存在ActorSelection,那么如何获取 Actor 引用呢?你可以在消息中发送refs,但需要一些东西来引导交互。

Receptionist

为此,有个 Actor 叫Receptionist。你注册了应该可以从本地Receptionist实例中的其他节点发现的特定 Actor。Receptionist的 API 也基于 Actor 消息。然后,Actor 引用的注册表将自动分发到集群中的所有其他节点。你可以使用注册时使用的键查找这些 Actor。对这样一个Find请求的答复是一个Listing,其中包含为键注册的一组 Actor 引用的Set。请注意,可以将多个 Actor 注册到同一个键。

注册表是动态的。新的 Actor 可以在系统的生命周期中注册。当已注册的 Actor 停止或节点从群集中删除时,条目将被删除。为了方便这个动态方面,你还可以通过发送Receptionist.Subscribe消息来订阅更改。当某个键的条目被更改时,它将向订阅服务器发送Listing消息。

使用Receptionist的主要场景是,当一个 Actor 需要被另一个 Actor 发现,但你无法在传入消息中引用它时。

以下示例中使用了这些导入:

  1. import akka.actor.typed.ActorRef;
  2. import akka.actor.typed.Behavior;
  3. import akka.actor.typed.javadsl.ActorContext;
  4. import akka.actor.typed.javadsl.Behaviors;
  5. import akka.actor.typed.receptionist.Receptionist;
  6. import akka.actor.typed.receptionist.ServiceKey;

首先,我们创建一个PingService Actor,并根据一个ServiceKeyReceptionist注册它,该ServiceKey稍后将用于查找引用:

  1. public static class PingService {
  2. static final ServiceKey<Ping> pingServiceKey = ServiceKey.create(Ping.class, "pingService");
  3. public static class Pong {}
  4. public static class Ping {
  5. private final ActorRef<Pong> replyTo;
  6. public Ping(ActorRef<Pong> replyTo) {
  7. this.replyTo = replyTo;
  8. }
  9. }
  10. static Behavior<Ping> createBehavior() {
  11. return Behaviors.setup(
  12. context -> {
  13. context
  14. .getSystem()
  15. .receptionist()
  16. .tell(Receptionist.register(pingServiceKey, context.getSelf()));
  17. return Behaviors.receive(Ping.class).onMessage(Ping.class, PingService::onPing).build();
  18. });
  19. }
  20. private static Behavior<Ping> onPing(ActorContext<Ping> context, Ping msg) {
  21. context.getLog().info("Pinged by {}", msg.replyTo);
  22. msg.replyTo.tell(new Pong());
  23. return Behaviors.same();
  24. }
  25. }

然后我们有另一个 Actor 需要构造PingService

  1. public static class Pinger {
  2. static Behavior<PingService.Pong> createBehavior(ActorRef<PingService.Ping> pingService) {
  3. return Behaviors.setup(
  4. (ctx) -> {
  5. pingService.tell(new PingService.Ping(ctx.getSelf()));
  6. return Behaviors.receive(PingService.Pong.class)
  7. .onMessage(PingService.Pong.class, Pinger::onPong)
  8. .build();
  9. });
  10. }
  11. private static Behavior<PingService.Pong> onPong(
  12. ActorContext<PingService.Pong> context, PingService.Pong msg) {
  13. context.getLog().info("{} was ponged!!", context.getSelf());
  14. return Behaviors.stopped();
  15. }
  16. }

最后,在守护者 Actor 中,我们生成服务,并订阅针对ServiceKey注册的任何 Actor。订阅意味着守护者 Actor 将通过Listing消息被通知任何新的注册:

  1. public static Behavior<Void> createGuardianBehavior() {
  2. return Behaviors.setup(
  3. context -> {
  4. context
  5. .getSystem()
  6. .receptionist()
  7. .tell(
  8. Receptionist.subscribe(
  9. PingService.pingServiceKey, context.getSelf().narrow()));
  10. context.spawnAnonymous(PingService.createBehavior());
  11. return Behaviors.receive(Object.class)
  12. .onMessage(
  13. Receptionist.Listing.class,
  14. (c, msg) -> {
  15. msg.getServiceInstances(PingService.pingServiceKey)
  16. .forEach(
  17. pingService ->
  18. context.spawnAnonymous(Pinger.createBehavior(pingService)));
  19. return Behaviors.same();
  20. })
  21. .build();
  22. })
  23. .narrow();
  24. }

每次注册一个新的PingService(本例中只有一次)时,守护者 Actor 都为每个当前已知的PingService生成一个PingerPinger发送一个Ping消息,当接收到Pong回复时,它将停止。

集群 Receptionist

Receptionist也在集群中工作,注册到Receptionist的 Actor 将出现在集群其他节点的Receptionist中。

Receptionist的状态通过分布式数据传播,这意味着每个节点最终都将达到每个ServiceKey的同一组 Actor。

与仅本地接收的一个重要区别是串行化问题,从另一个节点上的 Actor 发送和返回的所有消息都必须是可序列化的,具体请参阅「集群」。


英文原文链接Actor discovery.