
EventBus最初被认为是一种向 Actor 组发送消息的方法,它被概括为一组实现简单接口的抽象基类:

  1. /**
  2. * Attempts to register the subscriber to the specified Classifier
  3. *
  4. * @return true if successful and false if not (because it was already subscribed to that
  5. * Classifier, or otherwise)
  6. */
  7. public boolean subscribe(Subscriber subscriber, Classifier to);
  8. /**
  9. * Attempts to deregister the subscriber from the specified Classifier
  10. *
  11. * @return true if successful and false if not (because it wasn't subscribed to that Classifier,
  12. * or otherwise)
  13. */
  14. public boolean unsubscribe(Subscriber subscriber, Classifier from);
  15. /** Attempts to deregister the subscriber from all Classifiers it may be subscribed to */
  16. public void unsubscribe(Subscriber subscriber);
  17. /** Publishes the specified Event to this bus */
  18. public void publish(Event event);
  • 注释:请注意,EventBus不保留已发布消息的发送者。如果你需要原始发件人的引用,则必须在消息中提供。

此机制在 Akka 内的不同地方使用,例如「事件流」。实现可以使用下面介绍的特定构建基块。

事件总线(event bus)必须定义以下三个类型参数:

  • Event是在该总线上发布的所有事件的类型
  • Subscriber是允许在该事件总线上注册的订阅者类型
  • Classifier定义用于选择用于调度事件的订阅者的分类器



这里介绍的分类器(classifiers)是 Akka 发行版的一部分,但是如果你没有找到完美的匹配,那么滚动你自己的分类器并不困难,请检查「Github」上现有分类器的实现情况。




  1. import akka.event.japi.LookupEventBus;
  2. static class MsgEnvelope {
  3. public final String topic;
  4. public final Object payload;
  5. public MsgEnvelope(String topic, Object payload) {
  6. this.topic = topic;
  7. this.payload = payload;
  8. }
  9. }
  10. /**
  11. * Publishes the payload of the MsgEnvelope when the topic of the MsgEnvelope equals the String
  12. * specified when subscribing.
  13. */
  14. static class LookupBusImpl extends LookupEventBus<MsgEnvelope, ActorRef, String> {
  15. // is used for extracting the classifier from the incoming events
  16. @Override
  17. public String classify(MsgEnvelope event) {
  18. return event.topic;
  19. }
  20. // will be invoked for each event for all subscribers which registered themselves
  21. // for the event’s classifier
  22. @Override
  23. public void publish(MsgEnvelope event, ActorRef subscriber) {
  24. subscriber.tell(event.payload, ActorRef.noSender());
  25. }
  26. // must define a full order over the subscribers, expressed as expected from
  27. // `java.lang.Comparable.compare`
  28. @Override
  29. public int compareSubscribers(ActorRef a, ActorRef b) {
  30. return a.compareTo(b);
  31. }
  32. // determines the initial size of the index data structure
  33. // used internally (i.e. the expected number of different classifiers)
  34. @Override
  35. public int mapSize() {
  36. return 128;
  37. }
  38. }


  1. LookupBusImpl lookupBus = new LookupBusImpl();
  2. lookupBus.subscribe(getTestActor(), "greetings");
  3. lookupBus.publish(new MsgEnvelope("time", System.currentTimeMillis()));
  4. lookupBus.publish(new MsgEnvelope("greetings", "hello"));
  5. expectMsgEquals("hello");



