【Redisson 加锁源码解析】

发布于:2025-08-31 ⋅ 阅读:(24) ⋅ 点赞:(0)

Redisson 源码解析 —— 分布式锁实现过程

在分布式系统中,分布式锁 是非常常见的需求,用来保证多个节点之间的互斥操作。Redisson 是 Redis 的一个 Java 客户端,它提供了对分布式锁的良好封装。本文将从源码角度剖析 Redisson 的分布式锁实现过程。


一、分布式锁的基本需求

一个健壮的分布式锁需要满足以下条件:

  1. 互斥性:同一时间只能有一个客户端持有锁。
  2. 死锁避免:客户端宕机后,锁不会永久被占用。
  3. 可重入性:同一线程可多次获取同一把锁。
  4. 高可用性:在 Redis 集群模式下仍能正常工作。
  5. 超时释放:设置持有锁时间,时间超过锁释放,避免死锁。
  6. 锁时间续约:看门狗机制,避免业务未执行完毕锁释放,导致并发问题。

二、Redisson 分布式锁的核心实现类以及加锁方法

在源码中,Redisson 提供了多种锁的实现,最核心的是:

  • RedissonLock —— 基于 Redis 的可重入锁实现
  • RedissonReadWriteLock —— 读写锁
  • RedissonFairLock —— 公平锁

我们主要关注 RedissonLock 的实现。


 RLock lock = redissonClient.getLock("32r");
 lock.方法名()

常用加锁方法:
在这里插入图片描述

  1. lock():获取锁,获取不到会一致阻塞直到获取。通过看门狗机制续期,默认持有锁是30s,每隔10s续期一次。
  2. lock(long l, TimeUnit timeUnit):获取锁,获取不到会一致阻塞直到获取。持有锁时间是手动入参的timeUnit,到期释放锁。
  3. tryLock(long waite, long l1, TimeUnit timeUnit) :获取锁失败后,自旋,等待 waite 秒,获取不到返回false,获取到,持有锁时间是 l1,单位 timeUnit。
  4. tryLock():尝试获取一次锁,如果获取不到,立即返回 false,获取锁成功,触发 看门狗续期机制(和 lock() 一样)。
  5. tryLock(long waitTime, TimeUnit unit):在 waitTime 时间窗口内,不断尝试执行,范围内获取锁失败,返回false。获取成功,启动看门狗机制。
 RLock lock = redissonClient.getLock("32r");

我们可以看到 redissonClient 调用这个方法时候,客户端返回的是RedissonLock这个类
在这里插入图片描述

所以对应的我们主要关注 RedissonLock 子类和父类RedissonBaseLock
在这里插入图片描述

这里我主要分析 lock() 方法的调用,其他锁的逻辑都是参考这个去完善的。

三、加锁流程解析

1. 调用入口

当我们执行:

RLock lock = redisson.getLock("myLock");
lock.lock();

进入RedissonLock#lock方法:
在这里插入图片描述
可以看到调用lock方法其实都是调用的另外一个lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法。
对应真正调用的lock()方法:

/**
 * 获取分布式锁的核心方法
 * @param leaseTime 锁的租约时间
 * @param unit 时间单位
 * @param interruptibly 是否允许中断
 * @throws InterruptedException 当线程被中断时抛出
 */
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    // 获取当前线程ID,用于标识锁的持有者
    long threadId = Thread.currentThread().getId();
    
    // 尝试获取锁,返回剩余的TTL(生存时间)
    // 如果返回null表示获取锁成功,否则返回锁的剩余过期时间
    Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
    
    // 如果ttl不为null,说明锁获取失败,需要等待
    if (ttl != null) {
        // 订阅锁释放的通知,返回一个Future对象
        CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);
        
        // 设置订阅操作的超时时间
        this.pubSub.timeout(future);
        
        // 根据是否允许中断来获取订阅结果
        RedissonLockEntry entry;
        if (interruptibly) {
            // 允许中断的方式获取结果
            entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);
        } else {
            // 不允许中断的方式获取结果
            entry = (RedissonLockEntry)this.commandExecutor.get(future);
        }
        
        try {
            // 自旋等待锁释放
            while(true) {
                // 再次尝试获取锁
                ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
                
                // 如果获取锁成功(ttl为null),则退出循环
                if (ttl == null) {
                    return;
                }
                
                // 如果ttl大于等于0,说明锁还存在,需要等待指定的时间
                if (ttl >= 0L) {
                    try {
                        // 使用信号量等待指定的ttl时间
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        // 如果允许中断,直接抛出异常
                        if (interruptibly) {
                            throw e;
                        }
                        // 如果不允许中断,继续等待
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    // 如果ttl小于0,表示需要无限等待
                    if (interruptibly) {
                        // 允许中断的无限等待
                        entry.getLatch().acquire();
                    } else {
                        // 不允许中断的无限等待
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            // 无论成功与否,都要取消订阅,释放资源
            this.unsubscribe(entry, threadId);
        }
    }
}

这时候我们只需要重点关注对应的this.tryAcquire(-1L, leaseTime, unit, threadId);这个方法。
源码图如下:
在这里插入图片描述
对应的Java代码解释:

/**
 * 异步尝试获取锁
 * @param waitTime 等待时间
 * @param leaseTime 锁的租约时间
 * @param unit 时间单位
 * @param threadId 线程ID
 * @return 返回锁的剩余TTL时间,null表示获取锁成功
 */
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 声明TTL剩余时间的Future对象
    RFuture<Long> ttlRemainingFuture;
    
    // 判断是否指定了租约时间
    if (leaseTime > 0L) {
        // 使用指定的租约时间尝试获取锁
        ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 使用默认的内部锁租约时间尝试获取锁
        ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }

    // 对获取锁的结果进行后续处理
    CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
        // 如果ttlRemaining为null,说明成功获取到锁
        if (ttlRemaining == null) {
            // 判断是否指定了租约时间
            if (leaseTime > 0L) {
                // 将指定的租约时间转换为毫秒并存储到内部锁租约时间
                this.internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 如果没有指定租约时间,启动锁的自动续期机制
                // 防止锁因过期而被误释放
                this.scheduleExpirationRenewal(threadId);
            }
        }

        // 返回TTL剩余时间(null表示获取锁成功,非null表示需要等待的时间)
        return ttlRemaining;
    });
    
    // 将CompletionStage包装成RFuture并返回
    return new CompletableFutureWrapper(f);
}

