深入剖析 ReentrantLock 公平锁与非公平锁源码实现


本文地址:http://www.6aiq.com/article/1547289187814
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

原文地址: https://blog.csdn.net/lsgqjh/article/details/63685058

ReentrantLock 是 JUC 包中重要的并发工具之一,支持中断和超时、还支持尝试机制获取锁, 并且是一种通过编程控制的可重入锁,尽可能减少死锁问题。本文以公平与非公平锁的加锁释放锁过程成为主线,分析 ReentrantLock 的重要内容。有问题可以加我微信。

目标

  1. 掌握 AQS 基本原理,理解 ReentrantLock 的具体实现
  2. 挖掘体会 AQS 精妙设计

总之,学习 AQS 过程,要多思考为什么要这样实现,善于带着问题主动探索答案,不断的回过头去看,去思考,我们对并发会有更深的理解。在学习过程中沉淀的“pattern”认知和学习能力能够轻松的复制到我们工作实践和其它优秀代码的学习中。工作后时不时会遇到业界通用方案不匹配自身业务,到造轮子的时候可能才会后悔之前没有多多沉淀吧~。同时感谢我参考学习过的大神资料。

准备知识

ReentrantLock 类图:
7d17b337ea46429eba0af2e00689b787.png

c63badf948644efcb16eeaefeff6ac6f.png

理想情况下,线程 A 拿到锁,执行完后释放锁,线程 B 恰好到来顺手接下这把锁,一切都那么完美的话,也没必要加锁了,问题就在于在 A 没释放锁时,后续线程也想得到这把锁,所以只好让这些等待的线程进行排队,进而需要一套数据结构来组织这个队伍:

Node 结点:作为获取锁失败线程的包装类, 组合了 Thread 引用, 实现为 FIFO 双向队列。 下图为 Node 结点的属性描述
26c6cfea16224251bab82653a2b80c77.png

下图为用 Node 节点构成的双向链表图示:
7ada27dcd65d46238904a50f91222bd6.png

CLH queues need a dummy header node to get started. But we don’t create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention.

链表初始化的头节点其实是一个虚拟节点,英文名称之为dummy header, 因为它不会像后继节点一样真实的存放线程,并且这个节点只会在真正产生竞争排队情况下才会延迟初始化,避免性能浪费,下面看代码的时候,我会再次提到。
AbstractQueuedSynchronizer 类是一个模版类,维护了着一个同步队列(双向链表),提供着同步队列一些操作的公共方法,JUC 并发包里基于此类实现了很多常用的并发工具类,如 Semaphore, CountDownLatch等。

   /**
     * The synchronization state.
     */
    private volatile int state;

AbstractQueuedSynchronizer维护了一个state变量,来表示同步器的状态,state可以称为 AQS 的灵魂,基于 AQS 实现的好多 JUC 工具,都是通过操作state来实现的,state为 0 表示没有任何线程持有锁;state为 1 表示某一个线程拿到了一次锁,state为 n(n > 1),表示这个线程获取了 n 次这把锁,用来表达所谓的“可重入锁”的概念。

锁的创建

非公平锁(默认)

final ReentrantLock lock = new ReentrantLock();
final ReentrantLock lock = new ReentrantLock(false);

公平锁

final ReentrantLock lock = new ReentrantLock(true);

非公平锁加锁过程

lock()方法的逻辑: 多个线程调用 lock()方法, 如果当前state为 0, 说明当前没有线程占有锁, 那么只有一个线程会 CAS 获得锁, 并设置此线程为独占锁线程。那么其它线程会调用 acquire 方法来竞争锁(后续会全部加入同步队列中自旋或挂起)。当有其它线程 A 又进来想要获取锁时, 恰好此前的某一线程恰好释放锁, 那么 A 会恰好在同步队列中所有等待获取锁的线程之前抢先获取锁。也就是说所有已经在同步队列中的尚未被 取消获取锁 的线程是绝对保证串行获取锁,而其它新来的却可能抢先获取锁。后面代码解释。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
	/**
	* 此为AQS的protected方法,允许子类重写, 在这里被NonfairSync类重写
	*/
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

