第10章 Java并发包中线程同步器原理剖析

CountDownLatch原理剖析

日常开发中经常遇到一个线程需要等待一些线程都结束后才能继续向下运行的场景,在CountDownLatch出现之前通常使用join方法来实现,但join方法不够灵活,所以开发了CountDownLatch。

示例

  1. public static void main(String[] args) throws InterruptedException {
  2. CountDownLatch countDownLatch = new CountDownLatch(2);
  3. ExecutorService executorService = Executors.newFixedThreadPool(2);
  4. // 添加任务
  5. executorService.execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. // 模拟运行时间
  10. Thread.sleep(1000);
  11. System.out.println("thread one over...");
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }finally {
  15. // 递减计数器
  16. countDownLatch.countDown();
  17. }
  18. }
  19. });
  20. // 同上
  21. executorService.execute(new Runnable() {
  22. @Override
  23. public void run() {
  24. try {
  25. Thread.sleep(1000);
  26. System.out.println("thread two over...");
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }finally {
  30. countDownLatch.countDown();
  31. }
  32. }
  33. });
  34. System.out.println("wait all child thread over!");
  35. // 阻塞直到被interrupt或计数器递减至0
  36. countDownLatch.await();
  37. System.out.println("all child thread over!");
  38. executorService.shutdown();
  39. }

输出为:

  1. wait all child thread over!
  2. thread one over...
  3. thread two over...
  4. all child thread over!

CountDownLatch相对于join方法的优点大致有两点:

  • 调用一个子线程的join方法后,该线程会一直阻塞直到子线程运行完毕,而CountDownLatch允许子线程运行完毕或在运行过程中递减计数器,也就是说await方法不一定要等到子线程运行结束才返回。
  • 使用线程池来管理线程一般都是直接添加Runnable到线程池,这时就没有办法再调用线程的join方法了,而仍可在子线程中递减计数器,也就是说CountDownLatch相比join方法可以更灵活地控制线程的同步。

类图结构

第10章 Java并发包中线程同步器原理剖析 - 图1

由图可知,CountDownLatch是基于AQS实现的。

由下面的代码可知,CountDownLatch的计数器值就是AQS的state值

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }
  5. Sync(int count) {
  6. setState(count);
  7. }

源码解析

void await()

