深入剖析 ReentrantLock 公平锁与非公平锁源码实现



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

原文地址: https://blog.csdn.net/lsgqjh/article/details/63685058

ReentrantLock 是 JUC 包中重要的并发工具之一,支持中断和超时、还支持尝试机制获取锁, 并且是一种通过编程控制的可重入锁,尽可能减少死锁问题。本文以公平与非公平锁的加锁释放锁过程成为主线,分析 ReentrantLock 的重要内容。

准备知识简介

ReentrantLock 类图:
7d17b337ea46429eba0af2e00689b787.png

c63badf948644efcb16eeaefeff6ac6f.png

Node 结点:作为获取锁失败线程的包装类, 组合了 Thread 引用, 实现为 FIFO 双向队列。 下图为 Node 结点的属性描述
26c6cfea16224251bab82653a2b80c77.png

7ada27dcd65d46238904a50f91222bd6.png

锁的创建

非公平锁(默认)

final ReentrantLock lock = new ReentrantLock();
final ReentrantLock lock = new ReentrantLock(false);

公平锁

final ReentrantLock lock = new ReentrantLock(true);

非公平锁加锁过程

lock()方法的逻辑: 多个线程调用 lock()方法, 如果当前state为 0, 说明当前没有线程占有锁, 那么只有一个线程会 CAS 获得锁, 并设置此线程为独占锁线程。那么其它线程会调用 acquire 方法来竞争锁(后续会全部加入同步队列中自旋或挂起)。当有其它线程 A 又进来想要获取锁时, 恰好此前的某一线程恰好释放锁, 那么 A 会恰好在同步队列中所有等待获取锁的线程之前抢先获取锁。也就是说所有已经在同步队列中的尚未被 取消获取锁 的线程是绝对保证串行获取锁,而其它新来的却可能抢先获取锁。后面代码解释。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
	/**
	* 此为AQS的protected方法,允许子类重写, 在这里被NonfairSync类重写
	*/
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

先看 acquire 方法:
逻辑:tryAcquire方法仍然尝试获取锁(快速获取锁机制),成功返回 false,如果没有成功, 那么就将此线程包装成 Node 加入同步队列尾部。。Node.EXCLUSIVE 为 null 表示这是独占锁,如果为读写锁,那就是 共享模式 (shared)。

    public final void acquire(int arg) {
    // tryAcquire()方法也是让新来的线程进行第二次插队的机会!!
	//如果再次获取锁还不成功才会放到队列
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

再看addWaiter 逻辑:

  1. Node 包装当前线程
  2. pred 尾指针不为 null,即队列不为空, 则快速 CAS 添加到尾结点
  3. 如果队列为空, 或 CAS 设置失败, 则调用 enq 强制入队
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

下面是enq方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 判断队列是否为空,空队列应该先初始化头节点
                if (compareAndSetHead(new Node()))
                    tail = head;//头尾共同指向头结点
            } else {//CAS添加并允许失败, 走for(;;)
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;//返回新插入节点的前置节点
                }
            }
        }
    }

入队完成后看下acquireQueued 逻辑:

failed 标记最终 acquire 是否成功, interrupted标记是否曾被挂起过。注意到 for(;;) 跳出的唯一条件就是if (p == head && tryAcquire(arg)) 即当前线程结点是头结点且获取锁成功。从这里我们应该看到,这是一个线程第三次又想着尝试快速获取锁:虽然此时该节点已被加入等待队列,在进行睡眠之前又通过p == head && tryAcquire(arg)方法看看能否获取锁。也就是说只有该线程结点的所有 有效的前置结点都拿到过锁了,当前结点才有机会争夺锁,如果失败了那就通过shouldParkAfterFailedAcquire方法判断是否可以挂起当前结点,等待响应中断。观察 每次调用tryAcquire方法的时机,可以看出作者优化意图:
1. 尽量在没入队的时候拿到锁,避免过多队列操作维护成本
2. 尽量在睡眠前拿到锁,避免过多上下文切换

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 获取前置结点
                final Node p = node.predecessor();
                //如果第一次循环就获取成功那么返回的interrupted是false,不需要自我中断。
	    //否则 说明在获取到同步状态之前经历过挂起(返回true)那么就需要自我中断,可以 看acquire方法中的代码。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果当前node线程是可挂起的,就调用parkAndCheckInterrupt挂起 ,interrupted设置为true,表示曾经被中断过
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)//如果tryAcquire出现异常那么取消当前结点的获取
                cancelAcquire(node);
        }
    }

