源码分析AbstractQueuedSynchronizer 加锁操作

发布于:2022-12-27 ⋅ 阅读:(426) ⋅ 点赞:(0)

源码分析AbstractQueuedSynchronizer 加锁操作

简介

在分析 Java 并发包 java.util.concurrent 源码的时候,少不了需要了解 AbstractQueuedSynchronizer(以下简写AQS),首先我们看字面意思,翻译过来就是“抽象的队列同步器”,可以知道他是一个抽象类,而且可能实现方式是队列实现,它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。

位置

在这里插入图片描述

我们可以找到AbstractQueuedSynchronizer他所在的位置,可以看到和他相邻位置有也有两个类,分别为:AbstractOwnableSynchronizer AbstractQueuedLongSynchronizer,但是通常我们叫AbstractQueuedSynchronizer 为AQS。

在这里插入图片描述

可以看到AQS是继承了AbstractOwnableSynchronizer (我们也叫AOS), 他也是AbstractQueuedLongSynchronizer的父类

官网解释:整体就是一个抽象的FIFO队列来完成资源获取线程排队工作,并通过一个int类变量表示持有锁的状态。

在这里插入图片描述

在这里插入图片描述

​ 大概的工作流程就是,在资源state处只有一个线程进行工作,抢占资源state的其他的线程就会进入到队列中等待state资源的释放,一旦资源state释放了,那么队列里面的线程就会去争抢上边的资源state,没有抢到资源的线程继续回到队列中等待,往复循环,这里流程介绍大概混个眼熟,后边会详细分析。

AQS结构

我们可以先看看AQS这个抽象类里边到底有什么属性

// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

这个就是AQS的等待队列的示意图,其中每个节点都是node,线程就是被包装在node中的。

在这里插入图片描述

我们可以看一下node节点里边包含那些属性,从图上来看就是一个链表的数据结构,下边看一下源码:

static final class Node {
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个int常量是给waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    // =====================================================


    // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
    //    ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程本尊
    volatile Thread thread;

}

可以看到Node节点的数据结构其实比较简单, 就是 thread + waitStatus + pre + next 四个属性而已,大家先要有这个概念在心里。

本次使用ReentrantLock 来讲解,基本结构如下:

在这里插入图片描述

首先,我们看 ReentrantLock 的基本使用方式:

// 我用个web开发中的service概念吧
public class OrderService {
    // 使用static,这样每个线程拿到的是同一把锁,当然,spring mvc中service默认就是单例,别纠结这个
    private static ReentrantLock reentrantLock = new ReentrantLock();

    public void createOrder() {
        // 比如我们同一时间,只允许一个线程创建订单
        reentrantLock.lock();
        // 通常,lock 之后紧跟着 try 语句
        try {
            // 这块代码同一时间只能有一个线程进来(获取到锁的线程),
            // 其他的线程在lock()方法上阻塞,等待获取到锁,再进来
            // 执行代码...
            // 执行代码...
            // 执行代码...
        } finally {
            // 释放锁
            reentrantLock.unlock();
        }
    }
}

点开ReentrantLock看一下源码:

//默认就是一个非公平锁
public ReentrantLock() {
    sync = new NonfairSync(); // 非公平锁
}

//通过传参来决定使用公平锁还是非公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

点开lock()方法看源码:

public void lock() {
    sync.lock();  //使用了一个sync的变量
}

可以看到使用了一个sync的变量,继续翻看源码,sync是个什么东西?

在这里插入图片描述

可以很清楚的看到,Sync是继承了AQS,本质上就是操作AQS中的相关方法。

线程如何去抢锁?

当我们第一个线程占用资源的时候,后续线程进来我们该如何让这些线程进入队列进行等待呢?代码是怎样实现这个过程的呢?我们一步一步来进行分析。

首先我们进入lock()方法查看,可以看到最终是调用了sync的lock()方法,而Sync又是继承了AQS。

在这里插入图片描述

继续,我们查看lock()的实现,可以发现只有两个实现方法,一个是公平锁,一个是非公平锁,本次案例讲解非公平锁。

在这里插入图片描述

进入实现我们可以看到非公平锁(NonfairSync)的实现:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        //可以看到这个是个cas操作,当第一个线程进来的时候,这个时候资源是没有被占用的,此时进行cas操作,cas一定能够成功并且将state设置为1,这个很重要,是用来判断资源是否被线程占用的标志,最后返回true
        //然后会执行setExclusiveOwnerThread()方法,参数为当前的线程,点开这个方法,其实就是把当前线程赋值给exclusiveOwnerThread
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //当第一个线程得到资源还没有释放的时候,此时第二个或者后边的线程进来的时候就会走这个分支调用acquire(1)方法,后续会详细讲解这个方法
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

此时我们进入acquire(1)方法查看源码,这块就是线程争抢和入队列的核心方法,主要包含tryAcquire(arg) addWaiter(Node.EXCLUSIVE)

以及 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) selfInterrupt()这四大方法,他们分别是干什么的呢?下边会一个一个讲解。

public final void acquire(int arg) {  //参数arg为1
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg) 方法

这个方法的作用:

  1. 尝试直接获取锁,返回值是boolean值,来表示是否获取锁。
  2. 返回true: 1、没有线程在等待锁。2、重入锁,线程本来就持有锁,也可以理所当然获取锁。

我们点开tryAcquire(arg)方法查看,来到了AQS这个抽象类里边,这里边tryAcquire(arg)方法并没有直接实现,而是交由子类来实现,我们找到子类实现的方法。

在这里插入图片描述

找到子类实现的tryAcquire(arg)方法:

在这里插入图片描述

继续点进实现方法nonfairTryAcquire(acquires)查看源码:

final boolean nonfairTryAcquire(int acquires) { //源码可知参数acquires = 1
           //获取到当前线程
            final Thread current = Thread.currentThread();
           //由上边文章分析可知,state原本是0,由于第一个线程进入后进行了cas操作,将state设置为1
            int c = getState();
    
            if (c == 0) {
                //如果进入该分支,代表state = 0 ,说明资源还没有线程占用,此时线程就去争抢资源,还是进行cas操作
                //如果不成功的话,只能说明资源被其它线程占用了。
                //如果成功的话,进入分支
                if (compareAndSetState(0, acquires)) {
                    //这里就获取到锁,然后标识一下是当前线程获取到锁,点开方法可以看到,也就是给exclusiveOwnerThread						变量赋值为当前线程,最后返回true
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
    		//这里说明state不等于0 但是线程是当前线程,只能说明锁重入了
            else if (current == getExclusiveOwnerThread()) {
                //记录一下重入次数
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
    		//如果到这里,就说明线程尝试去抢占资源,但是没有成功,也就是没有获取到锁,最终返回false.
    		//可以回到刚刚那四大核心方法继续看:
    		//if (!tryAcquire(arg) &&
            //       acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        	//			selfInterrupt();
            return false;
        }

addWaiter(Node.EXCLUSIVE) 方法

这个方法的作用:

  1. 将线程包装成node,同时进入队列中
  2. 参数为 Node.EXCLUSIVE ,查看源码注释知道是独占模式。

源码中可以看到node中定义了各种模式:

在这里插入图片描述

可以简单的看一下具体含义:

在这里插入图片描述

然后进入到addWaiter(Node mode)这个方法,查看源码:

private Node addWaiter(Node mode) {
    //将当前线程放进node中
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //下边几行代码就是想把当前这个node加到链表的最后边去,也就是进入到阻塞队列的最后
    Node pred = tail;
    //tail != null ---->队列不为空才走这个分支,当tail == head的时候,也就是当线程第一次要准备加入到阻塞队列的时候,这个时候队列就	   //为空,不会进入到该分支
    if (pred != null) {
        //将当前节点的前驱指针指向pred,也就是队列的队尾节点
        node.prev = pred;
        //用cas设置当前节点为队尾节点,如果成功了,那么tail == node了,也就是当前节点node成为了阻塞队列的尾节点
        if (compareAndSetTail(pred, node)) {
            //进入到这个分支说明cas成功了,当前节点node == tail,成为了阻塞队列的尾节点
            //上边设置了node.prev = pred,加上下边这句,就实现了当前节点和前一个节点的双向连接
            pred.next = node;
            //线程包装的node已经入队了,可以返回了
            return node;
        }
    }
    //走到这里说明pred == null 或者cas失败(可能有其他线程在竞争入队,导致本次cas失败)
    enq(node);
    return node;
}

假设node3进行插入,插入结果就如下图所示:

在这里插入图片描述

enq(node)方法解读

方法的作用

  1. 采用自旋的方式入队
  2. 之前提过,进入这个方法的前提是等待队列为空或者cas失败(可能有其他线程在竞争入队,导致本次cas失败)
  3. 自旋在这边的意思就是:在cas设置当前node为tail(尾节点的时候)假设当时没有竞争成功,我就自旋多次竞争,总会排队等到竞争成功

我们查看enq(node)的源码:

private Node enq(final Node node) {
    //可以看出来是一个自旋的写法
        for (;;) {
            Node t = tail;
            //t == null 说明队列为空,会进入这个分支
            if (t == null) { // Must initialize
                //可以看到源码的注释,必须要初始化
                //还是使cas 将head设置一个新的node(),仅仅是新new了个node
                if (compareAndSetHead(new Node()))
                    //进入这个分支说明cas成功了,head节点设置为new Node()了
                    //这里提一个node节点里边的属性 waitStatus,初始化的时候waitStatus == 0 
                    //因为这时候tail == null ,所以这里是把尾节点设置尾head
                    //注意这里没有返回return 而且还是在for循环里边,所以,下一次循环会进入下边else分支里
                    tail = head;
            } else {
                //下边这几行代码,还是和addWaiter()方法一样,将当前node加入到队尾
                //因为在for循环里边,cas一次不成功就再试几次,总会将node加入到队尾
                //加入成功后就返回尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

第一次初始化阻塞队列后,形成的结构如下:

在这里插入图片描述

addWaiter(Node mode)方法结束以后,又会回到之前说的那四大方法

public final void acquire(int arg) {  //参数arg为1
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquireQueued(final Node node, int arg) 方法

该方法的作用为:

  1. 参数为node,而且经过了addWaiter(Node.EXCLUSIVE)这个方法,设置模式为独占模式,此时node已经进入了队列
  2. node里边的线程被挂起,然后被唤醒去获取锁,都是在这个方法里边实现的。

我们来看一下acquireQueued(final Node node, int arg)方法的源码:

//整体来看里边也是用了自旋的操作
final boolean acquireQueued(final Node node, int arg) {
    	// 设置一个标志位,来表示成功或者失败
        boolean failed = true;
        try {
            //设置一个中断标识
            boolean interrupted = false;
            for (;;) {
                //获取到当前节点的前一个节点,可以自行查看源码,比较简单
                final Node p = node.predecessor();
                // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
                // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
                // 所以当前节点可以去试抢一下锁
                // 这里我们说一下,为什么可以去试试:
                // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
                // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
                // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
                // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //什么情况下failed会为true
            //当tryAcquire(arg) 方法抛出异常的时候
            //也就是在队列中的node不想等待了,想取消竞争锁了,进行出队操作
            if (failed)
                cancelAcquire(node);
        }
    }

注意一下,如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,那么就会执行selfInterrupt()方法,看名字或者点开源码看,就是中断当前线程,所以正常情况下不返回true

static void selfInterrupt() {
    //中断当前线程
        Thread.currentThread().interrupt();
    }
shouldParkAfterFailedAcquire(p, node) 方法

该方法作用:

  1. 从上边源码可以得知,p为node的前置节点,node为当前节点
  2. 如果走到这个方法,说明上一个if没有成功,说明没有抢到锁
  3. 当前节点(也就是当前线程)没有抢到锁,是否需要挂起当前线程

这里为了方便查看每个状态对应的含义,我把图贴过来:

在这里插入图片描述

查看源码:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //定义可知,一个新的node1节点里边waitStatus被初始化为0
        int ws = pred.waitStatus;
    //查看源码可知 Node.SIGNAL == -1 
    //假设前置节点的 waitStatus == -1 说明前置节点是正常的,当前节点需要挂起,所以返回true.忘记状态代表什么含义的话可以        查看文章前边的截图
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //前置节点 waitStatus > 0的话,说明前置节点取消排队,也就不参与竞争锁了
        // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
        // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
        // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
        // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 仔细想想,如果进入到这个分支意味着什么
            // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
            // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
            // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
            // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 这个方法返回 false,那么会再走一次 for 循序,
        // 然后再次进来此方法,此时会从第一个分支返回 true
        return false;
    }
parkAndCheckInterrupt() 方法

该方法作用:

  1. 负责线程挂起

查看源码:

// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 这个方法结束根据返回值我们简单分析下:
// 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
//        我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
// 如果返回false, 说明当前不需要被挂起,为什么呢?往后看

// 跳回到前面是这个方法
// if (shouldParkAfterFailedAcquire(p, node) &&
//                parkAndCheckInterrupt())
//                interrupted = true;

// 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
// 那么需要执行parkAndCheckInterrupt():

// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

// 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况

// 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。

// 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:
// => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。

说到这里,也就明白了,多看几遍 final boolean acquireQueued(final Node node, int arg) 这个方法吧。自己推演下各个分支怎么走,哪种情况下会发生什么,走到哪里。
最后推荐一下我的个人博客,欢迎各位小伙伴访问。

本文含有隐藏内容,请 开通VIP 后查看