事件总线

注:本节未经校验,如有问题欢迎提issue

最初设想是为了提供一种向多个actor群发消息的方法,之后EventBus被一般化为一组实现一个简单接口的可组合的特质:

  1. /**
  2. * Attempts to register the subscriber to the specified Classifier
  3. * @return true if successful and false if not (because it was already
  4. * subscribed to that Classifier, or otherwise)
  5. */
  6. def subscribe(subscriber: Subscriber, to: Classifier): Boolean
  7. /**
  8. * Attempts to deregister the subscriber from the specified Classifier
  9. * @return true if successful and false if not (because it wasn't subscribed
  10. * to that Classifier, or otherwise)
  11. */
  12. def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
  13. /**
  14. * Attempts to deregister the subscriber from all Classifiers it may be subscribed to
  15. */
  16. def unsubscribe(subscriber: Subscriber): Unit
  17. /**
  18. * Publishes the specified Event to this bus
  19. */
  20. def publish(event: Event): Unit

注意

请注意 EventBus 不会保留发布消息的发件人。如果你需要原始发件人的引用必须在消息内部提供。

这个机制在Akka的多个地方用到,例如事件流. 具体实现可以使用下面列出的特定构建工具块。

一个事件总线必须定义以下三种抽象类型:

  • Event 所有发布到该总线上的事件的类型
  • Subscriber 允许注册到该总线上的订阅者的类型
  • Classifier 定义用来派发消息时选择订阅者的分类器

下面的 trait 在这些类型中仍然是泛化的,但它们必须在任何具体的实现中被定义。

类别(Classifiers)

这里提到的类别是Akka发布包的一部分,如果没找到合适的就实现一个自己的类别,并不困难,到 [github](http://github.com/akka/akka/tree/v2.3.6/akka-actor/src/main/scala/akka/event/EventBus.scala) 了解已有类别的实现。

查找分类法

最简单的分类法是给每个事件提取一个随机的类别,并为每一种类别维护一组订阅者。这可以用在收音机上选台来类比。 LookupClassification trait 仍然是泛化的,抽象了如何比较订阅者,以及具体分类的方法。

需要实现的方法如下:

  1. import akka.event.EventBus
  2. import akka.event.LookupClassification
  3. case class MsgEnvelope(topic: String, payload: Any)
  4. /**
  5. * Publishes the payload of the MsgEnvelope when the topic of the
  6. * MsgEnvelope equals the String specified when subscribing.
  7. */
  8. class LookupBusImpl extends EventBus with LookupClassification {
  9. type Event = MsgEnvelope
  10. type Classifier = String
  11. type Subscriber = ActorRef
  12. // is used for extracting the classifier from the incoming events
  13. override protected def classify(event: Event): Classifier = event.topic
  14. // will be invoked for each event for all subscribers which registered themselves
  15. // for the event’s classifier
  16. override protected def publish(event: Event, subscriber: Subscriber): Unit = {
  17. subscriber ! event.payload
  18. }
  19. // must define a full order over the subscribers, expressed as expected from
  20. // `java.lang.Comparable.compare`
  21. override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
  22. a.compareTo(b)
  23. // determines the initial size of the index data structure
  24. // used internally (i.e. the expected number of different classifiers)
  25. override protected def mapSize: Int = 128
  26. }

对该实现的测试看上去像这样:

  1. val lookupBus = new LookupBusImpl
  2. lookupBus.subscribe(testActor, "greetings")
  3. lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis()))
  4. lookupBus.publish(MsgEnvelope("greetings", "hello"))
  5. expectMsg("hello")

这种分类法在对某个特定事件没有任何订阅者时是高效的。

子频道分类法

如果类别构成一个树形结构而且订阅可以不仅针对叶子结点,这种分类法可能是最合适的。 这可以类比成在安风格划分过的(多个)收音机频道中进行调台。 开发这种分类法是为了用在分类器正好是事件的JVM类,并且订阅者可能对订阅某个特定类的所有子类感兴趣的场合,但是它可以用在任何树形类别体系。

需要实现的方法如下:

  1. import akka.util.Subclassification
  2. class StartsWithSubclassification extends Subclassification[String] {
  3. override def isEqual(x: String, y: String): Boolean =
  4. x == y
  5. override def isSubclass(x: String, y: String): Boolean =
  6. x.startsWith(y)
  7. }
  8. import akka.event.SubchannelClassification
  9. /**
  10. * Publishes the payload of the MsgEnvelope when the topic of the
  11. * MsgEnvelope starts with the String specified when subscribing.
  12. */
  13. class SubchannelBusImpl extends EventBus with SubchannelClassification {
  14. type Event = MsgEnvelope
  15. type Classifier = String
  16. type Subscriber = ActorRef
  17. // Subclassification is an object providing `isEqual` and `isSubclass`
  18. // to be consumed by the other methods of this classifier
  19. override protected val subclassification: Subclassification[Classifier] =
  20. new StartsWithSubclassification
  21. // is used for extracting the classifier from the incoming events
  22. override protected def classify(event: Event): Classifier = event.topic
  23. // will be invoked for each event for all subscribers which registered
  24. // themselves for the event’s classifier
  25. override protected def publish(event: Event, subscriber: Subscriber): Unit = {
  26. subscriber ! event.payload
  27. }
  28. }

对该实现的测试看上去像这样:

  1. val subchannelBus = new SubchannelBusImpl
  2. subchannelBus.subscribe(testActor, "abc")
  3. subchannelBus.publish(MsgEnvelope("xyzabc", "x"))
  4. subchannelBus.publish(MsgEnvelope("bcdef", "b"))
  5. subchannelBus.publish(MsgEnvelope("abc", "c"))
  6. expectMsg("c")
  7. subchannelBus.publish(MsgEnvelope("abcdef", "d"))
  8. expectMsg("d")

这种分类法在某事件没有任何订阅者时也很高效,但它使用一个保守锁来对共内部的类别缓存进行同步,所以不适合订阅关系以很高的频率变化的场合(记住通过发送第一个消息来“打开”一个类别,也将需要重新检查所有之前的订阅)。

扫描分类法

上一种分类法是为严格的树形多类别订阅设计的,而这个分类法是用在覆盖事件空间中可能互相重叠的非树形结构类别上的。 这可以比喻为有地理限制的(比如老旧的收音机信号传播方式)的(可能有多个)收音机频道之间进行调台。

需要实现的方法如下:

  1. import akka.event.ScanningClassification
  2. /**
  3. * Publishes String messages with length less than or equal to the length
  4. * specified when subscribing.
  5. */
  6. class ScanningBusImpl extends EventBus with ScanningClassification {
  7. type Event = String
  8. type Classifier = Int
  9. type Subscriber = ActorRef
  10. // is needed for determining matching classifiers and storing them in an
  11. // ordered collection
  12. override protected def compareClassifiers(a: Classifier, b: Classifier): Int =
  13. if (a < b) -1 else if (a == b) 0 else 1
  14. // is needed for storing subscribers in an ordered collection
  15. override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
  16. a.compareTo(b)
  17. // determines whether a given classifier shall match a given event; it is invoked
  18. // for each subscription for all received events, hence the name of the classifier
  19. override protected def matches(classifier: Classifier, event: Event): Boolean =
  20. event.length <= classifier
  21. // will be invoked for each event for all subscribers which registered themselves
  22. // for a classifier matching this event
  23. override protected def publish(event: Event, subscriber: Subscriber): Unit = {
  24. subscriber ! event
  25. }
  26. }

对该实现的测试看上去像这样:

  1. val scanningBus = new ScanningBusImpl
  2. scanningBus.subscribe(testActor, 3)
  3. scanningBus.publish("xyzabc")
  4. scanningBus.publish("ab")
  5. expectMsg("ab")
  6. scanningBus.publish("abc")
  7. expectMsg("abc")

这种分类法耗费的时间总是与订阅关系的数量成正比,而与实际匹配的数量无关。

Actor 分类法

这种分类法原来是专门为实现 DeathWatch开发的: 订阅者和类别都是 ActorRef 类型.

需要实现的方法如下:

  1. import akka.event.ActorEventBus
  2. import akka.event.ActorClassification
  3. import akka.event.ActorClassifier
  4. case class Notification(ref: ActorRef, id: Int)
  5. class ActorBusImpl extends ActorEventBus with ActorClassifier with ActorClassification {
  6. type Event = Notification
  7. // is used for extracting the classifier from the incoming events
  8. override protected def classify(event: Event): ActorRef = event.ref
  9. // determines the initial size of the index data structure
  10. // used internally (i.e. the expected number of different classifiers)
  11. override protected def mapSize: Int = 128
  12. }

对该实现的测试看上去像这样:

  1. val observer1 = TestProbe().ref
  2. val observer2 = TestProbe().ref
  3. val probe1 = TestProbe()
  4. val probe2 = TestProbe()
  5. val subscriber1 = probe1.ref
  6. val subscriber2 = probe2.ref
  7. val actorBus = new ActorBusImpl
  8. actorBus.subscribe(subscriber1, observer1)
  9. actorBus.subscribe(subscriber2, observer1)
  10. actorBus.subscribe(subscriber2, observer2)
  11. actorBus.publish(Notification(observer1, 100))
  12. probe1.expectMsg(Notification(observer1, 100))
  13. probe2.expectMsg(Notification(observer1, 100))
  14. actorBus.publish(Notification(observer2, 101))
  15. probe2.expectMsg(Notification(observer2, 101))
  16. probe1.expectNoMsg(500.millis)

这种分类器对事件类型仍然是泛化的,它在所有的场合下都是高效的。

事件流" class="reference-link">事件流

事件流是每个actor系统的主事件总线:它用来携带 日志消息死信,并且也可以被用户代码使用来达到其它目的。它使用 子频道分类法 使得可以向一组相关的频道进行注册 (就象 RemoteLifeCycleMessage 所用的那样). 以下例子演示了一个简单的订阅是如何工作的:

  1. import akka.actor.{ Actor, DeadLetter, Props }
  2. class Listener extends Actor {
  3. def receive = {
  4. case d: DeadLetter => println(d)
  5. }
  6. }
  7. val listener = system.actorOf(Props(classOf[Listener], this))
  8. system.eventStream.subscribe(listener, classOf[DeadLetter])
缺省的处理器

actor系统在启动时会创建一些actor,并为其在事件流上订阅日志消息:这些是缺省的处理器,可以配置在例如application.conf:

  1. akka {
  2. loggers = ["akka.event.Logging$DefaultLogger"]
  3. }

这里以全路径类名列出的处理器将订阅所有配置的日志级别以上的日志事件类,并且当日志级别在运行时被修改时这些订阅关系也会同步修改:

  1. system.eventStream.setLogLevel(Logging.DebugLevel)

这意味着一个低于日志级别的日志事件,事实上根本不会被派发(除非专门为相应的事件类进行了手工订阅)

死信" class="reference-link">死信

正如 终止actors所述, 当actor终止后其邮箱队列中的剩余消息及后续被发送的消息都将被发送到死信邮箱, 它缺省情况下会发布打包在 DeadLetter 中的消息. 这种打包动作会保留被重定向消息的原始发送者、接收者以及消息内容。

其它用处

事件流一直存在并可用,你可以向它发布你自己的事件 (它接受 AnyRef) 并对相应的JVM类添加订阅监听器。