Redisson框架

发布于:2024-07-04 ⋅ 阅读:(8) ⋅ 点赞:(0)

1. Redisson锁与Redis订阅与发布模式的联系:

Redisson锁中,使用订阅发布模式去通知等待锁的客户端:锁已经释放,可以进行抢锁。

  • publish channel_name message:将消息发送到指定频道
    • 解锁时,在Lua解锁脚本中使用:场景1:锁不存在时,发送消息到解锁频道;场景2:锁重入记数hash等于0时,发送消息到解锁频道。
  • subscribe channel_name ... :客户端订阅一个或多个频道消息
    • 加锁时,没有争抢锁成功的客户端,需要订阅解锁频道 -> RedissonLock#tryLock中订阅频道后,会用await()阻塞等待锁释放。直到获取推送的频道解锁消息,才取消等待。

  • unsubsribe channel_name ... :客户端退订一个或多个频道消息
    • 加锁时,客户端最后都会去退订解锁频道,不管加锁成功与失败。

2. RedissonLock锁:

数据结构:

  • Hash结构:key -> name 、field -> lockName (id+":"+threadId -> UUID:threadId) 、 value -> 可重入次数

加锁伪脚本:

// 如果不存在锁:则新增锁,并设置锁重入计数为1、设置锁过期时间,返回nil结束
exists name == 0                      ->  name:加锁的key名称
hset name lockName  1                 -> lockName:当前客户端标识
pexpire name  internalLockLeaseTime   -> internalLockLeaseTime:锁租期
return nil

// 如果存在锁,且唯一标识也匹配,表示当前锁可重入,重入计数+1,并重新设置锁过期时间,返回nil结束
hexists  name lockName = 1
hincrby  name lockName  1 
pexpire name  internalLockLeaseTime 
return nil

// 如果存在锁,且唯一标识不匹配,表示锁是被其他线程占用,返回剩余时间结束
return pttl name

解锁伪脚本:

// 如果锁不存在:则直接广播解锁消息,返回1,结束
exists name == 0 
publish channelName  0   -> channelName:频道名称 -> redisson_lock__channel:{name}
return 1

// 如果锁存在,但是但唯一标识不匹配:表示锁被其他线程占用,当前线程不允许解锁,返回nil,结束
hexists name lockName == 0 
return nil

// 如果锁存在,并且唯一标识匹配:则先将锁重入计数 -1
counter = hincrby name lockName -1
// 如果锁重入计数-1后,还大于0,重新设置有效期,返回0,结束
pexpire name  internalLockLeaseTime
return 0

// 如果锁重入计数-1后,等于0,直接删除key,并广播解锁消息(即唤醒其它争抢锁的被阻塞的线程),返回1,结束
del name 
publish channelName  0
return 1

参考文档:https://www.cnblogs.com/huangwentian/p/14622441.html

3. RedissonFairLock锁:

数据结构:

  • Hash结构:key -> name 、field -> lockName (UUID:threadId)、 value -> 可重入次数
  • List结构:key -> redisson_lock_queue:{lockName} 、vaule -> lockName (UUID:threadId)
    • 作用:实现公平锁,按顺序记录线程争抢锁的顺序。
  • Zset结构:key -> redisson_lock_timeout:{lockName}、 score -> timeout(过期时间,它是时间戳) member -> lockName (id+":"+threadId)
    • 作用1:利用Zset分值的功能,清除等待队列中,等待加锁,但是已经过期的线程。
    • 作用2:利用Zset分值的功能,在加锁不成功后,计算需要等待的时间(ttl)和更新Zset分值(timeout)

加锁伪脚本:

/ 删除列表中第一个位置是过期的线程
// 死循环
while true do 
  // list中获取第一元素,如果firstThreadId2为空,直接结束循环
  firstThreadId2 = lindex  redisson_lock_queue:{lockName}  0
  if firstThreadId2 == false
      break;
  // 如果firstThreadId2不为空,从Zset中获取分值timeout
  timeout = zscore redisson_lock_timeout:{lockName} firstThreadId2
  // 如果 timeout <= 当前时间戳,从Zset中删除该成员,从List弹出该元素
  if timeout <= currentTime
      zrem redisson_lock_timeout:{lockName}  firstThreadId2
      lpop  redisson_lock_queue:{lockName} 
  // 如果 timeout > 当前时间戳,结束循环
      break;
      
// 如果不存在name的key,并且不存队列或者队列第一个元素是lockName时,当前线程可以获得锁
if exists name == 0  && (exists redisson_lock_timeout:{lockName} == 0  ||   lindex  redisson_lock_queue:{lockName}  0 ==  lockName)  
    lpop redisson_lock_queue:{lockName}  
    zrem redisson_lock_timeout:{lockName}  lockName
    hset name  lockName  1
    pexpire name internalLockLeaseTime
    return nil
    