先看 acquire 方法:
逻辑:tryAcquire方法仍然尝试获取锁(快速获取锁机制),成功返回 false,如果没有成功, 那么就将此线程包装成 Node 加入同步队列尾部。。Node.EXCLUSIVE 为 null 表示这是独占锁,如果为读写锁,那就是 共享模式 (shared)。

    public final void acquire(int arg) {
    // tryAcquire()方法也是让新来的线程进行第二次插队的机会!!
	//如果再次获取锁还不成功才会放到队列
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

再看addWaiter 逻辑:

  1. Node 包装当前线程
  2. pred 尾指针不为 null,即队列不为空, 则快速 CAS 将自己设为新的 tail
  3. 如果队列为空, 则调用 enq 强制入队
  4. 如果 CAS 设置失败,说明在其它线程入队节点争抢了 tail,则此线程只能调用 enq 强制入队

注意这里在链表尾部添加节点时,先设置的prev,我们先记住这个事情。

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

 Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

看 Node 的构造方法,属性 nextWaiter 值在此被赋予EXCLUSIVE表示独占模式。

下面是enq方法,方法内是一个for(;;),看来退出的条件只能是当前线程入队成功。之前也提到过,只有在产生锁竞争了,才会去初始化链表头节点。如果队列为空,初始化头尾节点,然后后续循环会走到 else,else 的逻辑和上线 CAS 入队的逻辑一样,只不过这里套在 for 循环里,直到入队成功才退出循环。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail; 
            if (t == null) { // 判断队列是否为空,空队列应该先初始化头节点
                if (compareAndSetHead(new Node()))
                    tail = head;//头尾共同指向头结点
            } else {//CAS添加并允许失败, 走for(;;)
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;//返回新插入节点的前置节点
                }
            }
        }
    }

入队完成后看下acquireQueued 逻辑:

failed 标记最终 acquire 是否成功, interrupted标记是否曾被挂起过。注意到 for(;;) 跳出的唯一条件就是if (p == head && tryAcquire(arg)) 即当前线程结点是头结点且获取锁成功。从这里我们应该看到,这是一个线程第三次又想着尝试快速获取锁:虽然此时该节点已被加入等待队列,在进行睡眠之前又通过p == head && tryAcquire(arg)方法看看能否获取锁。也就是说只有该线程结点的所有 有效的前置结点都拿到过锁了,当前结点才有机会争夺锁,如果失败了那就通过shouldParkAfterFailedAcquire方法判断是否应该挂起当前结点,等待响应中断。观察 每次调用tryAcquire方法的时机,可以看出作者优化意图:
1. 尽量在没入队的时候拿到锁,避免过多队列操作维护成本
2. 尽量在睡眠前拿到锁,避免过多上下文切换

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
			   //如果第一次循环就获取成功那么返回的interrupted是false,不需要自我中断。
	    //否则 说明在获取到同步状态之前经历过挂起(返回true)。
            for (;;) {// 获取前置结点
                final Node p = node.predecessor();
             //如果刚入队的尚未被挂起的节点的前置节点是头节点,那么此节点线程有必要尝试一下获取锁,因为head很可能是 刚初始化的 dummy head,或者 会预设head很快释放锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node); //设置新的head
                    p.next = null; // help GC
                    failed = false;
                    return interrupted; // fase 因为刚入队还没挂起就拿到锁了
                }
				//走到这里说明前置节点不为head或者抢锁失败了
                //判断当前node线程是否可挂起,是,就调用parkAndCheckInterrupt挂起 ,interrupted设置为true,表示曾经被中断过
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
			//如果tryAcquire出现异常那么取消当前结点的获取,毕竟tryAcquire是留给子类实现的,谁知道弄出啥幺蛾子
                cancelAcquire(node);
        }
    }

看下shouldParkAfterFailedAcquire 方法的逻辑:

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
		/*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
      /*
      如果前置结点是SIGNAL状态, 那么当前置结点执行完成后是可以唤醒后续结点的,
	  此时可以安全的挂起当前结点, 不需要进行不必要的for(;;),前置结点自然会通知。
      */
            return true;
        if (ws > 0) {
      //   如果ws>0说明前置结点是被自己取消获取同步的结点(只有线程本身可以取消自己)。
	  //那么do while循环一直往头结点方向找waitStatus < 0的节点;
	  //含义就是去除了FIFO队列中的已经自我取消申请同步状态的线程。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node; // 终于找到一个“正常”的节点,赶紧将它设为自己的后继节点
        } else {
           /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //如果是其它状态waitStatus要不是0或着PROPAGATE,意味着当前结点需要一个signal但是尚未park,
			 // 所以调用者必须确保在挂起之前不能获取同步状态。
			// 并强行设置前置结点为SIGNAL。之所以CAS设置是因为,pred线程也会操作cancelAcquire来取消自己和node线程对pred的操作产生竞争条件。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
	

上面整个流程是线程通过 addWaiter 方法入队后,在挂起之前尝试获取锁,如果失败,通过shouldParkAfterFailedAcquire 判断是否应该挂起,因为刚入队的 node 的前置节点也是刚初始化没多久的节点,默认的 waitStatus=0, 所以第一次走shouldParkAfterFailedAcquire 方法if (ws == Node.SIGNAL)分支进不去,并在 else 逻辑里 CAS 设置前置节点 waitStatus=SIGNAL, 注意这里可能会 CAS 失败,最终方法应该会返回 false。对于shouldParkAfterFailedAcquire 方法的调用上层别忘了有个for(;;),再次进入此方法就满足了if (ws == Node.SIGNAL)分支(预设之前 CAS 设置 SIGNAL 成功,即使失败,再次进来时 do while 也会干掉 cancel 的节点),总之,总之,for(;;) 一定会控制在当前线程挂起之前设置好了前置节点能够将来 SIGNAL 自己。
这个获取锁的流程我整理了一个流程图,方便理解:
f50905017e8b497e85d1868f6c61daf6-image.png

接下来看 ReentrantLock 中的 NonfairSync 对同步器方法 tryAcquire 的重写:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

下面是nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { //如果没有线程获取锁,则当前线程CAS获取锁。并设置自己为当前锁的独占线程
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
			// 如果存在锁竞争,判断获取锁的线程是否是当前线程, 因为ReentrantLock是可重入锁,
			//用state变量表示重入次数,即:使当前state+1;
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
			//如果不是当前线程,则不能获取同步状态
            return false;
        }

公平锁的加锁过程

之前实习面美团金融的时候,被问到ReentrantLock中公平锁和 非公平锁在哪里体现的,问的一脸懵逼,挂电话后打开代码开始自我检讨。我们对比下两者对tryAcquire 的实现:

lock方法对比非公平锁, 没有了 if else 也就意味着新来的线程没有插队的机会, 所有来的线程必须扔到队列尾部, acquire 方法也会像非公平锁一样首先调用 tryAcquire 插队试试,但是只有队列为空或着本身就是 head,那么才可能成功,如果 队列非空那么肯定被扔到队列尾部去了, 插个毛线。

 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
      //只有队列为空或着当前tryAcquire的node线程是head
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();//拿到当前的同步状态, 如果是无锁状态, 则进行hasQueuedPredecessors方法逻辑
			//逻辑含义是:当前队列为空或本身是同步队列中的头结点。如果满足条件CAS获取同步状态,并设置当前独占线程。
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //重入锁逻辑 和非公平锁一样 不解释
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

hasQueuedPredecessors()
和非公平锁对比多了这个方法逻辑, 也就意味着没有了新来线程插队的情况,保证了公平锁的获取串行化。

 public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

有个有趣的地方: 既然 h != t 还能 h.next==null ? ,我这里能想到的一处地方,在于头节点初始化时先 CAS 的 head,第二步才将 tail=head,大概只有在这两步的间隙,这个条件才会成立吧!

公平锁和非公平锁的释放逻辑

公平锁和非公平锁的释放逻辑是一致的 都是通过sync.release(1);
ReentrantLock中的unlock()



public void unlock() {
        sync.release(1);
    }

AbstractQueuedSynchronizer中的release方法:
release 之后还要调用unparkSuccessor() 方法唤醒后继结点

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;// 当前这个头结点是同步队列中的头结点
            //并且不一定当前线程是要释放锁的执行线程, 因为可能是非公平锁的释放,压根就没插入队列就特么插队获得同步了, 总之就是拿到头结点了。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

释放锁的逻辑tryRelease
只有拥有锁的线程才有资格tryRelease,所以这个方法内不需要任何同步机制

  protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //  只有获得锁的线程自己才能释放锁
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {//c==0说明已经无锁
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);//这里就没有CAS,好像reactJs里的 state状态管理~
            //否则更新state为state-1, 也就是加锁多少次,就得释放多少次, lock unlock配对使用。
         
            return free;
        }

unparkSuccessor 方法逻辑:
某线程释放锁后,要唤醒队列中的首结点或者后继结点来偷锁,当然唤醒后可能会参与和一些新来的线程 pk 下。会发现其可能会存在一个从 tail 向前查找的流程,假如刚好这时执行这个流程,从 tail 向 head 查找节点显然就会存在问题,所以 1 和 2 对调的流程在并发下也是存在问题的。unparkSuccessor 在查找 head 的下一个有效节点的时候,没有从 head 到 tail 方向查找,而是反方向从 tail 向 head 查找,正常逻辑肯定是从 head 向 tail 方向查找速度更快

   private void unparkSuccessor(Node node) {
    
        int ws = node.waitStatus;
        if (ws < 0)//同样是因为node线程和当前线程有竞争,node本身也可以修改自身状态嘛,因此CAS
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //发现这里竟然从tail结点往前找最靠近head结点的且处于非取消状态的结点?这不增加了遍历复杂度么? 留个疑问下面解释!
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //s是后继结点 因为后继结点很可能是满足条件的, 如果满足即s!=NULL 直接unpark后继结点。
        if (s != null)
            LockSupport.unpark(s.thread);
    }

为什么在unparkSuccessor 方法中对于需要unpark的线程结点进行从tail 往前查找?再来看一下enq 方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

将上面代码梳理一下,大致分为三个步骤:
​ 1、首先node.prev = t ,将当前 node 的 prev 指向 tail 节点;
​ 2、CAS 将tail 指向新 node 结点;
​ 3、将之前 tail 节点的 next 指向当前节点
示意图如下:

假如将追加节点的三个步骤顺序调换下,先将 tail 节点的 next 指向当前节点,然后 cas 原子修改 tail 指向,最后再来修改当前节点的 prev 指向,即将上面的 1 和 3 对调一下,会出现上面情况呢?

将 tail 节点的 next 指向当前节点操作后,紧接着会执行 cas 操作修改 tail 指向当前节点,由于存在多线程并发问题,即可能会存在多个线程同时申请锁资源,假如现在 t1、t2 两个线程都同时做上面两个步骤:
​ 1、t1 线程修改 next 后,紧接着 t2 线程也修改 next 指向,导致会把 t1 修改的指向覆盖;
​ 2、这时 t1 线程做 cas 替换 tail 指向成功后,t2 也来做 cas 操作就会失败;
​ 3、t1 由于 cas 操作成功,最后修改 prev 指向

可以发现,由于并发导致追加的 t1 节点是存在问题的,正常情况下 Node1 的 next 应该指向 t1 节点,但是却被 t2 节点覆盖了。所以,1 和 3 对调是在并发下是存在问题的。

假如 1 和 2 对调,先进行 cas 操作,然后修改 prev,最后再来修改 next 又会怎么样呢?首先通过 cas 原子操作将 tail 指向当前节点,示意图如下:
99e2d009efc94c6f89a37adaf705fb8c.png

tail 节点这时还是孤立的节点,prev 和 next 都还没有指向,tail 节点和其它节点之间没有关联了,这时如果其它线程需要遍历这个双向链表就比较危险了

现在我们来分析下为什么源码中这个顺序执行在并发下就不会存在问题。现在假设两个线程同一时间都没有获取到锁,都需要追加到 Sync Queue 队列尾部,大致流程如下:

  1. 线程 t1 的节点设置 prev 指向 tail,线程 t2 节点同时也设置 prev 指向 tail,这时就不会出现上面如果先设置 next 就会导致后设置把之前设置覆盖情况,因为如果先设置 next 是对 Node1 进行操作,存在多个线程对 Node1 同时操作导致状态不一致问题,而如果这里先设置 prev,操作对象是线程本身的节点,是不存在多线程并发问题,示意图如下:

  1. 这时 t1 和 t2 都进行 cas 原子操作,反正会有一个线程会操作成功,假如是 t1 线程操作成功,然后就可以顺利的设置 Node1 节点的 next 指向 t1,因为只会存在一个线程操作成功,所以对 Node1 的操作此时也不会存在并发问题,由于 t1 的 cas 操作成功导致 t2 线程进行 cas 操作必然失败,此刻示意图如下:

  1. 由于 t2 线程 cas 操作失败,因此不再继续操作 Node1 的 next 指向自己,而是进入 enq() 方法中,其源码逻辑是 enq 方法中通过 cas+ 无限循环方式保证 t2 节点一定会被追加到 Sync Queue 尾部的,每次循环都是重新获取最新的 tail,然后将 t2 的 prev 指向这个最新的 tail,然后通过 cas 操作将 tail 指向自己,最后在将之前 tail 节点的 next 指向 t2 节点,这个案例中获取的最新 tail 就是 t1 节点了,所以 t2 节点会被追加到 t1 节点后,这样就能保证即使在高并发下依然可以实现节点正常添加,而不会像之前出现状态不一致情况,示意图如下:
    2e203a8612f84291aadb314afb2db7b9.png

  2. 上面分析 unparkSuccessor()在查找 head 的下一个有效节点的时候,没有从 head 到 tail 方向查找,而是反方向从 tail 向 head 查找,如果你对我刚才分析得到逻辑理解透彻的话,就比较好解释了。比如:t1 设置 prev 指向 Node1,然后 cas 操作将 tail 指向了 t1,这时 Queue 的结构如下,假如这时候执行 unparkSuccessor(),Node0 查找它的后驱节点为 Node1,假如 Node1 是无效节点,Node1 需要继续查找它的后驱节点,但是这时 Node1 的 next 并没有设置,是无法查找到的,所以必须从 tail 向 head 方向查找才行。

取消获取锁cancelAcquire() 方法

取消获取锁意味着结点的出队。包括三个场景下的出队:

  1. node 是 tail
  2. node 既不是 tail,也不是 head 的后继节点
  3. node 是 head 的后继节点
 private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // 跳过所有被取消的前置结点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        // `` 如果node是tail,更新tail为pred,并使pred.next指向null``
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 如果node既不是tail,又不是head的后继节点
            int ws;
            // 如果前置结点状态是SIGNAL || 如果不是CAS设为SIGNAL 那么接下来的执行自然就会唤醒下一个结点
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
				//设置next-link 使node的通过上面while循环找到的有效的前继结点指向node的下一结点
				//之所以还是CAS,使下一个结点还是会可能取消自己的
                    compareAndSetNext(pred, predNext, next);
            } else {  // 如果node是head的后继节点,则直接唤醒node的后继节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
            // 最终把取消acquire的 node的next指针指向的它自己
        }
    }

场景 1. node 是 tail