看下shouldParkAfterFailedAcquire 方法的逻辑:

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
      /*
      如果前置结点是SIGNAL状态, 那么当前置结点是执行完成可以唤醒后续结点的,
	  此时可以安全的挂起当前结点, 不需要进行不必要的for(;;),前置结点自然会通知。
      */
            return true;
        if (ws > 0) {
      //   如果ws>0说明前置结点是被自己取消获取同步的结点(只有线程本身可以取消自己)。那么do while循环一直往头结点方向找waitStatus < 0;含义就是去除了FIFO队列中的已经自我取消申请同步状态的线程。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
          
             //如果是其它状态waitStatus要不是0或着PROPAGATE,意味着当前结点需要一个signal但是尚未park,所以调用者必须确保在挂起之前不能获取同步状态, 所以return false。
			 并强行设置前置结点为SIGNAL。之所以CAS设置是因为,pred线程也会操作cancelAcquire来取消自己和node线程对pred的操作产生竞争条件。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
	

看 ReentrantLock 中的 NonfairSync 对同步器方法 tryAcquire 的重写:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

下面是nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { //如果没有线程获取锁,则当前线程CAS获取锁。并设置自己为当前锁的独占线程
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
			// 如果存在锁竞争,判断获取锁的线程是否是当前线程, 因为ReentrantLock是可重入锁,
			//用state变量表示重入次数,即:使当前state+1;
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
			//如果不是当前线程,则不能获取同步状态
            return false;
        }

公平锁的加锁过程

之前实习面美团金融的时候,被问到ReentrantLock中公平锁和 非公平锁在哪里体现的,问的一脸懵逼,挂电话后打开代码开始自我检讨。我们对比下两者对tryAcquire 的实现:

lock方法对比非公平锁, 没有了 if else 也就意味着新来的线程没有插队的机会, 所有来的线程必须扔到队列尾部, acquire 方法也会像非公平锁一样首先调用 tryAcquire 插队试试,但是只有队列为空或着本身就是 head,那么才可能成功,如果 队列非空那么肯定被扔到队列尾部去了, 插个毛线。

 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
      //只有队列为空或着当前tryAcquire的node线程是head
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();//拿到当前的同步状态, 如果是无锁状态, 则进行hasQueuedPredecessors方法逻辑
			//逻辑含义是:当前队列为空或本身是同步队列中的头结点。如果满足条件CAS获取同步状态,并设置当前独占线程。
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //重入锁逻辑 和非公平锁一样 不解释
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

hasQueuedPredecessors()
和非公平锁对比多了这个方法逻辑, 也就意味着没有了新来线程插队的情况,保证了公平锁的获取串行化。

 public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

公平锁和非公平锁的释放逻辑

公平锁和非公平锁的释放逻辑是一致的 都是通过sync.release(1);
ReentrantLock中的unlock()

public void unlock() {
        sync.release(1);
    }

AbstractQueuedSynchronizer中的release方法:
release 之后还要调用unparkSuccessor() 方法唤醒后继结点

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;// 当前这个头结点是同步队列中的头结点
            //并且不一定当前线程是要释放锁的执行线程, 因为可能是非公平锁的释放,压根就没插入队列就特么插队获得同步了, 总之就是拿到头结点了。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

释放锁的逻辑tryRelease
只有拥有锁的线程才有资格tryRelease,所以这个方法内不需要任何同步机制

  protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //  只有获得锁的线程自己才能释放锁
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {//c==0说明已经无锁
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);//这里就没有CAS,好像reactJs里的 state状态管理~
            //否则更新state为state-1, 也就是加锁多少次,就得释放多少次, lock unlock配对使用。
         
            return free;
        }

unparkSuccessor 方法逻辑:
某线程释放锁后,要唤醒队列中的首结点或者后继结点来偷锁,当然唤醒后可能会参与和一些新来的线程 pk 下。会发现其可能会存在一个从 tail 向前查找的流程,假如刚好这时执行这个流程,从 tail 向 head 查找节点显然就会存在问题,所以 1 和 2 对调的流程在并发下也是存在问题的。unparkSuccessor 在查找 head 的下一个有效节点的时候,没有从 head 到 tail 方向查找,而是反方向从 tail 向 head 查找,正常逻辑肯定是从 head 向 tail 方向查找速度更快

   private void unparkSuccessor(Node node) {
    
        int ws = node.waitStatus;
        if (ws < 0)//同样是因为node线程和当前线程有竞争,node本身也可以修改自身状态嘛,因此CAS
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //发现这里竟然从tail结点往前找最靠近head结点的且处于非取消状态的结点?这不增加了遍历复杂度么? 留个疑问下面解释!
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //s是后继结点 因为后继结点很可能是满足条件的, 如果满足即s!=NULL 直接unpark后继结点。
        if (s != null)
            LockSupport.unpark(s.thread);
    }

