调度程序

依赖

为了使用调度程序(Scheduler),你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-actor_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-actor_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.23"

简介

有时候需要让事情在未来发生,那你去哪里看呢?除了ActorSystem,不需要看其他的!在这里,你可以找到返回akka.actor.Scheduler实例的scheduler方法,该实例对于每个ActorSystem都是唯一的,并且在内部用于调度在特定时间点发生的事情。

你可以安排向 Actor 发送消息和执行任务(函数或Runnable)。你将得到一个Cancellable返回,你也可以调用cancel来取消计划操作的执行。

当对 Actor 中的定期或单个消息进行自我调度时,建议使用「Actor Timers」,而不是直接使用Scheduler

Akka 中的调度程序设计用于高吞吐量的数千到数百万个触发器。主要的用例是触发器 Actor 接收超时、Future的超时、断路器(circuit breakers)和其他与时间相关的事件,这些事件总是同时发生在许多情况下。该实现基于哈希轮计时器(Hashed Wheel Timer),这是处理此类用例的已知数据结构和算法,如果你想了解其内部工作原理,请参阅 Varghese 和 Lauck 的「Hashed and Hierarchical Timing Wheels」白皮书。

Akka 调度程序不是为长期调度而设计的(请参阅「akka-quartz-scheduler」,而不是这个用例),也不用于高精度的事件触发。在未来,你可以安排触发事件的最长时间约为 8 个月,实际上,这太多了,无法发挥作用,因为这将假定系统在这段时间内从未停止。如果你需要长期的调度,我们强烈建议你寻找替代的调度程序,因为这不是实现 Akka 调度程序的用例。

  • 警告:Akka 使用的调度程序的默认实现基于作业桶(job buckets),作业桶根据固定的计划清空。它不会在准确的时间执行任务,但在每一个时间点上,它将运行所有到期(超过)的内容。默认计划程序的准确性可以由akka.scheduler.tick-duration配置属性修改。

一些示例

  1. import akka.actor.Props;
  2. import jdocs.AbstractJavaTest;
  3. import java.time.Duration;

计划在 50 毫秒后向testActor发送foo消息:

  1. system
  2. .scheduler()
  3. .scheduleOnce(Duration.ofMillis(50), testActor, "foo", system.dispatcher(), null);

调度一个Runnable,它将当前时间发送给testActor,在 50 毫秒后执行:

  1. system
  2. .scheduler()
  3. .scheduleOnce(
  4. Duration.ofMillis(50),
  5. new Runnable() {
  6. @Override
  7. public void run() {
  8. testActor.tell(System.currentTimeMillis(), ActorRef.noSender());
  9. }
  10. },
  11. system.dispatcher());

在 0 毫秒后向tickActor发送Tick消息,并于每 50 毫秒后重复发生此消息:

  1. class Ticker extends AbstractActor {
  2. @Override
  3. public Receive createReceive() {
  4. return receiveBuilder()
  5. .matchEquals(
  6. "Tick",
  7. m -> {
  8. // Do someting
  9. })
  10. .build();
  11. }
  12. }
  13. ActorRef tickActor = system.actorOf(Props.create(Ticker.class, this));
  14. // This will schedule to send the Tick-message
  15. // to the tickActor after 0ms repeating every 50ms
  16. Cancellable cancellable =
  17. system
  18. .scheduler()
  19. .schedule(
  20. Duration.ZERO, Duration.ofMillis(50), tickActor, "Tick", system.dispatcher(), null);
  21. // This cancels further Ticks to be sent
  22. cancellable.cancel();
  • 警告:如果调度函数或Runnable实例,则应特别注意不要关闭不稳定的引用。在实践中,这意味着不要在 Actor 实例的范围内的闭包中使用this,不要直接访问sender(),也不要直接调用 Actor 实例的方法。如果需要调度调用,请将消息调度为self(包含必要的参数),然后在收到消息时调用该方法。

来自 akka.actor.ActorSystem

  1. /**
  2. * Light-weight scheduler for running asynchronous tasks after some deadline
  3. * in the future. Not terribly precise but cheap.
  4. */
  5. def scheduler: Scheduler
  • 警告:当ActorSystem终止时,所有计划的任务都将被执行,即任务可以在超时之前执行。

