第9章 Java并发包中ScheduledThreadPoolExecutor原理探究

类图结构

ScheduledThreadPoolExecutor时一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。

第9章 Java并发包中ScheduledThreadPoolExecutor原理探究 - 图1

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。

线程池队列是DelayedWorkQueue,与DelayedQueue一样属于延迟队列。

ScheduledFuturetask是具有返回值的任务,继承自FutureTask。FutureTask内部用一个变量state来表示任务的状态,一开始为NEW。

各状态意义如下:

  1. private static final int NEW = 0; // 初始状态
  2. private static final int COMPLETING = 1; // 执行中
  3. private static final int NORMAL = 2; // 正常运行结束
  4. private static final int EXCEPTIONAL = 3; // 运行中异常
  5. private static final int CANCELLED = 4; // 任务被取消
  6. private static final int INTERRUPTING = 5; // 任务正在被中断
  7. private static final int INTERRUPTED = 6; // 任务已经被中断

ScheduledFutureTask内部用一个变量period来表示任务的类型:

  • period=0,说明当前任务是一次性的,执行完毕后就推出了。
  • period为负数,说明当前任务为固定延迟的定时可重复执行任务(执行完一次后会停止指定时间后再次运行,若每次执行任务耗时不同,则显然相邻两次任务执行间隔不同)。
  • period为正数,说明当前任务为固定频率的定尺可重复执行任务(也即固定周期)。

以下为ScheduledThreadPoolExecutor的构造函数:

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }
  5. // 指定了线程工厂
  6. public ScheduledThreadPoolExecutor(int corePoolSize,
  7. ThreadFactory threadFactory) {
  8. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  9. new DelayedWorkQueue(), threadFactory);
  10. }
  11. // 指定了拒绝策略
  12. public ScheduledThreadPoolExecutor(int corePoolSize,
  13. RejectedExecutionHandler handler) {
  14. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  15. new DelayedWorkQueue(), handler);
  16. }
  17. // 指定了线程工厂和拒绝策略
  18. public ScheduledThreadPoolExecutor(int corePoolSize,
  19. ThreadFactory threadFactory,
  20. RejectedExecutionHandler handler) {
  21. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  22. new DelayedWorkQueue(), threadFactory, handler);
  23. }

从上面的代码中可以看到,ScheduledThreadPoolExecutor的线程池队列为DelayedWorkQueue。

源码分析

schedule(Runnable command, long delay, TimeUnit unit)

提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay后开始执行。提交的任务不是周期性任务,任务只会执行一次。

  1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  2. // 参数校验
  3. if (command == null || unit == null)
  4. throw new NullPointerException();
  5. // 将任务包装成ScheduledFutureTask
  6. // triggerTime方法用来计算触发时间(即任务开始执行的绝对时间)
  7. RunnableScheduledFuture<?> t = decorateTask(command,
  8. new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
  9. // 添加任务到延迟队列
  10. delayedExecute(t);
  11. return t;
  12. }

以下是ScheduledFutureTask的相关代码:

  1. ScheduledFutureTask(Runnable r, V result, long ns) {
  2. // 调用父类构造函数
  3. super(r, result);
  4. this.time = ns; // 等待ns纳秒后开始执行
  5. this.period = 0; // period=0说明为一次性任务
  6. // 记录任务编号
  7. this.sequenceNumber = sequencer.getAndIncrement();
  8. }
  9. public FutureTask(Runnable runnable, V result) {
  10. // 将Runnable任务转化成Callable任务
  11. this.callable = Executors.callable(runnable, result);
  12. this.state = NEW; // ensure visibility of callable
  13. }

delayedExecute的代码如下:

  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2. // 线程池关闭则执行拒绝策略
  3. if (isShutdown())
  4. reject(task);
  5. else {
  6. // 将任务添加到任务队列中
  7. // 任务队列为DelayedWorkQueue
  8. // 所加的task实现了comparable接口
  9. // 添加到任务队列中能保证队首元素为最早需要执行的
  10. super.getQueue().add(task);
  11. // 再次检查线程池是否关闭
  12. // 因为执行上面add代码过程中线程池完全有可能被关闭
  13. // 如果线程池被关闭则判断当前任务是否可以在当前状态下继续执行
  14. // 不能继续执行则移除当前任务
  15. if (isShutdown() &&
  16. !canRunInCurrentRunState(task.isPeriodic()) &&
  17. remove(task))
  18. task.cancel(false);
  19. else
  20. // 保证至少有一个线程存活可以从任务队列中获取任务处理任务
  21. ensurePrestart();
  22. }
  23. }
  24. // 判断任务是否是周期执行的
  25. public boolean isPeriodic() {
  26. return period != 0;
  27. }
  28. // 根据periodic来决定isRunningOrShutdown的参数
  29. // continueExistingPeriodicTasksAfterShutdown和
  30. // executeExistingDelayedTasksAfterShutdown的值可通过相应的setter方法来设置
  31. // 为true表示线程池关闭后当前任务会继续执行完毕
  32. // 为false则取消当前任务
  33. boolean canRunInCurrentRunState(boolean periodic) {
  34. return isRunningOrShutdown(periodic ?
  35. continueExistingPeriodicTasksAfterShutdown :
  36. executeExistingDelayedTasksAfterShutdown);
  37. }
  38. // 确保至少有一个线程存活来执行任务
  39. void ensurePrestart() {
  40. int wc = workerCountOf(ctl.get());
  41. if (wc < corePoolSize)
  42. addWorker(null, true);
  43. else if (wc == 0)
  44. addWorker(null, false);
  45. }

