源码分析AbstractQueuedSynchronizer 加锁操作
简介
在分析 Java 并发包 java.util.concurrent 源码的时候,少不了需要了解 AbstractQueuedSynchronizer(以下简写AQS),首先我们看字面意思,翻译过来就是“抽象的队列同步器”,可以知道他是一个抽象类,而且可能实现方式是队列实现,它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。
位置
我们可以找到AbstractQueuedSynchronizer他所在的位置,可以看到和他相邻位置有也有两个类,分别为:AbstractOwnableSynchronizer
和 AbstractQueuedLongSynchronizer
,但是通常我们叫AbstractQueuedSynchronizer
为AQS。
可以看到AQS是继承了AbstractOwnableSynchronizer
(我们也叫AOS), 他也是AbstractQueuedLongSynchronizer
的父类
官网解释:整体就是一个抽象的FIFO队列来完成资源获取线程排队工作,并通过一个int类变量表示持有锁的状态。
大概的工作流程就是,在资源state处只有一个线程进行工作,抢占资源state的其他的线程就会进入到队列中等待state资源的释放,一旦资源state释放了,那么队列里面的线程就会去争抢上边的资源state,没有抢到资源的线程继续回到队列中等待,往复循环,这里流程介绍大概混个眼熟,后边会详细分析。
AQS结构
我们可以先看看AQS这个抽象类里边到底有什么属性
// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;
// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
这个就是AQS的等待队列的示意图,其中每个节点都是node,线程就是被包装在node中的。
我们可以看一下node节点里边包含那些属性,从图上来看就是一个链表的数据结构,下边看一下源码:
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;
// ======== 下面的几个int常量是给waitStatus用的 ===========
/** waitStatus value to indicate thread has cancelled */
// 代码此线程取消了争抢这个锁
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
// =====================================================
// 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
// ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是线程本尊
volatile Thread thread;
}
可以看到Node节点的数据结构其实比较简单, 就是 thread + waitStatus + pre + next 四个属性而已,大家先要有这个概念在心里。
本次使用ReentrantLock 来讲解,基本结构如下:
首先,我们看 ReentrantLock 的基本使用方式:
// 我用个web开发中的service概念吧
public class OrderService {
// 使用static,这样每个线程拿到的是同一把锁,当然,spring mvc中service默认就是单例,别纠结这个
private static ReentrantLock reentrantLock = new ReentrantLock();
public void createOrder() {
// 比如我们同一时间,只允许一个线程创建订单
reentrantLock.lock();
// 通常,lock 之后紧跟着 try 语句
try {
// 这块代码同一时间只能有一个线程进来(获取到锁的线程),
// 其他的线程在lock()方法上阻塞,等待获取到锁,再进来
// 执行代码...
// 执行代码...
// 执行代码...
} finally {
// 释放锁
reentrantLock.unlock();
}
}
}
点开ReentrantLock看一下源码:
//默认就是一个非公平锁
public ReentrantLock() {
sync = new NonfairSync(); // 非公平锁
}
//通过传参来决定使用公平锁还是非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
点开lock()方法看源码:
public void lock() {
sync.lock(); //使用了一个sync的变量
}
可以看到使用了一个sync的变量,继续翻看源码,sync是个什么东西?
可以很清楚的看到,Sync是继承了AQS,本质上就是操作AQS中的相关方法。
线程如何去抢锁?
当我们第一个线程占用资源的时候,后续线程进来我们该如何让这些线程进入队列进行等待呢?代码是怎样实现这个过程的呢?我们一步一步来进行分析。
首先我们进入lock()方法查看,可以看到最终是调用了sync的lock()方法,而Sync又是继承了AQS。
继续,我们查看lock()的实现,可以发现只有两个实现方法,一个是公平锁,一个是非公平锁,本次案例讲解非公平锁。
进入实现我们可以看到非公平锁(NonfairSync)的实现:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//可以看到这个是个cas操作,当第一个线程进来的时候,这个时候资源是没有被占用的,此时进行cas操作,cas一定能够成功并且将state设置为1,这个很重要,是用来判断资源是否被线程占用的标志,最后返回true
//然后会执行setExclusiveOwnerThread()方法,参数为当前的线程,点开这个方法,其实就是把当前线程赋值给exclusiveOwnerThread
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//当第一个线程得到资源还没有释放的时候,此时第二个或者后边的线程进来的时候就会走这个分支调用acquire(1)方法,后续会详细讲解这个方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
此时我们进入acquire(1)方法查看源码,这块就是线程争抢和入队列的核心方法,主要包含tryAcquire(arg)
和 addWaiter(Node.EXCLUSIVE)
以及 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
和selfInterrupt()
这四大方法,他们分别是干什么的呢?下边会一个一个讲解。
public final void acquire(int arg) { //参数arg为1
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg) 方法
这个方法的作用:
- 尝试直接获取锁,返回值是boolean值,来表示是否获取锁。
- 返回true: 1、没有线程在等待锁。2、重入锁,线程本来就持有锁,也可以理所当然获取锁。
我们点开tryAcquire(arg)方法查看,来到了AQS这个抽象类里边,这里边tryAcquire(arg)方法并没有直接实现,而是交由子类来实现,我们找到子类实现的方法。
找到子类实现的tryAcquire(arg)方法:
继续点进实现方法nonfairTryAcquire(acquires)
查看源码:
final boolean nonfairTryAcquire(int acquires) { //源码可知参数acquires = 1
//获取到当前线程
final Thread current = Thread.currentThread();
//由上边文章分析可知,state原本是0,由于第一个线程进入后进行了cas操作,将state设置为1
int c = getState();
if (c == 0) {
//如果进入该分支,代表state = 0 ,说明资源还没有线程占用,此时线程就去争抢资源,还是进行cas操作
//如果不成功的话,只能说明资源被其它线程占用了。
//如果成功的话,进入分支
if (compareAndSetState(0, acquires)) {
//这里就获取到锁,然后标识一下是当前线程获取到锁,点开方法可以看到,也就是给exclusiveOwnerThread 变量赋值为当前线程,最后返回true
setExclusiveOwnerThread(current);
return true;
}
}
//这里说明state不等于0 但是线程是当前线程,只能说明锁重入了
else if (current == getExclusiveOwnerThread()) {
//记录一下重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果到这里,就说明线程尝试去抢占资源,但是没有成功,也就是没有获取到锁,最终返回false.
//可以回到刚刚那四大核心方法继续看:
//if (!tryAcquire(arg) &&
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
return false;
}
addWaiter(Node.EXCLUSIVE) 方法
这个方法的作用:
- 将线程包装成node,同时进入队列中
- 参数为 Node.EXCLUSIVE ,查看源码注释知道是独占模式。
源码中可以看到node中定义了各种模式:
可以简单的看一下具体含义:
然后进入到addWaiter(Node mode)
这个方法,查看源码:
private Node addWaiter(Node mode) {
//将当前线程放进node中
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//下边几行代码就是想把当前这个node加到链表的最后边去,也就是进入到阻塞队列的最后
Node pred = tail;
//tail != null ---->队列不为空才走这个分支,当tail == head的时候,也就是当线程第一次要准备加入到阻塞队列的时候,这个时候队列就 //为空,不会进入到该分支
if (pred != null) {
//将当前节点的前驱指针指向pred,也就是队列的队尾节点
node.prev = pred;
//用cas设置当前节点为队尾节点,如果成功了,那么tail == node了,也就是当前节点node成为了阻塞队列的尾节点
if (compareAndSetTail(pred, node)) {
//进入到这个分支说明cas成功了,当前节点node == tail,成为了阻塞队列的尾节点
//上边设置了node.prev = pred,加上下边这句,就实现了当前节点和前一个节点的双向连接
pred.next = node;
//线程包装的node已经入队了,可以返回了
return node;
}
}
//走到这里说明pred == null 或者cas失败(可能有其他线程在竞争入队,导致本次cas失败)
enq(node);
return node;
}
假设node3进行插入,插入结果就如下图所示:
enq(node)方法解读
方法的作用
- 采用自旋的方式入队
- 之前提过,进入这个方法的前提是等待队列为空或者cas失败(可能有其他线程在竞争入队,导致本次cas失败)
- 自旋在这边的意思就是:在cas设置当前node为tail(尾节点的时候)假设当时没有竞争成功,我就自旋多次竞争,总会排队等到竞争成功
我们查看enq(node)的源码:
private Node enq(final Node node) {
//可以看出来是一个自旋的写法
for (;;) {
Node t = tail;
//t == null 说明队列为空,会进入这个分支
if (t == null) { // Must initialize
//可以看到源码的注释,必须要初始化
//还是使cas 将head设置一个新的node(),仅仅是新new了个node
if (compareAndSetHead(new Node()))
//进入这个分支说明cas成功了,head节点设置为new Node()了
//这里提一个node节点里边的属性 waitStatus,初始化的时候waitStatus == 0
//因为这时候tail == null ,所以这里是把尾节点设置尾head
//注意这里没有返回return 而且还是在for循环里边,所以,下一次循环会进入下边else分支里
tail = head;
} else {
//下边这几行代码,还是和addWaiter()方法一样,将当前node加入到队尾
//因为在for循环里边,cas一次不成功就再试几次,总会将node加入到队尾
//加入成功后就返回尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一次初始化阻塞队列后,形成的结构如下:
当addWaiter(Node mode)
方法结束以后,又会回到之前说的那四大方法
public final void acquire(int arg) { //参数arg为1
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquireQueued(final Node node, int arg) 方法
该方法的作用为:
- 参数为node,而且经过了
addWaiter(Node.EXCLUSIVE)
这个方法,设置模式为独占模式,此时node已经进入了队列 - node里边的线程被挂起,然后被唤醒去获取锁,都是在这个方法里边实现的。
我们来看一下acquireQueued(final Node node, int arg)
方法的源码:
//整体来看里边也是用了自旋的操作
final boolean acquireQueued(final Node node, int arg) {
// 设置一个标志位,来表示成功或者失败
boolean failed = true;
try {
//设置一个中断标识
boolean interrupted = false;
for (;;) {
//获取到当前节点的前一个节点,可以自行查看源码,比较简单
final Node p = node.predecessor();
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
// 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
// 所以当前节点可以去试抢一下锁
// 这里我们说一下,为什么可以去试试:
// 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
// enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
// 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
// tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//什么情况下failed会为true
//当tryAcquire(arg) 方法抛出异常的时候
//也就是在队列中的node不想等待了,想取消竞争锁了,进行出队操作
if (failed)
cancelAcquire(node);
}
}
注意一下,如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
返回true的话,那么就会执行selfInterrupt()
方法,看名字或者点开源码看,就是中断当前线程,所以正常情况下不返回true
static void selfInterrupt() {
//中断当前线程
Thread.currentThread().interrupt();
}
shouldParkAfterFailedAcquire(p, node) 方法
该方法作用:
- 从上边源码可以得知,p为node的前置节点,node为当前节点
- 如果走到这个方法,说明上一个if没有成功,说明没有抢到锁
- 当前节点(也就是当前线程)没有抢到锁,是否需要挂起当前线程
这里为了方便查看每个状态对应的含义,我把图贴过来:
查看源码:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//定义可知,一个新的node1节点里边waitStatus被初始化为0
int ws = pred.waitStatus;
//查看源码可知 Node.SIGNAL == -1
//假设前置节点的 waitStatus == -1 说明前置节点是正常的,当前节点需要挂起,所以返回true.忘记状态代表什么含义的话可以 查看文章前边的截图
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//前置节点 waitStatus > 0的话,说明前置节点取消排队,也就不参与竞争锁了
// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
// 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
// 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
// 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 仔细想想,如果进入到这个分支意味着什么
// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
// 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
// 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
// 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 这个方法返回 false,那么会再走一次 for 循序,
// 然后再次进来此方法,此时会从第一个分支返回 true
return false;
}
parkAndCheckInterrupt() 方法
该方法作用:
- 负责线程挂起
查看源码:
// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 这个方法结束根据返回值我们简单分析下:
// 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
// 我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
// 如果返回false, 说明当前不需要被挂起,为什么呢?往后看
// 跳回到前面是这个方法
// if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt())
// interrupted = true;
// 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
// 那么需要执行parkAndCheckInterrupt():
// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况
// 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。
// 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:
// => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。
说到这里,也就明白了,多看几遍 final boolean acquireQueued(final Node node, int arg)
这个方法吧。自己推演下各个分支怎么走,哪种情况下会发生什么,走到哪里。
最后推荐一下我的个人博客,欢迎各位小伙伴访问。