为什么在unparkSuccessor 方法中对于需要unpark的线程结点进行从tail 往前查找?再来看一下enq 方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

将上面代码梳理一下,大致分为三个步骤:
​ 1、首先node.prev = t ,将当前 node 的 prev 指向 tail 节点;
​ 2、CAS 将tail 指向新 node 结点;
​ 3、将之前 tail 节点的 next 指向当前节点
示意图如下:

假如将追加节点的三个步骤顺序调换下,先将 tail 节点的 next 指向当前节点,然后 cas 原子修改 tail 指向,最后再来修改当前节点的 prev 指向,即将上面的 1 和 3 对调一下,会出现上面情况呢?

将 tail 节点的 next 指向当前节点操作后,紧接着会执行 cas 操作修改 tail 指向当前节点,由于存在多线程并发问题,即可能会存在多个线程同时申请锁资源,假如现在 t1、t2 两个线程都同时做上面两个步骤:
​ 1、t1 线程修改 next 后,紧接着 t2 线程也修改 next 指向,导致会把 t1 修改的指向覆盖;
​ 2、这时 t1 线程做 cas 替换 tail 指向成功后,t2 也来做 cas 操作就会失败;
​ 3、t1 由于 cas 操作成功,最后修改 prev 指向

可以发现,由于并发导致追加的 t1 节点是存在问题的,正常情况下 Node1 的 next 应该指向 t1 节点,但是却被 t2 节点覆盖了。所以,1 和 3 对调是在并发下是存在问题的。

假如 1 和 2 对调,先进行 cas 操作,然后修改 prev,最后再来修改 next 又会怎么样呢?首先通过 cas 原子操作将 tail 指向当前节点,示意图如下:
99e2d009efc94c6f89a37adaf705fb8c.png

tail 节点这时还是孤立的节点,prev 和 next 都还没有指向,tail 节点和其它节点之间没有关联了,这时如果其它线程需要遍历这个双向链表就比较危险了

现在我们来分析下为什么源码中这个顺序执行在并发下就不会存在问题。现在假设两个线程同一时间都没有获取到锁,都需要追加到 Sync Queue 队列尾部,大致流程如下:

  1. 线程 t1 的节点设置 prev 指向 tail,线程 t2 节点同时也设置 prev 指向 tail,这时就不会出现上面如果先设置 next 就会导致后设置把之前设置覆盖情况,因为如果先设置 next 是对 Node1 进行操作,存在多个线程对 Node1 同时操作导致状态不一致问题,而如果这里先设置 prev,操作对象时线程本身的节点,是不存在多线程并发问题,示意图如下:

  1. 这时 t1 和 t2 都进行 cas 原子操作,反正会有一个线程会操作成功,假如是 t1 线程操作成功,然后就可以顺利的设置 Node1 节点的 next 指向 t1,因为只会存在一个线程操作成功,所以对 Node1 的操作此时也不会存在并发问题,由于 t1 的 cas 操作成功导致 t2 线程进行 cas 操作必然失败,此刻示意图如下:

  1. 由于 t2 线程 cas 操作失败,因此不再继续操作 Node1 的 next 指向自己,而是进入 enq() 方法中,其源码如下,enq 方法中通过 cas+ 无限循环方式保证 t2 节点一定会被追加到 Sync Queue 尾部的,每次循环都是重新获取最新的 tail,然后将 t2 的 prev 指向这个最新的 tail,然后通过 cas 操作将 tail 指向自己,最后在将之前 tail 节点的 next 指向 t2 节点,这个案例中获取的最新 tail 就是 t1 节点了,所以 t2 节点会被追加到 t1 节点后,这样就能保证即使在高并发下依然可以实现节点正常添加,而不会像之前出现状态不一致情况,示意图如下:
    2e203a8612f84291aadb314afb2db7b9.png

  2. 上面分析 unparkSuccessor()在查找 head 的下一个有效节点的时候,没有从 head 到 tail 方向查找,而是反方向从 tail 向 head 查找,如果你对我刚才分析得到逻辑理解透彻的话,就比较好解释了。比如:t1 设置 prev 指向 Node1,然后 cas 操作将 tail 指向了 t1,这时 Queue 的结构如下,假如这时候执行 unparkSuccessor(),Node0 查找它的后驱节点为 Node1,假如 Node1 是无效节点,Node1 需要继续查找它的后驱节点,但是这时 Node1 的 next 并没有设置,是无法查找到的,所以必须从 tail 向 head 方向查找才行。

取消获取锁cancelAcquire() 方法

