关于并发编程AQS的学习

发布于:2025-05-15 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

1、AQS作用

2、核心结构

2.1、组成

2.2、关键概念

2.3、流程

2.4、消息通知逻辑

2.5、挂起线程

3、常用方法

4、应用示例

4.1、ReentrantLock的实现

4.2、CountDownLatch的实现

4.3、Semaphore的实现

5、AQS的优势

6、对比其他同步机制


前言 

       AQS(AbstractQueuedSynchronizer) 是Java并发编程中一个核心的同步器框架,位于 java.util.concurrent.locks 包下,由Doug Lea设计。它是构建锁(如 ReentrantLock)和其他同步工具(如 CountDownLatchSemaphore)的基础抽象类。

如下图所示:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    /**
     * Creates a new {@code AbstractQueuedSynchronizer} instance
     * with initial synchronization state of zero.
     */
    protected AbstractQueuedSynchronizer() { }

关于更多CountDownLatch、ReentrantLock、Semaphore可参考本人文章:

1、线程和线程池详细介绍_线程池和线程的区别-CSDN博客

2、多线程并发控制工具Semaphore的学习-CSDN博客


1、AQS作用

  • 提供模板:通过抽象方法(如 tryAcquiretryRelease)定义同步器的核心逻辑,开发者只需实现这些方法即可自定义锁或同步组件。

  • 管理线程排队:内部通过 CLH队列(一种FIFO的双向链表)实现线程的阻塞和唤醒机制,处理竞争资源的线程排队问题。

  • 支持独占/共享模式

    • 独占模式(Exclusive):同一时刻只有一个线程能获取资源(如 ReentrantLock)。

    • 共享模式(Shared):多个线程可同时获取资源(如 Semaphore)。


2、核心结构

2.1、组成

由cas、state、clh队列三部分组成。

如下图所示:

  • 状态变量(state
    通过 volatile int state 表示资源状态(如锁的重入次数、信号量的许可证数量)。

如下图所示:

 private volatile long state;
  • CLH队列
    存储等待线程的双向队列链表,节点(Node)封装线程及等待状态(如 CANCELLEDSIGNAL)。

如下图所示:

代码所示:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;

    }

2.2、关键概念

