多线程学习笔记-5.AQS

发布于:2022-12-14 ⋅ 阅读:(308) ⋅ 点赞:(0)

@TOC

1. AQS的使用原因

在这里插入图片描述在这里插入图片描述

2. Semaphore和AQS的关系

在这里插入图片描述
不止Samaphore,其他协助类和锁,例如CountDownLatch, ReentrantLock都是内部引入Sync类,这个类继承AQS,因此可以实现AQS的功能
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3. AQS的理解和作用

3.1 AQS的地位

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2 AQS的作用

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

4. AQS内部原理解析

在这里插入图片描述

4.1 state状态

在这里插入图片描述
在这里插入图片描述

在可重入锁中的使用原理

在这里插入图片描述

4.2 控制线程枪锁和配合的FIFO队列

在这里插入图片描述
在这里插入图片描述

4.3 获取/释放等重要方法

在这里插入图片描述

获取方法

在这里插入图片描述

释放方法

在这里插入图片描述

不同的协作类有不同的实现

在这里插入图片描述

5. AQS应用实例、源码解析

5.1 AQS用法

在这里插入图片描述

5.2 CountDownLatch源码分析

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

构造方法

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

count传入到Sync类进行处理

在这里插入图片描述

getCount

 public long getCount() {
        return sync.getCount();
    }

最终会调用AQS的获取状态方法

在这里插入图片描述

countDown

 public void countDown() {
        sync.releaseShared(1);
    }
 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 倒数到0
            doReleaseShared(); // 将等待的线程全部唤醒
            return true;
        }
        return false;
    }

tryReleaseShared的实现

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) { // CAS自旋
                int c = getState();
                if (c == 0) // 已经释放过,无需再释放
                    return false;
                int nextc = c-1; // 减1
                if (compareAndSetState(c, nextc)) // cas更新
                    return nextc == 0; // 倒数完了,返回true
            }
        }

await

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

在这里插入图片描述
在这里插入图片描述

  private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//包装的node节点放线程
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 将线程挂起, 进行阻塞
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

5.3 AQS在Semaphore的应用

在这里插入图片描述
在这里插入图片描述

public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

  • 会调用AQS类的tryAcquireShared
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) // 剩下的许可证-要获取的数量,负数,表示不能获取,需要等待
            doAcquireSharedInterruptibly(arg);
    }
  • tryAcquireShared方法的实现在Semaphore中的实现有公平和非公平两种

  • 非公平

 protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
 final int nonfairTryAcquireShared(int acquires) {
            for (;;) { // cas自旋
                int available = getState(); // 获取当前剩余许可证的数量
                int remaining = available - acquires;// 为负数,表示获取不到
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

5.4 AQS在ReentrantLock的应用

释放锁

在这里插入图片描述

public void unlock() {
        sync.release(1);
    }
 public final boolean release(int arg) {
        if (tryRelease(arg)) { // 返回true,锁被释放
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 唤醒线程
            return true;
        }
        return false;
    }
 protected final boolean tryRelease(int releases) {
            int c = getState() - releases; // 状态减1
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { // 状态为0时,才释放
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

加锁方法

  • 先判断state是不是为0, 然后判断当前线程是否持有锁,如果持有,可以重入,如果都不是,当前线程不能拿到锁,只能放到等待队列中
public void lock() {
        sync.lock();
    }
abstract void lock();

   final void lock() {
            if (compareAndSetState(0, 1)) // cas判断
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1); // 上不了锁,会执行此方法
        }

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 放到队列
            selfInterrupt();
    }
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 锁重入
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; // 被其他线程持有,不能上锁
        }
本文含有隐藏内容,请 开通VIP 后查看