深入理解 AQS 之 Condition 源码

前言

很久之前分享过 ReetrantLock 的实现深入剖析 ReentrantLock 公平锁与非公平锁源码实现,而今再回头去看,对 AQS 也有了更深刻准确的理解,随即更新了下之前的文章。今天分享利用 AQS 实现的另一个重要的 JUC 工具类 Condition。如果上篇理解到位,这个 Condition 的学习就没啥难度的 ~

我们应该都了解过 Object 的一些监视器方法: wait(), notify()notifyAll()。场景是某线程 A 需要对资源操作时,需要满足一定的条件,不满足就调用 wait(),进入等待队列等待条件满足,而创造这个条件的是另外一个线程 B,线程 B 操作这份资源让其满足条件,一旦条件满足,线程 B 就会 signal 等待队列的线程。这个过程中会发现涉及了多线程操作共享数据,所以这就是为什么调用监视器方法需要首先获取一把锁。另外,Object 自带的一套监视器方法,只能包含一个同步队列,一个条件等待队列。Condition 是对上述模型的另一种实现,支持的功能特性更加丰富,如:

  • 一个同步队列可以有多个等待队列

  • 可以在 wait 过程中不响应中断退出等待

  • 可以指定等待满足条件获取锁的等待时间

Condition 使用

Condition 属于多线程间通信的一种机制,我们常用的 BlockingQueue 就是基于 Condition 实现的。我们在异步 RPC 通信框架中也经常会见到使用的 BlockingQueue,比如我们在 rpc client 端使用 Netty IO,因为 Netty 本身 write 操作非阻塞,而业务调用要求同步阻塞获取结果,所以可以实现上使所有业务线程 write 数据后,阻塞在 BlockingQueue 上,等 Netty client 收到响应数据后,填充到 BlockingQueue,并唤醒当时阻塞的请求线程。

JDK 的文档中给出的 Condition 使用例子就是如何实现一个简单的 BlockingQueue:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // 创建Condition一定依赖Lock实例
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];

    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
		// 生产者线程发现队列满了,无法继续生产,只能在notFull条件上排队等待
		// while循环为为了防止假唤醒
            while (count == items.length)
                notFull.await(); 
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
			// 生产成功,通知等待notEmpty条件的线程来消费
            notEmpty.signal(); 
        } finally {
            lock.unlock();

        }

    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {

		// 消费者线程如果发现没有数据可消费,只能排队等待在notEmpty条件上
            while (count == 0)
                notEmpty.await(); 
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
			//每次被消费者线程消费一个都会发个通知,告诉等待notFull条件的线程
            notFull.signal(); 
            return x;
        } finally {
            lock.unlock();
        }
    }
}

Condition 实现

整体结构

26014d69c40148b88b15aa63cc2a9f00-image.png