当线程调用CountDownLatch的await方法后,当前线程会被阻塞,直到CountDownLatch的计数器值递减至0或者其他线程调用了当前线程的interrupt方法。

  1. public void await() throws InterruptedException {
  2. // 允许中断(中断时抛出异常)
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. // AQS的方法
  6. public final void acquireSharedInterruptibly(int arg)
  7. throws InterruptedException {
  8. if (Thread.interrupted())
  9. throw new InterruptedException();
  10. // state=0时tryAcquireShared方法返回1,直接返回
  11. // 否则执行doAcquireSharedInterruptibly方法
  12. if (tryAcquireShared(arg) < 0)
  13. // state不为0,调用该方法使await方法阻塞
  14. doAcquireSharedInterruptibly(arg);
  15. }
  16. // Sync的方法(重写了AQS中的该方法)
  17. protected int tryAcquireShared(int acquires) {
  18. return (getState() == 0) ? 1 : -1;
  19. }
  20. // AQS的方法
  21. private void doAcquireSharedInterruptibly(int arg)
  22. throws InterruptedException {
  23. final Node node = addWaiter(Node.SHARED);
  24. boolean failed = true;
  25. try {
  26. for (;;) {
  27. final Node p = node.predecessor();
  28. if (p == head) {
  29. // 获取state值,state=0时r=1,直接返回,不再阻塞
  30. int r = tryAcquireShared(arg);
  31. if (r >= 0) {
  32. setHeadAndPropagate(node, r);
  33. p.next = null; // help GC
  34. failed = false;
  35. return;
  36. }
  37. }
  38. // 若state不为0则阻塞调用await方法的线程
  39. // 等到其他线程执行countDown方法使计数器递减至0
  40. // (state变为0)或该线程被interrupt时
  41. // 该线程才能继续向下运行
  42. if (shouldParkAfterFailedAcquire(p, node) &&
  43. parkAndCheckInterrupt())
  44. throw new InterruptedException();
  45. }
  46. } finally {
  47. if (failed)
  48. cancelAcquire(node);
  49. }
  50. }

boolean await(long timeout, TimeUnit unit)

相较于上面的await方法,调用此方法后调用线程最多被阻塞timeout时间(单位由unit指定),即使计数器没有递减至0或调用线程没有被interrupt,调用线程也会继续向下运行。

  1. public boolean await(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  4. }

void countDown()

递减计数器,当计数器的值为0(即state=0)时会唤醒所有因调用await方法而被阻塞的线程。

  1. public void countDown() {
  2. // 将计数器减1
  3. sync.releaseShared(1);
  4. }
  5. // AQS的方法
  6. public final boolean releaseShared(int arg) {
  7. // 当state被递减至0时tryReleaseShared返回true
  8. // 会执行doReleaseShared方法唤醒因调用await方法阻塞的线程
  9. // 否则如果state不是0的话什么也不做
  10. if (tryReleaseShared(arg)) {
  11. doReleaseShared();
  12. return true;
  13. }
  14. return false;
  15. }
  16. // Sync重写的AQS中的方法
  17. protected boolean tryReleaseShared(int releases) {
  18. for (;;) {
  19. int c = getState();
  20. // 如果state已经为0,没有递减必要,直接返回
  21. // 否则会使state变成负数
  22. if (c == 0)
  23. return false;
  24. int nextc = c-1;
  25. // 通过CAS递减state的值
  26. if (compareAndSetState(c, nextc))
  27. // 如果state被递减至0,返回true以进行后续唤醒工作
  28. return nextc == 0;
  29. }
  30. }

CyclicBarrier原理探究

CountDownLatch的计数器时一次性的,也就是说当计数器至变为0后,再调用await和countDown方法会直接返回。而CyclicBarrier则解决了此问题。CyclicBarrier是回环屏障的意思,它可以使一组线程全部达到一个状态后再全部同时执行,然后重置自身状态又可用于下一次的状态同步。

示例

假设一个任务由阶段1、阶段2、阶段3组成,每个线程要串行地执行阶段1、阶段2、阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1都执行完毕后才能进入阶段2,当所有线程的阶段2都执行完毕后才能进入阶段3,可用下面的代码实现:

  1. public static void main(String[] args) {
  2. // 等待两个线程同步
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
  4. ExecutorService executorService = Executors.newFixedThreadPool(2);
  5. // 运行两个子线程,当两个子线程的step1都执行完毕后才会执行step2
  6. // 当两个子线程的step2都执行完毕后才会执行step3
  7. for(int i = 0; i < 2; i++) {
  8. executorService.execute(new Runnable() {
  9. @Override
  10. public void run() {
  11. try{
  12. System.out.println(Thread.currentThread() + " step1");
  13. cyclicBarrier.await();
  14. System.out.println(Thread.currentThread() + " step2");
  15. cyclicBarrier.await();
  16. System.out.println(Thread.currentThread() + " step3");
  17. }catch (Exception e){
  18. e.printStackTrace();
  19. }
  20. }
  21. });
  22. }
  23. executorService.shutdown();
  24. }

输出如下:

  1. Thread[pool-1-thread-1,5,main] step1
  2. Thread[pool-1-thread-2,5,main] step1
  3. Thread[pool-1-thread-1,5,main] step2
  4. Thread[pool-1-thread-2,5,main] step2
  5. Thread[pool-1-thread-2,5,main] step3
  6. Thread[pool-1-thread-1,5,main] step3

类图结构

第10章 Java并发包中线程同步器原理剖析 - 图2

CyclicBarrier基于ReentrantLock实现,本质上还是基于AQS的。parties用于记录线程个数,表示多少个线程调用await方法后,所有线程才会冲破屏障往下运行。count一开始等于parties,当由线程调用await方法时会递减1,当count变成0时到达屏障点,所有调用await的线程会一起往下执行,此时要重置CyclicBarrier,再次令count=parties。

lock用于保证更新计数器count的原子性。lock的条件变量trip用于支持线程间使用await和signalAll进行通信。

以下是CyclicBarrier的构造函数:

  1. public CyclicBarrier(int parties) {
  2. this(parties, null);
  3. }
  4. // barrierAction为达到屏障点(parties个线程调用了await方法)时执行的任务
  5. public CyclicBarrier(int parties, Runnable barrierAction) {
  6. if (parties <= 0) throw new IllegalArgumentException();
  7. this.parties = parties;
  8. this.count = parties;
  9. this.barrierCommand = barrierAction;
  10. }

Generation的定义如下:

  1. private static class Generation {
  2. // 记录当前屏障是否可以被打破
  3. boolean broken = false;
  4. }

源码分析

int await()

当前线程调用该方法时会阻塞,直到满足以下条件之一才会返回:

  • parties个线程调用了await方法,也就是到达屏障点
  • 其他线程调用了当前线程的interrupt方法
  • Generation对象的broken标志被设置为true,抛出BrokenBarrierExecption
  1. public int await() throws InterruptedException, BrokenBarrierException {
  2. try {
  3. // false表示不设置超时时间,此时后面参数无意义
  4. // dowait稍后具体分析
  5. return dowait(false, 0L);
  6. } catch (TimeoutException toe) {
  7. throw new Error(toe); // cannot happen
  8. }
  9. }

boolean await(long timeout, TimeUnit unit)

相比于await(),等待超时会返回false。

  1. public int await(long timeout, TimeUnit unit)
  2. throws InterruptedException,
  3. BrokenBarrierException,
  4. TimeoutException {
  5. // 设置了超时时间
  6. // dowait稍后分析
  7. return dowait(true, unit.toNanos(timeout));
  8. }

int dowait(boolean timed, long nanos)

  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. try {
  7. final Generation g = generation;
  8. // 屏障已被打破则抛出异常
  9. if (g.broken)
  10. throw new BrokenBarrierException();
  11. // 线程中断则抛出异常
  12. if (Thread.interrupted()) {
  13. // 打破屏障
  14. // 会做三件事
  15. // 1. 设置generation的broken为true
  16. // 2. 重置count为parites
  17. // 3. 调用signalAll激活所有等待线程
  18. breakBarrier();
  19. throw new InterruptedException();
  20. }
  21. int index = --count;
  22. // 到达了屏障点
  23. if (index == 0) {
  24. boolean ranAction = false;
  25. try {
  26. final Runnable command = barrierCommand;
  27. if (command != null)
  28. // 执行每一次到达屏障点所需要执行的任务
  29. command.run();
  30. ranAction = true;
  31. // 重置状态,进入下一次屏障
  32. nextGeneration();
  33. return 0;
  34. } finally {
  35. if (!ranAction)
  36. breakBarrier();
  37. }
  38. }
  39. // 如果index不为0
  40. // loop until tripped, broken, interrupted, or timed out
  41. for (;;) {
  42. try {
  43. if (!timed)
  44. trip.await();
  45. else if (nanos > 0L)
  46. nanos = trip.awaitNanos(nanos);
  47. } catch (InterruptedException ie) {
  48. // 执行此处时,有可能其他线程已经调用了nextGeneration方法
  49. // 此时应该使当前线程正常执行下去
  50. // 否则打破屏障
  51. if (g == generation && ! g.broken) {
  52. breakBarrier();
  53. throw ie;
  54. } else {
  55. // We're about to finish waiting even if we had not
  56. // been interrupted, so this interrupt is deemed to
  57. // "belong" to subsequent execution.
  58. Thread.currentThread().interrupt();
  59. }
  60. }
  61. if (g.broken)
  62. throw new BrokenBarrierException();
  63. // 如果此次屏障已经结束,则正常返回
  64. if (g != generation)
  65. return index;
  66. // 如果是因为超时,则打破屏障并抛出异常
  67. if (timed && nanos <= 0L) {
  68. breakBarrier();
  69. throw new TimeoutException();
  70. }
  71. }
  72. } finally {
  73. lock.unlock();
  74. }
  75. }
  76. // 打破屏障
  77. private void breakBarrier() {
  78. // 设置打破标志
  79. generation.broken = true;
  80. // 重置count
  81. count = parties;
  82. // 唤醒所有等待的线程
  83. trip.signalAll();
  84. }
  85. private void nextGeneration() {
  86. // 唤醒当前屏障下所有被阻塞的线程
  87. trip.signalAll();
  88. // 重置状态,进入下一次屏障
  89. count = parties;
  90. generation = new Generation();
  91. }

