调度器

注:本节未经校验,如有问题欢迎提issue

有时需要设定将来发生的事情,这时该怎么办? ActorSystem 就能搞定一切! 在那儿你能找到 scheduler 方法,它返回一个 akka.actor.Scheduler 实例, 这个实例在每个Actor系统里是唯一的,用来在内部指定一段时间后发生的行为。

请注意定时任务是使用 ActorSystem 的 MessageDispatcher 执行的.

你可以计划向actor发送消息或执行任务 (函数或Runnable). 你会得到一个 Cancellable 类型的返回值,你可以调用 cancel 来取消定时操作的执行。

警告

Akka中使用的Scheduler的默认实现是基于根据一个固定的时间表清空的工作桶。它不是在确切的时间执行任务,而是在每个时间刻度,它将运行 (结束) 到期的一切。可以通过akka.scheduler.tick-duration配置属性修改默认Scheduler的时间精度。

示例

  1. import akka.actor.Actor
  2. import akka.actor.Props
  3. import scala.concurrent.duration._
  4. //Use the system's dispatcher as ExecutionContext
  5. import system.dispatcher
  6. //Schedules to send the "foo"-message to the testActor after 50ms
  7. system.scheduler.scheduleOnce(50 milliseconds, testActor, "foo")
  1. //Schedules a function to be executed (send a message to the testActor) after 50ms
  2. system.scheduler.scheduleOnce(50 milliseconds) {
  3. testActor ! System.currentTimeMillis
  4. }
  1. val Tick = "tick"
  2. class TickActor extends Actor {
  3. def receive = {
  4. case Tick => //Do something
  5. }
  6. }
  7. val tickActor = system.actorOf(Props(classOf[TickActor], this))
  8. //Use system's dispatcher as ExecutionContext
  9. import system.dispatcher
  10. //This will schedule to send the Tick-message
  11. //to the tickActor after 0ms repeating every 50ms
  12. val cancellable =
  13. system.scheduler.schedule(0 milliseconds,
  14. 50 milliseconds,
  15. tickActor,
  16. Tick)
  17. //This cancels further Ticks to be sent
  18. cancellable.cancel()

警告

如果你计划函数或Runnable实例时应该多加小心,不要关闭(闭合包含)不稳定的引用。在实践中这意味着在Actor实例中,不要在闭包中使用this,不直接访问 sender() 并且不要直接调用actor实例的方法。如果你需要安排一个调用,则安排一个发往self的消息 (包含必需的参数),然后在收到消息时再调用方法。

来自 akka.actor.ActorSystem

  1. /**
  2. * Light-weight scheduler for running asynchronous tasks after some deadline
  3. * in the future. Not terribly precise but cheap.
  4. */
  5. def scheduler: Scheduler

Scheduler 接口

