邮箱

一个Akka Mailbox保存发往某个Actor的消息。通常每个Actor都拥有自己的邮箱,但也有例外,例如使用BalancingPool的所有路由子(routee)共享同一个邮箱实例。

邮箱选择

为actor指定一个消息队列类型

为某个特定类型的actor指定一个特定类型的消息队列是有可能的,只要通过actor扩展RequiresMessageQueue参数化特质即可。下面是一个示例:

  1. import akka.dispatch.RequiresMessageQueue
  2. import akka.dispatch.BoundedMessageQueueSemantics
  3. class MyBoundedActor extends MyActor
  4. with RequiresMessageQueue[BoundedMessageQueueSemantics]

RequiresMessageQueue特质的类型参数需要映射到配置中的邮箱,像这样:

  1. bounded-mailbox {
  2. mailbox-type = "akka.dispatch.BoundedMailbox"
  3. mailbox-capacity = 1000
  4. mailbox-push-timeout-time = 10s
  5. }
  6. akka.actor.mailbox.requirements {
  7. "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
  8. }

现在每当你创建一个类型为MyBoundedActor的actor时,它会尝试得到一个有界的邮箱。如果actor在部署中具有一个不同的邮箱配置——直接地,或是通过一个指定的邮箱类型的调度器——那么它将覆盖此映射。

注意

为actor创建的邮箱队列类型,会和特质中要求的类型进行检查,如果队列没有实现要求的类型,则actor创建会失败。

为调度器指定一个消息队列类型

调度器也可能对actor使用的邮箱类型进行限制。一个例子是 BalancingDispatcher,它要求消息队列对多个并发的消费者是线程安全的。该约束是在配置的调度器一节中像下面这样设定:

  1. my-dispatcher {
  2. mailbox-requirement = org.example.MyInterface
  3. }

给定的约束是要求指定的类或者接口,必须是消息队列的实现的超类。如果出现冲突——例如如果actor要求的邮箱类型不能满足要求——则actor创建会失败。

如何选择邮箱类型

当一个actor创建时,ActorRefProvider首先确定将执行它的调度器。然后,邮箱确定如下:

  1. 如果actor的部署配置节包含mailbox键,则其描述邮箱类型将被使用。
  2. 如果actor的Props包含邮箱选择——即它调用了withMailbox——则其描述邮箱类型将被使用。
  3. 如果调度器的配置节包含mailbox-type键,则该节内容将用于配置邮箱类型。
  4. 如果该actor需要邮箱类型,如上文所述,然后该约束的映射将用于确定使用的邮箱类型;如果不能满足调度器的约束——如果有的话——将继续替换尝试。
  5. 如果调度器需要一个邮箱类型,如上文所述,则该约束的映射将被用来确定要使用的邮箱类型。
  6. 将使用默认邮箱akka.actor.default-mailbox
默认邮箱

当未如上所述,指定使用邮箱时,将使用默认邮箱。默认情况它是无界的邮箱,由java.util.concurrent.ConcurrentLinkedQueue实现。

SingleConsumerOnlyUnboundedMailbox是一个更有效率的邮箱,而且它也可以用作默认邮箱,但它不能与 BalancingDispatcher一起使用。

SingleConsumerOnlyUnboundedMailbox作为默认邮箱的配置:

  1. akka.actor.default-mailbox {
  2. mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  3. }
哪项配置会传递给邮箱类型

每个邮箱类型由扩展MailboxType并带两个构造函数参数的类实现:一个ActorSystem.Settings对象和一个Config对象。后者通过actor系统配置的指定配置节被计算出来,重写其指定邮箱类型的配置路径的id键,并添加一个到默认邮箱配置节的回退。

内置实现

Akka自带有一些内置的邮箱实现:

  • UnboundedMailbox

    • 默认邮箱
    • 底层是一个java.util.concurrent.ConcurrentLinkedQueue
    • 阻塞: 否
    • 有界: 否
    • 配置名称:”unbounded” 或 “akka.dispatch.UnboundedMailbox”
  • SingleConsumerOnlyUnboundedMailbox

    • 底层是一个非常高效的多生产者单消费者队列,不能被用于BalancingDispatcher
    • 阻塞: 否
    • 有界: 否
    • 配置名称:”akka.dispatch.SingleConsumerOnlyUnboundedMailbox”
  • BoundedMailbox

    • 底层是一个java.util.concurrent.LinkedBlockingQueue
    • 阻塞: 是
    • 有界: 是
    • 配置名称:”bounded” 或 “akka.dispatch.BoundedMailbox”
  • UnboundedPriorityMailbox

    • 底层是一个java.util.concurrent.PriorityBlockingQueue
    • 阻塞: 是
    • 有界: 否
    • 配置名称:”akka.dispatch.UnboundedPriorityMailbox”
  • BoundedPriorityMailbox

    • 底层是一个 java.util.PriorityBlockingQueue包装为akka.util.BoundedBlockingQueue
    • 阻塞: 是
    • 有界: 是
    • 配置名称:”akka.dispatch.BoundedPriorityMailbox”

邮箱配置示例

如何创建一个PriorityMailbox:

  1. import akka.dispatch.PriorityGenerator
  2. import akka.dispatch.UnboundedPriorityMailbox
  3. import com.typesafe.config.Config
  4. // We inherit, in this case, from UnboundedPriorityMailbox
  5. // and seed it with the priority generator
  6. class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
  7. extends UnboundedPriorityMailbox(
  8. // Create a new PriorityGenerator, lower prio means more important
  9. PriorityGenerator {
  10. // 'highpriority messages should be treated first if possible
  11. case 'highpriority => 0
  12. // 'lowpriority messages should be treated last if possible
  13. case 'lowpriority => 2
  14. // PoisonPill when no other left
  15. case PoisonPill => 3
  16. // We default to 1, which is in between high and low
  17. case otherwise => 1
  18. })

