ReentranLock手写

发布于:2025-03-22 ⋅ 阅读:(26) ⋅ 点赞:(0)

ReentranLock手写

整体概述

MiniLock 是一个自定义的锁实现,模拟了 Java ReentrantLock 的公平锁机制。公平锁的核心思想是“先来后到”,即线程按照请求锁的顺序依次获取锁,避免线程饥饿。代码使用了以下关键组件:

  1. state: 表示锁的状态,0 表示未被锁定,>0 表示已被锁定。
  2. exclusiveOwerThread: 记录当前持有锁的线程(独占模式)。
  3. Node 队列: 使用双向链表维护等待锁的线程队列,head 是持有锁的线程节点,tail 是队列尾部。
  4. LockSupport: 用于线程的阻塞(park)和唤醒(unpark)。

代码实现了 Lock 接口的 lock()unlock() 方法,并通过 tryAcquiretryRelease 等辅助方法实现锁的获取和释放逻辑。


关键字段和类

  1. state

    • 类型:volatile int
    • 作用:表示锁的状态。
      • 0:未被锁定。
      • >0:已被锁定,值表示重入次数。
    • 使用 volatile 确保多线程下的可见性。
  2. exclusiveOwerThread

    • 类型:Thread
    • 作用:记录当前持有锁的线程,确保锁的独占性。
  3. Node

    • 内部静态类,用于表示等待队列中的节点。
    • 字段:
      • prev:前一个节点。
      • next:后一个节点。
      • thread:当前节点对应的线程。
    • 作用:构建一个双向链表队列,保存等待获取锁的线程。
  4. headtail

    • 类型:Node
    • 作用:
      • head:队列头部,总是指向当前持有锁的线程节点。
      • tail:队列尾部,指向最后一个等待的线程节点。

核心方法解析

1. lock()
public void lock() {
    acquire(1);
}
  • 作用:获取锁的入口方法。
  • 参数 1 表示获取锁的次数(支持重入时会累加)。
  • 逻辑:调用 acquire(1) 开始抢占锁。
2. acquire(int arg)
private void acquire(int arg) {
    if (!tryAcquire(arg)) {
        Node node = addWaiter();
        acquireQueued(node, arg);
    }
}
  • 作用:尝试获取锁,如果失败则将线程加入等待队列并阻塞。
  • 步骤:
    1. 调用 tryAcquire(arg) 尝试抢占锁。
      • 如果成功,直接返回。
      • 如果失败,继续执行。
    2. 调用 addWaiter() 将当前线程封装成 Node 并加入队列。
    3. 调用 acquireQueued(node, arg) 阻塞线程并等待获取锁。
