前言

java5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁的功能,它提供了与synchronized关键字类似的同步功能。既然有了synchronized这种内置的锁功能,为何要新增Lock接口?先来想象一个场景:手把手的进行锁获取和释放,先获得锁A,然后再获取锁B,当获取锁B后释放锁A同时获取锁C,当锁C获取后,再释放锁B同时获取锁D,以此类推,这种场景下,synchronized关键字就不那么容易实现了,而使用Lock却显得容易许多。

定义

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. private final Sync sync;
  3. abstract static class Sync extends AbstractQueuedSynchronizer {
  4. /**
  5. * Performs {@link Lock#lock}. The main reason for subclassing
  6. * is to allow fast path for nonfair version.
  7. */
  8. abstract void lock();
  9. /**
  10. * Performs non-fair tryLock. tryAcquire is implemented in
  11. * subclasses, but both need nonfair try for trylock method.
  12. */
  13. final boolean nonfairTryAcquire(int acquires) {
  14. final Thread current = Thread.currentThread();
  15. int c = getState();
  16. if (c == 0) {
  17. if (compareAndSetState(0, acquires)) {
  18. setExclusiveOwnerThread(current);
  19. return true;
  20. }
  21. }
  22. else if (current == getExclusiveOwnerThread()) {
  23. int nextc = c + acquires;
  24. if (nextc < 0) // overflow
  25. throw new Error("Maximum lock count exceeded");
  26. setState(nextc);
  27. return true;
  28. }
  29. return false;
  30. }
  31. protected final boolean tryRelease(int releases) {
  32. int c = getState() - releases;
  33. if (Thread.currentThread() != getExclusiveOwnerThread())
  34. throw new IllegalMonitorStateException();
  35. boolean free = false;
  36. if (c == 0) {
  37. free = true;
  38. setExclusiveOwnerThread(null);
  39. }
  40. setState(c);
  41. return free;
  42. }
  43. }
  44. //默认非公平锁
  45. public ReentrantLock() {
  46. sync = new NonfairSync();
  47. }
  48. //fair为false时,采用公平锁策略
  49. public ReentrantLock(boolean fair) {
  50. sync = fair ? new FairSync() : new NonfairSync();
  51. }
  52. public void lock() {
  53. sync.lock();
  54. }
  55. public void unlock() { sync.release(1);}
  56. public Condition newCondition() {
  57. return sync.newCondition();
  58. }
  59. ...
  60. }

从源代码可以Doug lea巧妙的采用组合模式把lock和unlock方法委托给同步器完成。

使用方式

  1. Lock lock = new ReentrantLock();
  2. Condition condition = lock.newCondition();
  3. lock.lock();
  4. try {
  5. while(条件判断表达式) {
  6. condition.wait();
  7. }
  8. // 处理逻辑
  9. } finally {
  10. lock.unlock();
  11. }

需要显示的获取锁,并在finally块中显示的释放锁,目的是保证在获取到锁之后,最终能够被释放。

非公平锁实现

在非公平锁中,每当线程执行lock方法时,都尝试利用CAS把state从0设置为1。

那么Doug lea是如何实现锁的非公平性呢?
我们假设这样一个场景:

  1. 持有锁的线程A正在running,队列中有线程BCDEF被挂起并等待被唤醒;
  2. 在某一个时间点,线程A执行unlock,唤醒线程B;
  3. 同时线程G执行lock,这个时候会发生什么?线程B和G拥有相同的优先级,这里讲的优先级是指获取锁的优先级,同时执行CAS指令竞争锁。如果恰好线程G成功了,线程B就得重新挂起等待被唤醒。