如果分类器形成了一个层次结构,并且希望不仅可以在叶节点上订阅,那么这个分类可能就是正确的分类。这种分类是为分类器只是事件的 JVM 类而开发的,订阅者可能对订阅某个类的所有子类感兴趣,但它可以与任何分类器层次结构一起使用。


  1. import akka.event.japi.SubchannelEventBus;
  2. static class StartsWithSubclassification implements Subclassification<String> {
  3. @Override
  4. public boolean isEqual(String x, String y) {
  5. return x.equals(y);
  6. }
  7. @Override
  8. public boolean isSubclass(String x, String y) {
  9. return x.startsWith(y);
  10. }
  11. }
  12. /**
  13. * Publishes the payload of the MsgEnvelope when the topic of the MsgEnvelope starts with the
  14. * String specified when subscribing.
  15. */
  16. static class SubchannelBusImpl extends SubchannelEventBus<MsgEnvelope, ActorRef, String> {
  17. // Subclassification is an object providing `isEqual` and `isSubclass`
  18. // to be consumed by the other methods of this classifier
  19. @Override
  20. public Subclassification<String> subclassification() {
  21. return new StartsWithSubclassification();
  22. }
  23. // is used for extracting the classifier from the incoming events
  24. @Override
  25. public String classify(MsgEnvelope event) {
  26. return event.topic;
  27. }
  28. // will be invoked for each event for all subscribers which registered themselves
  29. // for the event’s classifier
  30. @Override
  31. public void publish(MsgEnvelope event, ActorRef subscriber) {
  32. subscriber.tell(event.payload, ActorRef.noSender());
  33. }
  34. }


  1. SubchannelBusImpl subchannelBus = new SubchannelBusImpl();
  2. subchannelBus.subscribe(getTestActor(), "abc");
  3. subchannelBus.publish(new MsgEnvelope("xyzabc", "x"));
  4. subchannelBus.publish(new MsgEnvelope("bcdef", "b"));
  5. subchannelBus.publish(new MsgEnvelope("abc", "c"));
  6. expectMsgEquals("c");
  7. subchannelBus.publish(new MsgEnvelope("abcdef", "d"));
  8. expectMsgEquals("d");





  1. import akka.event.japi.ScanningEventBus;
  2. /**
  3. * Publishes String messages with length less than or equal to the length specified when
  4. * subscribing.
  5. */
  6. static class ScanningBusImpl extends ScanningEventBus<String, ActorRef, Integer> {
  7. // is needed for determining matching classifiers and storing them in an
  8. // ordered collection
  9. @Override
  10. public int compareClassifiers(Integer a, Integer b) {
  11. return a.compareTo(b);
  12. }
  13. // is needed for storing subscribers in an ordered collection
  14. @Override
  15. public int compareSubscribers(ActorRef a, ActorRef b) {
  16. return a.compareTo(b);
  17. }
  18. // determines whether a given classifier shall match a given event; it is invoked
  19. // for each subscription for all received events, hence the name of the classifier
  20. @Override
  21. public boolean matches(Integer classifier, String event) {
  22. return event.length() <= classifier;
  23. }
  24. // will be invoked for each event for all subscribers which registered themselves
  25. // for the event’s classifier
  26. @Override
  27. public void publish(String event, ActorRef subscriber) {
  28. subscriber.tell(event, ActorRef.noSender());
  29. }
  30. }


  1. ScanningBusImpl scanningBus = new ScanningBusImpl();
  2. scanningBus.subscribe(getTestActor(), 3);
  3. scanningBus.publish("xyzabc");
  4. scanningBus.publish("ab");
  5. expectMsgEquals("ab");
  6. scanningBus.publish("abc");
  7. expectMsgEquals("abc");


Actor 分类


这种分类需要一个ActorSystem来执行与作为 Actor 的订阅者相关的簿记操作,而订阅者可以在不首先从EventBus取消订阅的情况下终止。ManagedActorClassification维护一个系统 Actor,自动处理取消订阅终止的 Actor。


  1. import akka.event.japi.ManagedActorEventBus;
  2. static class Notification {
  3. public final ActorRef ref;
  4. public final int id;
  5. public Notification(ActorRef ref, int id) {
  6. this.ref = ref;
  7. this.id = id;
  8. }
  9. }
  10. static class ActorBusImpl extends ManagedActorEventBus<Notification> {
  11. // the ActorSystem will be used for book-keeping operations, such as subscribers terminating
  12. public ActorBusImpl(ActorSystem system) {
  13. super(system);
  14. }
  15. // is used for extracting the classifier from the incoming events
  16. @Override
  17. public ActorRef classify(Notification event) {
  18. return event.ref;
  19. }
  20. // determines the initial size of the index data structure
  21. // used internally (i.e. the expected number of different classifiers)
  22. @Override
  23. public int mapSize() {
  24. return 128;
  25. }
  26. }


  1. ActorRef observer1 = new TestKit(system).getRef();
  2. ActorRef observer2 = new TestKit(system).getRef();
  3. TestKit probe1 = new TestKit(system);
  4. TestKit probe2 = new TestKit(system);
  5. ActorRef subscriber1 = probe1.getRef();
  6. ActorRef subscriber2 = probe2.getRef();
  7. ActorBusImpl actorBus = new ActorBusImpl(system);
  8. actorBus.subscribe(subscriber1, observer1);
  9. actorBus.subscribe(subscriber2, observer1);
  10. actorBus.subscribe(subscriber2, observer2);
  11. Notification n1 = new Notification(observer1, 100);
  12. actorBus.publish(n1);
  13. probe1.expectMsgEquals(n1);
  14. probe2.expectMsgEquals(n1);
  15. Notification n2 = new Notification(observer2, 101);
  16. actorBus.publish(n2);
  17. probe2.expectMsgEquals(n2);
  18. probe1.expectNoMessage(Duration.ofMillis(500));



