路由

依赖

为了使用 Akka Actor 类型,你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-actor-typed_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.23"

简介

在某些情况下,将同一类型的消息分发到一组 Actor 上是很有用的,这样消息就可以并行处理,单个 Actor 一次只处理一条消息。

路由(router)本身就是一种行为,它衍生到一个正在运行的 Actor 身上,然后将发送给它的任何消息转发到路由集之外的最后一个收件人。

Akka 型路由包括两种类型的路由:池路由和组路由。

池路由

池路由是用一个路由Behavior创建的,并产生许多具有该行为的子代,然后将消息转发给这些子代。

如果停止子进程,池路由会将其从其一组路由中删除。当最后一个子节点停止时,路由本身停止。为了制造一个处理故障的弹性路由,必须监视路由的Behavior

  1. import akka.actor.typed.ActorRef;
  2. import akka.actor.typed.Behavior;
  3. import akka.actor.typed.SupervisorStrategy;
  4. import akka.actor.typed.javadsl.Behaviors;
  5. import akka.actor.typed.javadsl.GroupRouter;
  6. import akka.actor.typed.javadsl.PoolRouter;
  7. import akka.actor.typed.javadsl.Routers;
  8. import akka.actor.typed.receptionist.Receptionist;
  9. import akka.actor.typed.receptionist.ServiceKey;
  10. class Worker {
  11. interface Command {}
  12. static class DoLog implements Command {
  13. public final String text;
  14. public DoLog(String text) {
  15. this.text = text;
  16. }
  17. }
  18. static final Behavior<Command> behavior =
  19. Behaviors.setup(
  20. context -> {
  21. context.getLog().info("Starting worker");
  22. return Behaviors.receive(Command.class)
  23. .onMessage(
  24. DoLog.class,
  25. (notUsed, doLog) -> {
  26. context.getLog().info("Got message {}", doLog.text);
  27. return Behaviors.same();
  28. })
  29. .build();
  30. });
  31. }
  32. // make sure the workers are restarted if they fail
  33. Behavior<Worker.Command> supervisedWorker =
  34. Behaviors.supervise(Worker.behavior).onFailure(SupervisorStrategy.restart());
  35. PoolRouter<Worker.Command> pool = Routers.pool(4, supervisedWorker);
  36. ActorRef<Worker.Command> router = context.spawn(pool, "worker-pool");
  37. for (int i = 0; i < 10; i++) {
  38. router.tell(new Worker.DoLog("msg " + i));
  39. }

组路由

组路由是用ServiceKey创建的,它使用Receptionist来发现该键的可用 Actor,并将消息路由到当前已知的已注册 Actor 之一。

由于使用了Receptionist,这意味着组路由可以随时识别集群,并将接收集群中任何节点上注册的路由,目前没有逻辑来避免路由到不可访问的节点,请参见「#26355」。

这也意味着路由的集合最终是一致的,并且当组路由器启动时,它知道的路由集合立即是空的。当路由器集为空时,发送到路由的消息将被转发到死信。

  1. // this would likely happen elsewhere - if we create it locally we
  2. // can just as well use a pool
  3. ActorRef<Worker.Command> worker = context.spawn(Worker.behavior, "worker");
  4. context.getSystem().receptionist().tell(Receptionist.register(serviceKey, worker));
  5. GroupRouter<Worker.Command> group = Routers.group(serviceKey);
  6. ActorRef<Worker.Command> router = context.spawn(group, "worker-group");
  7. // note that since registration of workers goes through the receptionist there is no
  8. // guarantee the router has seen any workers yet if we hit it directly like this and
  9. // these messages may end up in dead letters - in a real application you would not use
  10. // a group router like this - it is to keep the sample simple
  11. for (int i = 0; i < 10; i++) {
  12. router.tell(new Worker.DoLog("msg " + i));
  13. }

路由策略

有两种不同的策略用于选择转发消息的路由,在生成消息之前可以从路由中选择该路由:

  1. PoolRouter<Worker.Command> alternativePool = pool.withPoolSize(2).withRoundRobinRouting();

Round Robin

在一组路由上旋转,确保如果有n个路由,那么对于通过路由发送的n条消息,每个 Actor 都被转发一条消息。

这是池路由的默认路由。

Random

通过路由发送消息时随机选择路由。

这是组路由的默认值路由。

路由和性能

注意,如果路由共享一个资源,那么资源将决定增加 Actor 的数量是否会实际提供更高的吞吐量或更快的答案。例如,如果路由是 CPU 绑定的 Actor,那么创建更多的路由的性能不会比执行 Actor 的线程更好。

由于路由本身是一个 Actor 并且有一个邮箱,这意味着消息按顺序路由到路由器,在那里可以并行处理(取决于调度程序中可用的线程)。在高吞吐量的用例中,顺序路由可能是瓶颈。Akka 类型没有为此提供优化的工具。


英文原文链接Routers.