Stash

依赖

为了使用 Akka Actor 类型,你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-actor-typed_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.23"

简介

Stash使 Actor 能够临时缓冲所有或某些不能或不应该使用 Actor 当前行为处理的消息。

当这一点很有用,一个典型的例子是,如果 Actor 在接受第一条真实消息之前加载了一些初始状态或者初始化了一些资源;另一个例子是,Actor 在处理下一条消息之前正在等待完成某件事情。

让我们用一个例子来说明这两个问题。它是一个 Actor,就像对存储在数据库中的值的单点访问一样使用。启动后,它从数据库中加载当前状态,并在等待该初始值的同时,所有传入的消息都被存储起来。

当一个新的状态保存在数据库中时,它也会存储传入的消息,以使处理连续进行,一个接一个,没有多个挂起的写入。

  1. import akka.actor.typed.javadsl.StashBuffer;
  2. interface DB {
  3. CompletionStage<Done> save(String id, String value);
  4. CompletionStage<String> load(String id);
  5. }
  6. public static class DataAccess {
  7. static interface Command {}
  8. public static class Save implements Command {
  9. public final String payload;
  10. public final ActorRef<Done> replyTo;
  11. public Save(String payload, ActorRef<Done> replyTo) {
  12. this.payload = payload;
  13. this.replyTo = replyTo;
  14. }
  15. }
  16. public static class Get implements Command {
  17. public final ActorRef<String> replyTo;
  18. public Get(ActorRef<String> replyTo) {
  19. this.replyTo = replyTo;
  20. }
  21. }
  22. static class InitialState implements Command {
  23. public final String value;
  24. InitialState(String value) {
  25. this.value = value;
  26. }
  27. }
  28. static class SaveSuccess implements Command {
  29. public static final SaveSuccess instance = new SaveSuccess();
  30. private SaveSuccess() {}
  31. }
  32. static class DBError implements Command {
  33. public final RuntimeException cause;
  34. public DBError(RuntimeException cause) {
  35. this.cause = cause;
  36. }
  37. }
  38. private final StashBuffer<Command> buffer = StashBuffer.create(100);
  39. private final String id;
  40. private final DB db;
  41. public DataAccess(String id, DB db) {
  42. this.id = id;
  43. this.db = db;
  44. }
  45. Behavior<Command> behavior() {
  46. return Behaviors.setup(
  47. context -> {
  48. context.pipeToSelf(
  49. db.load(id),
  50. (value, cause) -> {
  51. if (cause == null) return new InitialState(value);
  52. else return new DBError(asRuntimeException(cause));
  53. });
  54. return init();
  55. });
  56. }
  57. private Behavior<Command> init() {
  58. return Behaviors.receive(Command.class)
  59. .onMessage(
  60. InitialState.class,
  61. (context, message) -> {
  62. // now we are ready to handle stashed messages if any
  63. return buffer.unstashAll(context, active(message.value));
  64. })
  65. .onMessage(
  66. DBError.class,
  67. (context, message) -> {
  68. throw message.cause;
  69. })
  70. .onMessage(
  71. Command.class,
  72. (context, message) -> {
  73. // stash all other messages for later processing
  74. buffer.stash(message);
  75. return Behaviors.same();
  76. })
  77. .build();
  78. }
  79. private Behavior<Command> active(String state) {
  80. return Behaviors.receive(Command.class)
  81. .onMessage(
  82. Get.class,
  83. (context, message) -> {
  84. message.replyTo.tell(state);
  85. return Behaviors.same();
  86. })
  87. .onMessage(
  88. Save.class,
  89. (context, message) -> {
  90. context.pipeToSelf(
  91. db.save(id, message.payload),
  92. (value, cause) -> {
  93. if (cause == null) return SaveSuccess.instance;
  94. else return new DBError(asRuntimeException(cause));
  95. });
  96. return saving(message.payload, message.replyTo);
  97. })
  98. .build();
  99. }
  100. private Behavior<Command> saving(String state, ActorRef<Done> replyTo) {
  101. return Behaviors.receive(Command.class)
  102. .onMessageEquals(
  103. SaveSuccess.instance,
  104. context -> {
  105. replyTo.tell(Done.getInstance());
  106. return buffer.unstashAll(context, active(state));
  107. })
  108. .onMessage(
  109. DBError.class,
  110. (context, message) -> {
  111. throw message.cause;
  112. })
  113. .onMessage(
  114. Command.class,
  115. (context, message) -> {
  116. buffer.stash(message);
  117. return Behaviors.same();
  118. })
  119. .build();
  120. }
  121. private static RuntimeException asRuntimeException(Throwable t) {
  122. // can't throw Throwable in lambdas
  123. if (t instanceof RuntimeException) {
  124. return (RuntimeException) t;
  125. } else {
  126. return new RuntimeException(t);
  127. }
  128. }
  129. }

需要注意的一件重要的事情是,StashBuffer是一个缓冲区,存储的消息将保存在内存中,直到它们被取消显示(或者 Actor 被停止并被垃圾回收)。建议避免存储过多的消息,以避免过多的内存使用,如果许多 Actor 存储过多的消息,甚至会有OutOfMemoryError的风险。因此,StashBuffer是有界的,在创建时必须指定它可以保存多少消息的capacity

如果你试图存储的消息超过了capacity的容量,那么将抛出一个StashOverflowException。你可以在存储消息之前使用StashBuffer.isFull,以避免出现这种情况并采取其他操作,例如删除消息。

当通过调用unstashAll取消缓冲消息的显示时,所有消息将按添加它们的顺序依次处理,并且除非引发异常,否则将全部处理。在unstashAll完成之前,Actor 对其他新消息没有响应。这也是保持隐藏信息数量低的另一个原因。占用消息处理线程太长时间的 Actor 可能会导致其他 Actor 发生饥饿现象。

这可以通过使用StashBuffer.unstashnumberOfMessages参数来减轻消息负载,然后在继续取消更多的显示之前向context.getSelf发送一条消息。这意味着其他新消息可能会在这期间到达,并且必须将它们存储起来,以保持消息的原始顺序。不过这将使它变得更加复杂,所以最好将隐藏消息的数量保持在较低的水平。


英文原文链接Stash.