事件流(event stream)是每个 Actor 系统的主要事件总线:它用于承载「日志消息」和「死信」,用户代码也可以将其用于其他目的。它使用子通道分类,允许注册到相关的信道集(用于RemotingLifecycleEvent)。下面的示例演示简单订阅的工作原理。给定一个简单的 Actor:

  1. import akka.actor.ActorRef;
  2. import akka.actor.ActorSystem;
  1. static class DeadLetterActor extends AbstractActor {
  2. @Override
  3. public Receive createReceive() {
  4. return receiveBuilder()
  5. .match(
  6. DeadLetter.class,
  7. msg -> {
  8. System.out.println(msg);
  9. })
  10. .build();
  11. }
  12. }


  1. final ActorSystem system = ActorSystem.create("DeadLetters");
  2. final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
  3. system.getEventStream().subscribe(actor, DeadLetter.class);


  1. interface AllKindsOfMusic {}
  2. class Jazz implements AllKindsOfMusic {
  3. public final String artist;
  4. public Jazz(String artist) {
  5. this.artist = artist;
  6. }
  7. }
  8. class Electronic implements AllKindsOfMusic {
  9. public final String artist;
  10. public Electronic(String artist) {
  11. this.artist = artist;
  12. }
  13. }
  14. static class Listener extends AbstractActor {
  15. @Override
  16. public Receive createReceive() {
  17. return receiveBuilder()
  18. .match(
  19. Jazz.class,
  20. msg -> System.out.printf("%s is listening to: %s%n", getSelf().path().name(), msg))
  21. .match(
  22. Electronic.class,
  23. msg -> System.out.printf("%s is listening to: %s%n", getSelf().path().name(), msg))
  24. .build();
  25. }
  26. }
  27. final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
  28. system.getEventStream().subscribe(actor, DeadLetter.class);
  29. final ActorRef jazzListener = system.actorOf(Props.create(Listener.class));
  30. final ActorRef musicListener = system.actorOf(Props.create(Listener.class));
  31. system.getEventStream().subscribe(jazzListener, Jazz.class);
  32. system.getEventStream().subscribe(musicListener, AllKindsOfMusic.class);
  33. // only musicListener gets this message, since it listens to *all* kinds of music:
  34. system.getEventStream().publish(new Electronic("Parov Stelar"));
  35. // jazzListener and musicListener will be notified about Jazz:
  36. system.getEventStream().publish(new Jazz("Sonny Rollins"));

与 Actor 分类类似,EventStream将在订阅者终止时自动删除订阅者。

  • 注释:事件流是一个本地设施,这意味着它不会将事件分发到集群环境中的其他节点(除非你明确地向流订阅远程 Actor)。如果你需要在 Akka 集群中广播事件,而不明确地知道你的收件人(即获取他们的ActorRefs),你可能需要查看:「集群中的分布式发布订阅」。


启动后,Actor 系统创建并订阅事件流的 Actor 以进行日志记录:这些是在application.conf中配置的处理程序:

  1. akka {
  2. loggers = ["akka.event.Logging$DefaultLogger"]
  3. }


  1. system.eventStream.setLogLevel(Logging.DebugLevel());



如「停止 Actor」所述,Actor 在其死亡后终止或发送时排队的消息将重新路由到死信邮箱,默认情况下,死信邮箱将发布用死信包装的消息。此包装包含已重定向信封的原始发件人、收件人和消息。

一些内部消息(用死信抑制特性标记)不会像普通消息一样变成死信。这些是设计安全的,并且预期有时会到达一个终止的 Actor,因为它们不需要担心,所以它们被默认的死信记录机制抑制。

但是,如果你发现自己需要调试这些低级抑制死信(low level suppressed dead letters),仍然可以明确订阅它们:

  1. system.getEventStream().subscribe(actor, SuppressedDeadLetter.class);


  1. system.getEventStream().subscribe(actor, AllDeadLetters.class);


事件流总是在那里并且随时可以使用,你可以发布自己的事件(它接受Object)并向监听器订阅相应的 JVM 类。