通过上述场景描述,我们可以看书,即使线程B等了很长时间也得和新来的线程G同时竞争锁,如此的不公平。

  1. static final class NonfairSync extends Sync {
  2. /**
  3. * Performs lock. Try immediate barge, backing up to normal
  4. * acquire on failure.
  5. */
  6. final void lock() {
  7. if (compareAndSetState(0, 1))
  8. setExclusiveOwnerThread(Thread.currentThread());
  9. else
  10. acquire(1);
  11. }
  12. public final void acquire(int arg) {
  13. if (!tryAcquire(arg) &&
  14. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  15. selfInterrupt();
  16. }
  17. protected final boolean tryAcquire(int acquires) {
  18. return nonfairTryAcquire(acquires);
  19. }
  20. }

下面我们用线程A和线程B来描述非公平锁的竞争过程。

  1. 线程A和B同时执行CAS指令,假设线程A成功,线程B失败,则表明线程A成功获取锁,并把同步器中的exclusiveOwnerThread设置为线程A。

  2. 竞争失败的线程B,在nonfairTryAcquire方法中,会再次尝试获取锁,

    Doug lea会在多处尝试重新获取锁,应该是在这段时间如果线程A释放锁,线程B就可以直接获取锁而不用挂起

    。完整的执行流程如下:

    深入理解ReentrantLock - 图1

公平锁实现

在公平锁中,每当线程执行lock方法时,如果同步器的队列中有线程在等待,则直接加入到队列中。
场景分析:

  1. 持有锁的线程A正在running,对列中有线程BCDEF被挂起并等待被唤醒;
  2. 线程G执行lock,队列中有线程BCDEF在等待,线程G直接加入到队列的对尾。

所以每个线程获取锁的过程是公平的,等待时间最长的会最先被唤醒获取锁。

  1. static final class FairSync extends Sync {
  2. private static final long serialVersionUID = -3000897897090466540L;
  3. final void lock() {
  4. acquire(1);
  5. }
  6. /**
  7. * Fair version of tryAcquire. Don't grant access unless
  8. * recursive call or no waiters or is first.
  9. */
  10. protected final boolean tryAcquire(int acquires) {
  11. final Thread current = Thread.currentThread();
  12. int c = getState();
  13. if (c == 0) {
  14. if (!hasQueuedPredecessors() &&
  15. compareAndSetState(0, acquires)) {
  16. setExclusiveOwnerThread(current);
  17. return true;
  18. }
  19. }
  20. else if (current == getExclusiveOwnerThread()) {
  21. int nextc = c + acquires;
  22. if (nextc < 0)
  23. throw new Error("Maximum lock count exceeded");
  24. setState(nextc);
  25. return true;
  26. }
  27. return false;
  28. }
  29. }

重入锁实现

重入锁,即线程可以重复获取已经持有的锁。在非公平和公平锁中,都对重入锁进行了实现。

  1. if (current == getExclusiveOwnerThread()) {
  2. int nextc = c + acquires;
  3. if (nextc < 0)
  4. throw new Error("Maximum lock count exceeded");
  5. setState(nextc);
  6. return true;
  7. }

条件变量Condition

条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. /** First node of condition queue. */
  3. private transient Node firstWaiter;
  4. /** Last node of condition queue. */
  5. private transient Node lastWaiter;
  6. public final void signal() {}
  7. public final void signalAll() {}
  8. public final void awaitUninterruptibly() {}
  9. public final void await() throws InterruptedException {}
  10. }
  1. Synchronized中,所有的线程都在同一个object的条件队列上等待。而ReentrantLock中,每个condition都维护了一个条件队列。
  2. 每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。
  3. Condition接口定义的方法,await对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆。

先看一个condition在生产者消费者的应用场景:

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. /**
  5. * Created by j_zhan on 2016/7/13.
  6. */
  7. public class Queue<T> {
  8. private final T[] items;
  9. private final Lock lock = new ReentrantLock();
  10. private Condition notFull = lock.newCondition();
  11. private Condition notEmpty = lock.newCondition();
  12. private int head, tail, count;
  13. public Queue(int maxSize) {
  14. items = (T[]) new Object[maxSize];
  15. }
  16. public Queue() {
  17. this(10);
  18. }
  19. public void put(T t) throws InterruptedException {
  20. lock.lock();
  21. try {
  22. while (count == items.length) {
  23. //数组满时,线程进入等待队列挂起。线程被唤醒时,从这里返回。
  24. notFull.await();
  25. }
  26. items[tail] = t;
  27. if (++tail == items.length) {
  28. tail = 0;
  29. }
  30. ++count;
  31. notEmpty.signal();
  32. } finally {
  33. lock.unlock();
  34. }
  35. }
  36. public T take() throws InterruptedException {
  37. lock.lock();
  38. try {
  39. while (count == 0) {
  40. notEmpty.await();
  41. }
  42. T o = items[head];
  43. items[head] = null;//GC
  44. if (++head == items.length) {
  45. head = 0;
  46. }
  47. --count;
  48. notFull.signal();
  49. return o;
  50. } finally {
  51. lock.unlock();
  52. }
  53. }
  54. }