1.State

            AQS 用一个整数(int)来表示同步状态(state)。这个状态可以表示锁是否被持有、信号量的剩余许可数等。

    2.FIFO 队列

            当一个线程尝试获取锁而失败时,AQS 会将线程加入到一个 FIFO 队列中,这个队列用于管理竞争资源的线程等待顺序。

    3.独占模式和共享模式

      AQS 支持两种模式:独占模式和共享模式。

              独占模式:例如 ReentrantLock,只允许一个线程独占访问。

              共享模式:例如 CountDownLatch 和 Semaphore,多个线程可以共享访问。

      4.Node 节点

                每个等待线程在内部队列中都有一个 Node 节点,节点包含线程引用、等待状态等信息。

        2.3、流程

                假设有两个线程 ThreadA 和 ThreadB,他们尝试获取一个基于 AQS 的独占锁(例如 ReentrantLock),此时锁的初始状态是空闲的(未被任何线程持有)。

        如下图所示:

        1. 初始状态

        • 同步状态:在锁处于空闲状态时,AQS 的状态初始为 0。
        • 同步队列:队列为空,表示没有线程排队等待锁。

        2. ThreadA 获取锁(允许可重入)

        尝试获取锁

                  ThreadA 尝试通过调用 lock() 方法来获取锁。

                  AQS 通过调用 tryAcquire(int arg) 尝试获取锁。实现中常用 CAS (Compare-And-Swap) 操作将状态从 0 更改为 1。

                  如果可以成功更改状态(CAS 成功),ThreadA 获得锁。

          3. ThreadB 尝试获取锁

          锁已被 ThreadA 持有

                    当 ThreadB 调用 lock() 方法时,发现当前同步状态为 1,表示锁已被 ThreadA 持有。

                    tryAcquire(int arg) 尝试进行 CAS 操作以更改状态,但由于 ThreadA 持有锁,操作失败。

            排队等待

                    ThreadB 进入等待队列,成为一个 Node 节点并挂起。

                    AQS 将 ThreadB 的 Node 插入到同步队列中,以 FIFO 顺序排队。

            4. ThreadA 释放锁

            释放锁

                      ThreadA 对资源操作完成后,调用 unlock() 方法。

                      AQS 通过 tryRelease(int arg) 方法实现释放锁,通常 CAS 将状态更改回 0。

              通知下一个线程

                      通过 unparkSuccessor(Node node) 唤醒队列中的后继节点(ThreadB)。

                      ThreadB 被唤醒后,重新尝试获取锁。

              5. ThreadB 获取锁

              成功获取锁

                        ThreadB 在被唤醒后重新调用 tryAcquire(int arg) 以获取锁。

                        如果 CAS 操作成功(状态成功更改为 1),ThreadB 获得锁。

                2.4、消息通知逻辑

                当持有锁的 ThreadA释放锁时,它需要唤醒等待中的线程。

                这过程通常涉及:

                • 状态更新

                    ThreadA 在释放锁时会进行 tryRelease(int arg) 操作,它会检查同步状态并尝试将其更新为非锁定状态(通常将状态置为0或按可重入次数减少)。

                • 队列唤醒

                    ThreadA通过调用 LockSupport.unpark(node.thread) 来唤醒队列中的后继节点,也就是当前挂起的 ThreadB

                • 重新获取锁

                        被唤醒的 ThreadB 会重新尝试通过 tryAcquire(int arg) 方法获取锁。若成功,它便进入正常工作状态;否则,它将返回到队列中继续等待。

                2.5、挂起线程

                          实际的挂起操作通过调用 LockSupport.park() 方法来实现。这个方法会阻塞当前线程直到它被另一个线程唤醒。

                  LockSupport.park() 会将 ThreadB 挂起,使得它处于等待状态,从而不消耗 CPU 资源。


                  3、常用方法

                  • 资源获取

                    • acquire(int arg):独占模式下获取资源,内部调用 tryAcquire(需子类实现)。

                    • acquireShared(int arg):共享模式下获取资源,内部调用 tryAcquireShared

                  • 资源释放

                    • release(int arg):独占模式释放资源,调用 tryRelease

                    • releaseShared(int arg):共享模式释放资源,调用 tryReleaseShared

                  • 模板方法(需子类实现):

                  protected boolean tryAcquire(int arg);  // 独占模式尝试获取资源
                  protected boolean tryRelease(int arg);  // 独占模式尝试释放资源
                  protected int tryAcquireShared(int arg); // 共享模式尝试获取资源
                  protected boolean tryReleaseShared(int arg); // 共享模式尝试释放资源
                  boolean compareAndSetState(int expect, int update)//使用 CAS 更新同步状态。是原子操作,用于确保线程安全。
                  protected void setExclusiveOwnerThread(Thread thread)//设置当前线程为独占资源的拥有者。
                  protected Thread getExclusiveOwnerThread()//返回独占资源的当前拥有者。
                  
                  
                  • 线程挂起和唤醒

                  LockSupport.park/unpark 实现对线程的挂起和唤醒。

                  4、应用示例

                          以下是关于ReentrantLock、Semaphore、CountDownLatch三种实现类对于AQS(AbstractQueuedLongSynchronizer)抽象类的实现。

                  4.1、ReentrantLock的实现

                  • state 表示锁的重入次数:

                    • 线程首次获取锁时,state 从0变为1。

                    • 同一线程重入时,state 递增。

                  • 非公平锁直接尝试CAS修改 state,失败后进入队列。

                  public class ReentrantLock implements Lock, java.io.Serializable {
                    
                      private final Sync sync;
                  
                     
                      abstract static class Sync extends AbstractQueuedSynchronizer {
                          abstract void lock();
                  
                          protected final boolean tryRelease(int releases) {****}
                      }
                  }

                  示例1:自定义独占锁(类似ReentrantLock)

                  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
                  
                  public class MyLock {
                      private final Sync sync = new Sync();
                  
                      // 自定义同步器,继承AQS
                      private static class Sync extends AbstractQueuedSynchronizer {
                          // 尝试获取锁(state=0表示未锁定,1表示已锁定)
                          @Override
                          protected boolean tryAcquire(int arg) {
                              if (compareAndSetState(0, 1)) { // CAS操作抢锁
                                  setExclusiveOwnerThread(Thread.currentThread()); // 设置当前线程为锁持有者
                                  return true;
                              }
                              return false;
                          }
                  
                          // 尝试释放锁
                          @Override
                          protected boolean tryRelease(int arg) {
                              if (getState() == 0) throw new IllegalMonitorStateException();
                              setExclusiveOwnerThread(null); // 清空持有线程
                              setState(0); // 状态置为0
                              return true;
                          }
                  
                          // 是否被当前线程独占
                          @Override
                          protected boolean isHeldExclusively() {
                              return getState() == 1 && getExclusiveOwnerThread() == Thread.currentThread();
                          }
                      }
                  
                      // 对外暴露的加锁方法
                      public void lock() {
                          sync.acquire(1); // 调用AQS的acquire模板方法
                      }
                  
                      // 对外暴露的解锁方法
                      public void unlock() {
                          sync.release(1); // 调用AQS的release模板方法
                      }
                  }
                  
                  // 测试代码
                  public class TestMyLock {
                      private static int count = 0;
                      private static MyLock lock = new MyLock();
                  
                      public static void main(String[] args) throws InterruptedException {
                          Runnable task = () -> {
                              for (int i = 0; i < 10000; i++) {
                                  lock.lock();
                                  try {
                                      count++;
                                  } finally {
                                      lock.unlock();
                                  }
                              }
                          };
                  
                          Thread t1 = new Thread(task);
                          Thread t2 = new Thread(task);
                          t1.start();
                          t2.start();
                          t1.join();
                          t2.join();
                          System.out.println("Final count: " + count); // 正确输出20000
                      }
                  }

                  关键点

                  1. tryAcquire 和 tryRelease 是AQS要求子类实现的模板方法。

                  2. 通过CAS操作(compareAndSetState)保证原子性。

                  3. acquire 和 release 是AQS提供的模板方法,内部会处理线程排队和唤醒。

                  4.2、CountDownLatch的实现

                  • state 初始化为计数器的值(如N)。

                  • await() 调用 acquireShared,当 state=0 时唤醒所有等待线程。

                  • countDown() 调用 releaseShared,递减 state

                  示例2:自定义CountDownLatch

                  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
                  
                  public class MyCountDownLatch {
                      private final Sync sync;
                  
                      public MyCountDownLatch(int count) {
                          if (count < 0) throw new IllegalArgumentException();
                          this.sync = new Sync(count);
                      }
                  
                      // 自定义同步器(共享模式)
                      private static class Sync extends AbstractQueuedSynchronizer {
                          Sync(int count) {
                              setState(count); // 初始化状态值
                          }
                  
                          // 尝试获取共享资源(state=0时返回1,否则返回-1表示需要阻塞)
                          @Override
                          protected int tryAcquireShared(int acquires) {
                              return getState() == 0 ? 1 : -1;
                          }
                  
                          // 尝试释放共享资源(每次countDown减1)
                          @Override
                          protected boolean tryReleaseShared(int releases) {
                              for (;;) {
                                  int current = getState();
                                  if (current == 0) return false; // 已经为0,无需操作
                                  int next = current - 1;
                                  if (compareAndSetState(current, next)) {
                                      return next == 0; // 返回true表示所有线程已释放
                                  }
                              }
                          }
                      }
                  
                      public void await() throws InterruptedException {
                          sync.acquireSharedInterruptibly(1); // 调用AQS共享模式获取
                      }
                  
                      public void countDown() {
                          sync.releaseShared(1); // 调用AQS共享模式释放
                      }
                  }
                  
                  // 测试代码
                  public class TestMyLatch {
                      public static void main(String[] args) throws InterruptedException {
                          MyCountDownLatch latch = new MyCountDownLatch(2);
                  
                          Runnable task = () -> {
                              System.out.println(Thread.currentThread().getName() + " 完成任务");
                              latch.countDown();
                          };
                  
                          new Thread(task, "线程1").start();
                          new Thread(task, "线程2").start();
                  
                          latch.await(); // 主线程等待
                          System.out.println("所有任务完成!");
                      }
                  }

                  关键点

                  1. tryAcquireShared 返回1表示成功(state=0时唤醒所有等待线程),-1表示需要阻塞。

                  2. tryReleaseShared 通过CAS循环递减 state,直到为0时返回true触发唤醒。

                  4.3、Semaphore的实现

                  旨在控制对共享资源的访问,通常限制最多同时能够访问资源的线程数量。Semaphore 的实现基于 AbstractQueuedSynchronizer(AQS)的共享模式。

                  1.Semaphore 的工作原理

                  • Semaphore 维护一组许可(permits),这些许可表示可访问资源的数量。
                  • 线程在访问资源前需要获取许可,如果没有许可,则线程将会被阻塞直到有许可可用。
                  • 当一个线程完成对资源的访问后,可以释放许可,其他被阻塞的线程会因为许可的回收而被唤醒继续执行。

                  代码示例如下:

                  import java.util.concurrent.ExecutorService;
                  import java.util.concurrent.Executors;
                  import java.util.concurrent.Semaphore;
                  
                  public class SemaphoreExample {
                      private static final int THREAD_COUNT = 5; // 定义总线程数
                      private static final int MAX_AVAILABLE = 2; // 定义最大可并发访问的许可数
                  
                      public static void main(String[] args) {
                          ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
                          Semaphore semaphore = new Semaphore(MAX_AVAILABLE); // 创建信号量,最多允许2个线程同时访问
                  
                          for (int i = 0; i < THREAD_COUNT; i++) {
                              executor.execute(() -> {
                                  try {
                                      semaphore.acquire(); // 获取许可
                                      System.out.println(Thread.currentThread().getName() + " acquired semaphore.");
                                      // 模拟执行访问的操作
                                      Thread.sleep((long) (Math.random() * 1000));
                                      System.out.println(Thread.currentThread().getName() + " finished work.");
                  
                                  } catch (InterruptedException e) {
                                      Thread.currentThread().interrupt();
                                  } finally {
                                      semaphore.release(); // 释放许可
                                      System.out.println(Thread.currentThread().getName() + " released semaphore.");
                                  }
                              });
                          }
                  
                          executor.shutdown();
                      }
                  }
                  

                  代码解析

                  1. 初始化 Semaphore

                    • 用 Semaphore semaphore = new Semaphore(MAX_AVAILABLE); 初始化信号量,指定最大可用许可数。此处 MAX_AVAILABLE 是 2,代表最多允许两个线程同时访问资源。
                  2. 申请许可

                    • 使用 semaphore.acquire(); 来申请一个许可。如果当前信号量为零,线程会在此等待,直到有其他线程释放许可。
                  3. 模拟业务逻辑

                    • 在成功获得许可后,线程执行模拟任务(用 Thread.sleep() 模拟工作时间)。
                  4. 释放许可

                    • 在任务完成后,通过 semaphore.release(); 释放许可,以便为其他等待线程提供机会。
                  5. 线程池使用

                    • 使用 ExecutorService 创建固定线程池,管理线程的生命周期。executor.shutdown(); 用于优雅地关闭线程池。

                          限制对资源的并发访问和控制同时执行业务逻辑的线程数量。这在需要保护有限资源、控制吞吐量的场合非常有用。


                  5、AQS的优势

                  • 复用性:开发者无需从头实现线程排队、阻塞/唤醒机制。

                  • 灵活性:通过重写模板方法,可快速构建自定义同步器。

                  • 高性能:基于CAS和volatile变量,减少锁竞争开销。


                  6、对比其他同步机制


                  总结

                          AQS是Java并发包的基石,理解其原理能更高效地使用 ReentrantLockSemaphore 等工具,或在需要时开发自定义同步器。其核心思想是 “模板方法模式 + 资源状态管理 + 线程排队”


                  参考文章:

                  1、谈谈Java多线程离不开的AQS_java aqs-CSDN博客


                  网站公告

                  今日签到

                  点亮在社区的每一天
                  去签到