目录
前言
AQS(AbstractQueuedSynchronizer) 是Java并发编程中一个核心的同步器框架,位于 java.util.concurrent.locks
包下,由Doug Lea设计。它是构建锁(如 ReentrantLock
)和其他同步工具(如 CountDownLatch
、Semaphore
)的基础抽象类。
如下图所示:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
/**
* Creates a new {@code AbstractQueuedSynchronizer} instance
* with initial synchronization state of zero.
*/
protected AbstractQueuedSynchronizer() { }
关于更多CountDownLatch、ReentrantLock、Semaphore可参考本人文章:
2、多线程并发控制工具Semaphore的学习-CSDN博客
1、AQS作用
提供模板:通过抽象方法(如
tryAcquire
、tryRelease
)定义同步器的核心逻辑,开发者只需实现这些方法即可自定义锁或同步组件。管理线程排队:内部通过 CLH队列(一种FIFO的双向链表)实现线程的阻塞和唤醒机制,处理竞争资源的线程排队问题。
支持独占/共享模式:
独占模式(Exclusive):同一时刻只有一个线程能获取资源(如
ReentrantLock
)。共享模式(Shared):多个线程可同时获取资源(如
Semaphore
)。
2、核心结构
2.1、组成
由cas、state、clh队列三部分组成。
如下图所示:
状态变量(
state
):
通过volatile int state
表示资源状态(如锁的重入次数、信号量的许可证数量)。
如下图所示:
private volatile long state;
CLH队列:
存储等待线程的双向队列链表,节点(Node
)封装线程及等待状态(如CANCELLED
、SIGNAL
)。
如下图所示:
代码所示:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
2.2、关键概念
1.State:
AQS 用一个整数(
int
)来表示同步状态(state)。这个状态可以表示锁是否被持有、信号量的剩余许可数等。
2.FIFO 队列:
当一个线程尝试获取锁而失败时,AQS 会将线程加入到一个 FIFO 队列中,这个队列用于管理竞争资源的线程等待顺序。
3.独占模式和共享模式:
AQS 支持两种模式:独占模式和共享模式。
独占模式:例如
ReentrantLock
,只允许一个线程独占访问。共享模式:例如
CountDownLatch
和Semaphore
,多个线程可以共享访问。
4.Node 节点:
每个等待线程在内部队列中都有一个 Node 节点,节点包含线程引用、等待状态等信息。
2.3、流程
假设有两个线程 ThreadA
和 ThreadB
,他们尝试获取一个基于 AQS 的独占锁(例如 ReentrantLock
),此时锁的初始状态是空闲的(未被任何线程持有)。
如下图所示:
1. 初始状态
- 同步状态:在锁处于空闲状态时,AQS 的状态初始为 0。
- 同步队列:队列为空,表示没有线程排队等待锁。
2. ThreadA 获取锁(允许可重入)
尝试获取锁:
ThreadA 尝试通过调用 lock()
方法来获取锁。
AQS 通过调用 tryAcquire(int arg)
尝试获取锁。实现中常用 CAS (Compare-And-Swap
) 操作将状态从 0 更改为 1。
如果可以成功更改状态(CAS 成功),ThreadA 获得锁。
3. ThreadB 尝试获取锁
锁已被 ThreadA 持有:
当 ThreadB 调用 lock()
方法时,发现当前同步状态为 1,表示锁已被 ThreadA 持有。
tryAcquire(int arg)
尝试进行 CAS 操作以更改状态,但由于 ThreadA 持有锁,操作失败。
排队等待:
ThreadB 进入等待队列,成为一个 Node 节点并挂起。
AQS 将 ThreadB
的 Node 插入到同步队列中,以 FIFO 顺序排队。
4. ThreadA 释放锁
释放锁:
ThreadA 对资源操作完成后,调用 unlock()
方法。
AQS 通过 tryRelease(int arg)
方法实现释放锁,通常 CAS 将状态更改回 0。
通知下一个线程:
通过 unparkSuccessor(Node node)
唤醒队列中的后继节点(ThreadB)。
ThreadB 被唤醒后,重新尝试获取锁。
5. ThreadB 获取锁
成功获取锁:
ThreadB 在被唤醒后重新调用 tryAcquire(int arg)
以获取锁。
如果 CAS 操作成功(状态成功更改为 1),ThreadB 获得锁。
2.4、消息通知逻辑
当持有锁的 ThreadA
释放锁时,它需要唤醒等待中的线程。
这过程通常涉及:
状态更新:
ThreadA
在释放锁时会进行 tryRelease(int arg)
操作,它会检查同步状态并尝试将其更新为非锁定状态(通常将状态置为0或按可重入次数减少)。
队列唤醒:
ThreadA
通过调用 LockSupport.unpark(node.thread)
来唤醒队列中的后继节点,也就是当前挂起的 ThreadB
。
重新获取锁:
被唤醒的 ThreadB
会重新尝试通过 tryAcquire(int arg)
方法获取锁。若成功,它便进入正常工作状态;否则,它将返回到队列中继续等待。
2.5、挂起线程
实际的挂起操作通过调用 LockSupport.park()
方法来实现。这个方法会阻塞当前线程直到它被另一个线程唤醒。
LockSupport.park()
会将 ThreadB
挂起,使得它处于等待状态,从而不消耗 CPU 资源。
3、常用方法
资源获取:
acquire(int arg)
:独占模式下获取资源,内部调用tryAcquire
(需子类实现)。acquireShared(int arg)
:共享模式下获取资源,内部调用tryAcquireShared
。
资源释放:
release(int arg)
:独占模式释放资源,调用tryRelease
。releaseShared(int arg)
:共享模式释放资源,调用tryReleaseShared
。
模板方法(需子类实现):
protected boolean tryAcquire(int arg); // 独占模式尝试获取资源
protected boolean tryRelease(int arg); // 独占模式尝试释放资源
protected int tryAcquireShared(int arg); // 共享模式尝试获取资源
protected boolean tryReleaseShared(int arg); // 共享模式尝试释放资源
boolean compareAndSetState(int expect, int update)//使用 CAS 更新同步状态。是原子操作,用于确保线程安全。
protected void setExclusiveOwnerThread(Thread thread)//设置当前线程为独占资源的拥有者。
protected Thread getExclusiveOwnerThread()//返回独占资源的当前拥有者。
线程挂起和唤醒
LockSupport.park/unpark 实现对线程的挂起和唤醒。
4、应用示例
以下是关于ReentrantLock、Semaphore、CountDownLatch三种实现类对于AQS(AbstractQueuedLongSynchronizer)抽象类的实现。
4.1、ReentrantLock的实现
state
表示锁的重入次数:线程首次获取锁时,
state
从0变为1。同一线程重入时,
state
递增。
非公平锁直接尝试CAS修改
state
,失败后进入队列。
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
protected final boolean tryRelease(int releases) {****}
}
}
示例1:自定义独占锁(类似ReentrantLock)
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyLock {
private final Sync sync = new Sync();
// 自定义同步器,继承AQS
private static class Sync extends AbstractQueuedSynchronizer {
// 尝试获取锁(state=0表示未锁定,1表示已锁定)
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) { // CAS操作抢锁
setExclusiveOwnerThread(Thread.currentThread()); // 设置当前线程为锁持有者
return true;
}
return false;
}
// 尝试释放锁
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); // 清空持有线程
setState(0); // 状态置为0
return true;
}
// 是否被当前线程独占
@Override
protected boolean isHeldExclusively() {
return getState() == 1 && getExclusiveOwnerThread() == Thread.currentThread();
}
}
// 对外暴露的加锁方法
public void lock() {
sync.acquire(1); // 调用AQS的acquire模板方法
}
// 对外暴露的解锁方法
public void unlock() {
sync.release(1); // 调用AQS的release模板方法
}
}
// 测试代码
public class TestMyLock {
private static int count = 0;
private static MyLock lock = new MyLock();
public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final count: " + count); // 正确输出20000
}
}
关键点:
tryAcquire
和tryRelease
是AQS要求子类实现的模板方法。通过CAS操作(
compareAndSetState
)保证原子性。acquire
和release
是AQS提供的模板方法,内部会处理线程排队和唤醒。
4.2、CountDownLatch的实现
state
初始化为计数器的值(如N)。await()
调用acquireShared
,当state=0
时唤醒所有等待线程。countDown()
调用releaseShared
,递减state
。
示例2:自定义CountDownLatch
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyCountDownLatch {
private final Sync sync;
public MyCountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException();
this.sync = new Sync(count);
}
// 自定义同步器(共享模式)
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // 初始化状态值
}
// 尝试获取共享资源(state=0时返回1,否则返回-1表示需要阻塞)
@Override
protected int tryAcquireShared(int acquires) {
return getState() == 0 ? 1 : -1;
}
// 尝试释放共享资源(每次countDown减1)
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
if (current == 0) return false; // 已经为0,无需操作
int next = current - 1;
if (compareAndSetState(current, next)) {
return next == 0; // 返回true表示所有线程已释放
}
}
}
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 调用AQS共享模式获取
}
public void countDown() {
sync.releaseShared(1); // 调用AQS共享模式释放
}
}
// 测试代码
public class TestMyLatch {
public static void main(String[] args) throws InterruptedException {
MyCountDownLatch latch = new MyCountDownLatch(2);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName() + " 完成任务");
latch.countDown();
};
new Thread(task, "线程1").start();
new Thread(task, "线程2").start();
latch.await(); // 主线程等待
System.out.println("所有任务完成!");
}
}
关键点:
tryAcquireShared
返回1表示成功(state=0
时唤醒所有等待线程),-1表示需要阻塞。tryReleaseShared
通过CAS循环递减state
,直到为0时返回true触发唤醒。
4.3、Semaphore的实现
旨在控制对共享资源的访问,通常限制最多同时能够访问资源的线程数量。Semaphore
的实现基于 AbstractQueuedSynchronizer
(AQS)的共享模式。
1.Semaphore 的工作原理
Semaphore
维护一组许可(permits),这些许可表示可访问资源的数量。- 线程在访问资源前需要获取许可,如果没有许可,则线程将会被阻塞直到有许可可用。
- 当一个线程完成对资源的访问后,可以释放许可,其他被阻塞的线程会因为许可的回收而被唤醒继续执行。
代码示例如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private static final int THREAD_COUNT = 5; // 定义总线程数
private static final int MAX_AVAILABLE = 2; // 定义最大可并发访问的许可数
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
Semaphore semaphore = new Semaphore(MAX_AVAILABLE); // 创建信号量,最多允许2个线程同时访问
for (int i = 0; i < THREAD_COUNT; i++) {
executor.execute(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " acquired semaphore.");
// 模拟执行访问的操作
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " finished work.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " released semaphore.");
}
});
}
executor.shutdown();
}
}
代码解析
初始化 Semaphore:
- 用
Semaphore semaphore = new Semaphore(MAX_AVAILABLE);
初始化信号量,指定最大可用许可数。此处MAX_AVAILABLE
是 2,代表最多允许两个线程同时访问资源。
- 用
申请许可:
- 使用
semaphore.acquire();
来申请一个许可。如果当前信号量为零,线程会在此等待,直到有其他线程释放许可。
- 使用
模拟业务逻辑:
- 在成功获得许可后,线程执行模拟任务(用
Thread.sleep()
模拟工作时间)。
- 在成功获得许可后,线程执行模拟任务(用
释放许可:
- 在任务完成后,通过
semaphore.release();
释放许可,以便为其他等待线程提供机会。
- 在任务完成后,通过
线程池使用:
- 使用
ExecutorService
创建固定线程池,管理线程的生命周期。executor.shutdown();
用于优雅地关闭线程池。
- 使用
限制对资源的并发访问和控制同时执行业务逻辑的线程数量。这在需要保护有限资源、控制吞吐量的场合非常有用。
5、AQS的优势
复用性:开发者无需从头实现线程排队、阻塞/唤醒机制。
灵活性:通过重写模板方法,可快速构建自定义同步器。
高性能:基于CAS和volatile变量,减少锁竞争开销。
6、对比其他同步机制
总结
AQS是Java并发包的基石,理解其原理能更高效地使用
ReentrantLock
、Semaphore
等工具,或在需要时开发自定义同步器。其核心思想是 “模板方法模式 + 资源状态管理 + 线程排队”。
参考文章: