Redisson 分布式锁原理解析

发布于:2025-07-10 ⋅ 阅读:(19) ⋅ 点赞:(0)

Redisson 分布式锁原理解析

分布锁有很多种形式,今天就来研究下Redisson 分布式锁的实现

Redisson 分布式锁使用

在讲实现前,先来说下Redisson分布式锁的使用。首先先引入Redisson 得依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.47.0</version>
</dependency>

如果是Spring项目则引入starter

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.47.0</version>
</dependency>

(这里使用spring项目作为基础使用)

public class RedissionClientTest {
    public void RedissonTest(){
       // 获取锁实例
      RLock lock = redissonClient.getLock(key);
       try {
          // 尝试加锁
          lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
       } finally {
          // 释放锁
          if (lock.isLocked() && lock.isHeldByCurrentThread()) {
              lock.unlock();
          }
        }
    }
}

上面是对于常规分布式锁的使用,一般会使用tryLock进行加锁,以下是对锁参数的解释:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);

waitTime:获取锁的等待时间,现场会尝试获取锁,超过这个时间,获取锁失败

leaseTime:锁的过期时间,超过时间,自动释放锁

Redisson源码实现

WatchDog 看门狗机制原理

如果说我们不设置过期时间,则redisson会有锁的续约机制,也就是常说的看门狗机制,watchDog。

代码的实现如下:

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

internalLockLeaseTime 默认为 30 * 1000 也就是30秒,redisson开了个task每过1/3得时间也就是10秒会自动续约一次,直到现场手动释放锁。

Redisson 各个类型锁的实现

非公平锁

先来说下非公平锁的实现,我们在使用的时候如果没有指定,则默认走的都是非公平锁,他的加锁是用的redis的Hash表,通过lua脚本来实现。具体源码如下:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

通过上面的lua脚本我们可以看出来,其实有两步:

  1. 先判断是否存在key了,如果不存在,也就是是否已经加过锁了,如果没有加过,则先hincrby计数+1然后设置过期时间。
  2. 如果存在,则说明已经获取过一次锁了,通过hincrby计数+1然后设置过期时间,此时就是实现了锁的重入功能,非常的巧妙。
公平锁实现

然后再来说下公平锁,公平锁的实现是在非公平锁的基础上加入了队列,而他队列的实现是通过ZSET结构来实现。按照时间作为score进行先后顺序排列。

lua脚本源码如下:

-- 清理等待队列中超时的线程
while true do
    local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
    if firstThreadId2 == false then
        break;
    end;

    local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
    if timeout <= tonumber(ARGV[4]) then
        redis.call('zrem', KEYS[3], firstThreadId2);
        redis.call('lpop', KEYS[2]);
    else
        break;
    end;
end;

-- 如果锁未被占用,并且(等待队列为空 或者 队列头是自己)
if (redis.call('exists', KEYS[1]) == 0) and 
   ((redis.call('exists', KEYS[2]) == 0) or 
    (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then

    -- 移除自己
    redis.call('lpop', KEYS[2]);
    redis.call('zrem', KEYS[3], ARGV[2]);

    -- 队列中的其他线程等待时间递减(防止饥饿)
    local keys = redis.call('zrange', KEYS[3], 0, -1);
    for i = 1, #keys, 1 do
        redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
    end;

    -- 设置锁(hash 结构保存线程 ID + 重入次数)
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

-- 如果锁已存在,并且当前线程是持有者:重入逻辑
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

参数说明:

KEYS 参数(由 Java 代码传入)

KEYS[i] 描述
KEYS[1] 锁标识,如:lock:{key},类型:Hash(用于重入)
KEYS[2] 等待线程的 FIFO 队列,类型:List
KEYS[3] 等待线程的超时集合,类型:ZSet(score=过期时间戳)

ARGV 参数(由 Java 代码传入)

ARGV[i] 描述
ARGV[1] 锁 TTL(毫秒)
ARGV[2] 当前线程唯一标识(客户端 + UUID + threadId)
ARGV[3] 单线程最大等待时间(毫秒)
ARGV[4] 当前时间戳(毫秒)

流程图:

线程请求锁

├─▶ 判断锁是否空闲 + 当前线程是否队头
│ └─是:加锁 + 设置 TTL
│ └─否:
│ ├─是否重入 → 是:重入 + TTL 延期
│ └─是否已在队列 → 是:返回预计等待时间
│ 否:加入队列,记录超时点

读锁

todo

写锁

todo


网站公告

今日签到

点亮在社区的每一天
去签到