@TOC
1. AQS的使用原因
2. Semaphore和AQS的关系
不止Samaphore
,其他协助类和锁,例如CountDownLatch
, ReentrantLock
都是内部引入Sync
类,这个类继承AQS
,因此可以实现AQS
的功能
3. AQS的理解和作用
3.1 AQS的地位
3.2 AQS的作用
4. AQS内部原理解析
4.1 state状态
在可重入锁中的使用原理
4.2 控制线程枪锁和配合的FIFO队列
4.3 获取/释放等重要方法
获取方法
释放方法
不同的协作类有不同的实现
5. AQS应用实例、源码解析
5.1 AQS用法
5.2 CountDownLatch源码分析
构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
将count
传入到Sync
类进行处理
getCount
public long getCount() {
return sync.getCount();
}
最终会调用AQS的获取状态方法
countDown
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 倒数到0
doReleaseShared(); // 将等待的线程全部唤醒
return true;
}
return false;
}
tryReleaseShared
的实现
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) { // CAS自旋
int c = getState();
if (c == 0) // 已经释放过,无需再释放
return false;
int nextc = c-1; // 减1
if (compareAndSetState(c, nextc)) // cas更新
return nextc == 0; // 倒数完了,返回true
}
}
await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//包装的node节点放线程
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 将线程挂起, 进行阻塞
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5.3 AQS在Semaphore的应用
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
- 会调用AQS类的
tryAcquireShared
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 剩下的许可证-要获取的数量,负数,表示不能获取,需要等待
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
方法的实现在Semaphore
中的实现有公平和非公平两种非公平
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // cas自旋
int available = getState(); // 获取当前剩余许可证的数量
int remaining = available - acquires;// 为负数,表示获取不到
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
5.4 AQS在ReentrantLock的应用
释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { // 返回true,锁被释放
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒线程
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 状态减1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 状态为0时,才释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
加锁方法
- 先判断
state
是不是为0
, 然后判断当前线程是否持有锁,如果持有,可以重入,如果都不是,当前线程不能拿到锁,只能放到等待队列中
public void lock() {
sync.lock();
}
abstract void lock();
final void lock() {
if (compareAndSetState(0, 1)) // cas判断
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 上不了锁,会执行此方法
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 放到队列
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 锁重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // 被其他线程持有,不能上锁
}
本文含有隐藏内容,请 开通VIP 后查看