Agents

依赖

为了使用代理(Agents),你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-agent_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-agent_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-agent" % "2.5.23"

简介

Akka 的代理受到了「Clojure 中的代理」的启发。

  • 废弃警告:代理已被弃用,并计划在下一个主要版本中删除。我们发现,他们的抽象性(他们不在网络上工作)使他们不如纯粹的 Actor,而且面对 Akka Typed 很快被包括在内的情况,我们认为维持现有的代理没有什么价值。

代理提供单个位置的异步更改。代理在其生命周期内绑定到单个存储位置(storage location),并且只允许该位置的突变(到一个新的状态)作为操作的结果。更新操作是异步应用于代理状态的函数,其返回值将成为代理的新状态。代理的状态应该是不可变的。

虽然对代理的更新是异步的,但是代理的状态始终可以立即供任何线程(使用get)读取,而不需要任何消息。

代理是反应性的。所有代理的更新操作在ExecutionContext中的线程之间交错。在任何时间点,每个代理最多执行一个send操作。从另一个线程发送到代理的操作将按发送顺序发生,可能与从其他线程发送到同一代理的操作交错出现。

  • 注释:代理是创建它们的节点的本地代理。这意味着你通常不应将它们包括在可能传递给远程 Actor 的消息中,或者作为远程 Actor 的构造函数参数;这些远程 Actor 将无法读取或更新代理。

创建代理

通过调用new Agent<ValueType>(value, executionContext)传入代理的初始值并提供要用于它的ExecutionContext来创建代理,

  1. import scala.concurrent.ExecutionContext;
  2. import akka.agent.Agent;
  3. import akka.dispatch.ExecutionContexts;
  4. ExecutionContext ec = ExecutionContexts.global();
  5. Agent<Integer> agent = Agent.create(5, ec);

读取代理的值

通过使用get()调用代理,可以引用代理(可以获取代理的值),如下所示:

  1. Integer result = agent.get();

读取代理的当前值不涉及任何消息传递,并且会立即发生。因此,虽然对代理的更新是异步的,但是读取代理的状态是同步的。

你还可以获得代理值的Future,该值将在当前入队的更新完成后完成:

  1. import scala.concurrent.Future;
  2. Future<Integer> future = agent.future();

通过「Futures」可以了解有关Futures更多的信息。

更新代理(发送 & 更改)

通过发送转换当前值的函数(akka.dispatch.Mapper)或只发送一个新值来更新代理。代理将以原子方式和异步方式应用新值或函数。更新是以fire-forget的方式完成的,你只能保证它将被应用。无法保证何时应用更新,但将按顺序从单个线程发送给代理。通过调用send函数来应用值或函数。

  1. import akka.dispatch.Mapper;
  2. // send a value, enqueues this change
  3. // of the value of the Agent
  4. agent.send(7);
  5. // send a Mapper, enqueues this change
  6. // to the value of the Agent
  7. agent.send(
  8. new Mapper<Integer, Integer>() {
  9. public Integer apply(Integer i) {
  10. return i * 2;
  11. }
  12. });

你也可以调度一个函数来更新内部状态,但它是在自己的线程上进行的。这不使用反应式线程池,可以用于长时间运行或阻塞操作。你可以使用sendOff方法来实现这一点。使用sendOffsend的调度仍将按顺序执行。

  1. import akka.dispatch.Mapper;
  2. // sendOff a function
  3. agent.sendOff(longRunningOrBlockingFunction, theExecutionContextToExecuteItIn);

所有send方法都有一个相应的alter方法,它返回一个Future。有关Futures的更多信息,请参阅「Futures」。

  1. import scala.concurrent.Future;
  2. import akka.dispatch.Mapper;
  3. // alter a value
  4. Future<Integer> f1 = agent.alter(7);
  5. // alter a function (Mapper)
  6. Future<Integer> f2 =
  7. agent.alter(
  8. new Mapper<Integer, Integer>() {
  9. public Integer apply(Integer i) {
  10. return i * 2;
  11. }
  12. });
  13. // alterOff a function (Mapper)
  14. Future<Integer> f3 =
  15. agent.alterOff(longRunningOrBlockingFunction, theExecutionContextToExecuteItIn);

配置

代理模块有几个配置属性,具体请参阅「配置」。

不推荐使用事务代理

参与封闭 STM 事务的代理是2.3版本中不推荐使用的功能。

如果代理在封闭的 Scala STM 事务中使用,那么它将参与该事务。如果在事务中发送到代理,那么发送到代理的操作将一直保持到事务提交为止,如果事务中止,则放弃。


英文原文链接Agents.