前言

在Java面试的时候,多线程相关的知识是躲不掉的,肯定会被问。我就被问到了AQS的知识,就直接了当的问,AQS知道是什么吧,来讲讲它是怎么实现的,以及哪些地方用到了它。当时自己确实没有讲好,所以这次来总结一下这个知识点。

aqs是什么意思(你来讲讲AQS是什么吧)(1)

什么是AQS

AQS全称是 AbstractQueuedSynchronizer ,形如其名,抽象队列同步器。

AQS定义了两种资源共享模式:

它维护了一个 volatile 的 state 变量和一个FIFO(先进先出)的队列。

其中 state 变量代表的是竞争资源标识,而队列代表的是竞争资源失败的线程排队时存放的容器。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... /** * The synchronization state. */ private volatile int state; /** * Wait queue node class. **/ static final class Node { ... } ... }

AQS中提供了操作state的方法:

protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

因为 AbstractQueuedSynchronizer 是一个抽象类,他采用 模板方法 的设计模式,规定了 独占共享 模式需要实现的方法,并且将一些通用的功能已经进行了实现,所以不同模式的使用方式,只需要自己定义好实现共享资源的获取与释放即可,至于具体线程在等待队列中的维护(获取资源入队列、唤醒出队列等),AQS已经实现好了。

所以根据共享资源的模式一般实现的方法有如下几个:

isHeldExclusively(); boolean tryAcquire(int arg); boolean tryRelease(int arg) ; int tryAcquireShared(int arg); boolean tryReleaseShared(int arg) ;

上面的这几个方法在 AbstractQueuedSynchronizer 这个抽象类中,都没有被定义为abstract的,说明这些方法都是可以按需实现的,共享模式下可以只实现共享模式的方法(例如 CountDownLatch ),独占模式下可以只实现独占模式的方法(例如 ReentrantLock ),也支持两种都实现,两种模式都使用(例如 ReentrantReadWriteLock )。

AQS源码分析

我们先简单介绍AQS的两种模式的实现类的代表 ReentrantLock ( 独占模式 )和 CountDownLatch ( 共享模式 ),是如何来共享资源的一个过程,然后再详细通过AQS的源码来分析整个实现过程。

独占模式分析

在AbstractQueuedSynchronizer的类里面有一个静态内部类Node。它代表的是队列中的每一个节点。

其中Node节点有如下几个属性:

// 节点的状态 volatile int waitStatus; // 当前节点的前一个节点 volatile Node prev; // 当前节点的后一个节点 volatile Node next; // 当前节点中所包含的线程对象 volatile Thread thread; // 等待队列中的下一个节点 Node nextWaiter;

每个属性代表的什么,已经写在代码注释中了。其中Node类中还有几个常量,代表了几个节点的状态( waitStatus )值。

/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;

首先节点的状态值waitStatus默认是0,然后下面几个常量有自己具体的含义。

CANCELLED = 1; 代表的是当前节点从同步队列中取消,当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。

SIGNAL = -1; 代表后继节点处于等待状态。后继结点入队时,会将前继结点的状态更新为SIGNAL。

CONDITION = -2; 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了 signal() 方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。

PROPAGATE = -3; 表示在共享模式下,前继节点在释放资源后会唤醒后继节点,并将这种共享模式传播下去。

通过上面几个固定的常量值,我们可以看出节点状态中通常负数值通常表示节点处于有效的等待状态,而正数值代表节点已经被取消了。

所以AQS源码中有很多地方都用 waitStatus>0 或 waitStatus<0 这种方式来判断队列中节点的是否正常。

独占模式下,只能有一个线程占有锁资源,其他竞争资源的线程,在竞争失败后都会进入到等待队列中,等待占有锁资源的线程释放锁,然后再重新被唤醒竞争资源。

ReentrantLock加锁过程

ReentrantLock 默认是非公平锁,就是说,线程在竞争锁的时候并不是按照先来后到的顺序来获取锁的,但是 ReentrantLock 也是支持公平锁的,在创建的时候传入一个参数值即可。

下面我们以 ReentrantLock 默认情况下的加锁来分析AQS的源码。

ReentrantLock 并没有直接继承AQS类,而是通过内部类来继承AQS类的,这样自己的实现功能,自己用。

我们在用 ReentrantLock 加锁的时候都是调用的用 lock() 方法,那么我们来看看默认非公平锁下, lock() 方法的源码:

/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

通过源码可以看到, lock() 方法,首先是通过CAS的方式抢占锁,如果抢占成功则将state的值设置为1。然后将对象独占线程设置为当前线程。

protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }

如果抢占锁失败,就会调用 acquire() 方法,这个 acquire() 方法的实现就是在AQS类中了,说明具体抢占锁失败后的逻辑,AQS已经规定好了模板。

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

上面已经介绍了,独占模式是需要实现 tryAcquire() 方法的,这里首先就是通过 tryAcquire() 方法抢占锁,如果成功返回true,失败返回false。 tryAcquire() 方法的具体实现,是在 ReentrantLock 里面的,AQS类中默认是直接抛出异常的。

protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();// 获取state值 if (c == 0) { // 如果state值为0,说明无锁,那么就通过cas方式,尝试加锁,成功后将独占线程设置为当前线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果是同一个线程再次来获取锁,那么就将state的值进行加1处理(可重入锁的,重入次数)。 int nextc = c acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

我们继续来看 acquire() 方法,在执行完 tryAcquire() 方法后,如果加锁失败那么就会执行 addWaiter() 方法和 acquireQueued() ,这两个方法的作用是将竞争锁失败的线程放入到等待队列中。

aqs是什么意思(你来讲讲AQS是什么吧)(2)

addWaiter() 方法的源码如下:

private Node addWaiter(Node mode) { // 用参数指定的模式将当前线程封装成队列中的节点(EXCLUSIVE【独占】,SHARED【共享】) Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // tail是队列的尾部节点,初始时队列为空,尾部节点为null,直接调用enq将节点插入队列 if (pred != null) { // 将当前线程节点的前级节点指向队列的尾部节点。 node.prev = pred; // 通过CAS方式将节点插入到队列中 if (compareAndSetTail(pred, node)) { // 插入成功后,将原先的尾部节点的后级节点指向新的尾部节点 pred.next = node; return node; } } // 如果尾部节点为空或通过CAS插入队列失败则通过enq方法插入节点 enq(node); return node; }

addWaiter() 中主要做了三件事:

enq()

那么enq()方法是又是怎么插入节点的呢?

enq()方法源码如下:

private Node enq(final Node node) { // 看到死循环,就明白是通过自旋咯 for (;;) { // 当tail节点为空时直接将当前节点设置成尾部节点,并插入到队列中,以及设置它为head节点。 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 若是因为在addWaiter()方法中插入失败或第二次进入循环,那么将当前线程的前级节点指向尾部节点,并通过CAS方式将尾部节点指向当前线程的节点。 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

其实enq()方法主要就是通过自旋将数据插入到队列中的操作:

这样addWaiter()方法就构造了一个队列,并将当前线程添加到了队列中了。

我们再回到 acquire() 方法中。

aqs是什么意思(你来讲讲AQS是什么吧)(3)

现在就剩下 acquireQueued() 方法没看了,这个方法中的操作挺多的。

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前级节点,如果未null,则抛出异常 final Node p = node.predecessor(); // 如果前级节点为head,并且执行抢占锁成功。 if (p == head && tryAcquire(arg)) { // 抢占锁成功,当前节点成功新的head节点 setHead(node); // 然后将原先的head节点指向null,方便垃圾回收进行回收 p.next = null; // help GC failed = false; return interrupted; } // 如果当前节点不为head,或者抢占锁失败。就根据节点的状态决定是否需要挂起线程。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 如果获取锁异常,则出取消获取锁操作。 cancelAcquire(node); } }

final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }

waitStatus cancelAcquire()

下面看一下 shouldParkAfterFailedAcquire() 和 parkAndCheckInterrupt() 这两个方法是如何挂起线程的。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;// 获取前级节点 if (ws == Node.SIGNAL)// 如果前级节点的waitStatus值为SIGNAL(-1),说明当前节点也已经在等待唤醒了,直接返回true。 return true; // 如果前级节点的waitStatus值大于0说明前级节点已经取消了。 if (ws > 0) { // 如果前级节点已经是CANCEL状态了,那么会继续向前找,直到找到的节点不是CANCEL(waitStatue>0)状态的节点,然后将其设置为当前节点的前级节点。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前级节点为0或者其他不为-1的小于0的值,则将当前节点的前级节点设置为 SIGNAL(-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

parkAndCheckInterrupt() 方法的作用就是挂起线程,如果 shouldParkAfterFailedAcquire() 方法成功,会执行 parkAndCheckInterrupt() 方法,它通过 LockSupport的park() 方法,将当前线程挂起(WAITING),它需要 unpark() 方法唤醒它,通过这样一种FIFO机制的等待,来实现Lock操作。

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

LockSupport 是JDK从1.6开始提供的一个线程同步源语工具类,在这里主要用到了它的两个方法,挂起线程和唤醒线程:

public static void park() { UNSAFE.park(false, 0L); } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }

LockSupport 的挂起和唤醒线程都是不可重入的,它由一个许可标志,当调用park()时就会将许可设置为0,挂起线程,如果再调用一次 park() ,会阻塞线程。当调用 unpark() 时才会将许可标志设置成1。

ReentrantLock释放锁过程

ReentrantLock 释放锁的过程主要有两个阶段:

public void unlock() { sync.release(1); }

释放锁的方法是写在父类, AbstractQueuedSynchronizer 类中的。

源码如下:

// 释放独占模式下的锁资源 public final boolean release(int arg) { if (tryRelease(arg)) { // 尝试释放资源 Node h = head; //释放成功后,判断头节点的状态是否为无锁状态,如果不为无锁状态就将头节点中的线程唤醒。 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; // 释放资源失败,直接返回false }

我们首先释放资源来看 tryRelease() 方法的源码,看看释放资源是怎样的过程。

protected final boolean tryRelease(int releases) { // 从state中减去传入参数的相应值(一般为1) int c = getState() - releases; // 当释放资源的线程与独占锁现有线程不一致时,非法线程释放,直接抛出异常。 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 这里是处理重入锁的机制,因为可重入机制,所以每次都重入state值都加1, //所以在释放的时候也要相应的减1,直到state的值为0才算完全的释放锁资源。 if (c == 0) { free = true; // 完全释放资源后,将独占线程设置为null,这样后面的竞争线程才有可能抢占。 setExclusiveOwnerThread(null); } // 重新赋值state setState(c); return free; }

tryRelease() 方法在释放锁资源时,可以单纯的理解为是修改独占模式的状态值和置空占有线程的操作。将 state 的值减掉相应的参数值(一般是1),如果计算结果为0,就将他的独占线程设置为 null ,其他线程才有机会抢占成功。

在加锁时,同一线程加一次锁, state 状态值就会加1,在解锁的时候没解锁一次就会减1,同一个锁可重入,只有 lock 次数与 unlock 次数相同才会释放资源,将独占线程设置为null。

释放了资源后,我们再看唤醒挂起线程时的过程。这个过程就在 unparkSuccessor() 方法中。

private void unparkSuccessor(Node node) { /* 获取当前节点的等待状态,一般是头节点,占有锁的节点是在头节点上。 */ int ws = node.waitStatus; // 将当前节点的线程的状态值设为0,成为无锁状态。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next;// 获取下一个需要唤醒的节点线程。 if (s == null || s.waitStatus > 0) {// 如果获取到的节点线程为空或已经取消 s = null; // 就从队列的后面向前找,直到找到一个未取消的节点。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 如果获得的下一个可以唤醒的节点线程不为空,那么就唤醒它。 LockSupport.unpark(s.thread); }

这个释放过程就是将需要释放的线程节点设置成无锁状态,然后去队列中找到可以唤醒的节点,进行唤醒线程。

有一点需要解释一下,就是在寻找可以唤醒的节点时,为什么要从后向前找?

在上面 unparkSuccessor() 方法的源码里面有一段英文注释(7行~12行),我保留了下来了。

/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */

这段英文注释翻译过来的大概意思就是:线 程唤醒的时候,通常是从当前线程的下个节点线程开始寻找,但是下个节点有可能已经取消了或者为null了,所以从后向前找,直到找到一个非 取消状态的节点线程。

由于文章篇幅太长了,我这次就先将独占模式的加锁和解锁的过程总结到这里,下一篇通过CountDownLatch的加锁和解锁过程来总结AQS在共享模式下过程。

AQS共享模式分析

其实AQS的共享模式总结起来感觉比独占模式稍微容易一些,只是说总结起来稍微容易哦。

CountDownLatch的获取共享资源的过程

在使用CountDownLatch的时候,是先创建CountDownLatch对象,然后在每次执行完一个任务后,就执行一次countDown()方法。直到通过getCount()获取到的值为0时才算执行完,如果count值不为0可通过await()方法让主线程进行等待,知道所有任务都执行完成,count的值被设为0。

那么我们先来看创建CountDownLatch的方法。

public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } }

我们看到创建CountDownLatch的过程,其实就是将count值赋值给state的过程。

再来看await()方法的源码:

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);// 等待可中断的获取共享资源的方法 }

public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 如果线程已经中断,直接抛出异常结束。 throw new InterruptedException(); if (tryAcquireShared(arg) < 0)// 尝试获取共享资源,获取失败后,自旋入队列 doAcquireSharedInterruptibly(arg);// 可中断的入队列过程 }

