请实现一个定时任务调度器,有很多任务,每个任务都有一个时间戳,任务会在该时间点开始执行。

定时执行任务是一个很常见的需求,例如Uber打车48小时后自动好评,淘宝购物15天后默认好评,等等。

方案1: PriorityBlockingQueue + Polling

我们很快可以想到第一个办法:

  • 用一个java.util.concurrent.PriorityBlockingQueue来作为优先队列。因为我们需要一个优先队列,又需要线程安全,用PriorityBlockingQueue再合适不过了。你也可以手工实现一个自己的PriorityBlockingQueue,用java.util.PriorityQueue + ReentrantLock,用一把锁把这个队列保护起来,就是线程安全的啦
  • 对于生产者,可以用一个while(true),造一些随机任务塞进去
  • 对于消费者,起一个线程,在 while(true)里每隔几秒检查一下队列,如果有任务,则取出来执行。

这个方案的确可行,总结起来就是轮询(polling)。轮询通常有个很大的缺点,就是时间间隔不好设置,间隔太长,任务无法及时处理,间隔太短,会很耗CPU。

方案2: PriorityBlockingQueue + 时间差

可以把方案1改进一下,while(true)里的逻辑变成:

  • 偷看一下堆顶的元素,但并不取出来,如果该任务过期了,则取出来
  • 如果没过期,则计算一下时间差,然后 sleep()该时间差

不再是 sleep() 一个固定间隔了,消除了轮询的缺点。

稍等!这个方案其实有个致命的缺陷,导致它比 PiorityBlockingQueue + Polling 更加不可用,这个缺点是什么呢?。。。假设当前堆顶的任务在100秒后执行,消费者线程peek()偷看到了后,开始sleep 100秒,这时候一个新的任务插了进来,该任务在10秒后应该执行,但是由于消费者线程要睡眠100秒,这个新任务无法及时处理。

方案3: DelayQueue

方案2虽然已经不错了,但是还可以优化一下,Java里有一个DelayQueue,完全符合题目的要求。DelayQueue 设计得非常巧妙,可以看做是一个特化版的PriorityBlockingQueue,它把计算时间差并让消费者等待该时间差的功能集成进了队列,消费者不需要关心时间差的事情了,直接在while(true)里不断take()就行了。

DelayQueue的实现原理见下面的代码。

  1. import java.util.PriorityQueue;
  2. import java.util.concurrent.Delayed;
  3. import java.util.concurrent.locks.Condition;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. import static java.util.concurrent.TimeUnit.NANOSECONDS;
  6. public class DelayQueue<E extends Delayed> {
  7. private final transient ReentrantLock lock = new ReentrantLock();
  8. private final PriorityQueue<E> q = new PriorityQueue<E>();
  9. private final Condition available = lock.newCondition();
  10. private Thread leader = null;
  11. public DelayQueue() {}
  12. /**
  13. * Inserts the specified element into this delay queue.
  14. *
  15. * @param e the element to add
  16. * @return {@code true}
  17. * @throws NullPointerException if the specified element is null
  18. */
  19. public boolean put(E e) {
  20. final ReentrantLock lock = this.lock;
  21. lock.lock();
  22. try {
  23. q.offer(e);
  24. if (q.peek() == e) {
  25. leader = null;
  26. available.signal();
  27. }
  28. return true;
  29. } finally {
  30. lock.unlock();
  31. }
  32. }
  33. /**
  34. * Retrieves and removes the head of this queue, waiting if necessary
  35. * until an element with an expired delay is available on this queue.
  36. *
  37. * @return the head of this queue
  38. * @throws InterruptedException {@inheritDoc}
  39. */
  40. public E take() throws InterruptedException {
  41. final ReentrantLock lock = this.lock;
  42. lock.lockInterruptibly();
  43. try {
  44. for (;;) {
  45. E first = q.peek();
  46. if (first == null)
  47. available.await();
  48. else {
  49. long delay = first.getDelay(NANOSECONDS);
  50. if (delay <= 0)
  51. return q.poll();
  52. first = null; // don't retain ref while waiting
  53. if (leader != null)
  54. available.await();
  55. else {
  56. Thread thisThread = Thread.currentThread();
  57. leader = thisThread;
  58. try {
  59. available.awaitNanos(delay);
  60. } finally {
  61. if (leader == thisThread)
  62. leader = null;
  63. }
  64. }
  65. }
  66. }
  67. } finally {
  68. if (leader == null && q.peek() != null)
  69. available.signal();
  70. lock.unlock();
  71. }
  72. }
  73. }

这个代码中有几个要点要注意一下。

1. put()方法

  1. if (q.peek() == e) {
  2. leader = null;
  3. available.signal();
  4. }

如果第一个元素等于刚刚插入进去的元素,说明刚才队列是空的。现在队列里有了一个任务,那么就应该唤醒所有在等待的消费者线程,避免了方案2的缺点。将leader重置为null,这些消费者之间互相竞争,自然有一个会被选为leader。

2. 线程leader的作用

leader这个成员有啥作用?DelayQueue的设计其实是一个Leader/Follower模式,leader就是指向Leader线程的。该模式可以减少不必要的等待时间,当一个线程是Leader时,它只需要一个时间差;其他Follower线程则无限等待。比如头节点任务还有5秒就要开始了,那么Leader线程会sleep 5秒,不需要傻傻地等待固定时间间隔。

想象一下有个多个消费者线程用take方法去取任务,内部先加锁,然后每个线程都去peek头节点。如果leader不为空说明已经有线程在取了,让当前消费者无限等待。

  1. if (leader != null)
  2. available.await();

