事件总线

EventBus最初被认为是一种向 Actor 组发送消息的方法,它被概括为一组实现简单接口的抽象基类:

  1. /**
  2. * Attempts to register the subscriber to the specified Classifier
  3. *
  4. * @return true if successful and false if not (because it was already subscribed to that
  5. * Classifier, or otherwise)
  6. */
  7. public boolean subscribe(Subscriber subscriber, Classifier to);
  8. /**
  9. * Attempts to deregister the subscriber from the specified Classifier
  10. *
  11. * @return true if successful and false if not (because it wasn't subscribed to that Classifier,
  12. * or otherwise)
  13. */
  14. public boolean unsubscribe(Subscriber subscriber, Classifier from);
  15. /** Attempts to deregister the subscriber from all Classifiers it may be subscribed to */
  16. public void unsubscribe(Subscriber subscriber);
  17. /** Publishes the specified Event to this bus */
  18. public void publish(Event event);
  • 注释:请注意,EventBus不保留已发布消息的发送者。如果你需要原始发件人的引用,则必须在消息中提供。

此机制在 Akka 内的不同地方使用,例如「事件流」。实现可以使用下面介绍的特定构建基块。

事件总线(event bus)必须定义以下三个类型参数:

  • Event是在该总线上发布的所有事件的类型
  • Subscriber是允许在该事件总线上注册的订阅者类型
  • Classifier定义用于选择用于调度事件的订阅者的分类器

下面的特性在这些类型中仍然是通用的,但是需要为任何具体的实现定义它们。

分类器

这里介绍的分类器(classifiers)是 Akka 发行版的一部分,但是如果你没有找到完美的匹配,那么滚动你自己的分类器并不困难,请检查「Github」上现有分类器的实现情况。

查找分类

最简单的分类就是从每个事件中提取一个任意的分类器,并为每个可能的分类器维护一组订阅者。特征LookupClassification仍然是通用的,因为它抽象了如何比较订阅者以及如何准确地分类。

以下示例说明了需要实现的必要方法:

  1. import akka.event.japi.LookupEventBus;
  2. static class MsgEnvelope {
  3. public final String topic;
  4. public final Object payload;
  5. public MsgEnvelope(String topic, Object payload) {
  6. this.topic = topic;
  7. this.payload = payload;
  8. }
  9. }
  10. /**
  11. * Publishes the payload of the MsgEnvelope when the topic of the MsgEnvelope equals the String
  12. * specified when subscribing.
  13. */
  14. static class LookupBusImpl extends LookupEventBus<MsgEnvelope, ActorRef, String> {
  15. // is used for extracting the classifier from the incoming events
  16. @Override
  17. public String classify(MsgEnvelope event) {
  18. return event.topic;
  19. }
  20. // will be invoked for each event for all subscribers which registered themselves
  21. // for the event’s classifier
  22. @Override
  23. public void publish(MsgEnvelope event, ActorRef subscriber) {
  24. subscriber.tell(event.payload, ActorRef.noSender());
  25. }
  26. // must define a full order over the subscribers, expressed as expected from
  27. // `java.lang.Comparable.compare`
  28. @Override
  29. public int compareSubscribers(ActorRef a, ActorRef b) {
  30. return a.compareTo(b);
  31. }
  32. // determines the initial size of the index data structure
  33. // used internally (i.e. the expected number of different classifiers)
  34. @Override
  35. public int mapSize() {
  36. return 128;
  37. }
  38. }

此实现的测试可能如下所示:

  1. LookupBusImpl lookupBus = new LookupBusImpl();
  2. lookupBus.subscribe(getTestActor(), "greetings");
  3. lookupBus.publish(new MsgEnvelope("time", System.currentTimeMillis()));
  4. lookupBus.publish(new MsgEnvelope("greetings", "hello"));
  5. expectMsgEquals("hello");

如果不存在特定事件的订阅者,则此分类器是有效的。

子通道分类

如果分类器形成了一个层次结构,并且希望不仅可以在叶节点上订阅,那么这个分类可能就是正确的分类。这种分类是为分类器只是事件的 JVM 类而开发的,订阅者可能对订阅某个类的所有子类感兴趣,但它可以与任何分类器层次结构一起使用。

以下示例说明了需要实现的必要方法:

  1. import akka.event.japi.SubchannelEventBus;
  2. static class StartsWithSubclassification implements Subclassification<String> {
  3. @Override
  4. public boolean isEqual(String x, String y) {
  5. return x.equals(y);
  6. }
  7. @Override
  8. public boolean isSubclass(String x, String y) {
  9. return x.startsWith(y);
  10. }
  11. }
  12. /**
  13. * Publishes the payload of the MsgEnvelope when the topic of the MsgEnvelope starts with the
  14. * String specified when subscribing.
  15. */
  16. static class SubchannelBusImpl extends SubchannelEventBus<MsgEnvelope, ActorRef, String> {
  17. // Subclassification is an object providing `isEqual` and `isSubclass`
  18. // to be consumed by the other methods of this classifier
  19. @Override
  20. public Subclassification<String> subclassification() {
  21. return new StartsWithSubclassification();
  22. }
  23. // is used for extracting the classifier from the incoming events
  24. @Override
  25. public String classify(MsgEnvelope event) {
  26. return event.topic;
  27. }
  28. // will be invoked for each event for all subscribers which registered themselves
  29. // for the event’s classifier
  30. @Override
  31. public void publish(MsgEnvelope event, ActorRef subscriber) {
  32. subscriber.tell(event.payload, ActorRef.noSender());
  33. }
  34. }

此实现的测试可能如下所示:

  1. SubchannelBusImpl subchannelBus = new SubchannelBusImpl();
  2. subchannelBus.subscribe(getTestActor(), "abc");
  3. subchannelBus.publish(new MsgEnvelope("xyzabc", "x"));
  4. subchannelBus.publish(new MsgEnvelope("bcdef", "b"));
  5. subchannelBus.publish(new MsgEnvelope("abc", "c"));
  6. expectMsgEquals("c");
  7. subchannelBus.publish(new MsgEnvelope("abcdef", "d"));
  8. expectMsgEquals("d");

在没有为事件找到订阅者的情况下,该分类器也很有效,但它使用常规锁来同步内部分类器缓存,因此它不适合订阅以非常高的频率更改的情况(请记住,通过发送第一条消息“打开”分类器也必须重新检查所有以前的订阅)。

扫描分类

前一个分类器是为严格分层的多分类器订阅而构建的,如果有重叠的分类器覆盖事件空间的各个部分而不形成分层结构,则此分类器非常有用。

以下示例说明了需要实现的必要方法:

  1. import akka.event.japi.ScanningEventBus;
  2. /**
  3. * Publishes String messages with length less than or equal to the length specified when
  4. * subscribing.
  5. */
  6. static class ScanningBusImpl extends ScanningEventBus<String, ActorRef, Integer> {
  7. // is needed for determining matching classifiers and storing them in an
  8. // ordered collection
  9. @Override
  10. public int compareClassifiers(Integer a, Integer b) {
  11. return a.compareTo(b);
  12. }
  13. // is needed for storing subscribers in an ordered collection
  14. @Override
  15. public int compareSubscribers(ActorRef a, ActorRef b) {
  16. return a.compareTo(b);
  17. }
  18. // determines whether a given classifier shall match a given event; it is invoked
  19. // for each subscription for all received events, hence the name of the classifier
  20. @Override
  21. public boolean matches(Integer classifier, String event) {
  22. return event.length() <= classifier;
  23. }
  24. // will be invoked for each event for all subscribers which registered themselves
  25. // for the event’s classifier
  26. @Override
  27. public void publish(String event, ActorRef subscriber) {
  28. subscriber.tell(event, ActorRef.noSender());
  29. }
  30. }

此实现的测试可能如下所示:

  1. ScanningBusImpl scanningBus = new ScanningBusImpl();
  2. scanningBus.subscribe(getTestActor(), 3);
  3. scanningBus.publish("xyzabc");
  4. scanningBus.publish("ab");
  5. expectMsgEquals("ab");
  6. scanningBus.publish("abc");
  7. expectMsgEquals("abc");

这个分类器总是需要一个与订阅数量成比例的时间,与实际匹配的数量无关。

Actor 分类

这个分类最初是专门为实现「DeathWatch」而开发的:订阅者和分类器都是ActorRef类型的。

这种分类需要一个ActorSystem来执行与作为 Actor 的订阅者相关的簿记操作,而订阅者可以在不首先从EventBus取消订阅的情况下终止。ManagedActorClassification维护一个系统 Actor,自动处理取消订阅终止的 Actor。

以下示例说明了需要实现的必要方法:

  1. import akka.event.japi.ManagedActorEventBus;
  2. static class Notification {
  3. public final ActorRef ref;
  4. public final int id;
  5. public Notification(ActorRef ref, int id) {
  6. this.ref = ref;
  7. this.id = id;
  8. }
  9. }
  10. static class ActorBusImpl extends ManagedActorEventBus<Notification> {
  11. // the ActorSystem will be used for book-keeping operations, such as subscribers terminating
  12. public ActorBusImpl(ActorSystem system) {
  13. super(system);
  14. }
  15. // is used for extracting the classifier from the incoming events
  16. @Override
  17. public ActorRef classify(Notification event) {
  18. return event.ref;
  19. }
  20. // determines the initial size of the index data structure
  21. // used internally (i.e. the expected number of different classifiers)
  22. @Override
  23. public int mapSize() {
  24. return 128;
  25. }
  26. }

此实现的测试可能如下所示:

  1. ActorRef observer1 = new TestKit(system).getRef();
  2. ActorRef observer2 = new TestKit(system).getRef();
  3. TestKit probe1 = new TestKit(system);
  4. TestKit probe2 = new TestKit(system);
  5. ActorRef subscriber1 = probe1.getRef();
  6. ActorRef subscriber2 = probe2.getRef();
  7. ActorBusImpl actorBus = new ActorBusImpl(system);
  8. actorBus.subscribe(subscriber1, observer1);
  9. actorBus.subscribe(subscriber2, observer1);
  10. actorBus.subscribe(subscriber2, observer2);
  11. Notification n1 = new Notification(observer1, 100);
  12. actorBus.publish(n1);
  13. probe1.expectMsgEquals(n1);
  14. probe2.expectMsgEquals(n1);
  15. Notification n2 = new Notification(observer2, 101);
  16. actorBus.publish(n2);
  17. probe2.expectMsgEquals(n2);
  18. probe1.expectNoMessage(Duration.ofMillis(500));