整个 Condition 整体的队列模型如上,包含一个同步队列和多个条件队列,如果线程执行时条件不满足,调用 await()方法,会将该线程封装成 Node 节点添加到 condition 的条件队列中,一旦满足条件,条件队列便会被其他线程 signal()通知头节点 并请到同步队列去抢锁,抢到锁后便会从 wait()方法退出,继续执行。每个 condition 实例都对应一个条件队列,条件队列的实现类是 ConditionObject,内部维护了一个单向链表,每个节点也都是一个 Node 实例,毕竟因为将来是要被从条件队列转移到阻塞队列中的。


 public class ConditionObject implements Condition, java.io.Serializable {

        /** First node of condition queue. */

        private transient Node firstWaiter;

        /** Last node of condition queue. */

        private transient Node lastWaiter;

因为每次调用 condition 的方法操作前一定是获取了锁的,所以对条件队列的操作是线程安全的

await() 方法

await()方法有三种不同的实现:

  1. awaitUninterruptibly : await 期间不响应中断,非得等到条件满足被唤醒

  2. await() throws InterruptedException : await 期间响应中断,如果阻塞太久可以随时中断唤醒

  3. await(long time, TimeUnit unit) throws InterruptedException : 可以设置等待超时时间,并可以响应中断



对于中断的处理使用 interruptMode final 常量来表示,取值为 1,表示当前线程后续需要重新中断,-1 表示后续需要抛出 InterruptedException。总之这个常量是用来标记将来怎么处理这个中断的。
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
我们以第三种实现为例来具体分析:

 public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
// 因为本身await期间要响应中断,在await前先判断是否已被中断了,已中断就抛InterruptedException
            if (Thread.interrupted())
                throw new InterruptedException();
				//将当前线程封装成Node节点并加入等待队列的尾部
            Node node = addConditionWaiter();
						// 释放当前线程所占有的锁,如果是可重入锁,也要把state值归为0
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
// while只要不退出,就说明还在等待队列中进行await
            while (!isOnSyncQueue(node)) {
// 如果到了超时时间,会将节点从等待队列转移到同步队列,返回true,说明等待真的超时。返回false,说明当正准备取消等待前,已经被signal了,只是还没有完成转移到同步队列而已
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
//如果等待剩余时间少于1000纳秒就没必要park了,不如自旋,毕竟很快就要退出while循环了
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
// != 0意味着 waiting期间被中断,因为要响应中断,所以break,没必要再await; 等于0,意味着waiting期间没有被中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
// 退出上面while后,说明已经在同步队列了,此线程开始抢锁(试图恢复await前的state值),如果acquireQueued 返回false,说明在同步队列里获取锁的过程中没有被中断过,返回true则表示曾发生过中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
// 统一处理上述过程产生的中断状态
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

奇怪的一点在于为什么线程用这个常量来标志未来要如何处理? 而不是立刻处理,个人理解是为了复用方法,如 acquireQueued, 因为对外接口有的需要响应中断,有的不需要,意味着对中断处理上有着不同的方式,acquireQueued 只需要返回 是否中断过,而不会在内部做实际的中断处理,实际处理交给上层。

下面我们详细拆解 await() 中一些重要的方法。

isOnSyncQueue()

此方法用于判断条件队列某节点是否已经被转移到同步队列。

 final boolean isOnSyncQueue(Node node) {
 // 如果节点的waitStatus 依然为Node.CONDITION,说明还在条件队列,否则如果已被转移到同步队列中时waitStatus应为0或-1
 // node.prev 是在同步队列才会用的属性,==null 依然意味着没有进入同步队列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
 // 如果node.prev为非空,依然不能确定其已在同步队列中,因为同步队列的节点入队是两步操作,先设置node.prev,然后CAS设置自己为tail,第二步操作可能CAS失败。
 //从同步队列尾节点往前找
        return findNodeFromTail(node);
    }

signal()

此方法将条件队列头节点转移到同步队列中

 public final void signal() {
 // 调用signal的线程必须持有独占锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
				
				
private void doSignal(Node first) {
            do {
	// 因为first马上就要被转移到同步队列了,所以将first.nextWaiter,作为新的firstWatier。
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
					//切断和等待队列的关联
                first.nextWaiter = null;
								// 如果转移不成功且还有后续节点,那么继续后续节点的转移
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
 final boolean transferForSignal(Node node) {
 // 进行CAS,毕竟因为当前发起signal的是另一个线程,而node本身可能自己取消等待,所以需要CAS
//如果CAS失败 说明此节点已取消等待,此节点接下来将不会被转移到同步队列, 如果CAS成功,waitStatus将会被置为0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

// 将node 加入同步队列后返回其前置节点
        Node p = enq(node);
        int ws = p.waitStatus;
// ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。ws < 0时CAS设置node前置节点的waitStatus为SIGNAL,之前文章说过,新节点入同步队列需要设置前置节点waitStatus为SIGNAL,肩负起唤醒后继节点的责任

// 所以如果 node进入同步队列后的前置节点取消或者 CAS设置SIGNAL失败,直接唤醒该node
// 但是在绝大多数情况下 应该是ws<0,并且CAS成功的,并不会直接unpark,而是等到在同步队列中成功拿到锁后被unpark
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
	

await() 方法中的代码 ,一旦 unpark 后,继续往后执行 。
有以下三种情况会让 LockSupport.park(this); 这句返回继续往下执行:

  1. 常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
  2. 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
  3. signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的 CAS 操作失败了
  4. 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题

走到这里,该 node 一定是从 park 中返回了,返回后检查中断状态,如果 不为 0,说明发生过中断。 为 0,没有被中断过

 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
						// transferAfterCancelledWait 方法会判断被unpark的节点曾被中断的时机,如果返回true,意味着在条件队列中等待的时候被中断过(未被signal之前),false意味着中断发生在被signal之后
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

只有在发生过中断时,才会调用这个 transferAfterCancelledWait 方法。可以理解为出现了这样一个场景:
某已 park 的节点还在条件队列中静静的等待满足条件后被转移到同步队列中时,被其它线程进行了中断

final boolean transferAfterCancelledWait(Node node) {
// 此处CAS设置成功意味着node还在条件队列中
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
				// 所以在条件队列中即在signal之前被中断,那么将node加入到同步队列,并且返回true
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
// 走到这里大概率是已经在同步队列了,也可能是正在加入同步队列的过程中,自旋等待入队完成。总之中断是发生在已经被signal之后了。
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

在检查中断状态的过程中我们会发现,被 unpark 后,无论是否被中断过,无论是否是响应中断的 await 方法,都会被加入同步队列中,看起来并没有对中断有啥特殊的处理的地方。是不是有点和我们预期的不符?

所以退出 awaitwhile () 循环的条件有两个:

  1. 节点被之前所说的几种条件之一唤醒后发现已经在同步队列了
  2. 节点被之前所说的几种条件之一唤醒后发现是经由中断导致的被 unpark,直接 break;跳出 while 循环

我们继续看 await() 方法,跳出 while 循环后

// 执行到这里,该node一定是已经进入同步队列了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 能进入if ,一定是在被signal之后发生的中断,标志下接下来的处理中,需要重新进行中断
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

acquireQueued 返回值为 true 说明在同步队列中获取锁的过程中被中断过,而在上面 while 没退出之前是否中断过 由当前 interruptMode 变量标志,因为在 checkInterruptWhileWaitingThread.interrupted() 方法调用时,已经把中断状态标志位清除了。上面如果 interruptMode != THROW_IE 成立,说明在 while 阶段没有发生过中断或者发生过中断,但中断是在对条件队列中某 node 进行 signal 之后发生的。

总结一下就是说只要能够退出 while 循环,不管是否被中断过,那么 node 一定在同步队列了,当执行完 acquireQueued,意味着拿到锁并返回了,返回值代表在同步队列获取锁的过程中是否被中断过,所以可以看到,对线程进行中断,并不会影响其进入同步队列并成功拿到锁,而是把整个过程中是否被中断过先记录下来,然后“事后说事儿”,上层统一处理——使用 reportInterruptAfterWait() 方法。可以看到如果是响应中断的 await, 如果在条件队列期间,被 signal 之前被中断的话 interruptMode == THROW_IE,会抛出异常:

 private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

总结

JUC 中的实现确实存在很多设计技巧,每次看一遍都会有新的感悟,会发现一些之前没有发现的“奇技淫巧”,很多时候看不懂,就隔段时间不断的去看,我记得大学那会儿,看《Java 并发编程的艺术》看了两三遍,每看一遍就多加深一层理解。在 JUC 的学习上这回也算是“雄关漫道真如铁,而今迈步从头越”了 ~,不过现在回顾基础轻松得很,,,


本文地址:https://www.6aiq.com/article/1576409694128
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出