这里最重要的是调用对应的tryAcquire里面的tryLockInnerAsync方法,方法详解如下:

 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(this.getRawName(), 
        LongCodec.INSTANCE, 
        command,
         "if (redis.call('exists', KEYS[1]) == 0) then 
         redis.call('hincrby', KEYS[1], ARGV[2], 1);
          redis.call('pexpire', KEYS[1], ARGV[1]); 
          return nil; 
          end; 
          if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
          then
           redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); 
           return nil; 
           end; 
           return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    }

这个tryLockInnerAsync方法主要是执行对应的脚本,然后返回剩余的时间,如果获取锁成功返回 nil ,获取锁失败会返回 持有锁的锁过期时间

核心 Lua 脚本详解如下:

Redisson 并不是简单地 SETNX,而是使用 Lua 脚本 来保证操作的原子性
加锁脚本大致逻辑如下:

if (redis.call('exists', KEYS[1]) == 0) then
    -- 锁不存在,设置锁并绑定到线程
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

-- 锁已存在,判断是否是当前线程重入
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

return redis.call('pttl', KEYS[1]);

解释:

  • KEYS[1]: 锁的 key (如 myLock)
  • ARGV[1]: 锁的过期时间(默认 30s)
  • ARGV[2]: 当前线程标识(由 UUID + 线程 ID 组成)

执行流程:

  1. 如果锁不存在,设置 hash,key = 线程标识,value = 1。
  2. 如果锁存在且是自己线程,则递增重入次数。
  3. 否则返回锁的剩余过期时间。

问题延伸:
Redis不是单线程吗,高并发线程下不是线程安全吗?为什么还需要使用Lua脚本保证原子性
想想为什么使用lua脚本,你可以想象一下高并发场景下,Redis执行命令是单线程的,Redis只能保证对应的单条命令是原子性的,不能保证多条命令的原子性,假设线程A执行:redis.call('exists', KEYS[1]) == 0结束后,线程B抢到执行权,然后线程B也执行:redis.call('exists', KEYS[1]) == 0,然后后续大家都会进行对应的锁设置,导致线程A上锁可能会被覆盖,不过可以用hsetnx解决,但是后续可能判断还是会有并发问题。使用 lua 脚本可以将多条命令整合成类似一条命令,redis执行,从而保证原子性

WatchDog 自动续期机制

Redisson 的一大亮点是 锁续期机制

  • 当线程获取锁后,会启动一个 看门狗定时任务,默认每隔 lockWatchdogTimeout / 3 秒续期一次(默认 30s → 10s)。
  • 如果业务逻辑执行很久,不用担心锁被提前释放。
  • 如果线程宕机,定时任务不再执行,锁会在超时后自动释放。

判断对应的leasetime有没有指定,然后执行对应的续期或不续期的方法
源码关键点在:scheduleExpirationRenewal() 方法。
关键代码

   CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
            if (ttlRemaining == null) {
                if (leaseTime > 0L) {
                    this.internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    this.scheduleExpirationRenewal(threadId);
                }
            }

            return ttlRemaining;
        });

根据对应的没指定leaseTime ,然后执行对应的RedissonBaseLock#scheduleExpirationRenewal对应的方法逻辑如下:

  /**
 * 调度锁的过期时间续期任务
 * 为指定线程启动自动续期机制,防止锁因过期而被误释放
 * @param threadId 需要续期的线程ID
 */