实际的调度程序实现是在 ActorSystem 启动时反射加载的,这意味着通过使用 akka.scheduler.implementation 配置属性它可能提供一个的不同实现。引用的类必须实现以下接口:

  1. /**
  2. * An Akka scheduler service. This one needs one special behavior: if
  3. * Closeable, it MUST execute all outstanding tasks upon .close() in order
  4. * to properly shutdown all dispatchers.
  5. *
  6. * Furthermore, this timer service MUST throw IllegalStateException if it
  7. * cannot schedule a task. Once scheduled, the task MUST be executed. If
  8. * executed upon close(), the task may execute before its timeout.
  9. *
  10. * Scheduler implementation are loaded reflectively at ActorSystem start-up
  11. * with the following constructor arguments:
  12. * 1) the system’s com.typesafe.config.Config (from system.settings.config)
  13. * 2) a akka.event.LoggingAdapter
  14. * 3) a java.util.concurrent.ThreadFactory
  15. */
  16. trait Scheduler {
  17. /**
  18. * Schedules a message to be sent repeatedly with an initial delay and
  19. * frequency. E.g. if you would like a message to be sent immediately and
  20. * thereafter every 500ms you would set delay=Duration.Zero and
  21. * interval=Duration(500, TimeUnit.MILLISECONDS)
  22. *
  23. * Java & Scala API
  24. */
  25. final def schedule(
  26. initialDelay: FiniteDuration,
  27. interval: FiniteDuration,
  28. receiver: ActorRef,
  29. message: Any)(implicit executor: ExecutionContext,
  30. sender: ActorRef = Actor.noSender): Cancellable =
  31. schedule(initialDelay, interval, new Runnable {
  32. def run = {
  33. receiver ! message
  34. if (receiver.isTerminated)
  35. throw new SchedulerException("timer active for terminated actor")
  36. }
  37. })
  38. /**
  39. * Schedules a function to be run repeatedly with an initial delay and a
  40. * frequency. E.g. if you would like the function to be run after 2 seconds
  41. * and thereafter every 100ms you would set delay = Duration(2, TimeUnit.SECONDS)
  42. * and interval = Duration(100, TimeUnit.MILLISECONDS)
  43. *
  44. * Scala API
  45. */
  46. final def schedule(
  47. initialDelay: FiniteDuration,
  48. interval: FiniteDuration)(f: Unit)(
  49. implicit executor: ExecutionContext): Cancellable =
  50. schedule(initialDelay, interval, new Runnable { override def run = f })
  51. /**
  52. * Schedules a function to be run repeatedly with an initial delay and
  53. * a frequency. E.g. if you would like the function to be run after 2
  54. * seconds and thereafter every 100ms you would set delay = Duration(2,
  55. * TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS)
  56. *
  57. * Java API
  58. */
  59. def schedule(
  60. initialDelay: FiniteDuration,
  61. interval: FiniteDuration,
  62. runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
  63. /**
  64. * Schedules a message to be sent once with a delay, i.e. a time period that has
  65. * to pass before the message is sent.
  66. *
  67. * Java & Scala API
  68. */
  69. final def scheduleOnce(
  70. delay: FiniteDuration,
  71. receiver: ActorRef,
  72. message: Any)(implicit executor: ExecutionContext,
  73. sender: ActorRef = Actor.noSender): Cancellable =
  74. scheduleOnce(delay, new Runnable {
  75. override def run = receiver ! message
  76. })
  77. /**
  78. * Schedules a function to be run once with a delay, i.e. a time period that has
  79. * to pass before the function is run.
  80. *
  81. * Scala API
  82. */
  83. final def scheduleOnce(delay: FiniteDuration)(f: Unit)(
  84. implicit executor: ExecutionContext): Cancellable =
  85. scheduleOnce(delay, new Runnable { override def run = f })
  86. /**
  87. * Schedules a Runnable to be run once with a delay, i.e. a time period that
  88. * has to pass before the runnable is executed.
  89. *
  90. * Java & Scala API
  91. */
  92. def scheduleOnce(
  93. delay: FiniteDuration,
  94. runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
  95. /**
  96. * The maximum supported task frequency of this scheduler, i.e. the inverse
  97. * of the minimum time interval between executions of a recurring task, in Hz.
  98. */
  99. def maxFrequency: Double
  100. }

Cancellable 接口

它使你可以 取消 计划执行的任务。

警告
它不会中止已经启动的任务的执行。

调度的任务会返回一个 Cancellable (或抛出IllegalStateException时,如果在调度器关闭后尝试使用)。这允许你取消原定执行的东西。

警告

这不会中止已经开始执行的任务。检查cancel的返回值以检测已计划的任务是被取消,还是(最终)被运行。

  1. /**
  2. * Signifies something that can be cancelled
  3. * There is no strict guarantee that the implementation is thread-safe,
  4. * but it should be good practice to make it so.
  5. */
  6. trait Cancellable {
  7. /**
  8. * Cancels this Cancellable and returns true if that was successful.
  9. * If this cancellable was (concurrently) cancelled already, then this method
  10. * will return false although isCancelled will return true.
  11. *
  12. * Java & Scala API
  13. */
  14. def cancel(): Boolean
  15. /**
  16. * Returns true if and only if this Cancellable has been successfully cancelled
  17. *
  18. * Java & Scala API
  19. */
  20. def isCancelled: Boolean
  21. }