目录
〇、简介
读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。读锁是一个共享锁,写锁是一个互斥锁(排他锁)。
ReentrantReadWriteLock就是JUC中提供的读写锁工具类。之前我们学的ReentrantLock是一把互斥锁,而这个ReentrantReadWriteLock就可以实现共享锁(读锁)的功能。它是基于AQS提供的共享锁实现方法。
一、特性
1.1 互斥关系
读写锁具有以下特性:
是否互斥 |
读 |
写 |
读 |
否 |
是 |
写 |
是 |
是 |
可以看到,读写锁除了读读不互斥,其他的读写、写读、写写都是互斥的。
需要注意的是,上面都是基于不同线程对同一资源的操作来说的,因为相同的线程一定不互斥,ReentrantReadWriteLock是可重入的锁,相同的线程重复获取同一个锁是没有问题的,当然同一个线程的锁升级是不被允许的。
1.2 锁升级和锁降级
锁升级和锁降级主要是对同一个线程获取锁的规则限制,如果是不同线程获取锁的互斥规则就按照上一节讲的特性来执行。
读写锁允许锁降级,不允许锁升级,比如当一个线程持有读写锁,它先去获取了一个写锁,然后该线程再去获取读锁,这属于锁降级,是允许的,还有就是这个线程先获取了一个写锁,再去获取一个写锁,或者该线程先获取了一个读锁,再获取一个读锁,这种锁是平级变化的也是允许的。但是如果该线程获取了读锁,然后该线程再去获取写锁,这属于锁升级,是不允许的。
在讲解完源码之后,我们会在文章的最后基于源码,来举几个例子说明为什么ReentrantReadWriteLock不允许锁升级,只允许锁降级。
二、ReentrantReadWriteLock的使用案例
ReentrantReadWriteLock不同线程之间只允许读读并发,其他情况都是互斥的。
2.1 关于读读并发需要注意的事项
我们先用一个案例来文字说明一下ReentrantReadWriteLock的执行流程:
比如先有一个t1线程获取到写锁,等待队列中有一些其他线程的读锁或写锁在阻塞等待。
当t1释放锁之后会按照FIFO的原则去唤醒在等待队列中等待的线程;如果第一个被唤醒的线程t2是尝试获取写锁,则无可厚非,可以让t2获取写锁,因为现在已经没有线程持有读写锁了;但不会再跟着唤醒t3,只有等t2执行完成之后才会去唤醒t3;假设被唤醒的t3是尝试获取读锁,那么t3会去判断它在等待队列中的下一个t4线程是不是要获取读锁,如果是读锁则把t4唤醒;t4唤醒之后会判断后面的t5是不是读锁;如果t5也是则唤醒t5;依次类推;但是假设判断到了t6时,发现t6是要尝试获取写锁则就不会唤醒t6了;即使后面的t7是读锁也不会唤醒t7,唤醒过程就终止在t6了,t6和t6后面的线程继续在等待队列中等待;下面这个代码说明了这个现象
public class LockDemo {
// 创建ReentrantReadWriteLock读写锁对象
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
// 通过读写锁对象获取读锁
static Lock r = rwl.readLock();
// 通过读写锁对象获取写锁
static Lock w = rwl.writeLock();
public static void main(String[] args) throws InterruptedException {
/**
* t1 最先拿到写(W)锁,然后睡眠了5s
* 等t1释放了锁之后才会叫醒别人
*/
Thread t1 = new Thread(() -> {
w.lock();
try {
log.debug("t1 +");
TimeUnit.SECONDS.sleep(5);
log.debug("5s 之后");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
w.unlock();
}
}, "t1");
t1.start();
TimeUnit.SECONDS.sleep(1);
/**
* t1在睡眠的过程中,t2不能拿到锁读锁,因为读写互斥
* t2就会一直阻塞等待t1释放写锁
*/
Thread t2 = new Thread(() -> {
try {
r.lock();
log.debug("t2----+-------");
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
log.debug("t2-----jian-------");
r.unlock();
}
}, "t2");
t2.start();
TimeUnit.SECONDS.sleep(1);
/**
* t1在睡眠的过程中,t3也不能拿到读锁,因为读写互斥
* t3也一直阻塞等待
*
* 当t1释放写锁之后,t3和t2能同时拿到读锁,因为读读并发
*/
Thread t3 = new Thread(() -> {
try {
r.lock();
log.debug("t3----+-------");
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
log.debug("t3----释放-------");
r.unlock();
}
}, "t3");
t3.start();
/**
* t4尝试获取写锁
* t1睡眠的时候,t4也阻塞,在同步队列中的顺序应该 t2 t3 t4
* 当t1将写锁释放之后,就会开始唤醒同步队列的锁,唤醒t2后因为t2是读锁,所以继续向后判断t3,
* 发现t3也是读锁就也将t3唤醒,然后再去判断t4,发现t4是要获取写锁,则不再唤醒t4,也不再向后判断了,唤醒操作中止在t4线程
*/
Thread t4 = new Thread(() -> {
try {
w.lock();
log.debug("t4--------+---");
TimeUnit.SECONDS.sleep(10);
log.debug("t4--------醒来---");
} catch (Exception e) {
e.printStackTrace();
} finally {
log.debug("t4--------jian---");
w.unlock();
}
}, "t4");
t4.start();
/**
* t5 是读锁
* 因为唤醒操作中止在t4了,t5也不会被唤醒,继续在等待队列中等待
*/
Thread t5 = new Thread(() -> {
try {
r.lock();
log.debug("t5--------+---");
} catch (Exception e) {
e.printStackTrace();
} finally {
log.debug("t5--------jian---");
r.unlock();
}
}, "t5");
t5.start();
}
}
在了解了ReentrantReadWriteLock使用案例后,我们再来看是如何实现读写锁的。
三、继承关系
我们先来看一下ReentrantReadWriteLock这个类的主要结构。
ReentrantReadWriteLock中的类分成三个部分:
- ReentrantReadWriteLock本身实现了ReadWriteLock接口,这个接口只提供了两个方法readLock()和writeLock();
- 同步器,包含一个继承了AQS的Sync内部类,以及其两个子类FairSync和NonfairSync;
- ReadLock和WriteLock两个内部类实现了Lock接口,它们具有锁的一些特性。
读写锁的数据结构:
四、源码分析
4.1 读写状态的设计
在我们详细分析ReentrantReadWriteLock源码之前,需要先了解一下该类的读写锁设计,因为这里和以前其他的锁不太一样,需要单独拿出来讲一下。
ReentrantReadWriteLock仍然是基于AQS,也是依旧使用AQS的state字段来表示读写锁被持有的次数(同步状态),高16位用来标记读锁的同步状态,低16位用来标记写锁的同步状态。
// ReentrantReadWriteLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
// 划分的边界线,用16位来划分
static final int SHARED_SHIFT = 16;
// 读锁的基本单位,也就是读锁加1或者减1的基本单位(1左移16位后的值)
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁的最大值(在计算读锁的时候需要先右移16位)
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁的掩码,state值与掩码做与运算后得到写锁的真实值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 获取读锁被占用的次数
static int sharedCount(int c){
return c >>> SHARED_SHIFT;
}
// 获取写锁被占用的次数
static int exclusiveCount(int c){
return c & EXCLUSIVE_MASK;
}
}
结构如下图所示:
举个例子,比如state当前为
0010 1011 0001 1010 1110 1000 1011 0101
这个二进制数的前16位就标识读锁的同步状态,后16位就是写锁的同步状态
4.1.1 使用sharedCount()计算读锁被占用的次数
将state向右无符号移动16位,这个操作就会将后16位移除,然后前16位就移动到了后16位的位置上,并用0来补齐,结果如下:
0010 1011 0001 1010 1110 1000 1011 0101
无符号位右移16位
0000 0000 0000 0000 0010 1011 0001 1010
4.1.2 使用exclusiveCount()计算写锁被占用的次数
将state和写锁掩码按位取与,这样就能将前16位全部变为0,只保留后16位的数,计算过程如下:
SHARED_SHIFT = 16
将1左移16位,低位用0补齐
SHARED_UNIT = (1 << SHARED_SHIFT) = 1 << 16 = 0000 0000 0000 0001 0000 0000 0000 0000
将SHARED_UNIT减1得出写锁掩码
EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 = 0000 0000 0000 0001 0000 0000 0000 0000 - 1 = 0000 0000 0000 0000 1111 1111 1111 1111
将state和写锁掩码按位取与
0010 1011 0001 1010 1110 1000 1011 0101
0000 0000 0000 0000 1111 1111 1111 1111
结果得
0000 0000 0000 0000 1110 1000 1011 0101
4.1.3 记录每个线程持有读锁的重入次数
4.1.3.1 实现方法
在统计读锁被每个线程持有重入的次数时,ReentrantReadWriteLock采用的是HoldCounter来实现的,具体如下:
// ReentrantReadWriteLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
// 持有读锁的线程重入的次数。这个类的对象就是要存储到TreadLocal中的value
// 初始化HoldCounter时,count默认设置为0,这个在后面的源码中是有用处的
static final class HoldCounter {
// 重入的次数
int count = 0;
// 持有读锁线程的线程id
final long tid = getThreadId(Thread.currentThread());
}
/**
* 采用ThreadLocal机制,做到线程之间的隔离
* 每个线程都持有自己独有的ThreadLock,用来存储自己持有读锁的重入次数
*
* 这个ThreadLocalHoldCounter就是存储在ThreadLocalMap中ThreadLocal的key,ThreadLocal的value就是HoldCounter
* 可以理解为每一个线程都持有一个自己的ThreadLocalHoldCounter,里面存储着自己持有读锁的重入次数
*/
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
// 重写ThreadLocal的初始化方法,这样在第一次调用ThreadLocal的get()方法是就可以自动将其初始化
public HoldCounter initialValue() {
// 初始化一个HoldCounter对象,并存入ThreadLocal中
return new HoldCounter();
}
}
/**
* 线程持有可重入读锁的次数,这里就是记录下当前线程的ThreadLocalHoldCounter
* readHolds就可以理解为绑定在当前线程上的ThreadLocal
*/
private transient ThreadLocalHoldCounter readHolds;
/**
* 缓存最后一个成功获取读锁的线程的重入次数,这样做的好处:
* 避免了通过访问ThreadLocal来获取读锁的信息,相对于直接从本地缓存中获取,从ThreadLocal中获取数据效率更低。
* 这个优化的前提是假设多数情况下,一个获取读锁的线程,使用完以后就会释放读锁,也就是说最后获取读锁的线程和最先释放读锁的线程大多数情况下是同一个线程,获取了读锁用完之后马上就释放。
* 这样就能保证一个线程获取了读锁,后续如果再重入获取读锁,就不需要去从ThreadLocal中获取持有次数了,直接就能从本地缓存cachedHoldCounter中获取
* 而且用完了锁之后就会马上释放,这样在执行释放锁方法的时候,当前要释放读锁的线程仍然是最后一个获取读锁的线程,所以修改持有次数就可以直接操作cachedHoldCounter变量而不用使用ThreadLocal了,提高了执行效率
*
*/
private transient HoldCounter cachedHoldCounter;
/**
* 第一个获取读锁的线程,这里直接用的是Thread线程对象记录。单独记录第一个获取读锁线程的作用:
* 1、记录将共享数量从0变成1的线程
* 2、对于无竞争的读锁来说进行线程重入次数数据的追踪的成本是比较低的,省去了去ThreadLocal中查找而性能高效
* 3、在一定程度上确实可以减少ThreadLocal的使用,降低一些内存消耗
* 4、获取当前线程的读锁重入次数时,若是当前线程时第一个获取读锁的线程则可以免去ThreadLocal中查找,也能提高一点性能
*
* 虽然看起来性能提高不是很多的,但是JUC正是在这一点点的代码优化中将性能提高到极致的,能优化一点是一点
*/
private transient Thread firstReader = null;
/**
* 第一个获取读锁线程的重入次数,直接用int来记录重入次数
*/
private transient int firstReaderHoldCount;
Sync() {
// 在构造函数中就初始化创建了一个ThreadLocalHoldCounter对象
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
}
通过源码我们就可以看出,记录每个线程持有读锁的重入次数,主要应用到三个数据结构
- ThreadLocalHoldCounter readHolds:这个对象就是绑定在每个线程上的ThreadLocal,也就是说readHolds在每个线程上都存有一份,并且在线程间独立,由绑定的线程独享。存在该ThreadLocal中的value就是HoldCounter类对象,通过HoldCounter对象来记录当前线程持有读锁的重入次数。
- HoldCounter cachedHoldCounter:记录最后一个成功获取读锁的线程及其获取读锁的重入次数,需要注意的是最后一个成功获取读锁是指在所有成功获取到过该读锁的线程当中,不管线程是否已经中止,只要是在时间上最后一个获取读锁的线程,就会被记录在cachedHoldCounter中。比如此时有t1、t2、t3依次成功获取读锁,当前cachedHoldCounter指向的是t3,然后t3将读锁全部释放了,后面t2不再尝试获取读锁,那么此时cachedHoldCounter中记录的ThreadId就还是t3的,而不会修改为指向t2,因为在时间上t2获取锁的时间就是比当时t3晚,但是如果t2再次尝试获取读锁,那么cachedHoldCounter就会更新指向t2,因为t2成了在时间上最后一次成功获取锁的线程。
- Thread firstReader 和 int firstReaderHoldCount:分别用来记录第一个成功获取读锁的线程和该线程持有读锁的重入次数。这里的第一个成功获取读锁是指当一个线程成功持有读锁后,该读锁总的被线程持有的次数从0变为1,那么这个线程就被记录位该读锁的firstReader。比如此时有t1、t2、t3依次成功获取读锁,t1是被记录为firstReader,然后t1将读锁释放了,firstReader就会被设置为null,而不会更新指向为t2,因为t2获取读锁的时候,该读锁总的被线程持有数并不是从0变为1的,所以就不算是firstReader。只有当t2和t3将自己的读锁全部释放,然后再来一个t4成功获取读锁,这个t4才会被认为是firstReader,因为t4持有读锁时该读锁总的被线程持有的次数从0变为1。
第一个获取读锁的线程的重入次数信息会被记录在firstReader和firstReaderHoldCount中,并不会使用这个线程的ThreadLocal来记录重入次数信息。但是最后一个获取读锁的线程的重入次数信息仍然会存储在该线程的ThreadLocal中,并且会单独拿出一个本地缓存变量cachedHoldCounter来只想存储在ThreadLocal中的重入次数计数器,这样就可以直接通过本地缓存变量来获得HoldCounter,不需要通过ThreadLocal来查询获取了,提高了效率。
至于在使用过程中,这三个数据结构的具体变化和使用流程,我们在后面讲解获取释放读锁源码的时候会举例讲解,这里我们来讲解一下为什么使用三个数据结构来分别记录不同线程锁持有读锁的重入次数,这样设计到底有什么作用和好处。
4.1.3.2 为什么这样设计
其实最开始读写锁只是使用了ThreadLocalHoldCounter来记录每个线程持有读锁的重入次数,因为这个方法是最简单方便的,直接将每个线程持有读锁的重入次数存入到该线程独享的ThreadLocal中,非常方便存取。但是在后续的时候过程当中,他们就逐渐发现了问题:
- 当读锁的数量很大,持有读锁的线程数量很多的时候,就会导致需要使用大量的ThreadLocal来存储线程锁持有读锁的重入次数,比方说m是锁的个数,n是每个锁持有的线程数,每个线程有一个内部有一个ThreadLocal来记录锁重入,这样就会有m*n个内存占用,这就造成了内存占用率过高。而且在jdk1.6的时候,ReentrantReadWriteLocl还有一个小bug就是在ThreadLocal使用完之后没有调用remove(),这样就会造成ThreadLocal内存泄露的问题,更增加了内存占用率。
- 当大量使用ThreadLocal的时候,效率就会变慢,因为相对于直接从本地缓存对象中取出数据,从ThreadLocal中查找获取数据性能开销更大,如果能尽量避免使用ThreadLocal有助于提高性能。
基于以上两点原因,所以在后续的版本中就添加了cachedHoldCounter、firstReader、firstReader。并且也修复了使用完ThreadLocal后不调用remove()的bug。
添加cachedHoldCounter的原因:
在一些情况下,线程在拿到读锁使用完之后会马上释放锁,这就可能释放锁的线程就是最后一个获取到读锁的线程,这样我们单独创建一个对象来存储最后一个拿到读锁的线程的重入次数信息,每次释放锁的时候就可以直接从本地缓存对象中获取数据,不需要通过线程的ThreadLocal了,提高了效率。而且在最后一个获取读锁的线程马上又重入读锁时,也可以直接操作cachedHoldCounter,避免使用ThreadLocal,并且也能保证cachedHoldCounter和ThreadLocal数据是同步变化的,因为cachedHoldCounter指向的重入次数计数器和最后一个获取到读锁线程的ThreadLocal存储的重入次数计数器实际上是同一个对象。
添加firstReader 和 firstReaderHoldCount的原因:
- 如果第一次获取读锁的线程来执行获取锁和释放锁,就可以直接使用firstReader和firstReaderHoldCount来修改线程的重入次数信息。在一定程度上确实减少ThreadLocal的使用,免去了去ThreadLocal中查找,能提高一些性能。并且也可以降低一些内存的使用,假设一个线程需要获取50000个不同的读锁,就可以减少50000个ThreadLocal的内存使用。
- 对于无竞争时,只有一个线程获取读锁的计数省去了去ThreadLocal中查找而性能高效。但是一般情况都是有竞争的,所以实际收效甚微。
4.2 内部类
ReentrantReadWriteLock 一共有5个内部类,具体如下:
Sync:公平锁和非公平锁的抽象类
NonfairSync:非公平锁的具体实现
FairSync:公平锁的具体实现
ReadLock:读锁的具体实现
WriteLock:写锁的具体实现
4.3 主要属性
// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;
维护了读锁、写锁和同步器。
属性中的读锁和写锁是私有属性,需要通过这两个方法来获取读锁和写锁:
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
4.4 主要构造方法
// 默认构造方法
public ReentrantReadWriteLock() {
this(false);
}
// 是否使用公平锁的构造方法
public ReentrantReadWriteLock(boolean fair) {
// 设置同步器实现公平锁还是非公平锁
sync = fair ? new FairSync() : new NonfairSync();
// 初始化读锁和线索对象
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
它提供了两个构造方法,默认构造方法使用的是非公平锁模式,在构造方法中初始化了读锁和写锁。
我们从读锁ReadLock和写锁WriteLock的源码开始分析,然后顺着这个思路将整个ReentrantReadWriteLock中所有的核心源码(所有的包括内部类)进行分析。
4.5 ReadLock类源码解析
// ReentrantReadWriteLock.ReadLock
public static class ReadLock implements Lock, java.io.Serializable {
// 同步器
private final Sync sync;
/**
* 通过ReentrantReadWriteLock中的公平锁或非公平锁来初始化sync变量
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* 阻塞的方式获取锁,因为读锁是共享锁,所以调用acquireShared方法
*/
public void lock() {
sync.acquireShared(1);
}
/**
* 可中断且阻塞的方式获取锁
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 超时尝试获取锁,非阻塞的方式
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 尝试获取读锁,非阻塞的方式
*/
public boolean tryLock() {
return sync.tryReadLock();
}
/**
* 释放锁
*/
public void unlock() {
sync.releaseShared(1);
}
}
接下来,我们按基于非公平模式读锁(共享锁)的加锁和解锁操作来分别分析。
4.5.1 读锁获取锁
// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {
// 获取读锁(共享锁)
sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
// AQS中提供了获取共享锁的模板方法,供用户调用
public final void acquireShared(int arg) {
// 该方法是在ReentrantReadWriteLock的内部类Sync中实现的
// 尝试获取共享锁(返回1表示成功,返回-1表示失败)
if (tryAcquireShared(arg) < 0)
// 获取读锁失败了就可能要排队
doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 状态变量的值
// 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数
int c = getState();
/**
* 条件1:获取互斥锁(读锁)的被持有的次数,判断是否不等于0
* 条件2:判断当前线程是否持有互斥锁(写锁)
*
* 如果条件1和条件2同时成立,表明其它线程获得了写锁,因为读写和写读互斥,直接返回-1,该线程获取读锁失败
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁被获取的次数
int r = sharedCount(c);
// 执行到这里说明此时还没有线程获取写锁,尝试去更新state的值获取读锁
/**
* 条件1:读者是否需要排队(是否是公平模式),readerShouldBlock()返回true说明需要排队
* 条件2:读锁被持有次数是否小于上限
* 条件3:尝试CAS更新读锁state值是否成功
*
* 如果三个条件均成立,则说明当前线程尝试获取读锁成功,就可以执行后续的修改当前线程持有读锁重入次数的操作了
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 获取读锁成功
// 判断之前是否有线程持有该读锁
if (r == 0) {
// 如果之前还没有线程获取读锁,记录第一个读者为当前线程,直接使用firstReader和firstReaderHoldCount进行记录,不适用ThreadLocal
firstReader = current;
// 第一个读者重入的次数为1
firstReaderHoldCount = 1;
// 如果有线程获取了读锁且是当前线程就是第一个读者
} else if (firstReader == current) {
// 则把其重入次数加1
firstReaderHoldCount++;
} else {
// 如果执行到了这里,说明有线程获取了读锁且当前线程不是第一个读者
// 这种情况我们先从本地缓存中获取最后一个获取该读锁线程的重入次数保存器
HoldCounter rh = cachedHoldCounter;
/**
* 条件1:本地缓存cachedHoldCounter是否存有数据
* 条件2:当前线程是否为记录在cachedHoldCounter中最后一次获取该读锁的线程
*
* 条件1和条件2只要有任意一个成立,就说明此时cachedHoldCounter还未记录数据或者当前线程并不是最后一个获取读锁的线程
* 则进入到该if分支中去从readHolds中获取当前线程的重入次数保存器,并且更新cachedHoldCounter的记录,因为当前线程已经成了最新的(也就是最后一次)获取读锁的线程了
* readHolds本身是一个ThreadLocal,它和当前线程绑定在一起,里面存储的是HoldCounter
*/
if (rh == null || rh.tid != getThreadId(current))
/**
* 如果当前线程的readHolds是第一次调用get(),就会给该线程创建一个ThreadLocalMap,然后初始化一个ThreadLocalHoldCounter存入该线程的ThreadLocalMap中,并将这个ThreadLocalHoldCounter返回
* 如果不是第一调用get(),就会直接将与该线程绑定的ThreadLocalHoldCounter返回
*
* 这里就将rh引用指向该线程的ThreadLocalHoldCounter,同时更新cachedHoldCounter指向该线程的cachedHoldCounter,也就是说最后一个获取读锁的线程的重入保存器会存储在本地缓存cachedHoldCounter变量中一份,还会存储在该线程的ThreadLocalHoldCounter一份,一共存量份。这九个firstReader有所不同,firstReader的重入信息只会存储在本地缓存变量中,不会存储在ThreadLocal中
* 这里需要注意一下rh是一个引用地址,这个引用指向当前线程的ThreadLocalHoldCounter对象,也就是说修改rh,就相当于将该线程的ThreadLocalHoldCounter对象也修改了
*/
cachedHoldCounter = rh = readHolds.get();
// 如果没有进入上面的if分支,说明当前线程就是存储在cachedHoldCounter中的线程,也就是最后一个成功获取到该读锁的线程
// 判断该线程重入该读锁的次数是否为0
else if (rh.count == 0)
// 如果rh的次数为0,把它放到ThreadLocal中去
/**
* 如果进入到这个分支,说明当前cachedHoldCounter中存储的线程id就是当前线程
* 但是这个线程已经将之前自己持有的该读锁全部释放了,我们知道在最后一个成功获取到该读锁的线程完全释放掉持有的读锁后,
* cachedHoldCounter并不会跟着清空,只是会其中存储的该线程的读锁冲入次数清零,但是记录的线程id还是以前的线程
* 只有当以后又来了另一个线程成功获取到读锁,才会更新cachedHoldCounter中的数据,存储新线程的重入次数
*
* 这样的机制下就会出现这样一种情况,就是原有的最后一次获取读锁的线程t将锁全部释放以后,没有其他线程再来获取读锁
* 因为t已经将所有的锁释放了,所以在执行释放锁操作之后,就会调用remove()方法,将该线程的ThreadLocalHoldCounter从该线程的ThreadLocalMap中删除,注意,并没有删除ThreadLocalMap
* 然后t又一次获取了读锁,这个时候正好cachedHoldCounter记录的线程id还是t线程的,但是重入次数count已经是0了,所以就会进入到这个分支中
*
* 因为此时当前线程的ThreadLocalMap中已经没有了ThreadLocalHoldCounter,所以重新向readHold中set进去HoldCounter对象来创建ThreadLocalHoldCounter
* 这里就不能通过get()来初始化创建ThreadLocalHoldCounter了,具体原因可以见讲解ThreadLocal源码的文章
*/
readHolds.set(rh);
// 重入的次数加1。这里需要注意,rh是指向一个HoldCounter对象的引用,对rh的修改操作,其实就是修改了指向的那个HoldCounter对象
// 也就是说修改了rh,也就同步修改了cachedHoldCounter和readHolds中的HoldCounter对象的数据
rh.count++;
}
// 获取读锁成功,返回1
return 1;
}
// 如果CAS失败,则通过这个方法再去尝试获取读锁(如果此时其它线程仍在持有写锁,一样返回-1表示失败,因为写读,读写互斥)
return fullTryAcquireShared(current);
}
// ReentrantReadWriteLock.Sync.exclusiveCount()
abstract static class Sync extends AbstractQueuedSynchronizer {
// 划分的边界线,用16位来划分
static final int SHARED_SHIFT = 16;
// 写锁的掩码,state值与掩码做与运算后得到写锁的真实值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 获取写锁被占用的次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 获取读锁被占用的次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
}
// ReentrantReadWriteLock.Sync.fullTryAcquireShared()
final int fullTryAcquireShared(Thread current) {
/**
* 调用该方法的线程都是希望获取读锁的线程,有3种情况:
* 1、在尝试通过CAS操作修改state时由于有多个竞争读锁的线程导致CAS操作失败
* 2、需要排队等待获取读锁的线程(公平锁)
* 3、超过读锁限制的最大申请次数的线程
*/
HoldCounter rh = null;
// 自旋尝试获取锁
for (;;) {
// 获取读写锁的状态变量
int c = getState();
// 判断当前写锁被持有的次数是否为0,如果不为0说明已经有线程持有写锁了,直接返回-1,因为写读,读写互斥
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 当前线程是否需要被阻塞(公平锁)
} else if (readerShouldBlock()) {
// 判断当前线程是不是第一个持有读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 当前线程不是第一个持有读锁的线程
} else {
// 判断rh是否为null,进而判断是不是刚刚进行第一轮循环
if (rh == null) {
// 获取最后一个获取读锁的线程数据
rh = cachedHoldCounter;
// 判断该线程是不是最后一个获取读锁的线程
if (rh == null || rh.tid != getThreadId(current)) {
// 如果不是,则通过ThreadLocal获取该线程的重入次数保存器
rh = readHolds.get();
// 如果当前线程对该读锁的重入次数为0,则调用ThreadLocal的remove方法,将该ThreadLocal移除,避免ThreadLocal出现内存泄漏
// 只有从ThreadLocal中获取重入次数保存器时才需要remove,如果是cachedHoldCounter中的,就不会进到这个if分支中,不需要去做额外的删除,因为不会有内存泄漏的问题
if (rh.count == 0)
readHolds.remove();
}
}
// 当前线程获取锁失败
if (rh.count == 0)
return -1;
}
}
// 判断是否超过读锁的最大值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试CAS修改读锁的state值
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 最新获取到读锁的线程设置相关的信息,这里就和tryAcquireShared中修改线程的重入次数信息基本一样了
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程重复获取锁(重入)
firstReaderHoldCount++;
} else {
// 在readHolds中记录获取锁的线程的信息
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 最后一下获取锁的线程将读锁全部释放之后,再次获取读锁就会进入到这个分支
else if (rh.count == 0)
// 重新将重入保存期添加到readHolds(ThreadLocal)中
readHolds.set(rh);
rh.count++;
// 更新最后一次成功获取到锁的线程
cachedHoldCounter = rh; // cache for release
}
// 进入到这个if分支就是成功获取到读锁了,返回1
return 1;
}
}
}
// AbstractQueuedSynchronizer.doAcquireShared()
// 详细讲解可以见AQS源码文章,这个方法相当于互斥锁的acquireQueued()方法
private void doAcquireShared(int arg) {
// 将线程添加到AQS同步等待队列中
final Node node = addWaiter(Node.SHARED);
// 阻塞线程是否失败
boolean failed = true;
try {
// 中断标记,是否要将当前线程中断
boolean interrupted = false;
// 自旋不断尝试获取锁
for (;;) {
// 获取当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点是头节点(说明是第一个排队的节点)
if (p == head) {
// 再次尝试获取读锁
int r = tryAcquireShared(arg);
// 如果成功了
if (r >= 0) {
// 头节点后移并传播
// 传播即唤醒后面连续的读节点,这也是我们在本章节最开始举的例子,当一个在等待队列中等待获取读锁的线程被唤醒并成功获取读锁后,就会开始将它后面连续的也是在等待申请读锁的线程一并唤醒
// 注意这里必须得是连续的,如果中间出现了一个等待申请写锁的线程,则唤醒操作到此为止
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 判断该线程是否被中断
if (interrupted)
selfInterrupt();
// 将阻塞标志设置为false,说明没有阻塞节点线程
failed = false;
return;
}
}
// 如果没获取到读锁,判断是否可以将node节点阻塞,如果可以则将其阻塞,等待被唤醒
if (shouldParkAfterFailedAcquire(p, node) && // 判断node节点能否被唤醒
parkAndCheckInterrupt()) // 真正对node节点进行阻塞的方法,阻塞之后这个线程就会停在这个方法内
// 如果parkAndCheckInterrupt()执行了阻塞并且返回当前线程的中断信号是true,则将这个方法中设置的中断信号变量设置为true,用来向上一层返回当前线程的中断标志
interrupted = true;
}
} finally {
// 如果阻塞线程失败了
if (failed)
// 取消获取锁
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
// 将等待队列头节点后移,并向后传播唤醒连续的申请读锁的等待线程
private void setHeadAndPropagate(Node node, int propagate) {
// h为旧的头节点
Node h = head;
// 设置当前节点为新头节点
setHead(node);
// 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 需要传播
// 取下一个节点
Node s = node.next;
// 如果下一个节点为空,或者是需要获取读锁的节点
if (s == null || s.isShared())
// 唤醒下一个节点
doReleaseShared();
}
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 这个方法只会唤醒一个节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头节点状态为SIGNAL,说明要唤醒下一个节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒下一个节点
unparkSuccessor(h);
}
else if (ws == 0 &&
// 把头节点的状态改为PROPAGATE成功才会跳到下面的if
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果唤醒后head没变,则跳出循环
if (h == head) // loop if head changed
break;
}
}
这里面很多方法流程和ReentrantLock类似,如果有基础的话看这些源码会比较容易。
我们来看看大致的逻辑:
- 先尝试获取读锁;
- 如果成功了直接结束;
- 如果失败了,进入doAcquireShared()方法;
- doAcquireShared()方法中首先会生成一个新节点并进入AQS队列中;
- 如果头节点正好是当前节点的上一个节点,再次尝试获取锁;
- 如果成功了,则设置头节点为新节点,并传播;
- 传播即唤醒下一个读节点(如果下一个节点是读节点的话);
- 如果头节点不是当前节点的上一个节点或者(5)失败,则阻塞当前线程等待被唤醒;
- 唤醒之后继续走(5)的逻辑;
在整个逻辑中是在哪里连续唤醒读节点的呢?
答案是在doAcquireShared()方法中,在这里一个节点A获取了读锁后,会唤醒下一个读节点B,这时候B也会获取读锁,然后B继续唤醒C,依次往复,也就是说这里的节点是一个唤醒一个这样的形式,必须得是在队列中连续的读节点才会被唤醒,而不是一个节点获取了读锁后一次性唤醒后面所有的读节点。
下面再讲一下非阻塞获取读锁的操作,相对就要简单很多了:
// ReentrantReadWriteLock.ReadLock.tryLock()
// 非阻塞的尝试获取读锁
public boolean tryLock() {
return sync.tryReadLock();
}
// ReentrantReadWriteLock.Sync.tryReadLock()
// 非阻塞尝试获取读锁
final boolean tryReadLock() {
// 获取当前线程
Thread current = Thread.currentThread();
// 自旋尝试获取读锁
for (;;) {
// 获取读写锁状态变量的值
int c = getState();
/**
* 条件1:获取互斥锁(读锁)的被持有的次数,判断是否不等于0
* 条件2:判断当前线程是否持有互斥锁(写锁)
*
* 如果条件1和条件2同时成立,表明其它线程获得了写锁,因为读写和写读互斥,直接返回-1,该线程获取读锁失败
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
// 读锁被获取的次数
int r = sharedCount(c);
// 读锁的总重入次数是否超过最大次数限制
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
/**
* 通过CAS操作设置state的值,如果成功表示尝试获取读锁成功,需要做以下几件事情:
* 1、如果是第一获取读锁要记录第一个获取读锁的线程信息
* 2、如果是当前获取锁的线程和第一次获取锁的线程相同,需要更新第一获取线程的重入次数
* 3、更新获取读锁线程相关的信息
*
* 这个if就只有一个CAS操作,没有阻塞操作了,如果CAS失败就继续自旋尝试获取锁
*/
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 下面的操作和之前讲过的一样
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
// 成功获取读锁,返回true
return true;
}
}
}
4.5.2 读锁释放锁
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {
// 释放读锁(共享锁)
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {
// 如果尝试释放成功了,就唤醒下一个节点
if (tryReleaseShared(arg)) {
// 这个方法实际是唤醒下一个节点,在上一小节已经讲过了
doReleaseShared();
// 释放锁成功
return true;
}
// 释放锁失败
return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
// 只有所有持有该读锁的线程全部都完全释放了该锁时,才会返回true,其他情况一律返回false
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 当前线程是否为第一个获取读锁的线程
if (firstReader == current) {
// 如果第一个读者(读线程)是当前线程,就把它重入的次数减1。如果减到0了就把firstReader置为空。firstReader可以被置为空,cachedHoldCounter只要是写入数据之后就不会被置为空了
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 如果第一个读者不是当前线程,从记录最后一个获取读锁的本地缓存变量中获取重入次数保存期
HoldCounter rh = cachedHoldCounter;
/**
* 条件1:cachedHoldCounter是否为空
* 条件2:当前线程是否是最后一个获取读锁的线程
*
* 条件1或条件2只要有成立的,就会进入到该if分支中,说明cachedHoldCounter还没有记录最后一个获取锁的线程或者当前线程并不是最后一个获取读锁的线程
*/
if (rh == null || rh.tid != getThreadId(current))
// 该线程不是最后获取读锁的线程,则从其ThreadLocal中获取重入次数保存器
rh = readHolds.get();
// 获取当前线程对读锁的重入次数
int count = rh.count;
// 如果当前线程的重入次数已经小于等于1了,说明这次释放了读锁之后该线程就完全释放了该读锁,需要将该线程对存储该读锁重入次数的ThreadLocal删除掉,否则可能会导致ThreadLocal内存泄漏
if (count <= 1) {
// 调用ThreadLocal的remove()
readHolds.remove();
// 如果还没有释放锁这个线程的读锁重入次数就小于等于0了,说明出现了错误,抛出异常
if (count <= 0)
throw unmatchedUnlockException();
}
// 将重入次数减1
--rh.count;
}
// 再去更新读写锁的state变量
for (;;) {
// 共享锁获取的次数减1
int c = getState();
int nextc = c - SHARED_UNIT;
// 通过CAS更新state
if (compareAndSetState(c, nextc))
// 如果减为0了说明已经没有线程持有该读锁了,这种情况才返回true。否则返回false
return nextc == 0;
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行为跟方法名有点不符,实际是唤醒下一个节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头节点状态为SIGNAL,说明要唤醒下一个节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒下一个节点
unparkSuccessor(h);
}
else if (ws == 0 &&
// 把头节点的状态改为PROPAGATE成功才会跳到下面的if
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果唤醒后head没变,则跳出循环
if (h == head) // loop if head changed
break;
}
}
解锁的大致流程如下:
- 将当前线程重入的次数减1;
- 将共享锁总共被获取的次数减1;
- 如果共享锁获取的次数减为0了,说明共享锁被完全释放了,那就唤醒下一个节点;
举个例子,t1、t2、t3三个节点依次获取了一次共享锁,三者释放的顺序分别为t1、t3、t2,那么最后t2释放共享锁的时候tryReleaseShared()才会返回true,因为此时读锁才被所有线程完全释放了。这样才会去唤醒等待队列中的下一个读节点。
至此我们就将读锁的主要源码分析完了,下面我们就举几个例子让大家能更直观的了解ReentrantReadWriteLock的执行流程。也能更深刻理解cachedHoldCounter、firstReader、firstReaderHoldCount的作用。
例子1:第一个线程t1进来拿读锁,然后t1就一直不释放,第二个线程t2进来拿读锁,此时这个t2就是最后一个拿到读锁的线程,t2的读锁重入次数会记录在cachedHoldCounter中,并且也会存储到t2线程的ThreadLocal中一份。然后t2再去释放锁,此时就会将cachedHoldCounter的重入次数减为0,但是cachedHoldCounter中记录的线程id仍然是t2的,并不会因为t2完全释放了读锁就将cachedHoldCounter置为空,因为从时间角度上t2仍然是最后一个成功获取读锁的线程。同时t2绑定的存储重入次数计数器的ThreadLocal会被remove掉。如果t2释放了读锁之后又再一次获取读锁,这个时候它就会直接进入到获取读锁源码的else if (rh.count == 0)这个分支中,重新将t2线程的HoldCounter存储到ThreadLocal中,具体注释在上面源码中已经写好了。
例子2:第一个线程t1进来拿读锁,这个时候t1是第一个拿到读锁的线程,所以它的读锁重入次数记录会被记录在firstReader和firstReaderHoldCount中,但是并不会使用ThreadLocal来记录t1的重入次数。然后t1不释放,第二个线程t2进来拿读锁,然后t2不释放,t1先释放,t1完全释放了读锁,这个时候因为t1是第一个获取所得读锁的线程,所以直接操作firstReader,将其置为空,并不会将firstReader指向t2。然后再来一个t3获取读锁,这是t3成了最后一次获取读锁的线程,将cachedHoldCounter保存t3的重入次数计数器,但是firstReader和firstReaderHoldCount不会有任何变化。最后t2、t3全部彻底释放读锁后,再来一个线程t4获取读锁,此时就会将firstReader和firstReaderHoldCount转而记录t4的重入信息,因为t4使该读锁被占有次数从0变为1,属于第一个成功获取读锁的线程。
例子3:第一个线程t1拿读锁,然后t1不释放,第二个线程t2进来拿读锁,也先不释放,第三个线程t3进来拿读锁,也先不释放。t1获取到读锁后,它的重入次数信息被保存在firstReader和firstReaderHoldCount中,并不会记录到t1线程的ThreadLocal中,t2线程拿锁后,它的重入次数计数器就会存储到cachedHoldCounter本地缓存变量中一份,存储到t2绑定的ThreadLocal中一份,最后t3拿到读锁之后,cachedHoldCounter就会转而存储t3的重入次数信息,并且也会在t3绑定的ThreadLocal中存储一份重入次数计数器,这里需要注意,cachedHoldCounter指向的重入次数计数器和ThreadLocal中存储的重入次数计数器其实是同一个对象,改一个就全都改了。然后t3将读锁完全释放,cachedHoldCounter中记录的重入次数变为0,线程id仍然是记录的t3的,并不会转而记录t2线程的重入信息。后面再来一个t4获取读锁,这个时候cachedHoldCounter就会转而记录t4的重入信息,此时t4就成为了最后一个获取读锁的线程。
4.6 WriteLock类源码解析
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
/**
* 通过ReentrantReadWriteLock中的公平锁或非公平锁来初始化sync变量
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* 阻塞的方式获取写锁
*/
public void lock() {
sync.acquire(1);
}
/**
* 中断的方式获取写锁
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 尝试获取写锁
*/
public boolean tryLock( ) {
return sync.tryWriteLock();
}
/**
* 超时尝试获取写锁
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
/**
* 释放写锁
*/
public void unlock() {
sync.release(1);
}
}
接下来,我们按基于非公平模式写锁(互斥锁)的加锁和解锁操作来分别分析。
4.6.1 写锁获取锁
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {
// 获取互斥锁(写锁)
sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
// 先尝试获取锁
// 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一模一样了
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 状态变量state的值
int c = getState();
// 互斥锁被获取的次数
int w = exclusiveCount(c);
// c != 0朔说明此时读写锁已经被线程占有了
if (c != 0) {
/**
* 条件1:如果c!=0且w==0,说明共享锁被获取的次数不为0
* 条件2:当前线程是不是持有该写锁的线程
*
* 条件1和条件2只要有任意一个成立,就会进入该if分支中返回false
* 条件1成立,说明此时已经有线程获取读锁了,如果获取读锁的线程是其他线程,那么读写互斥,不能够获取到写锁。如果获取读锁的线程就是当前线程,因为不允许锁升级,所以也不能获取到写锁,直接返回false
* 条件2成立,说明该写锁已经被其他线程获取了,所以当前线程不能获取到该写锁,返回false
*/
if (w == 0 || current != getExclusiveOwnerThread())
// 返回false,获取写锁失败
return false;
// 溢出检测,写锁持有次数是否超过了最大限制
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可
// 注意这里直接用state+acquires就行了,因为state后16为本来就是记录写锁占有次数的,直接在state基础上做加法即可
setState(c + acquires);
// 获取写锁成功
return true;
}
// 如果c等于0,说明此时还没有线程持有该读写锁,那就尝试更新state的值(非公平模式writerShouldBlock()返回false,不会阻塞排队,直接去尝试CAS修改state)
// 如果失败了,说明CAS获取写锁失败,返回false
// 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 获取写锁失败了后面的逻辑跟ReentrantLock是一致的,进入队列排队,这里就不列源码了
写锁获取的过程大致如下:
- 尝试获取锁;
- 如果有读者占有着读锁,尝试获取写锁失败;
- 如果有其它线程占有着写锁,尝试获取写锁失败;
- 如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;
- 如果没有线程占有着锁(state==0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;
- 尝试获取锁失败以后,进入队列排队,等待被唤醒;
- 后续逻辑跟ReentrantLock是一致;
下面再讲一下非阻塞获取读锁的操作,相对就要简单很多了:
// ReentrantReadWriteLock.WriteLock.tryLock()
// 非阻塞的尝试获取写锁
public boolean tryLock( ) {
return sync.tryWriteLock();
}
// ReentrantReadWriteLock.Sync.tryWriteLock()
final boolean tryWriteLock() {
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState();
// 读锁或者写锁已经被线程持有
if (c != 0) {
int w = exclusiveCount(c);
// 写锁第一次获取锁或者当前线程不是第一次获取写锁的线程(也就是不是owner),直接失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 超出写锁的最大次数,直接失败
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 竞争写锁的线程修改state,
// 如果成功将自己设置成锁的owner,
// 如果失败直接返回
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current); // 设置当前线程持有锁
return true;
}
4.6.2 写锁释放锁
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {
sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
// 如果尝试释放锁成功(完全释放锁)
// 就尝试唤醒下一个节点
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {
// 如果写锁不是当前线程占有着,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 状态变量的值减1
int nextc = getState() - releases;
// 是否完全释放了该写锁
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 设置状态变量的值
setState(nextc);
// 如果完全释放了写锁,返回true
return free;
}
写锁释放的过程大致为:
- 先尝试释放锁,即状态变量state的值减1;
- 如果减为0了,说明完全释放了锁;
- 完全释放了锁才唤醒下一个等待的节点;
4.7 基于源码讨论一下锁升级和锁降级的问题
4.7.1 如果同一个线程先获取读锁,再获取写锁会怎样?
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 状态变量state的值
int c = getState();
// 互斥锁被获取的次数
int w = exclusiveCount(c);
// c != 0朔说明此时读写锁已经被线程占有了
if (c != 0) {
/**
* 条件1:如果c!=0且w==0,说明共享锁被获取的次数不为0
* 条件2:当前线程是不是持有该写锁的线程
*
* 条件1和条件2只要有任意一个成立,就会进入该if分支中返回false
* 条件1成立,说明此时已经有线程获取读锁了,如果获取读锁的线程是其他线程,那么读写互斥,不能够获取到写锁。如果获取读锁的线程就是当前线程,因为不允许锁升级,所以也不能获取到写锁,直接返回false
* 条件2成立,说明该写锁已经被其他线程获取了,所以当前线程不能获取到该写锁,返回false
*/
if (w == 0 || current != getExclusiveOwnerThread())
// 返回false,获取写锁失败
return false;
// 溢出检测,写锁持有次数是否超过了最大限制
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可
// 注意这里直接用state+acquires就行了,因为state后16为本来就是记录写锁占有次数的,直接在state基础上做加法即可
setState(c + acquires);
// 获取写锁成功
return true;
}
......
}
分析上面的源码,在tryAcquire()方法中,如果此时该线程已经获取了读锁了,那么该读写锁的state肯定不等于0(c != 0),然后就有两种可能,一个是此时还有有任何线程获取写锁(w == 0),这种情况下就会进入到if (w == 0 || current != getExclusiveOwnerThread())这个分支里,返回false获取写锁失败。返回之后外层方法会让当前线程阻塞等待。
这里例子说明了ReentrantReadWriteLock不允许锁升级。
可以通过下面的方法验证:
readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();
4.7.2 如果同一个线程先获取写锁,再获取读锁会怎样?
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 状态变量的值
// 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数
int c = getState();
/**
* 条件1:获取互斥锁(读锁)的被持有的次数,判断是否不等于0
* 条件2:判断当前线程是否持有互斥锁(写锁)
*
* 如果条件1和条件2同时成立,表明其它线程获得了写锁,因为读写和写读互斥,直接返回-1,该线程获取读锁失败
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁被获取的次数
int r = sharedCount(c);
// 执行到这里说明此时还没有线程获取写锁,尝试去更新state的值获取读锁
/**
* 条件1:读者是否需要排队(是否是公平模式),readerShouldBlock()返回true说明需要排队
* 条件2:读锁被持有次数是否小于上限
* 条件3:尝试CAS更新读锁state值是否成功
*
* 如果三个条件均成立,则说明当前线程尝试获取读锁成功,就可以执行后续的修改当前线程持有读锁重入次数的操作了
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 获取读锁成功
// 判断之前是否有线程持有该读锁
if (r == 0) {
// 如果之前还没有线程获取读锁,记录第一个读者为当前线程,直接使用firstReader和firstReaderHoldCount进行记录,不适用ThreadLocal
firstReader = current;
// 第一个读者重入的次数为1
firstReaderHoldCount = 1;
// 如果有线程获取了读锁且是当前线程就是第一个读者
} else if (firstReader == current) {
// 则把其重入次数加1
firstReaderHoldCount++;
} else {
......
}
// 获取读锁成功,返回1
return 1;
}
// 如果CAS失败,则通过这个方法再去尝试获取读锁(如果此时其它线程仍在持有写锁,一样返回-1表示失败,因为写读,读写互斥)
return fullTryAcquireShared(current);
}
分析上面的源码,在tryAcquireShared()方法中,在if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)分支处并不会返回,因为是当前线程获取的写锁,不满足getExclusiveOwnerThread() != current;如果在if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) 这个分支上原子更新成功就说明获取了读锁,然后就会执行后续的代码把其读锁重入次数+1。
这里例子说明了ReentrantReadWriteLock允许锁降级。
可以通过下面的方法验证:
writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();
4.7.3 为什么不能锁升级
通过上面的两个例子,我们可以感受到同一个线程先读后写和先写后读是完全不一样的,为什么不一样呢?
先读后写,一个线程占有读锁后,其它线程还是可以占有读锁的,这时候如果在其它线程占有读锁之前让自己占有了写锁,其它线程又不能占有读锁了,这段程序会非常难实现,逻辑也很奇怪,所以,设计成只要一个线程占有了读锁,其它线程包括它自己都不能再获取写锁。
先写后读,一个线程占有写锁后,其它线程是不能占有任何锁的,这时候,即使自己占有一个读锁,对程序的逻辑也不会有任何影响,所以,一个线程占有写锁后是可以再占有读锁的,只是这个时候其它线程依然无法获取读锁。
我们假设几个例子就能更好地理解为什么不能锁升级。
假设有 A,B 和 C 三个线程,它们都已持有读锁。假设线程 A 尝试从读锁升级到写锁。那么它必须等待 B 和 C 释放掉已经获取到的读锁。如果随着时间推移,B 和 C 逐渐释放了它们的读锁,此时线程 A 确实是可以成功升级并获取写锁。
但是我们考虑一种特殊情况。
假设线程 A 和 B 都想升级到写锁,那么对于线程 A 而言,它需要等待其他所有线程,包括线程 B 在内释放读锁。而线程 B 也需要等待所有的线程,包括线程 A 释放读锁。这就是一种非常典型的死锁的情况。谁都愿不愿意率先释放掉自己手中的锁。由此可见,是不能够允许锁升级的。
五、总结
- ReentrantReadWriteLock采用读写锁的思想,能提高并发的吞吐量;
- 读锁和写锁的占用(重入)次数都是共用state字段,高位记录读锁,地位记录写锁,所以读锁和写锁的最大占用次数为2^16;
- 读锁和写锁都是可重入的;
- 读锁是共享锁,允许多个线程获取,互相不会影响,即读读不互斥;
- 写锁是排他锁(互斥锁),只允许一个线程获取;
- 读写、写读和写写是会互斥的,前者占有着锁,后者需要进入AQS队列中排队;
- ReentrantReadWriteLock中的NonfairSync和FairSync分别实现了两种阻塞的策略,writerShouldBlock和readerShouldBlock。
- 一个线程获取了读锁,在非公平锁的情况下,其他等待获取读锁的线程都可以尝试获取读锁,在公平锁的情况下,按照AQS同步队列的顺利来获取,如果队列前面有一个等待写锁的线程在排队,则后面所有等待获取读锁的线程都将无法获取读锁,也就是说多个连续的读线程是一个接着一个被唤醒的,而不是一次性唤醒所有读线程;
- 只有多个读锁都完全释放了才会唤醒下一个写线程;
- 只有写锁完全释放了才会唤醒下一个等待者,这个等待者有可能是读线程,也可能是写线程;
- 获取读锁的线程,不能再去申请获取写锁,因为不允许锁升级;
- 读写锁都是悲观锁,在读多写少的情况下,可能会出现写线程“饿死”的情况,即写线程一直获取不到锁。
相关文章:【并发编程】爆肝1个月,一篇文章带你彻底搞懂ReentrantLock底层原理
【并发编程】Lock接口
【并发编程】JUC中locks包简介