面试切入点
锁的分类
- 单机版同一个JVM虚拟机内,synchronized或者Lock接口
- 分布式多个不同JVM虚拟机,单机的线程锁不再起作用,资源类在不同的服务器之间共享了
一个靠谱分布式锁需要具备的条件与刚需
- 独占性:onlyOne,任何时刻只能有且仅有一个线程持有
- 高可用:若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况。高并发请求下,依旧性能OK好使
- 防死锁:杜绝死锁,必须有超时控制机制或者撤销机制操作,有个兜底终止跳出方案。
- 不乱抢:防止张冠李戴,不能私下unlock别人的锁,只能自己加锁,自己释放锁,自己的锁含着泪也要自己去解
- 重入性:同一个节点的同一个线程如果获得锁后,它也可以再次获取这个锁。
分布式锁
setnx key value
set key value [EX seconds] [PX milliseconds] [NX|XX]
案例演示扣减库存
V1版本:JVM可重入锁的版本
private ReentrantLock lock = new ReentrantLock();
//V1.0 基础版本
public String saleV1(){
String retMessage="";
lock.lock();
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
}finally {
lock.unlock();
}
return retMessage+"\t"+"服务端口号"+port;
}
swagger结果
V2版本:分布式部署,将V1版本copy一份,端口为8888,同时用nginx路由转发
docker部署nginx
本地做好nginx.conf与宿主机的映射
nginx.conf的默认配置
#
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# location / {
# root html;
# index index.html index.htm;
# }
#}
# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
}
docker 启动命令
docker run -d --name nginx -p 80:80 -v /home/run/nginx/conf/nginx.conf:/etc/nginx/nginx.conf -v /home/run/nginx/html:/etc/nginx/html docker.1ms.run/library/nginx
nginx验证
修改nginx的配置加上负载均衡+反方向代理
负载均衡的效果
手工点击是OK的,模拟高并发100个请求
redis中还有多少数据
重复下单数据,出现了超卖现象
为什么加了 synchronized 或者 Lock 还是没有控制住?
分布式锁的出现
- 跨进程+跨服务
- 解决超卖
- 防止缓存击穿
redis分布式锁V1版本
//V2版本:
public String saleV2(){
String retMessage="";
String key="redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
//分布式锁的设置
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
//抢不到的线程继续重试
if(!flag){
//暂停20毫秒,递归重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
saleV2();
}else{
//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
} finally {
// 释放分布式锁
stringRedisTemplate.delete(key);
}
}
return retMessage+"\t"+"服务端口号"+port;
}
结果
扣减库存为0
存在的问题
测试手工OK,测试Jmeter压测5000OK
递归是一种思想没错,但是容易导致StackOverflowError,不太推荐,进一步完善
多线程判断想想JUC里面说过的虚假唤醒,用while替代if
用自旋替代递归重试
redis分布式锁V2版本:用while替换if,用自旋替换递归
//V3版本:
public String saleV3(){
String retMessage="";
String key="redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
//分布式锁的设置
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
// 用自旋替代递归、用while替换if
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){
//暂停20毫秒
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
} finally {
// 释放分布式锁
stringRedisTemplate.delete(key);
}
return retMessage+"\t"+"服务端口号"+port;
}
上面版本存在的问题:
部署了微服务的Java程序机器挂了,代码层面根本没有走到finally这块,
没办法保证解锁(无过期时间该key一直存在),这个key没有被删除,需要加入一个过期时间限定key
redis分布式锁版本3.0:宕机与过期+防止死锁
//V4版本:
public String saleV4(){
String retMessage="";
String key="redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
// 用自旋替代递归、用while替换if
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){
//暂停20毫秒
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//添加过期时间
stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);
//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
} finally {
// 释放分布式锁
stringRedisTemplate.delete(key);
}
return retMessage+"\t"+"服务端口号"+port;
}
存在的问题
设置key+过期时间分开了,必须要合并成一行具备原子性
redis 分布式锁版本3.1
public String saleV4(){
String retMessage="";
String key="redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
// 用自旋替代递归、用while替换if
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)){
//暂停20毫秒
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//添加过期时间
//stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);
//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
} finally {
// 释放分布式锁
stringRedisTemplate.delete(key);
}
return retMessage+"\t"+"服务端口号"+port;
}
结论:加锁与过期时间必须同一行,保证原子性
redis分布式锁版本4:防止key误删的问题
实际业务处理时间如果超过了默认设置key的过期时间??尴尬 ̄□ ̄||
张冠李戴,删除了别人的锁
解决: 只能自己删除自己的,不许动别人的
//V5版本:
public String saleV5(){
String retMessage="";
String key="redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
// 用自旋替代递归、用while替换if
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)){
//暂停20毫秒
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//添加过期时间
//stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);
//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
} finally {
// 改进点:只能删除属于自己的key,不能删除别人的
if(stringRedisTemplate.opsForValue().get(key).equals(uuidValue)){
stringRedisTemplate.delete(key);
}
}
return retMessage+"\t"+"服务端口号"+port;
}
redis分布式锁5.0版本:Lua脚本保证原子性
上个版本,finally块的判断+del删除操作不是原子性的
Lua脚本
官方脚本
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果。
eval "redis.call('set', 'k1', 'v1') redis.call('expire', 'k1', '30') return redis.call('get', 'k1')" 0
eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 k1 k2 lua1 lua2
redis get+del命令Lua脚本的原子操作
eval "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 redisLock 12345
Lua脚本条件判断分支
redis分布式锁V5版本
//V6版本:
public String saleV6(){
String retMessage="";
String key="redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
// 用自旋替代递归、用while替换if
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)){
//暂停20毫秒
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//添加过期时间
//stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);
//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
} finally {
//改进点,修改为Lua脚本的redis分布式锁调用,必须保证原子性,参考官网脚本案例
String luaScript =
"if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
stringRedisTemplate.execute(new DefaultRedisScript(luaScript,Boolean.class), Arrays.asList(key),uuidValue);
}
return retMessage+"\t"+"服务端口号"+port;
}
redis分布式锁V6:可重入锁+设计模式
上一个版本中while判断并自旋重试获取锁+setnx含自然过期+Lua脚本官网删除锁的命令。
存在的问题:如何兼顾锁的可重入性问题
写好一个锁的条件与规约
可重入锁(又名递归锁)
可重入锁又名递归锁。是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
”可重入锁“这四个字分开来解释:
可(可以)重(再次)入(进入)锁(同步锁)
进入什么?进入同步域(即同步代码块/方法或显式锁锁定的代码)
一句话:一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。自己可以获取自己的内部锁。
可重入锁的分类(隐式锁与显式锁)
隐式锁:也就是synchronized关键字使用的锁,默认是可重入锁。指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。
简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁
上面的三种情况出现的锁中锁,如果没有可重入性(持有同一把锁)就会产生死锁了
同步块
public class ReEntryLockDemo
{
public static void main(String[] args)
{
final Object objectLockA = new Object();
new Thread(() -> {
synchronized (objectLockA)
{
System.out.println("-----外层调用");
synchronized (objectLockA)
{
System.out.println("-----中层调用");
synchronized (objectLockA)
{
System.out.println("-----内层调用");
}
}
}
},"a").start();
}
}
同步方法
/**
* 在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的
*/
public class ReEntryLockDemo
{
public synchronized void m1()
{
System.out.println("-----m1");
m2();
}
public synchronized void m2()
{
System.out.println("-----m2");
m3();
}
public synchronized void m3()
{
System.out.println("-----m3");
}
public static void main(String[] args)
{
ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo();
reEntryLockDemo.m1();
}
}
Synchronized锁重入的实现原理
显式锁(即Lock) 也有ReentrantLock这样的可重入锁
/**
* 在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的
*/
public class ReEntryLockDemo
{
static Lock lock = new ReentrantLock();
public static void main(String[] args)
{
new Thread(() -> {
lock.lock();
try
{
System.out.println("----外层调用lock");
lock.lock();
try
{
System.out.println("----内层调用lock");
}finally {
// 这里故意注释,实现加锁次数和释放次数不一样
// 由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。
lock.unlock(); // 正常情况,加锁几次就要解锁几次
}
}finally {
lock.unlock();
}
},"a").start();
new Thread(() -> {
lock.lock();
try
{
System.out.println("b thread----外层调用lock");
}finally {
lock.unlock();
}
},"b").start();
}
}
切记:一般而言,lock了几次就要unlock几次
思考:上面可重入锁的计数问题,redis中的哪个数据类型可以代替?
K,K,V
Map<String,Map<Object,Object>>
案例
hincrby 加了几次1 最后再减去几次1,直到为0。也就是可重入性的lock几次再unlock几次。
小总结:
setnx只能解决无的问题,够用但不够完美。hset,不但可以解决有无,还可以解决可重入性的问题。
设计重点(两条线)
目前有2条支线,目的是保证同一个时候只能有一个线程持有锁进去redis做扣减库存的动作。
2个分分支
保证加锁、解锁(lock\unlock)
扣减库存redis命令的原子性
Lua脚本实现Lock与Unlock的操作
加锁lua脚本lock
先判断redis分布式锁这个key是否存在。
Exists Key 返回0 说明不存在,hset新建当前线程属于自己的锁BY UUID:ThreadId.
Exists Key返回1 说明已有锁,需要进一步判断是不是当前线程自己的。HEXISTS key uuid:ThreadID 返回0 说明不是自己的,返回1说明是自己的锁,自增1次表示重入。
V1版本的Lua脚本
if redis.call('exists','key') == 0 then
redis.call('hset','key','uuid:threadid',1)
redis.call('expire','key',30)
return 1
elseif redis.call('hexists','key','uuid:threadid') == 1 then
redis.call('hincrby','key','uuid:threadid',1)
redis.call('expire','key',30)
return 1
else
return 0
end
相同部分是否可以替换处理???
hincrby命令可否替代为hset命令??
V2版本
if redis.call('exists','key') == 0 or redis.call('hexists','key','uuid:threadid') == 1 then
redis.call('hincrby','key','uuid:threadid',1)
redis.call('expire','key',30)
return 1
else
return 0
end
KEYS[1]与ARGV[1]的参数化提取与处理
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
redis.call('hincrby',KEYS[1],ARGV[1],1)
redis.call('expire',KEYS[1],ARGV[2])
return 1
else
return 0
end
测试
EVAL "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end" 1 redisLock 0c90d37cb6ec42268861b3d739f8b3a8:1 30
解锁lua脚本unlock
设计思路:有锁且还是自己的锁
Hexists key uuid:ThreadId 返回0,说明根本没有锁,程序块返回nil。不是0,说明有锁且是自己的锁,直接调用HINCRBY 负1,表示每次减个1,解锁1次。直到它变为0表示可以删除该锁key,del 锁key
V1版本
if redis.call('HEXISTS',lock,uuid:threadID) == 0 then
return nil
elseif redis.call('HINCRBY',lock,uuid:threadID,-1) == 0 then
return redis.call('del',lock)
else
return 0
end
V2版本参数化处理
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then
return nil
elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then
return redis.call('del',KEYS[1])
else
return 0
end
测试
eval "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end" 1 redisLock 2f586ae740a94736894ab9d51880ed9d:1
整合到微服务代码中
RedisLock实现Lock接口
package com.atguigu.redislock.mylock;
import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Arrays;
import java.util.PrimitiveIterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自研分布式锁,实现Lock接口
*/
public class RedisLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuidValue;
private long expireTime;
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
this.expireTime = 50L;
}
@Override
public void lock() {
tryLock();
}
@Override
public void unlock() {
String script="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then" +
" return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
"return redis.call('del',KEYS[1])" +
" else " +
"return 0" +
" end";
// nil ==false 1==true 0==false
Long executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if(null==executeFlag){
throw new RuntimeException("this lock doesnt exists");
}
}
@Override
public boolean tryLock() {
try {
tryLock(-1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if(time==-1L){
String script=
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
" redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2])" +
" return 1 " +
"else" +
" return 0 end";
Boolean executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
while (!executeFlag){
//60 ms后再重试
TimeUnit.MILLISECONDS.sleep(60);
executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
}
return true;
}
return false;
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
}
调用redisLock的lock与unlock方法
private Lock redisLock = new RedisLock(stringRedisTemplate,"redisLock");
//V7版本
public String saleV7(){
String retMessage="";
redisLock.lock();
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
}finally {
redisLock.unlock();
}
return retMessage+"\t"+"服务端口号"+port;
}
利用工厂模式进行优化
package com.atguigu.redislock.mylock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.locks.Lock;
@Component
public class DistributeLockFactory {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String lockName;
public Lock getDistributeLock(String lockType) {
if(lockType==null){
return null;
}
if(lockType.equalsIgnoreCase("REDIS")){
this.lockName = "redisLock";
return new RedisLock(stringRedisTemplate,lockName);
}else if (lockType.equalsIgnoreCase("zookeeper")){
this.lockName = "zookeeperLock";
//TODO
return null;
}else if (lockType.equalsIgnoreCase("mysql")){
this.lockName = "mysqlLock";
//TODO
return null;
}
return null;
}
}
@Autowired
private DistributeLockFactory distributeLockFactory;
//V7版本
public String saleV7(){
String retMessage="";
Lock redisLock= distributeLockFactory.getDistributeLock("redis");
redisLock.lock();
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
}else{
retMessage="商品卖完了";
}
}finally {
redisLock.unlock();
}
return retMessage+"\t"+"服务端口号"+port;
}
压测+验证
redis中的库存
后台日志
可重入性验证代码
//V7版本
public String saleV7(){
String retMessage="";
Lock redisLock= distributeLockFactory.getDistributeLock("redis");
redisLock.lock();
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
testReentry();
}else{
retMessage="商品卖完了";
}
}finally {
redisLock.unlock();
}
return retMessage+"\t"+"服务端口号"+port;
}
private void testReentry() {
Lock redisLock= distributeLockFactory.getDistributeLock("redis");
redisLock.lock();
try {
System.out.println("==========测试可重入锁==================");
}finally {
redisLock.unlock();
}
}
出现的问题
线程Id一致,但是uuid不一致
问题修复
package com.atguigu.redislock.mylock;
import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.locks.Lock;
@Component
public class DistributeLockFactory {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuid;
public DistributeLockFactory()
{
this.uuid = IdUtil.simpleUUID();
}
public Lock getDistributeLock(String lockType) {
if(lockType==null){
return null;
}
if(lockType.equalsIgnoreCase("REDIS")){
this.lockName = "redisLock";
return new RedisLock(stringRedisTemplate,lockName,uuid);
}else if (lockType.equalsIgnoreCase("zookeeper")){
this.lockName = "zookeeperLock";
//TODO
return null;
}else if (lockType.equalsIgnoreCase("mysql")){
this.lockName = "mysqlLock";
//TODO
return null;
}
return null;
}
}
package com.atguigu.redislock.mylock;
import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Arrays;
import java.util.PrimitiveIterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自研分布式锁,实现Lock接口
*/
public class RedisLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuidValue;
private long expireTime;
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
this.expireTime = 25L;
}
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue= uuid;
this.expireTime = 25L;
}
@Override
public void lock() {
tryLock();
}
@Override
public void unlock() {
String script="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then" +
" return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
"return redis.call('del',KEYS[1])" +
" else " +
"return 0" +
" end";
// nil ==false 1==true 0==false
System.out.println("unlock lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");
Long executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if(null==executeFlag){
throw new RuntimeException("this lock doesnt exists");
}
}
@Override
public boolean tryLock() {
try {
tryLock(-1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if(time==-1L){
String script=
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
" redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2])" +
" return 1 " +
"else" +
" return 0 end";
System.out.println("lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");
Boolean executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
while (!executeFlag){
//60 ms后再重试
TimeUnit.MILLISECONDS.sleep(60);
executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
}
return true;
}
return false;
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
}
测试验证
自动续期
确保redisLock的过期时间大于业务执行时间的问题。redis分布式锁如何续期??
CAP
redis集群是AP
redis异步复制造成的锁丢失
比如:主节点没来的及把刚刚set进来这条数据给从节点,master就挂了,从机上位但从机上无该数据zookeeper集群是CP
故障
Eureka集群是AP
Nacos集群是AP
加个钟 Lua脚本
redis分布式锁加锁成功后续期
package com.atguigu.redislock.mylock;
import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Arrays;
import java.util.PrimitiveIterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自研分布式锁,实现Lock接口
*/
public class RedisLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuidValue;
private long expireTime;
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
this.expireTime = 25L;
}
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue= uuid;
this.expireTime = 30L;
}
@Override
public void lock() {
tryLock();
}
@Override
public void unlock() {
String script="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then" +
" return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
"return redis.call('del',KEYS[1])" +
" else " +
"return 0" +
" end";
// nil ==false 1==true 0==false
System.out.println("unlock lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");
Long executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if(null==executeFlag){
throw new RuntimeException("this lock doesnt exists");
}
}
@Override
public boolean tryLock() {
try {
tryLock(-1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if(time==-1L){
String script=
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
" redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2])" +
" return 1 " +
"else" +
" return 0 end";
System.out.println("lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");
Boolean executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
while (!executeFlag){
//60 ms后再重试
TimeUnit.MILLISECONDS.sleep(60);
executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
}
// 新建一个扫描程序,监控对应key的ttl是否到规定的1/3 进行续期
resetExpireTime();
return true;
}
return false;
}
private void resetExpireTime() {
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
// time调度方法
new Timer().schedule(new TimerTask()
{
@Override
public void run()
{
if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
resetExpireTime();
}
}
},(this.expireTime * 1000)/3);
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
}
//V8版本
public String saleV8(){
String retMessage="";
Lock redisLock= distributeLockFactory.getDistributeLock("redis");
redisLock.lock();
try {
//查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//判断库存是否足够
Integer inventory =result==null?0: Integer.valueOf(result);
//扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,库存剩余:"+inventory;
System.out.println(retMessage+"\t"+"服务端口号"+port);
try {
TimeUnit.SECONDS.sleep(120);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}else{
retMessage="商品卖完了";
}
}finally {
redisLock.unlock();
}
return retMessage+"\t"+"服务端口号"+port;
}
验证续期成功
redis分布式锁总结
视频链接
Redis分布式锁