如果为空说明没有其他消费者去取任务,设置leader为当前消费者,并让改消费者等待指定的时间,

  1. else {
  2. Thread thisThread = Thread.currentThread();
  3. leader = thisThread;
  4. try {
  5. available.awaitNanos(delay);
  6. } finally {
  7. if (leader == thisThread)
  8. leader = null;
  9. }
  10. }

下次循环会走如下分支,取到任务结束,

  1. if (delay <= 0)
  2. return q.poll();

3. take()方法中为什么释放first

  1. first = null; // don't retain ref while waiting

我们可以看到 Doug Lea 后面写的注释,那么这行代码有什么用呢?

如果删除这行代码,会发生什么呢?假设现在有3个消费者线程,

  • 线程A进来获取first,然后进入 else 的 else ,设置了leader为当前线程A,并让A等待一段时间
  • 线程B进来获取first, 进入else的阻塞操作,然后无限期等待,这时线程B是持有first引用的
  • 线程A等待指定时间后被唤醒,获取对象成功,出队,这个对象理应被GC回收,但是它还被线程B持有着,GC链可达,所以不能回收这个first
  • 只要线程B无限期的睡眠,那么这个本该被回收的对象就不能被GC销毁掉,那么就会造成内存泄露

Task对象

  1. import java.util.concurrent.Delayed;
  2. import java.util.concurrent.TimeUnit;
  3. public class Task implements Delayed {
  4. private String name;
  5. private long startTime; // milliseconds
  6. public Task(String name, long delay) {
  7. this.name = name;
  8. this.startTime = System.currentTimeMillis() + delay;
  9. }
  10. @Override
  11. public long getDelay(TimeUnit unit) {
  12. long diff = startTime - System.currentTimeMillis();
  13. return unit.convert(diff, TimeUnit.MILLISECONDS);
  14. }
  15. @Override
  16. public int compareTo(Delayed o) {
  17. return (int)(this.startTime - ((Task) o).startTime);
  18. }
  19. @Override
  20. public String toString() {
  21. return "task " + name + " at " + startTime;
  22. }
  23. }

JDK中有一个接口java.util.concurrent.Delayed,可以用于表示具有过期时间的元素,刚好可以拿来表示任务这个概念。

生产者

  1. import java.util.Random;
  2. import java.util.UUID;
  3. public class TaskProducer implements Runnable {
  4. private final Random random = new Random();
  5. private DelayQueue<Task> q;
  6. public TaskProducer(DelayQueue<Task> q) {
  7. this.q = q;
  8. }
  9. @Override
  10. public void run() {
  11. while (true) {
  12. try {
  13. int delay = random.nextInt(10000);
  14. Task task = new Task(UUID.randomUUID().toString(), delay);
  15. System.out.println("Put " + task);
  16. q.put(task);
  17. Thread.sleep(3000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. }

生产者很简单,就是一个死循环,不断地产生一些是时间随机的任务。

消费者

  1. public class TaskConsumer implements Runnable {
  2. private DelayQueue<Task> q;
  3. public TaskConsumer(DelayQueue<Task> q) {
  4. this.q = q;
  5. }
  6. @Override
  7. public void run() {
  8. while (true) {
  9. try {
  10. Task task = q.take();
  11. System.out.println("Take " + task);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  17. }

当 DelayQueue 里没有任务时,TaskConsumer会无限等待,直到被唤醒,因此它不会消耗CPU。

定时任务调度器

  1. public class TaskScheduler {
  2. public static void main(String[] args) {
  3. DelayQueue<Task> queue = new DelayQueue<>();
  4. new Thread(new TaskProducer(queue), "Producer Thread").start();
  5. new Thread(new TaskConsumer(queue), "Consumer Thread").start();
  6. }
  7. }

DelayQueue这个方案,每个消费者线程只需要等待所需要的时间差,因此响应速度更快。它内部用了一个优先队列,所以插入和删除的时间复杂度都是\log n

JDK里还有一个ScheduledThreadPoolExecutor,原理跟DelayQueue类似,封装的更完善,平时工作中可以用它,不过面试中,还是拿DelayQueue来讲吧,它封装得比较薄,容易讲清楚原理。

方案4: 时间轮(HashedWheelTimer)

时间轮(HashedWheelTimer)其实很简单,就是一个循环队列,如下图所示,

定时任务调度器 - 图1

上图是一个长度为8的循环队列,假设该时间轮精度为秒,即每秒走一格,像手表那样,走完一圈就是8秒。每个格子指向一个任务集合,时间轮无限循环,每转到一个格子,就扫描该格子下面的所有任务,把时间到期的任务取出来执行。

举个例子,假设指针当前正指向格子0,来了一个任务需要4秒后执行,那么这个任务就会放在格子4下面,如果来了一个任务需要20秒后执行怎么?由于这个循环队列转一圈只需要8秒,这个任务需要多转2圈,所以这个任务的位置虽然依旧在格子4(20%8+0=4)下面,不过需要多转2圈后才执行。因此每个任务需要有一个字段记录需圈数,每转一圈就减1,减到0则立刻取出来执行。

怎么实现时间轮呢?Netty中已经有了一个时间轮的实现, HashedWheelTimer.java,可以参考它的源代码。

时间轮的优点是性能高,插入和删除的时间复杂度都是O(1)。Linux 内核中的定时器采用的就是这个方案。

Follow up: 如何设计一个分布式的定时任务调度器呢?
答: Redis ZSet, RabbitMQ等

参考资料