3. tryAcquire(int arg)
private boolean tryAcquire(int arg) {
    if (state == 0) {
        if (!hasQueuedPredecessor() && casState(0, arg)) {
            exclusiveOwerThread = Thread.currentThread();
            return true;
        }
    } else if (Thread.currentThread().equals(exclusiveOwerThread)) {
        int c = getState();
        c += arg;
        state = c;
    }
    return false;
}
  • 作用:尝试获取锁(非阻塞)。
  • 返回值:
    • true:抢占成功。
    • false:抢占失败。
  • 逻辑:
    1. 锁未被占用(state == 0
      • 检查队列中是否有等待线程(hasQueuedPredecessor())。
      • 如果没有等待线程且 casState(0, arg) 成功(原子地将 state0 改为 arg),则:
        • 设置 exclusiveOwerThread 为当前线程。
        • 返回 true
    2. 锁已被占用且当前线程是持有者
      • 如果当前线程已经持有锁(支持重入),则:
        • state 增加 arg(重入计数)。
        • 返回 true
    3. 其他情况
      • 返回 false,表示抢锁失败。
4. hasQueuedPredecessor()
private boolean hasQueuedPredecessor() {
    Node h = head;
    Node t = tail;
    Node s;
    return h != t && ((s = h.next) == null || s.thread == Thread.currentThread());
}
  • 作用:判断当前线程是否需要排队。
  • 返回值:
    • true:队列中有等待线程,当前线程需要排队。
    • false:队列为空或当前线程是 head.next(有资格抢锁)。
  • 逻辑:
    • h != t:队列不为空。
    • (s = h.next) == null:队列刚初始化或正在调整。
    • s.thread == Thread.currentThread():当前线程是队列中的下一个线程。
5. addWaiter()
private Node addWaiter() {
    Node newNode = new Node(Thread.currentThread());
    Node pred = tail;
    while (pred != null) {
        newNode.prev = pred;
        if (casTail(pred, newNode)) {
            pred.next = newNode;
            return newNode;
        }
    }
    enq(newNode);
    return newNode;
}
  • 作用:将当前线程加入等待队列。
  • 逻辑:
    1. 创建一个新节点 newNode,绑定当前线程。
    2. 如果 tail != null,尝试将新节点加入队列尾部:
      • 设置 newNode.prev = pred
      • 使用 casTail 原子更新 tail
      • 如果成功,更新 pred.next 并返回。
    3. 如果 tail == null 或 CAS 失败,调用 enq(newNode) 自旋入队。
6. enq(Node node)
private void enq(Node node) {
    for (;;) {
        if (tail == null) {
            if (casHead(new Node())) {
                tail = head;
            }
        } else {
            Node pred = tail;
            node.prev = pred;
            if (casTail(pred, node)) {
                pred.next = node;
                return;
            }
        }
    }
}
  • 作用:自旋方式将节点加入队列,确保入队成功。
  • 逻辑:
    1. 队列为空(tail == null
      • 创建一个空的 head 节点并尝试通过 CAS 设置。
      • 设置 tail = head
    2. 队列非空
      • 将新节点加入 tail 后,通过 CAS 更新 tail
      • 更新链表指针后返回。
7. acquireQueued(Node node, int arg)
private void acquireQueued(Node node, int arg) {
    for (;;) {
        Node pred = node.prev;
        if (pred == head && tryAcquire(arg)) {
            setHead(node);
            pred.next = null;
            return;
        }
        LockSupport.park();
    }
}
  • 作用:阻塞当前线程,直到成功获取锁。
  • 逻辑:
    • 自旋检查:
      • 如果当前节点的前驱是 headtryAcquire 成功:
        • 将当前节点设为 head
        • 断开原 headnext 指针。
        • 返回。
      • 否则,调用 LockSupport.park() 阻塞线程。
8. unlock()
public void unlock() {
    release(1);
}
  • 作用:释放锁。
  • 逻辑:调用 release(1) 释放锁。
9. release(int arg)
private void release(int arg) {
    if (tryRelease(arg)) {
        Node head = getHead();
        if (head.next != null) {
            unparkSuccessor(head);
        }
    }
}
  • 作用:释放锁并唤醒队列中的下一个线程。
  • 逻辑:
    • 调用 tryRelease(arg) 尝试释放锁。
    • 如果完全释放成功(state == 0)且队列中有等待线程:
      • 调用 unparkSuccessor(head) 唤醒 head.next 的线程。
10. tryRelease(int arg)
private boolean tryRelease(int arg) {
    int c = getState() - arg;
    if (getExclusiveOwerThread() != Thread.currentThread()) {
        throw new RuntimeException("fuck error");
    }
    if (c == 0) {
        exclusiveOwerThread = null;
        state = 0;
        return true;
    }
    state = c;
    return false;
}
  • 作用:尝试释放锁。
  • 返回值:
    • true:锁完全释放。
    • false:锁部分释放(重入计数减少)。
  • 逻辑:
    • 检查当前线程是否是锁持有者,否则抛出异常。
    • 计算新状态 c = state - arg
    • 如果 c == 0
      • 清空 exclusiveOwerThread
      • 设置 state = 0
      • 返回 true
    • 否则:
      • 更新 state = c
      • 返回 false
11. unparkSuccessor(Node node)
private void unparkSuccessor(Node node) {
    Node next = node.next;
    if (next != null && next.thread != null) {
        LockSupport.unpark(next.thread);
    }
}
  • 作用:唤醒队列中的下一个线程。
  • 逻辑:
    • 获取 node.next
    • 如果存在且绑定了线程,调用 LockSupport.unpark() 唤醒。

公平锁的实现要点

  1. 先来后到
    • 通过 hasQueuedPredecessor() 确保只有队列为空或当前线程是 head.next 时才能抢锁。
  2. 可重入性
    • 如果当前线程已持有锁,tryAcquire 会增加 state 值,而不是阻塞。
  3. 线程安全
    • 使用 CAS 操作(casStatecasHeadcasTail)保证并发下的原子性。
    • volatile 修饰 state 确保可见性。
  4. 阻塞与唤醒
    • 使用 LockSupport.park()unpark() 实现线程的挂起和唤醒。

注意事项

  1. CAS 实现缺失
    • 代码中的 casStatecasHeadcasTail 方法仅返回 true,没有实现真正的原子操作。实际中需要使用 UnsafeAtomicInteger
  2. 边界情况
    • tryRelease 中存在错误:state = 0 应为 state = c,否则非完全释放时会错误清零。
  3. 性能
    • 自旋操作(enqacquireQueued)可能在高并发下消耗 CPU。

总结

这段代码通过双向链表队列和状态管理实现了一个公平、可重入的独占锁。核心逻辑是:

  • 获取锁:尝试抢占 -> 失败则入队 -> 阻塞等待。
  • 释放锁:减少重入计数 -> 完全释放则唤醒队列中的下一个线程。
    尽管实现简化和部分功能未完善(如 CAS),但它很好地展示了 ReentrantLock 公平锁的核心原理。

完整代码:

import java.util.concurrent.locks.LockSupport;

/**
 *          可以重入的、独占的公平锁
 *
 * */

public class MiniLock implements Lock {
    /**
     *  锁的是资源
     *  0 -> 未加锁状态
     * !0 -> 加锁状态
     */
    private volatile int state;

    // 记录当前获取锁的线程  独占模式 在同一时刻只有一个线程可以持有锁,其他线程未获取到就会被阻塞
    public Thread exclusiveOwerThread;

    /**
     *  需要有两个引用来维护被阻塞线程的队列
     *  1. head:队列头结点,头节点就是当前占有锁的线程
     *  2. tail:队列的尾节点
     */
    private Node head;
    private Node tail;

    public Node getHead() {
        return head;
    }

    public int getState() {
        return state;
    }

    public Node getTail() {
        return tail;
    }

    public void setHead(Node head) {
        this.head = head;
    }

    public Thread getExclusiveOwerThread() {
        return exclusiveOwerThread;
    }

    // 用一个链表队列保存要维护的线程
    static final class Node {
        Node prev;  // 前置节点
        Node next;  // 后置节点
        Thread thread;  // 当前node节点的线程

        public Node(Thread thread) {
            this.thread = thread;
        }

        public Node() {
        }
    }
    /**
     *  目的:获取锁,假设当前锁被占用,会阻塞调用者的线程,直到抢占到锁为止
     *
     *  模拟公平锁:先来后到
     *
     *  lock过程:
     *  情景1:线程进入 发现 state == 0,直接抢锁
     *  情景2:线程进入 发现 state > 0,需要进入锁的阻塞队列中
     * */
    @Override
    public void lock() {
        acquire(1);
    }

    /**
     * 争资源
     * 1. 尝试抢占锁,成功占到 就返回
     * 2. 抢占失败 放入队列 将线程阻塞
     */
    private void acquire(int arg){
        if(!tryAcquire(arg)){
            // 1. 如果抢锁失败 把自己打包成一个node 进入队列
            Node node = addWaiter();
            // 2. 然后阻塞
            acquireQueued(node, arg);
        }
    }


    private void acquireQueued(Node node, int arg){
        // 只有当前node成功获取到锁 才会跳出自旋
        for(;;){
            // 什么时候会被唤醒获取到锁  head.next 先来后到
            Node pred = node.prev;
            if(pred == head/*是否有资格来抢锁*/ && tryAcquire(arg)){
                // 竞争成功
                // 1. 把自己设置成head节点
                // 2. 原先的head出队
                setHead(node);
                pred.next = null;
                return;
            }

            // 枪锁失败
            LockSupport.park();
        }
    }

    /**
     *  尝试抢占锁失败后:
     *  1. 把当前的线程封装成Node 进入阻塞队列
     *  2. 将当前线程park掉 处于挂起状态
     *
     *  唤醒之后需要:
     *  1. 先检测一下自己是否为 head.next 节点(只有head.next有权限抢占锁)
     *  2. 枪锁
     *   成功:成功后先将自己设置为head节点 并且 之前的head出队列
     *   失败:失败后继续 park 等待下一次唤醒
     */


    /**
     * 将当前线程入队列 并且返回当前线程节点
     *
     * addWaiter 保证入队成功
     * @return
     */
    private Node addWaiter(){
        Node newNode = new Node(Thread.currentThread());

        // 进入阻塞队列
        // 1. 先找到前置节点 pred 去更新前置节点  2. cas更新tail节点  3. pred.next = newNode
        Node pred = tail;
        while(pred != null){
            newNode.prev = pred;
            if (casTail(pred, newNode)) {       // if cas 成功
                pred.next = newNode;
                return newNode;
            }
        }
        // 情况讨论:
        // 1. tail == null
        // 2. cas tail失败

        // 自旋入队
        enq(newNode);
        return newNode;
    }

    /**
     *  自旋入队, 只有成功后才可以返回
     *  tail == null
     *  cas tail 失败
     * @param node
     */
    private void enq(Node node){
        for(;;){
            // 队列为空 当前持有锁的线程并没有入队 并且没设置任何node
            // 作为后驱节点,需要为第一个线程完成node入队的情况,保持(head 节点任何时候都是占用锁的线程)这个事情
            if (tail == null) {
                // 说明成功了 当前持有锁的线程补充head线程成功了
                if(casHead(new Node())){
                    tail = head;
                    // 这里不返回
                }
            } else {
                Node pred = tail;
                node.prev = pred;
                if(casTail(pred, node)){
                    pred.next = node;
                    return;
                }
            }
        }
    }



    /**
     * 尝试获取锁 不会阻塞线程
     * true - 抢占成功
     * false - 抢占失败
     */
    private boolean tryAcquire(int arg){
        if (state == 0) {
            // 当前 state == 0 的话 是否能直接抢锁  肯定不行
            // 公平锁是先来后到   条件1:当前锁中阻塞队列没有线程等待 条件2:cas方式去设置state避免线程并发下同时进行state = 1
            // 抢占成功
            if(!hasQueuedPredecessor() && casState(0,arg)) {
                exclusiveOwerThread = Thread.currentThread();   // 设置其独占线程
                return true;
            }
            // 当前锁被占用 并且 自己正在占用线程
        } else if(Thread.currentThread().equals(exclusiveOwerThread)) {
            // 锁被重入

            // 当前不会出现并发,条件限制只可能有一个线程进来
            int c = getState();
            c += arg;
            state = c;
        } else {

        }
        return false;
    }

    /**
     * 判断队列中当前是否有线程等待
     * true - 有等待
     * false - 无等待
     * hasQueuedPredecessor 调用锁 lock -> acquire -> tryAcquire -> hasQueuedPredecessor
     *
     * 什么时候返回 false
     * 1. 队列为空
     * 2. 抢锁的线程是head.next线程的时候 (head.next是唯一在队中拥有枪锁资格的线程)
     */
    private boolean hasQueuedPredecessor() {
        Node h = head;
        Node t = tail;
        Node s;
        // h != t : 1.head和tail都为空 2. head和tail指向同一个节点
        // (s = h.next) == null :head和tail指向同一节点  队列在释放head的时候 有一瞬间会处于 head.next为空的状态
        // s.thread == Thread.currentThread() : 当前线程是不是next节点
        return h != t &&((s = h.next) == null || s.thread == Thread.currentThread());
    }
    @Override
    public void unlock() {
        release(1);
    }

    /**
     * 释放锁
     * @param arg
     */
    private void release(int arg){
        if(tryRelease(arg)){
            // 说明完全释放成功了 队列中还有node睡觉中 需要唤醒
            Node head = getHead();
            if(head.next != null){
                unparkSuccessor(head);
            }
        }
    }

    private void unparkSuccessor(Node node){
        Node next = node.next;
        if (next != null && next.thread != null) {
            LockSupport.unpark(next.thread);
        }
    }

    /**
     * true - 完全释放锁
     * false - 部分释放锁
     * @param arg
     * @return
     */
    private boolean tryRelease(int arg){
        int c = getState() - arg;
        if(getExclusiveOwerThread() != Thread.currentThread()){
            throw new RuntimeException("fuck error");
        }
        // 执行到这里 不会有并发
        if(c == 0){
            // 完全释放锁的状态
            // 要做:
            // 1. ownerThread = null
            // 2. state = 0
            exclusiveOwerThread = null;
            state = 0;
            return true;
        }

        state = 0;
        return false;
    }

    private final boolean casState(int expect, int update){
        return true;
    }

    private final boolean casHead(Node update){
        return true;
    }

    private final boolean casTail(Node expect, Node update){
        return true;
    }
}