这个分类器在事件类型中仍然是通用的,对于所有用例都是有效的。

事件流

事件流(event stream)是每个 Actor 系统的主要事件总线:它用于承载「日志消息」和「死信」,用户代码也可以将其用于其他目的。它使用子通道分类,允许注册到相关的信道集(用于RemotingLifecycleEvent)。下面的示例演示简单订阅的工作原理。给定一个简单的 Actor:

  1. import akka.actor.ActorRef;
  2. import akka.actor.ActorSystem;
  1. static class DeadLetterActor extends AbstractActor {
  2. @Override
  3. public Receive createReceive() {
  4. return receiveBuilder()
  5. .match(
  6. DeadLetter.class,
  7. msg -> {
  8. System.out.println(msg);
  9. })
  10. .build();
  11. }
  12. }

可以这样订阅:

  1. final ActorSystem system = ActorSystem.create("DeadLetters");
  2. final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
  3. system.getEventStream().subscribe(actor, DeadLetter.class);

值得指出的是,由于在事件流中实现子通道分类的方式,可以订阅一组事件,方法是订阅它们的公共超类,如下例所示:

  1. interface AllKindsOfMusic {}
  2. class Jazz implements AllKindsOfMusic {
  3. public final String artist;
  4. public Jazz(String artist) {
  5. this.artist = artist;
  6. }
  7. }
  8. class Electronic implements AllKindsOfMusic {
  9. public final String artist;
  10. public Electronic(String artist) {
  11. this.artist = artist;
  12. }
  13. }
  14. static class Listener extends AbstractActor {
  15. @Override
  16. public Receive createReceive() {
  17. return receiveBuilder()
  18. .match(
  19. Jazz.class,
  20. msg -> System.out.printf("%s is listening to: %s%n", getSelf().path().name(), msg))
  21. .match(
  22. Electronic.class,
  23. msg -> System.out.printf("%s is listening to: %s%n", getSelf().path().name(), msg))
  24. .build();
  25. }
  26. }
  27. final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
  28. system.getEventStream().subscribe(actor, DeadLetter.class);
  29. final ActorRef jazzListener = system.actorOf(Props.create(Listener.class));
  30. final ActorRef musicListener = system.actorOf(Props.create(Listener.class));
  31. system.getEventStream().subscribe(jazzListener, Jazz.class);
  32. system.getEventStream().subscribe(musicListener, AllKindsOfMusic.class);
  33. // only musicListener gets this message, since it listens to *all* kinds of music:
  34. system.getEventStream().publish(new Electronic("Parov Stelar"));
  35. // jazzListener and musicListener will be notified about Jazz:
  36. system.getEventStream().publish(new Jazz("Sonny Rollins"));

与 Actor 分类类似,EventStream将在订阅者终止时自动删除订阅者。

  • 注释:事件流是一个本地设施,这意味着它不会将事件分发到集群环境中的其他节点(除非你明确地向流订阅远程 Actor)。如果你需要在 Akka 集群中广播事件,而不明确地知道你的收件人(即获取他们的ActorRefs),你可能需要查看:「集群中的分布式发布订阅」。

默认处理程序

启动后,Actor 系统创建并订阅事件流的 Actor 以进行日志记录:这些是在application.conf中配置的处理程序:

  1. akka {
  2. loggers = ["akka.event.Logging$DefaultLogger"]
  3. }

此处按完全限定类名列出的处理程序将订阅优先级高于或等于配置的日志级别的所有日志事件类,并且在运行时更改日志级别时,它们的订阅将保持同步:

  1. system.eventStream.setLogLevel(Logging.DebugLevel());

这意味着对于一个不会被记录的级别,日志事件通常根本不会被调度(除非已经完成了对相应事件类的手动订阅)。

死信

如「停止 Actor」所述,Actor 在其死亡后终止或发送时排队的消息将重新路由到死信邮箱,默认情况下,死信邮箱将发布用死信包装的消息。此包装包含已重定向信封的原始发件人、收件人和消息。

一些内部消息(用死信抑制特性标记)不会像普通消息一样变成死信。这些是设计安全的,并且预期有时会到达一个终止的 Actor,因为它们不需要担心,所以它们被默认的死信记录机制抑制。

但是,如果你发现自己需要调试这些低级抑制死信(low level suppressed dead letters),仍然可以明确订阅它们:

  1. system.getEventStream().subscribe(actor, SuppressedDeadLetter.class);

或所有死信(包括被压制的):

  1. system.getEventStream().subscribe(actor, AllDeadLetters.class);

其他用途

事件流总是在那里并且随时可以使用,你可以发布自己的事件(它接受Object)并向监听器订阅相应的 JVM 类。


英文原文链接Event Bus.