假设线程AB在并发的往items中插入数据,当items中元素存满时。如果线程A获取到锁,继续添加数据,满足count == items.length条件,导致线程A执行await方法。
ReentrantLock是独占锁,同一时刻只有一个线程能获取到锁,所以在lock.lock()和lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。在Queue类中,不管take还是put,在线程持有锁之后只有await()方法有可能释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁。具体实现如下:

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }
  19. private Node addConditionWaiter() {
  20. Node t = lastWaiter;
  21. // If lastWaiter is cancelled, clean out.
  22. if (t != null && t.waitStatus != Node.CONDITION) {
  23. unlinkCancelledWaiters();
  24. t = lastWaiter;
  25. }
  26. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  27. if (t == null)
  28. firstWaiter = node;
  29. else
  30. t.nextWaiter = node;
  31. lastWaiter = node;
  32. return node;
  33. }

await实现逻辑:

  1. 将线程A加入到条件等待队列中,如果最后一个节点是取消状态,则从对列中删除。
  2. 线程A释放锁,实质上是线程A修改AQS的状态state为0,并唤醒AQS等待队列中的线程B,线程B被唤醒后,尝试获取锁,接下去的过程就不重复说明了。
  3. 线程A释放锁并唤醒线程B之后,如果线程A不在AQS的同步队列中,线程A将通过LockSupport.park进行挂起操作。
  4. 随后,线程A等待被唤醒,当线程A被唤醒时,会通过acquireQueued方法竞争锁,如果失败,继续挂起。如果成功,线程A从await位置恢复。

假设线程B获取锁之后,执行了take操作和条件变量的signal,signal通过某种实现唤醒了线程A,具体实现如下:

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. doSignal(first);
  7. }
  8. private void doSignal(Node first) {
  9. do {
  10. if ((firstWaiter = first.nextWaiter) == null)
  11. lastWaiter = null;
  12. first.nextWaiter = null;
  13. } while (!transferForSignal(first) &&
  14. (first = firstWaiter) != null);
  15. }
  16. final boolean transferForSignal(Node node) {
  17. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  18. return false;
  19. Node p = enq(node); //线程A插入到AQS的等待队列中
  20. int ws = p.waitStatus;
  21. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  22. LockSupport.unpark(node.thread);
  23. return true;
  24. }

signal实现逻辑:

  1. 接着上述场景,线程B执行了signal方法,取出条件队列中的第一个非CANCELLED节点线程,即线程A。另外,signalAll就是唤醒条件队列中所有非CANCELLED节点线程。遇到CANCELLED线程就需要将其从队列中删除。
  2. 通过CAS修改线程A的waitStatus,表示该节点已经不是等待条件状态,并将线程A插入到AQS的等待队列中。
  3. 唤醒线程A,线程A和别的线程进行锁的竞争。

总结

  1. ReentrantLock提供了内置锁类似的功能和内存语义。
  2. 此外,ReetrantLock还提供了其它功能,包括定时的锁等待、可中断的锁等待、公平性、以及实现非块结构的加锁、Condition,对线程的等待和唤醒等操作更加灵活,一个ReentrantLock可以有多个Condition实例,所以更有扩展性,不过ReetrantLock需要显示的获取锁,并在finally中释放锁,否则后果很严重。
  3. ReentrantLock在性能上似乎优于Synchronized,其中在jdk1.6中略有胜出,在1.5中是远远胜出。那么为什么不放弃内置锁,并在新代码中都使用ReetrantLock?
  4. 在java1.5中, 内置锁与ReentrantLock相比有例外一个优点:在线程转储中能给出在哪些调用帧中获得了哪些锁,并能够检测和识别发生死锁的线程。Reentrant的非块状特性任然意味着,获取锁的操作不能与特定的栈帧关联起来,而内置锁却可以。
  5. 因为内置锁时JVM的内置属性,所以未来更可能提升synchronized而不是ReentrantLock的性能。例如对线程封闭的锁对象消除优化,通过增加锁粒度来消除内置锁的同步。