protected void scheduleExpirationRenewal(long threadId) {
    // 创建新的过期时间管理条目
    ExpirationEntry entry = new ExpirationEntry();
    
    // 尝试将新条目放入续期映射表中,如果已存在则返回旧条目
    // 使用putIfAbsent确保原子性操作,避免并发问题
    ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    
    // 判断是否已经存在续期任务
    if (oldEntry != null) {
        // 如果已存在续期任务,只需将当前线程ID添加到现有条目中
        // 这种情况发生在同一个锁被多个线程(可重入锁)或同一线程多次获取时
        oldEntry.addThreadId(threadId);
    } else {
        // 如果是首次为这个锁创建续期任务
        
        // 将当前线程ID添加到新创建的条目中
        entry.addThreadId(threadId);
        
        try {
            // 启动实际的续期任务
            // 这会创建定时任务,定期延长锁的过期时间
            this.renewExpiration();
        } finally {
            // 检查当前线程是否被中断
            if (Thread.currentThread().isInterrupted()) {
                // 如果线程被中断,取消刚刚启动的续期任务
                // 防止资源泄漏和无效的续期操作
                this.cancelExpirationRenewal(threadId);
            }
        }
    }
}

这个通过一个创建一个ExpirationEntry 然后通过EXPIRATION_RENEWAL_MAP判断是否存在,如果条目不存在就启动对应的自动续期机制任务 renewExpiration()

RedissonBaseLock#renewExpiration()方法如下:

/**
 * 启动锁的自动续期机制
 * 创建定时任务,定期延长锁的过期时间,防止锁因超时而被释放
 */
private void renewExpiration() {
    // 从续期映射表中获取当前锁的过期时间管理条目
    ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    
    // 如果条目存在,说明需要为这个锁设置续期任务
    if (ee != null) {
        // 创建定时任务,在锁租约时间的1/3处执行续期操作
        // 选择1/3时间点是为了在锁过期前有足够的时间进行续期
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                // 定时任务执行时,重新获取续期条目(防止在延迟期间被移除)
                ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
                
                // 双重检查:确保续期条目仍然存在
                if (ent != null) {
                    // 获取需要续期的第一个线程ID
                    // 对于可重入锁,可能有多个线程ID,取第一个进行续期
                    Long threadId = ent.getFirstThreadId();
                    
                    // 如果线程ID有效,执行续期操作
                    if (threadId != null) {
                        // 异步执行锁的续期操作
                        CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
                        
                        // 处理续期结果
                        future.whenComplete((res, e) -> {
                            // 如果续期过程中发生异常
                            if (e != null) {
                                // 记录错误日志
                                RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);
                                // 从续期映射表中移除条目,停止续期
                                RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
                            } else {
                                // 续期操作成功完成
                                if (res) {
                                    // 如果续期成功(返回true),递归调用继续下一轮续期
                                    // 这样就形成了持续的自动续期循环
                                    RedissonBaseLock.this.renewExpiration();
                                } else {
                                    // 如果续期失败(返回false),说明锁已经不存在或不属于当前线程
                                    // 取消续期任务,清理资源
                                    RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
                                }
                            }
                        });
                    }
                }
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 在租约时间的1/3处执行续期
        
        // 将定时任务保存到条目中,用于后续的取消操作
        ee.setTimeout(task);
    }
}

最后完美结束对应的获取锁的过程,返回一个对应的时间值 ttl
在这里插入图片描述
如果返回的是null代表加锁成功,否则是加锁失败,此时会进行订阅持有锁者this.subscribe(threadId),如果释放锁会通知这个获取锁失败的线程,会将这个线程唤醒。

四、解锁流程解析

解锁的流程

解锁时同样使用 Lua 脚本,保证原子性:

if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then
    return nil;
end;

local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1);

if (counter > 0) then
    return 0;
else
    redis.call('del', KEYS[1]);
    return 1;
end;

解释:

  1. 检查当前线程是否持有锁。
  2. 如果是可重入锁,计数 -1。
  3. 如果计数为 0,则删除锁。

六、源码设计亮点

  1. Lua 脚本保证原子性,避免分布式并发问题。
  2. 可重入性设计:使用 hash 结构存储线程标识和重入次数。
  3. 锁超时释放设计:避免死锁问题。
  4. 看门狗机制:保证长时间任务也能安全持有锁。
  5. 异步化设计:Redisson 提供 lockAsync() 等方法,方便高并发场景。

七、总结

  • Redisson 的分布式锁实现基于 Redis + Lua 脚本,解决了互斥、可重入和死锁问题。
  • 看门狗续期机制 是 Redisson 的亮点,保证了业务执行时间不可预测的情况下的安全性。
  • 在生产环境中,Redisson 的分布式锁相较于 SETNX + EXPIRE 的手写版本,更加健壮和可靠。

网站公告

今日签到

点亮在社区的每一天
去签到