ReentranLock手写
整体概述
MiniLock
是一个自定义的锁实现,模拟了 Java ReentrantLock
的公平锁机制。公平锁的核心思想是“先来后到”,即线程按照请求锁的顺序依次获取锁,避免线程饥饿。代码使用了以下关键组件:
state
: 表示锁的状态,0
表示未被锁定,>0
表示已被锁定。exclusiveOwerThread
: 记录当前持有锁的线程(独占模式)。Node
队列: 使用双向链表维护等待锁的线程队列,head
是持有锁的线程节点,tail
是队列尾部。LockSupport
: 用于线程的阻塞(park
)和唤醒(unpark
)。
代码实现了 Lock
接口的 lock()
和 unlock()
方法,并通过 tryAcquire
和 tryRelease
等辅助方法实现锁的获取和释放逻辑。
关键字段和类
state
- 类型:
volatile int
- 作用:表示锁的状态。
0
:未被锁定。>0
:已被锁定,值表示重入次数。
- 使用
volatile
确保多线程下的可见性。
- 类型:
exclusiveOwerThread
- 类型:
Thread
- 作用:记录当前持有锁的线程,确保锁的独占性。
- 类型:
Node
类- 内部静态类,用于表示等待队列中的节点。
- 字段:
prev
:前一个节点。next
:后一个节点。thread
:当前节点对应的线程。
- 作用:构建一个双向链表队列,保存等待获取锁的线程。
head
和tail
- 类型:
Node
- 作用:
head
:队列头部,总是指向当前持有锁的线程节点。tail
:队列尾部,指向最后一个等待的线程节点。
- 类型:
核心方法解析
1. lock()
public void lock() {
acquire(1);
}
- 作用:获取锁的入口方法。
- 参数
1
表示获取锁的次数(支持重入时会累加)。 - 逻辑:调用
acquire(1)
开始抢占锁。
2. acquire(int arg)
private void acquire(int arg) {
if (!tryAcquire(arg)) {
Node node = addWaiter();
acquireQueued(node, arg);
}
}
- 作用:尝试获取锁,如果失败则将线程加入等待队列并阻塞。
- 步骤:
- 调用
tryAcquire(arg)
尝试抢占锁。- 如果成功,直接返回。
- 如果失败,继续执行。
- 调用
addWaiter()
将当前线程封装成Node
并加入队列。 - 调用
acquireQueued(node, arg)
阻塞线程并等待获取锁。
- 调用
3. tryAcquire(int arg)
private boolean tryAcquire(int arg) {
if (state == 0) {
if (!hasQueuedPredecessor() && casState(0, arg)) {
exclusiveOwerThread = Thread.currentThread();
return true;
}
} else if (Thread.currentThread().equals(exclusiveOwerThread)) {
int c = getState();
c += arg;
state = c;
}
return false;
}
- 作用:尝试获取锁(非阻塞)。
- 返回值:
true
:抢占成功。false
:抢占失败。
- 逻辑:
- 锁未被占用(
state == 0
)- 检查队列中是否有等待线程(
hasQueuedPredecessor()
)。 - 如果没有等待线程且
casState(0, arg)
成功(原子地将state
从0
改为arg
),则:- 设置
exclusiveOwerThread
为当前线程。 - 返回
true
。
- 设置
- 检查队列中是否有等待线程(
- 锁已被占用且当前线程是持有者
- 如果当前线程已经持有锁(支持重入),则:
- 将
state
增加arg
(重入计数)。 - 返回
true
。
- 将
- 如果当前线程已经持有锁(支持重入),则:
- 其他情况
- 返回
false
,表示抢锁失败。
- 返回
- 锁未被占用(
4. hasQueuedPredecessor()
private boolean hasQueuedPredecessor() {
Node h = head;
Node t = tail;
Node s;
return h != t && ((s = h.next) == null || s.thread == Thread.currentThread());
}
- 作用:判断当前线程是否需要排队。
- 返回值:
true
:队列中有等待线程,当前线程需要排队。false
:队列为空或当前线程是head.next
(有资格抢锁)。
- 逻辑:
h != t
:队列不为空。(s = h.next) == null
:队列刚初始化或正在调整。s.thread == Thread.currentThread()
:当前线程是队列中的下一个线程。
5. addWaiter()
private Node addWaiter() {
Node newNode = new Node(Thread.currentThread());
Node pred = tail;
while (pred != null) {
newNode.prev = pred;
if (casTail(pred, newNode)) {
pred.next = newNode;
return newNode;
}
}
enq(newNode);
return newNode;
}
- 作用:将当前线程加入等待队列。
- 逻辑:
- 创建一个新节点
newNode
,绑定当前线程。 - 如果
tail != null
,尝试将新节点加入队列尾部:- 设置
newNode.prev = pred
。 - 使用
casTail
原子更新tail
。 - 如果成功,更新
pred.next
并返回。
- 设置
- 如果
tail == null
或 CAS 失败,调用enq(newNode)
自旋入队。
- 创建一个新节点
6. enq(Node node)
private void enq(Node node) {
for (;;) {
if (tail == null) {
if (casHead(new Node())) {
tail = head;
}
} else {
Node pred = tail;
node.prev = pred;
if (casTail(pred, node)) {
pred.next = node;
return;
}
}
}
}
- 作用:自旋方式将节点加入队列,确保入队成功。
- 逻辑:
- 队列为空(
tail == null
)- 创建一个空的
head
节点并尝试通过 CAS 设置。 - 设置
tail = head
。
- 创建一个空的
- 队列非空
- 将新节点加入
tail
后,通过 CAS 更新tail
。 - 更新链表指针后返回。
- 将新节点加入
- 队列为空(
7. acquireQueued(Node node, int arg)
private void acquireQueued(Node node, int arg) {
for (;;) {
Node pred = node.prev;
if (pred == head && tryAcquire(arg)) {
setHead(node);
pred.next = null;
return;
}
LockSupport.park();
}
}
- 作用:阻塞当前线程,直到成功获取锁。
- 逻辑:
- 自旋检查:
- 如果当前节点的前驱是
head
且tryAcquire
成功:- 将当前节点设为
head
。 - 断开原
head
的next
指针。 - 返回。
- 将当前节点设为
- 否则,调用
LockSupport.park()
阻塞线程。
- 如果当前节点的前驱是
- 自旋检查:
8. unlock()
public void unlock() {
release(1);
}
- 作用:释放锁。
- 逻辑:调用
release(1)
释放锁。
9. release(int arg)
private void release(int arg) {
if (tryRelease(arg)) {
Node head = getHead();
if (head.next != null) {
unparkSuccessor(head);
}
}
}
- 作用:释放锁并唤醒队列中的下一个线程。
- 逻辑:
- 调用
tryRelease(arg)
尝试释放锁。 - 如果完全释放成功(
state == 0
)且队列中有等待线程:- 调用
unparkSuccessor(head)
唤醒head.next
的线程。
- 调用
- 调用
10. tryRelease(int arg)
private boolean tryRelease(int arg) {
int c = getState() - arg;
if (getExclusiveOwerThread() != Thread.currentThread()) {
throw new RuntimeException("fuck error");
}
if (c == 0) {
exclusiveOwerThread = null;
state = 0;
return true;
}
state = c;
return false;
}
- 作用:尝试释放锁。
- 返回值:
true
:锁完全释放。false
:锁部分释放(重入计数减少)。
- 逻辑:
- 检查当前线程是否是锁持有者,否则抛出异常。
- 计算新状态
c = state - arg
。 - 如果
c == 0
:- 清空
exclusiveOwerThread
。 - 设置
state = 0
。 - 返回
true
。
- 清空
- 否则:
- 更新
state = c
。 - 返回
false
。
- 更新
11. unparkSuccessor(Node node)
private void unparkSuccessor(Node node) {
Node next = node.next;
if (next != null && next.thread != null) {
LockSupport.unpark(next.thread);
}
}
- 作用:唤醒队列中的下一个线程。
- 逻辑:
- 获取
node.next
。 - 如果存在且绑定了线程,调用
LockSupport.unpark()
唤醒。
- 获取
公平锁的实现要点
- 先来后到:
- 通过
hasQueuedPredecessor()
确保只有队列为空或当前线程是head.next
时才能抢锁。
- 通过
- 可重入性:
- 如果当前线程已持有锁,
tryAcquire
会增加state
值,而不是阻塞。
- 如果当前线程已持有锁,
- 线程安全:
- 使用 CAS 操作(
casState
、casHead
、casTail
)保证并发下的原子性。 volatile
修饰state
确保可见性。
- 使用 CAS 操作(
- 阻塞与唤醒:
- 使用
LockSupport.park()
和unpark()
实现线程的挂起和唤醒。
- 使用
注意事项
- CAS 实现缺失:
- 代码中的
casState
、casHead
和casTail
方法仅返回true
,没有实现真正的原子操作。实际中需要使用Unsafe
或AtomicInteger
。
- 代码中的
- 边界情况:
tryRelease
中存在错误:state = 0
应为state = c
,否则非完全释放时会错误清零。
- 性能:
- 自旋操作(
enq
和acquireQueued
)可能在高并发下消耗 CPU。
- 自旋操作(
总结
这段代码通过双向链表队列和状态管理实现了一个公平、可重入的独占锁。核心逻辑是:
- 获取锁:尝试抢占 -> 失败则入队 -> 阻塞等待。
- 释放锁:减少重入计数 -> 完全释放则唤醒队列中的下一个线程。
尽管实现简化和部分功能未完善(如 CAS),但它很好地展示了ReentrantLock
公平锁的核心原理。
完整代码:
import java.util.concurrent.locks.LockSupport;
/**
* 可以重入的、独占的公平锁
*
* */
public class MiniLock implements Lock {
/**
* 锁的是资源
* 0 -> 未加锁状态
* !0 -> 加锁状态
*/
private volatile int state;
// 记录当前获取锁的线程 独占模式 在同一时刻只有一个线程可以持有锁,其他线程未获取到就会被阻塞
public Thread exclusiveOwerThread;
/**
* 需要有两个引用来维护被阻塞线程的队列
* 1. head:队列头结点,头节点就是当前占有锁的线程
* 2. tail:队列的尾节点
*/
private Node head;
private Node tail;
public Node getHead() {
return head;
}
public int getState() {
return state;
}
public Node getTail() {
return tail;
}
public void setHead(Node head) {
this.head = head;
}
public Thread getExclusiveOwerThread() {
return exclusiveOwerThread;
}
// 用一个链表队列保存要维护的线程
static final class Node {
Node prev; // 前置节点
Node next; // 后置节点
Thread thread; // 当前node节点的线程
public Node(Thread thread) {
this.thread = thread;
}
public Node() {
}
}
/**
* 目的:获取锁,假设当前锁被占用,会阻塞调用者的线程,直到抢占到锁为止
*
* 模拟公平锁:先来后到
*
* lock过程:
* 情景1:线程进入 发现 state == 0,直接抢锁
* 情景2:线程进入 发现 state > 0,需要进入锁的阻塞队列中
* */
@Override
public void lock() {
acquire(1);
}
/**
* 争资源
* 1. 尝试抢占锁,成功占到 就返回
* 2. 抢占失败 放入队列 将线程阻塞
*/
private void acquire(int arg){
if(!tryAcquire(arg)){
// 1. 如果抢锁失败 把自己打包成一个node 进入队列
Node node = addWaiter();
// 2. 然后阻塞
acquireQueued(node, arg);
}
}
private void acquireQueued(Node node, int arg){
// 只有当前node成功获取到锁 才会跳出自旋
for(;;){
// 什么时候会被唤醒获取到锁 head.next 先来后到
Node pred = node.prev;
if(pred == head/*是否有资格来抢锁*/ && tryAcquire(arg)){
// 竞争成功
// 1. 把自己设置成head节点
// 2. 原先的head出队
setHead(node);
pred.next = null;
return;
}
// 枪锁失败
LockSupport.park();
}
}
/**
* 尝试抢占锁失败后:
* 1. 把当前的线程封装成Node 进入阻塞队列
* 2. 将当前线程park掉 处于挂起状态
*
* 唤醒之后需要:
* 1. 先检测一下自己是否为 head.next 节点(只有head.next有权限抢占锁)
* 2. 枪锁
* 成功:成功后先将自己设置为head节点 并且 之前的head出队列
* 失败:失败后继续 park 等待下一次唤醒
*/
/**
* 将当前线程入队列 并且返回当前线程节点
*
* addWaiter 保证入队成功
* @return
*/
private Node addWaiter(){
Node newNode = new Node(Thread.currentThread());
// 进入阻塞队列
// 1. 先找到前置节点 pred 去更新前置节点 2. cas更新tail节点 3. pred.next = newNode
Node pred = tail;
while(pred != null){
newNode.prev = pred;
if (casTail(pred, newNode)) { // if cas 成功
pred.next = newNode;
return newNode;
}
}
// 情况讨论:
// 1. tail == null
// 2. cas tail失败
// 自旋入队
enq(newNode);
return newNode;
}
/**
* 自旋入队, 只有成功后才可以返回
* tail == null
* cas tail 失败
* @param node
*/
private void enq(Node node){
for(;;){
// 队列为空 当前持有锁的线程并没有入队 并且没设置任何node
// 作为后驱节点,需要为第一个线程完成node入队的情况,保持(head 节点任何时候都是占用锁的线程)这个事情
if (tail == null) {
// 说明成功了 当前持有锁的线程补充head线程成功了
if(casHead(new Node())){
tail = head;
// 这里不返回
}
} else {
Node pred = tail;
node.prev = pred;
if(casTail(pred, node)){
pred.next = node;
return;
}
}
}
}
/**
* 尝试获取锁 不会阻塞线程
* true - 抢占成功
* false - 抢占失败
*/
private boolean tryAcquire(int arg){
if (state == 0) {
// 当前 state == 0 的话 是否能直接抢锁 肯定不行
// 公平锁是先来后到 条件1:当前锁中阻塞队列没有线程等待 条件2:cas方式去设置state避免线程并发下同时进行state = 1
// 抢占成功
if(!hasQueuedPredecessor() && casState(0,arg)) {
exclusiveOwerThread = Thread.currentThread(); // 设置其独占线程
return true;
}
// 当前锁被占用 并且 自己正在占用线程
} else if(Thread.currentThread().equals(exclusiveOwerThread)) {
// 锁被重入
// 当前不会出现并发,条件限制只可能有一个线程进来
int c = getState();
c += arg;
state = c;
} else {
}
return false;
}
/**
* 判断队列中当前是否有线程等待
* true - 有等待
* false - 无等待
* hasQueuedPredecessor 调用锁 lock -> acquire -> tryAcquire -> hasQueuedPredecessor
*
* 什么时候返回 false
* 1. 队列为空
* 2. 抢锁的线程是head.next线程的时候 (head.next是唯一在队中拥有枪锁资格的线程)
*/
private boolean hasQueuedPredecessor() {
Node h = head;
Node t = tail;
Node s;
// h != t : 1.head和tail都为空 2. head和tail指向同一个节点
// (s = h.next) == null :head和tail指向同一节点 队列在释放head的时候 有一瞬间会处于 head.next为空的状态
// s.thread == Thread.currentThread() : 当前线程是不是next节点
return h != t &&((s = h.next) == null || s.thread == Thread.currentThread());
}
@Override
public void unlock() {
release(1);
}
/**
* 释放锁
* @param arg
*/
private void release(int arg){
if(tryRelease(arg)){
// 说明完全释放成功了 队列中还有node睡觉中 需要唤醒
Node head = getHead();
if(head.next != null){
unparkSuccessor(head);
}
}
}
private void unparkSuccessor(Node node){
Node next = node.next;
if (next != null && next.thread != null) {
LockSupport.unpark(next.thread);
}
}
/**
* true - 完全释放锁
* false - 部分释放锁
* @param arg
* @return
*/
private boolean tryRelease(int arg){
int c = getState() - arg;
if(getExclusiveOwerThread() != Thread.currentThread()){
throw new RuntimeException("fuck error");
}
// 执行到这里 不会有并发
if(c == 0){
// 完全释放锁的状态
// 要做:
// 1. ownerThread = null
// 2. state = 0
exclusiveOwerThread = null;
state = 0;
return true;
}
state = 0;
return false;
}
private final boolean casState(int expect, int update){
return true;
}
private final boolean casHead(Node update){
return true;
}
private final boolean casTail(Node expect, Node update){
return true;
}
}