Redisson 分布式锁原理解析
分布锁有很多种形式,今天就来研究下Redisson 分布式锁的实现
Redisson 分布式锁使用
在讲实现前,先来说下Redisson分布式锁的使用。首先先引入Redisson 得依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.47.0</version>
</dependency>
如果是Spring项目则引入starter
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.47.0</version>
</dependency>
(这里使用spring项目作为基础使用)
public class RedissionClientTest {
public void RedissonTest(){
// 获取锁实例
RLock lock = redissonClient.getLock(key);
try {
// 尝试加锁
lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
} finally {
// 释放锁
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
上面是对于常规分布式锁的使用,一般会使用tryLock进行加锁,以下是对锁参数的解释:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
waitTime:获取锁的等待时间,现场会尝试获取锁,超过这个时间,获取锁失败
leaseTime:锁的过期时间,超过时间,自动释放锁
Redisson源码实现
WatchDog 看门狗机制原理
如果说我们不设置过期时间,则redisson会有锁的续约机制,也就是常说的看门狗机制,watchDog。
代码的实现如下:
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
internalLockLeaseTime 默认为 30 * 1000 也就是30秒,redisson开了个task每过1/3得时间也就是10秒会自动续约一次,直到现场手动释放锁。
Redisson 各个类型锁的实现
非公平锁
先来说下非公平锁的实现,我们在使用的时候如果没有指定,则默认走的都是非公平锁,他的加锁是用的redis的Hash表,通过lua脚本来实现。具体源码如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
通过上面的lua脚本我们可以看出来,其实有两步:
- 先判断是否存在key了,如果不存在,也就是是否已经加过锁了,如果没有加过,则先hincrby计数+1然后设置过期时间。
- 如果存在,则说明已经获取过一次锁了,通过hincrby计数+1然后设置过期时间,此时就是实现了锁的重入功能,非常的巧妙。
公平锁实现
然后再来说下公平锁,公平锁的实现是在非公平锁的基础上加入了队列,而他队列的实现是通过ZSET结构来实现。按照时间作为score进行先后顺序排列。
lua脚本源码如下:
-- 清理等待队列中超时的线程
while true do
local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
if firstThreadId2 == false then
break;
end;
local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
if timeout <= tonumber(ARGV[4]) then
redis.call('zrem', KEYS[3], firstThreadId2);
redis.call('lpop', KEYS[2]);
else
break;
end;
end;
-- 如果锁未被占用,并且(等待队列为空 或者 队列头是自己)
if (redis.call('exists', KEYS[1]) == 0) and
((redis.call('exists', KEYS[2]) == 0) or
(redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
-- 移除自己
redis.call('lpop', KEYS[2]);
redis.call('zrem', KEYS[3], ARGV[2]);
-- 队列中的其他线程等待时间递减(防止饥饿)
local keys = redis.call('zrange', KEYS[3], 0, -1);
for i = 1, #keys, 1 do
redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
end;
-- 设置锁(hash 结构保存线程 ID + 重入次数)
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 如果锁已存在,并且当前线程是持有者:重入逻辑
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
参数说明:
KEYS 参数(由 Java 代码传入)
KEYS[i] | 描述 |
---|---|
KEYS[1] |
锁标识,如:lock:{key} ,类型:Hash(用于重入) |
KEYS[2] |
等待线程的 FIFO 队列,类型:List |
KEYS[3] |
等待线程的超时集合,类型:ZSet(score=过期时间戳) |
ARGV 参数(由 Java 代码传入)
ARGV[i] | 描述 |
---|---|
ARGV[1] |
锁 TTL(毫秒) |
ARGV[2] |
当前线程唯一标识(客户端 + UUID + threadId) |
ARGV[3] |
单线程最大等待时间(毫秒) |
ARGV[4] |
当前时间戳(毫秒) |
流程图:
线程请求锁
│
├─▶ 判断锁是否空闲 + 当前线程是否队头
│ └─是:加锁 + 设置 TTL
│ └─否:
│ ├─是否重入 → 是:重入 + TTL 延期
│ └─是否已在队列 → 是:返回预计等待时间
│ 否:加入队列,记录超时点
读锁
todo
写锁
todo