Scheduler 接口

实际的调度程序实现是在ActorSystem启动时反射加载的,这意味着可以使用akka.scheduler.implementation配置属性提供不同的实现。引用的类必须实现以下接口:

  1. /**
  2. * An Akka scheduler service. This one needs one special behavior: if Closeable, it MUST execute all
  3. * outstanding tasks upon .close() in order to properly shutdown all dispatchers.
  4. *
  5. * <p>Furthermore, this timer service MUST throw IllegalStateException if it cannot schedule a task.
  6. * Once scheduled, the task MUST be executed. If executed upon close(), the task may execute before
  7. * its timeout.
  8. *
  9. * <p>Scheduler implementation are loaded reflectively at ActorSystem start-up with the following
  10. * constructor arguments: 1) the system’s com.typesafe.config.Config (from system.settings.config)
  11. * 2) a akka.event.LoggingAdapter 3) a java.util.concurrent.ThreadFactory
  12. */
  13. public abstract class AbstractScheduler extends AbstractSchedulerBase {
  14. /**
  15. * Schedules a function to be run repeatedly with an initial delay and a frequency. E.g. if you
  16. * would like the function to be run after 2 seconds and thereafter every 100ms you would set
  17. * delay = Duration(2, TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS)
  18. */
  19. @Override
  20. public abstract Cancellable schedule(
  21. FiniteDuration initialDelay,
  22. FiniteDuration interval,
  23. Runnable runnable,
  24. ExecutionContext executor);
  25. /**
  26. * Schedules a function to be run repeatedly with an initial delay and a frequency. E.g. if you
  27. * would like the function to be run after 2 seconds and thereafter every 100ms you would set
  28. * delay = Duration(2, TimeUnit.SECONDS) and interval = Duration.ofMillis(100)
  29. */
  30. public Cancellable schedule(
  31. final java.time.Duration initialDelay,
  32. final java.time.Duration interval,
  33. final Runnable runnable,
  34. final ExecutionContext executor) {
  35. return schedule(
  36. JavaDurationConverters.asFiniteDuration(initialDelay),
  37. JavaDurationConverters.asFiniteDuration(interval),
  38. runnable,
  39. executor);
  40. }
  41. /**
  42. * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before
  43. * the runnable is executed.
  44. */
  45. @Override
  46. public abstract Cancellable scheduleOnce(
  47. FiniteDuration delay, Runnable runnable, ExecutionContext executor);
  48. /**
  49. * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before
  50. * the runnable is executed.
  51. */
  52. public Cancellable scheduleOnce(
  53. final java.time.Duration delay, final Runnable runnable, final ExecutionContext executor) {
  54. return scheduleOnce(JavaDurationConverters.asFiniteDuration(delay), runnable, executor);
  55. }
  56. /**
  57. * The maximum supported task frequency of this scheduler, i.e. the inverse of the minimum time
  58. * interval between executions of a recurring task, in Hz.
  59. */
  60. @Override
  61. public abstract double maxFrequency();
  62. }

Cancellable 接口

调度任务将导致Cancellable(或在调度程序关闭后尝试时引发IllegalStateException)。这允许你取消已计划执行的某些操作。

  • 警告:如果任务已经启动,则不会中止执行。检查cancel的返回值以检测调度任务是已取消还是(最终)将运行。
  1. /**
  2. * Signifies something that can be cancelled
  3. * There is no strict guarantee that the implementation is thread-safe,
  4. * but it should be good practice to make it so.
  5. */
  6. trait Cancellable {
  7. /**
  8. * Cancels this Cancellable and returns true if that was successful.
  9. * If this cancellable was (concurrently) cancelled already, then this method
  10. * will return false although isCancelled will return true.
  11. *
  12. * Java & Scala API
  13. */
  14. def cancel(): Boolean
  15. /**
  16. * Returns true if and only if this Cancellable has been successfully cancelled
  17. *
  18. * Java & Scala API
  19. */
  20. def isCancelled: Boolean
  21. }

英文原文链接Scheduler.