Semaphore原理探究

Semaphore信号量也是一个同步器,与CountDownLatch和CyclicBarrier不同的是,它内部的计数器是递增的,并且在初始化时可以指定计数器的初始值(通常为0),但不必知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。

示例

  1. public static void main(String[] args) throws InterruptedException {
  2. final int THREAD_COUNT = 2;
  3. // 初始信号量为0
  4. Semaphore semaphore = new Semaphore(0);
  5. ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
  6. for (int i = 0; i < THREAD_COUNT; i++){
  7. executorService.execute(new Runnable() {
  8. @Override
  9. public void run() {
  10. System.out.println(Thread.currentThread() + " over");
  11. // 信号量+1
  12. semaphore.release();
  13. }
  14. });
  15. }
  16. // 当信号量达到2时才停止阻塞
  17. semaphore.acquire(2);
  18. System.out.println("all child thread over!");
  19. executorService.shutdown();
  20. }

类图结构

第10章 Java并发包中线程同步器原理剖析 - 图3

由图可知,Semaphore还是使用AQS实现的,并且可以选取公平性策略(默认为非公平性的)。

源码解析

void acquire()

表示当前线程希望获取一个信号量资源,如果当前信号量大于0,则当前信号量的计数减1,然后该方法直接返回。否则如果当前信号量等于0,则被阻塞。

  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  4. public final void acquireSharedInterruptibly(int arg)
  5. throws InterruptedException {
  6. // 可以被中断
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. // 调用Sync子类方法尝试获取,这里根据构造函数决定公平策略
  10. if (tryAcquireShared(arg) < 0)
  11. // 将当前线程放入阻塞队列,然后再次尝试
  12. // 如果失败则挂起当前线程
  13. doAcquireSharedInterruptibly(arg);
  14. }

