目录
大家好,我是千语。上期给大家讲了使用悲观锁来解决“一人一单”的并发场景。但上期使用的是一个本地锁,本地锁在集群模式下会失效。具体可以看一下我上一篇博客。
一、本地锁存在的问题
在集群模式下,该项目会启动多个实例,且每个实例都会有各种的jvm。我们上面使用到的锁其实都是本地锁,所以就可能会出现这样的情况:
张三在进行并发地判断自己是否满足一人一单时,第一个请求被分配到了实例A,获取锁并判断到数据库中还没有改商品的订单,可以抢购,但当还没有完全提交事务到数据库时,即使还没有释放锁。
张三发送第二个请求被分配到了实例B,那么用户尝试获取锁时,是可以获取到的。然后判断到数据库没有订单,可以抢单的操作,这样又造成了一个用户抢到了多个订单的操作。
解析:因为每个实例都会有自己的JVM,而JVM里面都会有自己的锁监视器,并且每个实例的锁都是存储在它自己的jvm里面的,所以请求分配到不同的实例,锁监视器监视到的锁都是打开的状态。也就是说我们上面应用锁的方式只是在单机的情况下适用,集群模式下就不适用了。
二、redis实现分布式锁原理
原理就是使用redis的setnx命令,这个命令是给redis里面set值,但是只有这个键不存在的时候才set,所以我们要获取锁时,setnx一个固定的键,获取锁成功;当其他线程也想要获取锁时,也使用setnx命令,这时候是set不到的,所以这个线程就获取锁失败。当业务执行完释放锁时,就把这个键删除就可以了。
图例:
三、使用示例
@Component
public class RedisLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 尝试获取分布式锁
* @param lockKey 锁的键
* @param expireTime 过期时间
* @param timeUnit 时间单位
* @return 获取锁成功与否
*/
public String tryLock(String lockKey, long expireTime, TimeUnit timeUnit) {
// 使用setIfAbsent方法尝试获取锁(对应Redis的SETNX命令)
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, timeUnit);//设置锁超时时间,避免死锁
return locked != null && locked; //set成功表明获取锁成功
}
/**
* 释放分布式锁
* @param lockKey 锁的键
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey) {
return redisTemplate.delete(lockKey)
}
}
业务中实际加锁操作:
public String lockTest(){
String lockKey = "product_stock_lock";
try {
// 尝试获取锁,超时时间10秒,锁持有时间30秒
lockValue = redisLockHelper.tryLock(lockKey, 30, TimeUnit.SECONDS);
if (lockValue != null) {
// 获取锁成功,执行业务逻辑
System.out.println("获取锁成功,处理库存扣减...");
// 模拟业务处理
Thread.sleep(5000);
return "库存扣减成功";
} else {
// 获取锁失败
return "系统繁忙,请稍后重试";
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "操作被中断";
} finally {
// 释放锁(只有持有锁的线程才能释放)
if (lockValue != null) {
boolean released = redisLockHelper.releaseLock(lockKey, lockValue);
System.out.println("锁释放结果: " + released);
}
}
}
四、锁误删问题
在上述的使用示例当中,实际上会存在锁误删的问题。具体如下:
- 线程1获取锁成功,执行业务代码后阻塞,未执行到手动释放锁的操作,锁超时后自动释放了
- 由于锁超时被释放,线程2获取锁成功,执行业务
- 线程1阻塞过后,继续执行任务,执行了释放锁操作。但此时锁其实是线程2的,由于没有做判断,线程1执行了释放锁的操作。
- 由于锁已经被线程1释放,线程3可以获取锁,执行业务。
- 结果:线程2和线程3都同时在执行了只能单个线程执行的业务。
图例:
解决思路
获取锁时,判断一下标识是否一致;
在setnx时,value的值可以设置成当前线程的name或者
id。因为线程id在jvm里面是自增的,所以在集群模式下,多个jvm可能会存在id相同的线程,所以也是会冲突的,所以id不可行,往下看。
所以可以使用uuid+线程id作为锁的标识
当要释放锁时,先获取锁的值,如果是自己当前的线程id,再进行释放锁
获取锁和释放锁代码优化
@Component
public class RedisLockHelper {
@Autowired
private RedisTemplate<String, String> redisTemplate;
//生成当前锁持有者的唯一标识的uuid前缀
private static final String ID_PREFIX= UUID.randomUUID().toString(true) + "-";
/**
* 尝试获取分布式锁
* @param lockKey 锁的键
* @param expireTime 过期时间
* @param timeUnit 时间单位
* @return 锁的唯一标识,获取失败时为null
*/
public String tryLock(String lockKey, long expireTime, TimeUnit timeUnit) {
// 使用UUID前缀+当前线程id作为锁持有者的唯一标识
String lockValue = ID_PREFIX + Thread.currentThread().getid();
// 使用setIfAbsent方法尝试获取锁(对应Redis的SETNX命令)
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, timeUnit);
return locked != null && locked ? lockValue : null;
}
/**
* 释放分布式锁
* @param lockKey 锁的键
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey) {
//获取当前线程的标识
String currentThreadLock = ID_PREFIX + Thread.currentThread().getid();
// 获取分布式锁内的锁标识
String lockValue = redisTemplate.opsForValue().get(lockKey)
//释放锁时,先判断该锁是不是当前线程持有的
if(currentThreadLock.equals(lockValue)) {
//如果当前线程是锁的持有者,就释放锁
return redisTemplate.delete(lockKey);
}else{
return false;
}
}
}
业务层使用锁的代码不需要修改
五、锁释放的原子性问题
上一个问题是执行业务时线程阻塞,阻塞结束后误删了锁。
所以我们在释放锁前先判断一下标识,看是否是当前线程的锁再释放就可以解决
但是,当我们判断完标识是一致后,线程1在进行释放锁之前被阻塞了(由于这两者不是原子性)
等到锁过期,其他线程成功获取锁执行业务,那么线程1又误删了锁:
图例
解决思路(Lua脚本)
使用Lua脚本,在脚本里面写一系列操作,然后使用redis客户端调用该脚本,这些操作就会一次性执行,满足原子性。
使用流程
(1)创建并填写Lua脚本文件:
注意:Lua脚本是使用lua语言来写的。具体可以去看一下语法内容,下面只给出一种解决思路和大概的解决流程。后续可以使用redission来简化这些操作
(2)读取lua脚本,形成一个RedisScript,便于后续调用api
(3)执行Lua脚本,释放锁
(4)锁使用:
业务中使用锁的方法都不需要边
总结
- 分布式锁利用set nx ex的原理。(set nx的互斥性,ex保证超时释放锁,避免死锁)
- 释放锁时要看看锁是不是该线程的持有者,避免误删
- 使用Lua脚本满足一组操作的原子性