// 如果存在name的key,并且hash结构的field也是lockname时,表示当前线程已经获得锁,此时处理锁重入
if hexists name lockName == 1
    hincrby name lockName 1
    pexpire name internalLockLeaseTime
    return nil

//  获取列表第一个元素,计算当前的ttl,然后再重新计算timeout赋值给Zset中,Zset添加成功后,再rpush到队列,返回ttl
firstThreadId = lindex  redisson_lock_queue:{lockName}  0
ttl
// 如果列表第一元素不为空,并且不是lockName时,从Zset中获取lockName的timeout,计算ttl
if firstThreadId ~= false and firstThreadId ~= lockName
    ttl = (zscore redisson_lock_timeout:{lockName} lockName) - currentTime
    // 列表第一元素是lockName时,获取ttl
    ttl = pttl name
    // 计算 timeout
    timeout = ttl + (currentTime + threadWaitTime);
    // 如果 zadd 添加成功,则 rpush
    if (zadd  redisson_lock_timeout:{lockName}  timeout lockName) == 1
        rpush redisson_lock_queue:{lockName}  
    return ttl          

解锁伪脚本:

// 删除列表中第一个位置是过期的线程
... ...
// 1. 如果不存name时,获取列表第一个元素,并且第一元素不为空,向解锁频道发送通知
if exists name == 0
     nextThreadId = lindex  redisson_lock_queue:{lockName}  0
     if nextThreadId ~= false 
         publish channel_name .. ':' .. nextThreadId 0
     return 1
     
// 2. 如果 hexists name lockName 不存在时,直接返回nil
if hexists name lockName == 0
    return nil
    
// 3. 如果 hexists name lockName 存在时,处理锁可重入,或删除key,通知下一个元素
counter = hincrby  name  lockName  -1
// 如果counter大于0,设置过期时间
if counter > 0
    pexpire name internalLockLeaseTime
return 0
// 如果counter小于等于0,删除key
del name
// 获取列表第一个元素,如果不为空,发送解锁通知
nextThreadId = lindex  redisson_lock_queue:{lockName}  0
if nextThreadId ~= false 
    publish channel_name .. ':' .. nextThreadId 0
return 1   

4. 延迟队列:

两个API概念:

  • RBlockingQueue 就是目标队列
  • RDelayedQueue 就是中转队列

  • 我们实际操作的是RBlockingQueue队列,并不是RDelayedQueue队列,RDelayedQueue主要是提供中间转发的一个队列。
  • 我们都是基于RBlockingQueue目标队列在进行消费,而RDelayedQueue就是会把过期的消息放入到我们的目标队列中。
  • 我们只要从RBlockingQueue队列中取数据即可。

数据结构:

  • ZSet结构:key -> redisson_delay_queue_timeout:{name}、score -> System.currentTimeMillis() + delayInMs、member ->数据使用 struct.pack()包装,包含 'dLc0'、randomId、string.len(数据)、数据
    • 作用:该队列实现了获取延迟数据的功能。
  • List结构:key-> redisson_delay_queue:{name} 、value -> 数据使用 struct.pack()包装,包含 'dLc0'、randomId、string.len(数据)、数据
    • 作用:该队列记录了所有数据的加入顺序。
  • List结构:key -> 出入阻塞队列的名称,上边的{name}、value -> 数据

处理流程(展示一条主线):

Redisson延迟队列剖析_why redissondelayedqueue use list-CSDN博客

  • 实例化RedissonDelayedQueue时:
  1. 设置频道名称(redisson_delay_queue_channel:{name})、队列名称(redisson_delay_queue:{name})、延迟队列名称(redisson_delay_queue_timeout:{name})
  2. 设一个“将到期的任务推送阻塞队列”的任务(i.获取到期数据,到阻塞队列的Lua脚本;ii.设置发布订阅的主题)
  3. 启动该任务,并且添加相关监听(i.添加监听订阅发布的事件 ->pushTask() ;ii.添加监听消息的事件->scheduleTask(startTime))
  • 添加消息时:
  1. 向timeoutSet延迟队列添加数据
  2. 向queue队列添加数据
  3. 获取timeoutSet延迟队列的队头数据,如果队头数据等于刚加入的数据,则向频道发送消息
  • 触发频道监听事件->scheduleTask(startTime):
  1. 计算延迟时间 -> delay = startTime - System.currentTimeMillis()
  2. 如果延迟大于10,启动延迟任务
  3. 否则立即执行启动 -> pushTask()
  • 将到期的任务推送到阻塞队列 -> pushTask():
  1. 调用实例化时,定义的pushTaskAsync
  2. 添加监听操作完成的事件,在执行成功后,判断返回值是否为空,不为空调用scheduleTask(future.getNow())