tryAcquireShared由Sync的子类实现以根据公平性采取相应的行为。

以下是非公平策略NofairSync的实现:

  1. protected int tryAcquireShared(int acquires) {
  2. return nonfairTryAcquireShared(acquires);
  3. }
  4. final int nonfairTryAcquireShared(int acquires) {
  5. for (;;) {
  6. int available = getState();
  7. int remaining = available - acquires;
  8. // 如果剩余信号量小于0直接返回
  9. // 否则如果更新信号量成功则返回
  10. if (remaining < 0 ||
  11. compareAndSetState(available, remaining))
  12. return remaining;
  13. }
  14. }

假设线程A调用了acquire方法尝试获取信号量但因信号量不足被阻塞,这时线程B通过release增加了信号量,此时线程C完全可以调用acquire方法成功获取到信号量(如果信号量足够的话),这就是非公平性的体现。

下面是公平性的实现:

  1. protected int tryAcquireShared(int acquires) {
  2. for (;;) {
  3. // 关键在于先判断AQS队列中是否已经有元素要获取信号量
  4. if (hasQueuedPredecessors())
  5. return -1;
  6. int available = getState();
  7. int remaining = available - acquires;
  8. if (remaining < 0 ||
  9. compareAndSetState(available, remaining))
  10. return remaining;
  11. }
  12. }

hasQueuedPredecessors方法(可参看第6章 Java并发包中锁原理剖析)用于判断当前线程的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS中,否则尝试去获取。

void acquire(int permits)

可获取多个信号量。

  1. public void acquire(int permits) throws InterruptedException {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.acquireSharedInterruptibly(permits);
  4. }

void acquireUninterruptibly()

不对中断进行响应。

  1. public void acquireUninterruptibly() {
  2. sync.acquireShared(1);
  3. }

void acquireUninterruptibly(int permits)

不对中断进行相应并且可获取多个信号量。

  1. public void acquireUninterruptibly(int permits) {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.acquireShared(permits);
  4. }

void release()

使信号量加1,如果当前有线程因为调用acquire方法被阻塞而被放入AQS中的话,会根据公平性策略选择一个信号量个数能被满足的线程进行激活。

  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. // 尝试释放资源(增加信号量)
  6. if (tryReleaseShared(arg)) {
  7. // 释放资源成功则根据公平性策略唤醒AQS中阻塞的线程
  8. doReleaseShared();
  9. return true;
  10. }
  11. return false;
  12. }
  13. protected final boolean tryReleaseShared(int releases) {
  14. for (;;) {
  15. int current = getState();
  16. int next = current + releases;
  17. if (next < current) // overflow
  18. throw new Error("Maximum permit count exceeded");
  19. if (compareAndSetState(current, next))
  20. return true;
  21. }
  22. }

void release(int permits)

可增加多个信号量。

  1. public void release(int permits) {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.releaseShared(permits);
  4. }

更多

相关笔记:《Java并发编程之美》阅读笔记