并添加到配置文件:

  1. prio-dispatcher {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other dispatcher configuration goes here
  4. }

以下示例演示如何使用它:

  1. // We create a new Actor that just prints out what it processes
  2. class Logger extends Actor {
  3. val log: LoggingAdapter = Logging(context.system, this)
  4. self ! 'lowpriority
  5. self ! 'lowpriority
  6. self ! 'highpriority
  7. self ! 'pigdog
  8. self ! 'pigdog2
  9. self ! 'pigdog3
  10. self ! 'highpriority
  11. self ! PoisonPill
  12. def receive = {
  13. case x => log.info(x.toString)
  14. }
  15. }
  16. val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
  17. "prio-dispatcher"))
  18. /*
  19. * Logs:
  20. * 'highpriority
  21. * 'highpriority
  22. * 'pigdog
  23. * 'pigdog2
  24. * 'pigdog3
  25. * 'lowpriority
  26. * 'lowpriority
  27. */

也可以像这样直接配置邮箱类型:

  1. prio-mailbox {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other mailbox configuration goes here
  4. }
  5. akka.actor.deployment {
  6. /priomailboxactor {
  7. mailbox = prio-mailbox
  8. }
  9. }

然后可以在部署环境像这样使用它:

  1. import akka.actor.Props
  2. val myActor = context.actorOf(Props[MyActor], "priomailboxactor")

或像这样在代码中:

  1. import akka.actor.Props
  2. val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))

创建自己的邮箱类型

例子比文字更有说服力:

  1. import akka.actor.ActorRef
  2. import akka.actor.ActorSystem
  3. import akka.dispatch.Envelope
  4. import akka.dispatch.MailboxType
  5. import akka.dispatch.MessageQueue
  6. import akka.dispatch.ProducesMessageQueue
  7. import com.typesafe.config.Config
  8. import java.util.concurrent.ConcurrentLinkedQueue
  9. import scala.Option
  10. // Marker trait used for mailbox requirements mapping
  11. trait MyUnboundedMessageQueueSemantics
  12. object MyUnboundedMailbox {
  13. // This is the MessageQueue implementation
  14. class MyMessageQueue extends MessageQueue
  15. with MyUnboundedMessageQueueSemantics {
  16. private final val queue = new ConcurrentLinkedQueue[Envelope]()
  17. // these should be implemented; queue used as example
  18. def enqueue(receiver: ActorRef, handle: Envelope): Unit =
  19. queue.offer(handle)
  20. def dequeue(): Envelope = queue.poll()
  21. def numberOfMessages: Int = queue.size
  22. def hasMessages: Boolean = !queue.isEmpty
  23. def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
  24. while (hasMessages) {
  25. deadLetters.enqueue(owner, dequeue())
  26. }
  27. }
  28. }
  29. }
  30. // This is the Mailbox implementation
  31. class MyUnboundedMailbox extends MailboxType
  32. with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
  33. import MyUnboundedMailbox._
  34. // This constructor signature must exist, it will be called by Akka
  35. def this(settings: ActorSystem.Settings, config: Config) = {
  36. // put your initialization code here
  37. this()
  38. }
  39. // The create method is called to create the MessageQueue
  40. final override def create(owner: Option[ActorRef],
  41. system: Option[ActorSystem]): MessageQueue =
  42. new MyMessageQueue()
  43. }

然后在派发器配置中,或邮箱配置中以你定义的邮箱类型的全称作为“mailbox-type”的值。

注意

一定要定义一个以akka.actor.ActorSystem.Settingscom.typesafe.config.Config为参数的构造函数,该构造函数将以反射的方式被调用来创建你的邮箱类型。第二个传入的配置参数是配置文件中使用这个邮箱类型的派发器或邮箱的描述;该邮箱类型会为每一个使用它的派发器或邮箱创建一个实例。

此外可以作为派发器的约束这样使用邮箱:

  1. custom-dispatcher {
  2. mailbox-requirement =
  3. "docs.dispatcher.MyUnboundedJMessageQueueSemantics"
  4. }
  5. akka.actor.mailbox.requirements {
  6. "docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
  7. custom-dispatcher-mailbox
  8. }
  9. custom-dispatcher-mailbox {
  10. mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
  11. }

或通过在你的actor类上定义约束,像这样:

  1. class MySpecialActor extends Actor
  2. with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
  3. // ...
  4. }

system.actorOf的特殊语义

为了使system.actorOf同步和非阻塞,并且返回类型保持为ActorRef(并且保持返回的ref是完全函数式的语义),这种情况下会进行特殊处理。在幕后,一种空心的actor引用会被构造,然后发送到系统的监管者,该监管者实际创建actor和它的上下文,并把它们传到引用内部。直到这些发生以后,发送到该ActorRef的消息会被本地排队,而只有当填入真实信息后,它们才会被传入真实的邮箱,因此,

  1. val props: Props = ...
  2. // this actor uses MyCustomMailbox, which is assumed to be a singleton
  3. system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
  4. assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")

可能会失败;你将不得不留出一些时间来传递,然后重新尝试检查TestKit.awaitCond