取消获取锁意味着结点的出队。包括三个场景下的出队:

  1. node 是 tail
  2. node 既不是 tail,也不是 head 的后继节点
  3. node 是 head 的后继节点
 private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // 跳过所有被取消的前置结点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        // `` 如果node是tail,更新tail为pred,并使pred.next指向null``
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 如果node既不是tail,又不是head的后继节点
            int ws;
            // 如果前置结点状态是SIGNAL || 如果不是CAS设为SIGNAL 那么接下来的执行自然就会唤醒下一个结点
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
				//设置next-link 使node的通过上面while循环找到的有效的前继结点指向node的下一结点
				//之所以还是CAS,使下一个结点还是会可能取消自己的
                    compareAndSetNext(pred, predNext, next);
            } else {  // 如果node是head的后继节点,则直接唤醒node的后继节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
            // 最终把取消acquire的 node的next指针指向的它自己
        }
    }

场景 1. node 是 tail

node 出队的过程如下图所示。

cfbb15985794420d83e41b7f5bdf3eca.png

  1. cancelAcquire()调用 compareAndSetTail() 方法将 tail 指向 pred

  2. cancelAcquire()调用 compareAndSetNext() 方法将 pred 的 next 指向空

场景 2. node 既不是 tail,也不是 head 的后继节点

node 出队过程如下图所示:
7ca429ecf00c49659508d962c4d88d1b.png

cancelAcquire()调用了 compareAndSetNext() 方法将 pred 指向 successor。将 successor 指向 pred 是谁干的?是别的线程做的。当别的线程在调用 cancelAcquire()或者 shouldParkAfterFailedAcquire() 时,会根据 prev 指针跳过被 cancel 掉的前继节点,同时,会调整其遍历过的 prev 指针。代码类似这样;

cancelAcquire()中:
 // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
			
shouldParkAfterFailedAcquire中:
  do {
       node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
            pred.next = node;
			

场景 3 node 是 head 的后继节点

node 出队的过程如下图所示(图中用 node* 表示前继节点)

97aad488b40a429caad02185648f89ad.png

cancelAcquire()调用了unparkSuccessor(),不过,unparkSuccessor()中并没有对队列做任何调整呀。
比场景 2 还糟糕,这次,cancelAcquire()对于出队这件事情可以说是啥都没干。
出队操作实际上是由unparkSuccessor()唤醒的线程执行的。
unparkSuccessor()会唤醒successor结点,当被唤醒后将会实际执行出队操作。
现在需要搞清楚successor是从什么地方恢复执行的呢?这要看successor是在哪里被挂起的。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

successor线程是从parkAndCheckInterrupt被挂起的,恢复执行时,也从此处开始重新执行。successor将会重新执行 for 循环,此时,node 尚未出队,successor的前继节点依然是 node,而不是 head。所以,successor会执行到 shouldParkAfterFailedAcquire()处。而从场景 2 中可以得知,shouldParkAfterFailedAcquire() 中将会调整 successor 的 prev 指针(同时也调整 head 的 next 指针),从而完成了 node 的出队操作
fc5bcbd2274a4772ae1b1a5f16bc0f5e.png

总结

悲观锁在 Java 中就是我们所熟知的锁,实现方式主要分为两种:synchronized 和 Lock,而乐观锁的实现主要通过 CAS 操作实现。这里我们来比较下 synchronized 和 Lock 方式的大致差别:

  • synchronized 主要依赖 JVM 底层实现,而 Lock 是通过编码方式实现,其实现方式差别还是比较大,可以参考之前的一个文章https://blog.csdn.net/lsgqjh/article/details/61915074
  • synchronized 由于其简单方便,只需要声明在方法、代码块上即可,主要是不需要关心锁释放问题,在一般的编程中使用量还是比较大的,但是在真正的并发编程系统中,Lock 方式明显优于 synchronized:
    • 在高版本 JDK 中,已经对 synchronized 进行了优化,synchronized 和 Lock 方式在性能方面差别已不太明显
    • synchronized 最致命的缺陷是:synchronized 不支持中断和超时,也就是说通过 synchronized 一旦被阻塞住,如果一直无法获取到所资源就会一直被阻塞,即使中断也没用,这对并发系统的性能影响太大了;Lock 支持中断和超时、还支持尝试机制获取锁,对 synchronized 进行了很好的扩展,所以从灵活性上 Lock 是明显优于 synchronized 的

参考连接:

[1] Java 并发编程的艺术
[2] https://www.cnblogs.com/yangming1996/p/7612653.html
[3]https://blog.csdn.net/lsgqjh/article/details/61915074
[4]https://blog.reactor.top/2018/01/31/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E9%94%81%E4%B9%8BReentrantLock%E6%80%BB%E7%BB%93/
[5]https://www.jianshu.com/p/01f2046aab64


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com