node 出队的过程如下图所示。

cfbb15985794420d83e41b7f5bdf3eca.png

  1. cancelAcquire()调用 compareAndSetTail() 方法将 tail 指向 pred

  2. cancelAcquire()调用 compareAndSetNext() 方法将 pred 的 next 指向空

场景 2. node 既不是 tail,也不是 head 的后继节点

node 出队过程如下图所示:
7ca429ecf00c49659508d962c4d88d1b.png

cancelAcquire()调用了 compareAndSetNext() 方法将 pred 指向 successor。将 successor 指向 pred 是谁干的?是别的线程做的。当别的线程在调用 cancelAcquire()或者 shouldParkAfterFailedAcquire() 时,会根据 prev 指针跳过被 cancel 掉的前继节点,同时,会调整其遍历过的 prev 指针。代码类似这样;

cancelAcquire()中:
 // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
			
shouldParkAfterFailedAcquire中:
  do {
       node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
            pred.next = node;
			

场景 3 node 是 head 的后继节点

node 出队的过程如下图所示(图中用 node* 表示前继节点)

97aad488b40a429caad02185648f89ad.png

cancelAcquire()调用了unparkSuccessor(),不过,unparkSuccessor()中并没有对队列做任何调整呀。
比场景 2 还糟糕,这次,cancelAcquire()对于出队这件事情可以说是啥都没干。
出队操作实际上是由unparkSuccessor()唤醒的线程执行的。
unparkSuccessor()会唤醒successor结点,当被唤醒后将会实际执行出队操作。
现在需要搞清楚successor是从什么地方恢复执行的呢?这要看successor是在哪里被挂起的。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

successor线程是从parkAndCheckInterrupt被挂起的,恢复执行时,也从此处开始重新执行。successor将会重新执行 for 循环,此时,node 尚未出队,successor的前继节点依然是 node,而不是 head。所以,successor会执行到 shouldParkAfterFailedAcquire()处。而从场景 2 中可以得知,shouldParkAfterFailedAcquire() 中将会调整 successor 的 prev 指针(同时也调整 head 的 next 指针),从而完成了 node 的出队操作
fc5bcbd2274a4772ae1b1a5f16bc0f5e.png

总结

悲观锁在 Java 中就是我们所熟知的锁,实现方式主要分为两种:synchronized 和 Lock,而乐观锁的实现主要通过 CAS 操作实现。这里我们来比较下 synchronized 和 Lock 方式的大致差别:

  • synchronized 主要依赖 JVM 底层实现,而 Lock 是通过编码方式实现,其实现方式差别还是比较大,可以参考之前的一个文章https://blog.csdn.net/lsgqjh/article/details/61915074
  • synchronized 由于其简单方便,只需要声明在方法、代码块上即可,主要是不需要关心锁释放问题,在一般的编程中使用量还是比较大的,但是在真正的并发编程系统中,Lock 方式明显优于 synchronized:
    • 在高版本 JDK 中,已经对 synchronized 进行了优化,synchronized 和 Lock 方式在性能方面差别已不太明显
    • synchronized 最致命的缺陷是:synchronized 不支持中断和超时,也就是说通过 synchronized 一旦被阻塞住,如果一直无法获取到所资源就会一直被阻塞,即使中断也没用,这对并发系统的性能影响太大了;Lock 支持中断和超时、还支持尝试机制获取锁,对 synchronized 进行了很好的扩展,所以从灵活性上 Lock 是明显优于 synchronized 的

参考连接:

[1] Java 并发编程的艺术
[2] https://www.cnblogs.com/yangming1996/p/7612653.html
[3]https://blog.csdn.net/lsgqjh/article/details/61915074
[4]https://blog.reactor.top/2018/01/31/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E9%94%81%E4%B9%8BReentrantLock%E6%80%BB%E7%BB%93/
[5]https://www.jianshu.com/p/01f2046aab64


本文地址:http://www.6aiq.com/article/1547289187814
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出