引言
上篇文章讲解了CountDownLatch源码,底层是继承了AQS基类调用父类和重写父类方法实现的,本文将简介AQS源码和架构设计,帮助我们更深入理解多线程实战。
源码架构
1. 状态变量 state
AQS 使用一个 int 类型的变量 state 来表示同步状态。这个变量是 volatile 修饰的,保证了多线程之间的可见性。不同的同步器对 state 有不同的定义和使用方式,例如在 ReentrantLock 中,state 表示锁的重入次数;在 CountDownLatch 中,state 表示计数器的值。
private volatile int state;
2. 双向队列
AQS 内部维护了一个双向链表队列,用于存储等待获取锁的线程。这个队列是 CLH(Craig, Landin, and Hagersten)锁队列的变种。每个节点(Node)代表一个等待线程,包含线程引用、等待状态等信息。
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 线程已取消
static final int CANCELLED = 1;
// 后继节点需要被唤醒
static final int SIGNAL = -1;
// 线程在等待条件
static final int CONDITION = -2;
// 表示下一次共享式同步状态获取将会无条件地传播下去
static final int PROPAGATE = -3;
// 等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点所包装的线程
volatile Thread thread;
// 指向下一个处于 CONDITION 状态的节点
Node nextWaiter;
...
}
而双向队列细分为条件队列和同步队列
同步队列
同步队列是 AQS 的核心队列,用于存放等待获取锁的线程节点。当线程尝试获取锁失败时,会被封装成节点加入到这个队列中等待。
在 AQS 类中有两个关键属性用于表示同步队列的头节点和尾节点:
private transient volatile Node head;
private transient volatile Node tail;
head 指向队列的头节点,tail 指向队列的尾节点,这两个属性都是 volatile 修饰的,保证了多线程环境下的可见性。
节点加入同步队列(addWaiter 方法)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速将节点插入到队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速插入失败,使用 enq 方法插入
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 队列还未初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
条件队列
条件队列用于实现线程的等待 - 通知机制,一个 AQS 可以有多个条件队列,每个 Condition 对象对应一个条件队列。
关键类 ConditionObject
ConditionObject 是 AQS 中实现条件队列的内部类,它包含一个指向条件队列头节点和尾节点的引用
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
...
}
线程进入条件队列(await 方法)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点已取消,清除队列中所有已取消的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
唤醒条件队列中的线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 将节点的等待状态从 CONDITION 改为 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将节点从条件队列转移到同步队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}