1、AQS简介
AQS是AbstractQueuedSynchronizer
的简称,也即抽象队列同步器
,从字面来理解:
- 抽象:是一个抽象类,仅实现一些主要逻辑,有些方法会交由子类来实现
- 队列:采取队列(FIFO,先进先出)这种数据结构存储数据
- 同步:实现了多线程环境下的同步操作
那AQS有什么用呢?AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们提到的ReentrantLock
,Semaphore
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等等皆是基于AQS的。
当然,我们自己也能利用AQS来定制符合我们自己需求的同步器,只要实现它的几个protected方法就可以了,在下文会有详细的介绍。
2、AQS的数据结构
AQS内部使用一个volatile
关键字修饰的变量state来作为资源的标识符。
/**
* The synchronization state.
*/
private volatile int state;
同时定义了几个获取和设置state的原子方法:
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
这三种操作均是原子操作,其中compareAndSetState()
方法是依赖于UnSafe
类的compareAndSetInt()
方法。
AQS内部使用了一个先进先出(FIFO)的双端队列,并使用两个指针head
和tail
分别代表队列的头节点和尾节点。其数据结构如下图所示:
AQS的队列不直接存储线程,而是队列中每一个节点Node来具体存储线程。
AQS源码中关于节点Node类的描述:
3、AQS的Node节点
资源有两种共享模式,或者说两种同步方式:
- 独占模式(Exclusive):资源是独占的,一次只能被一个线程访问
- 共享模式(Share):资源是共享的,可以被多个线程同时访问。具体的资源个数可以通过参数指定,如
CountDownLatch
、Semaphore
一般情况下,子类只需要根据需求实现其中一种模式,但也有两种模式都实现的同步类,比如ReadWriteLock
.
AQS中关于这两种模式的源码全部都在Node
这个内部类中,源码如下:
static final class Node {
// 标记节点,不包含实际的线程信息,主要用做标识符来区分共享同步模式
static final Node SHARED = new Node();
// 标记节点,不包含实际的线程信息,主要用做标识符来区分独占同步模式
static final Node EXCLUSIVE = null;
// waitStatus的值,表示该节点(对应的线程)已经被取消
static final int CANCELLED = 1;
// waitStatus的值,表示后继结点(对应的线程)需要被唤醒
static final int SIGNAL = -1;
// waitStatus的值,表示该节点(对应的线程)在等待某种条件
static final int CONDITION = -2;
/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
static final int PROPAGATE = -3;
// 等待状态,取值范围:-3、-2、-1、0、1
volatile int waitStatus;
volatile Node prev; // 前驱节点
volatile Node next; // 后继结点
volatile Thread thread; // 节点对应的线程
Node nextWaiter; // 等待队列里下一个等待条件的节点
// 判断共享模式的方法
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取后继节点的方法
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
注意:通过Node我们可以实现两个队列,一是通过prev和next指针实现的CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待队列(单向队列),这个Condition主要用在
ReentrantLock
类中。
4、AQS源码解析
AQS的设计是基于模板方法设计模式,一些方法不做具体实现,抛出异常,业务逻辑交由子类做具体实现。这些方法主要是:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
父类方法不做具体实现,直接抛出异常:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
这里父类使用protected来修饰而不是抽象方法,这样做的目的是避免子类要把所有方法都重写一遍,增加了很多的工作量,子类只需要重写自己需要的方法。
而AQS实现了一系列的主要逻辑,下面AQS的源码剖析获取资源和释放资源的主要逻辑。
4.1、获取资源
acquire(int arg)
方法是获取资源的入口,arg参数表示获取资源的个数,在独占模式下,arg始终为1。下面是这个方法的源码:
public final void acquire(int arg) {
// 尝试获取资源,成功返回true,失败false
if (!tryAcquire(arg) &&
// 走到这里,说明获取资源失败。调用addWaiter方法将当前线程加入到等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断当前线程
selfInterrupt();
}
首先调用tryAcquire(arg)
方法尝试获取资源,前面也提到了,这个方法的逻辑是交由子类来具体实现。如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE), arg)
方法将当前线程加入到等待队列中,采用独占模式。这个方法的具体实现如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS操作将当前节点设置为新的尾节点(可能会失败)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果快速入队失败,再走完整入队
enq(node);
return node;
}
private Node enq(final Node node) {
// 这里通过自旋的方式,确保CAS操作一定正确完成
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter
方法先尝试能否快速入队,如果失败了,再通过完整入队的方式,将当前线程加入到等待队列。这样做的目的是,保证在线程安全的情况下提高性能。
ok,上面方法介绍完了,让我们回到最初的acquire(int arg)
方法,当获取资源失败,并且将当前线程添加到等待队列的队尾。然后我们来看看AQS最后要做的事情是什么呢?我们来看看最后一个方法acquireQueued(final Node node, int arg)
,源码如下:
// node节点是当前获取资源失败的节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 当前线程阻塞是否被中断
boolean interrupted = false;
// 自旋操作
for (;;) {
// 前驱节点
final Node p = node.predecessor();
// 前驱节点是头节点,说明当前节点就是等待队列中第一个等待节点,可以尝试获取资源
if (p == head && tryAcquire(arg)) {
// 如果成功,设置当前节点为新的头节点
setHead(node);
// 回收旧的头节点
p.next = null; // help GC
// 失败标记置为false
failed = false;
// 返回是否中断标记
return interrupted;
}
// 如果当前节点不是首个等待节点,判断是否应该阻塞。
if (shouldParkAfterFailedAcquire(p, node) &&
// 且判断是否应该阻塞当前线程,如果需要阻塞,调用parkAndCheckInterrupt()方法进行阻塞
// 线程唤醒后,继续下一次循环
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 如果失败,将当前线程的状态设置为 CANCELLED,等待GC回收
cancelAcquire(node);
}
}
AQS将获取资源失败的线程成功添加到等待队列后,反复尝试获取锁,如果获取不到就阻塞(挂起),直到获取锁成功或阻塞中断。
上述流程就是独占方式获取资源的全部执行流程了。
这里parkAndCheckInterrupt方法内部使用到了LockSupport.park(this),顺便简单介绍一下park。
LockSupport类是Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:
- park(boolean isAbsolute, long time):阻塞当前线程
- unpark(Thread jthread):唤醒指定的线程
现在用一张流程图总结上述过程:
4.2、释放资源
释放资源的逻辑比较简单,源码如下:
// 释放资源的主入口
public final boolean release(int arg) {
// 尝试释放资源,具体的逻辑由子类实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 头节点的状态如果小于0,尝试设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 后继节点
Node s = node.next;
// 如果后继节点不存在或者状态大于0(大于0表示线程已被取消),从尾部向前遍历找到队列中第一个待唤醒的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果待唤醒的后继节点存在,唤醒该节点对应的线程。
if (s != null)
LockSupport.unpark(s.thread);
}
5、小结
AQS是一个用来构建锁和同步器的框架,使用AQS能够很方便的构造出我们需要定制化的同步器,而且我们耳熟能详的并发包组件ReentrantLock
,Semaphore
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等等皆是基于AQS实现的。
下面是一个示例(互斥锁,同一时刻,只允许一个线程获取):
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class Mutex {
// 自定义内部类实现AQS
private static class Sync extends AbstractQueuedSynchronizer {
// 获取资源(独占模式)
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放资源(独占模式)
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 如果当前线程以独占方式获取资源,返回true
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private final Sync sync = new Sync();
// 加锁
public void lock() {
sync.acquire(1);
}
// 释放锁
public void unlock() {
sync.release(1);
}
// 资源是否被占有
public boolean isLocked() {
return sync.isHeldExclusively();
}
}