线程池中具体执行任务的是Worker,Worker通过调用run方法来执行。这里的任务是ScheduledFutureTask,下面来看其run方法。

  1. // ScheduledFutureTask.run
  2. public void run() {
  3. boolean periodic = isPeriodic();
  4. // 判断是否需要取消任务
  5. if (!canRunInCurrentRunState(periodic))
  6. cancel(false);
  7. // 一次性任务
  8. else if (!periodic)
  9. // 调用FutureTask的run方法
  10. ScheduledFutureTask.super.run();
  11. // 周期性任务
  12. // runAndReset为FutureTask中的方法
  13. // 用于执行当前任务但不改变future的状态
  14. else if (ScheduledFutureTask.super.runAndReset()) {
  15. // 设置下次执行的时间
  16. setNextRunTime();
  17. // 默认情况下,outerTask = this就是当前对象
  18. reExecutePeriodic(outerTask);
  19. }
  20. }
  21. // 设置周期性任务下次执行时间
  22. private void setNextRunTime() {
  23. long p = period;
  24. // p > 0表示任务执行频率一定
  25. if (p > 0)
  26. // time为此次任务(已执行完毕)刚开始执行时的时间
  27. time += p;
  28. // p < 0表示任务固定延迟时间
  29. // 即此次任务完成后会等待-p时间再执行下次任务
  30. else
  31. // 获取-p时间后的绝对时间
  32. time = triggerTime(-p);
  33. }
  34. // 周期性执行
  35. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  36. if (canRunInCurrentRunState(true)) {
  37. // 再次将task添加至任务队列中等待执行
  38. // 当轮到task执行时,又会在run中调用此方法
  39. // 再次将自身添加到任务队列中,从而达到周期性执行效果
  40. super.getQueue().add(task);
  41. if (!canRunInCurrentRunState(true) && remove(task))
  42. task.cancel(false);
  43. else
  44. ensurePrestart();
  45. }
  46. }

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

当任务执行完毕后,让其延迟固定时间后再次运行。

  1. // command:所要执行的任务
  2. // initialDelay: 提交任务后等待多少时间再开始执行任务
  3. // delay: 一次任务执行完后等待多少时间才执行下次任务
  4. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  5. long initialDelay,
  6. long delay,
  7. TimeUnit unit) {
  8. // 参数校验
  9. if (command == null || unit == null)
  10. throw new NullPointerException();
  11. if (delay <= 0)
  12. throw new IllegalArgumentException();
  13. // 将Runnable任务转换成ScheduledFutureTask
  14. // 第四个参数period为-delay < 0表示当前任务是固定延迟的
  15. ScheduledFutureTask<Void> sft =
  16. new ScheduledFutureTask<Void>(command,
  17. null,
  18. triggerTime(initialDelay, unit),
  19. unit.toNanos(-delay));
  20. // decorateTask默认直接返回第二个参数,即sft
  21. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  22. // 默认情况下t = sft
  23. // outerTask即为sft自身,用于实现周期性执行
  24. sft.outerTask = t;
  25. delayedExecute(t);
  26. return t;
  27. }
  28. // 可被子类重写拓展
  29. // 默认实现只是简单的返回task
  30. protected <V> RunnableScheduledFuture<V> decorateTask(
  31. Runnable runnable, RunnableScheduledFuture<V> task) {
  32. return task;
  33. }

主要不同在于设置了period=-delay,其他代码与schedule相同,相应的代码会判断period的取值从而决定程序不同的行为。

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

按固定频率周期性地执行任务。

  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  2. long initialDelay,
  3. long period,
  4. TimeUnit unit) {
  5. if (command == null || unit == null)
  6. throw new NullPointerException();
  7. if (period <= 0)
  8. throw new IllegalArgumentException();
  9. // 关键在于此处第四个参数period > 0
  10. ScheduledFutureTask<Void> sft =
  11. new ScheduledFutureTask<Void>(command,
  12. null,
  13. triggerTime(initialDelay, unit),
  14. unit.toNanos(period));
  15. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  16. sft.outerTask = t;
  17. delayedExecute(t);
  18. return t;
  19. }

除了设置period的值大于0外,总体与scheduleWithFixedDelay类似,不再赘述。

更多

相关笔记:《Java并发编程之美》阅读笔记