整个await()的等待过程是,先尝试获取共享资源,获取成功则执行任务,获取失败,则调用方法自旋式进入等待队列。

通过最初在介绍AQS的时候就说过 ,共享模式下是需要自己去实现tryAcquireShared()方法来获取共享资源的,那么我们看看CountDownLatch是如何实现获取共享资源的。

protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }

简单易懂,就一行代码,直接获取state值,等于0就是成功,不等于0就失败。

那么获取资源失败后,doAcquireSharedInterruptibly()方法是如何入执行的呢。

源码如下:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);// addWaiter()方法已经总结过了,这一步操作的目的就是将当前线程封装成节点加入队尾,并设置成共享模式。 boolean failed = true; try { for (;;) { final Node p = node.predecessor();// 获取前级节点 if (p == head) { // 如果前级节点是头节点,直接尝试获取共享资源。 int r = tryAcquireShared(arg); if (r >= 0) {// 如果获取共享资源成功,将head节点指向自己 setHeadAndPropagate(node, r); p.next = null; // help GC 将原head节点指向空,方便垃圾回收。 failed = false; return; } } // 如果不是前级节点不是head节点,就根据前级节点状态,判断是否需要挂起线程。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) // 如果执行失败,取消获取共享资源的操作。 cancelAcquire(node); } }

这里的方法和独占模式下acquireQueued()方法很像,只是在设置头节点唤醒新线程的时候有所不同,在setHeadAndPropagate()方法里面。

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // 如果在唤醒完下一个节点后,资源还有剩余,并且新唤醒的节点状态不为无效状态,就继续唤醒队列中的后面节点里的线程。 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }

setHeadAndPropagate()这个方法名称翻译成中文是“设置头节点并传播”,其实就是在获取共享锁资源的时候,如果资源除了用于唤醒下一个节点后,还有剩余,就会用于唤醒后面的节点,直到资源被用完。这里充分体现共享模式的“ 共享 ”。

CountDownLatch释放资源

我们再来看countDown()方法是如何释放资源的。

源码如下:

public void countDown() { sync.releaseShared(1); }

CountDownLatch中内部类Sync的releaseShared()方法,是使用的AQS的releaseShared()方法。

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {// 尝试释放资源 doReleaseShared();// 释放资源成功后,唤醒节点。 return true; } return false; }

尝试释放资源方法tryReleaseShared()是AQS规定需要自己来实现的,CountDownLatch的实现如下:

protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) // 若state为0,说明已经不需要释放资源了,直接返回false。 return false; int nextc = c-1; if (compareAndSetState(c, nextc))// 真正的释放资源,是通过CAS的方式将state的值减1。 return nextc == 0; } }

其实主要的就是通过CAS的方式将state的值减1的操作。

释放资源成功后,就到了唤醒节点的过程了,在doReleaseShared()方法中。

private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) {// 当头节点不为空,并且不等于尾节点时,从头开始唤醒。 int ws = h.waitStatus;// 获取头节点的等待状态 if (ws == Node.SIGNAL) {// 如果头节点状态为等待唤醒,那么将头节点的状态设置为无锁状态,若CAS设置节点状态失败,就自旋。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);// 唤醒头节点 }// 如果head节点的状态已经为无锁状态了,那么将head节点状态设置为可以向下传播唤醒的状态(PROPAGATE)。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若在执行过程中,head节点发生的变化,直接跳出循环。 if (h == head) // loop if head changed break; } }

至此,AQS的独占模式和共享模式,在获取共享资源和释放共享资源的过程,就总结完了。内容有点多,需要好好消化一下,能看到最后的也都厉害的人物,因为我自己在总结这部分内容的时候也是查阅了很多资料,看了很多源码,用了好几天的时间才自己总结明白,AQS到底是个什么东西,是怎么一个执行过程。

其实AQS里面不只我上面总结的这些内容,里面比如还有Condition、以及可中断的获取资源(acquireInterruptibly【独占】、acquireSharedInterruptibly【共享】acquire()和acquireShared()在线程等待过程中都是忽略中断的),还有ReentrantLock是如何实现公平锁的(其实是在竞争资源时如果有新进入的线程,先判断队列中是否有节点,如果有直接插入队尾等待,按顺序获取资源)。

通过总结了AQS,基于AQS实现的ReentrantLock、CountDownLatch、Semaphore等的源码基本上就能看懂了,甚至再上层的CyclicBarrier、CopyOnWriteArrayList我们通过看源码也能知道大概是一个什么过程了。

最后,内容有点多,有写的不好的地方也欢迎指正(我要是能闹明白就改,不明白我也没法改:joy:)。

,