Redis分布式锁的实现

发布于:2022-12-16 ⋅ 阅读:(506) ⋅ 点赞:(0)

目录

 Redis分布式锁实现:

 1.首先是setnx命令实现分布式锁

 2.问题之设置过期时间防止别人拿不到锁)

3.问题之设置uuid防止误删现象发生

4.问题之最后判断+删除这里没有保证原子性: 

5.问题之lock功能可重入的实现+重写加锁流程

 6.lua脚本重写解锁功能

7.自定义分布式锁的构造+uuid识别

所有代码 


上锁lock()和解锁unlock()和ReentrantLock思路一致,逻辑给到我们自定义的DistributeRedisLock实现上锁和解锁逻辑

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /**提供所有实现机制的同步器
    private final Sync sync;

    /**
     * 此锁的同步控制基础。下面分为公平和非公平版本。使用 AQS 状态来表示持有锁的次数。
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

 Redis分布式锁实现:

(66条消息) redis通过lua脚本实现分布式锁_Maxiao1204的博客-CSDN博客_lua实现redis分布式锁

(66条消息) aqs的学习_Fairy要carry的博客-CSDN博客

 1.首先是setnx命令实现分布式锁

加锁:其实就是cas操作,只有当key不存在的情况,才能将key的值设置为value,如果key已经存在了,那么setnx命令不做任何操作(Set if Not Exists)

setnx key value

那么setnx怎么释放锁呢?

 直接del key就ok了

del key

 2.问题之设置过期时间防止别人拿不到锁)

1.比如我们加锁后,如果忘记del锁的话,那么其他线程就一直无法获取到这个锁,造成一线程资源空转——>解决:上过期时间即可,所以说我们常说的redis分布式锁最基本的就是setnx+expire

set lock 746879613 nx ex 20

这里的nx和xx的区别

(35条消息) Redis - NX与XX_平_繁的博客-CSDN博客_redis xx

nx:如果不存在就会设置该值

xx:如果存在就会覆盖该值 

3.问题之设置uuid防止误删现象发生

首先看redis分布式锁的流程

1.加锁设置过期时间(一条命令保证原子性)

2.进行业务处理

3.然后del锁

场景:当A服务拿到了锁然后执行业务,业务时间很长>默认释放锁的时间,那么此时就会B拿到锁,然后再A业务执行完业务执行删除del时,会发现删的就是B服务的锁了

——>解决:加锁的时候除了设置过期时间并且设置UUID,然后删除的时候进行验证是不是该服务的,我们利用setIfAbsent(lockName,uuid,3,TimeUnit.SECONDS)设置lock的名称以及值也就是uuid作为标识——>(后面的可重入实现就得用lua脚本了,毕竟多了一个重复次数值)

  /**
     * 5.redis分布式锁循环实现(cas)
     */
    public void deduct5() {
        String uuid = UUID.randomUUID().toString();//每个请求自己的锁标识
        /**
         * 1.加锁setnx,并且设置过期时间-尽量保证原子性(不存在这个key就进行设置)
         */
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);

        /**
         * 2.判断是否获取锁成功
         */
        while (!lock) {//没获取到锁进行sleep
            try {
                Thread.sleep(50);
//                //2.1不成功进行cas操作——>此时递归操作可能会造成多次扣减库存的操作——>重复几次就扣减几次
//                this.deduct5();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /**
         *  3.成功的执行操作
         */
        try {
            //1.查询库存信息
            Integer stock = (Integer) redisTemplate.opsForValue().get("stock");
            //2.判断库存是否充足
            if (stock != null && stock > 0) {
                redisTemplate.opsForValue().set("stock", stock - 1);
            }
        } finally {
           
            /**
             * 3.解锁需要判断是否是自己的锁
             * 注意:我们需要保证原子性,因为可能出现请求1执行然后3s后释放lock
             * ——>请求2进来的时候获得了自己的uuid并且拿到了锁
             * ——>请求1还没走完刚好到了finally这里,判断锁的值和uuid相等把请求2的锁删了——>需要保证原子性
             */
            if(StringUtils.equals(this.redisTemplate.opsForValue().get("lock").toString(),uuid)){
                /**
                 * 4.进行解锁,将锁释放(可能出现误删现象)
                 * 误删:比如我们第一个请求3s后自动释放锁资源,然后被请求2拿到,但是此时请求1的执行流程还没走完
                 * ——>当请求1在finally的时候会删除lock锁资源,删除请求2的造成误删现象
                 */
                this.redisTemplate.delete("lock");
            }
        }

4.问题之最后判断+删除这里没有保证原子性: 

当服务A业务执行完判断——>lock的值是否和当前事务的uuid值一样——>然后进行del,当判断完还没del时,加入锁过期了,事务B拿到锁资源,然后事务A之前就判断完了一删,B的锁就会被误删(没有保证判断和删除的原子性)

解决:用lua脚本

 /**
     * 5.redis分布式锁循环实现(cas)
     */
    public void deduct5() {
        String uuid = UUID.randomUUID().toString();//每个请求自己的锁标识
        /**
         * 1.加锁setnx,并且设置过期时间-尽量保证原子性(不存在这个key就进行设置)
         */
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);

        /**
         * 2.判断是否获取锁成功
         */
        while (!lock) {//没获取到锁进行sleep
            try {
                Thread.sleep(50);
//                //2.1不成功进行cas操作——>此时递归操作可能会造成多次扣减库存的操作——>重复几次就扣减几次
//                this.deduct5();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /**
         *  3.成功的执行操作
         */
        try {
            //1.查询库存信息
            Integer stock = (Integer) redisTemplate.opsForValue().get("stock");
            //2.判断库存是否充足
            if (stock != null && stock > 0) {
                redisTemplate.opsForValue().set("stock", stock - 1);
            }
        } finally {
             String script="if redis.call('get',KEYS[1]) == ARGV[1] "+
                     "then "+
                     "  return redis.call('del','KEYS[1]')"+
                     "else"+
                     "  return 0 "+
                     "end";
             this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList("lock"),uuid);

            }
        }

5.问题之lock功能可重入的实现+重写加锁流程

原来的加锁方式是:setIfAbsent()——>这样加锁,lock只有一个值,就是uuid,作为我们线程的标识,那么我们如果可重入的话就需要再来个值放在uuid下面

解决:我们可以利用hset——>设置锁名称为testlock,拥有标识为wyh,重入次数为1

hset testlock wyh 1

也可以利用incr,更推荐方便lua脚本编写,既可以作为增加重入次数,又可以作为创建新的锁

加锁核心代码

1.我们自定义一个分布式锁,然后实现Lock接口(类似ReentranLock,实现lock接口,并且aqs帮助了一个思路,但是具体的逻辑还是ReentrantLock自己实现)

2.这里主要看我们的lua逻辑——>首先判断是否有这个锁,没有就直接创建一个锁并且设置标识以及当前重入次数1——>如果当前有锁并且是自己本身,也是一样利用incr增加重入次数——>否则返回0(注:参数lockName,uuid,过期时间u)

3.然后redisTemplate执行我们这个lua脚本,如果执行失败就进行cas——>sleep,获取锁成功返回true锁自动续期问题后面说——>当前线程执行业务的时间太长>锁释放时间,导致锁提前释放——>导致B事务拿到锁后,A事务执行业务完后删不掉锁(需要比较uuid标识),确实也不需要删,只是这样隔离性较差)

 /**
     * 2.核心加锁方法
     *
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time != -1) {
            //2.1指定了时间设置为你指定的
            this.expire = unit.toSeconds(time);
        }

        /**
         * 2.2判断锁是否存在,判断对应锁的标识,也就是uuid下的拥有者是否存在
         * 如果不存在(也就是为0)的话就加锁值为1(代表第一次加锁),并且设置过期时间——>返回1,否则返回0
         */
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +
                "then " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                "   redis.call('expire', KEYS[1], ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
            //2.3执行失败,循环尝试获取锁
            Thread.sleep(50);
        }

        /**
         * 2.4获取锁成功
         * 先获取自动续期
         */
        this.renewExpire();
        return true;
    }

 6.lua脚本重写解锁功能

好处:前面都有说保证了我们加锁解锁的原子性,并且包含了各种可能的逻辑 

1.判断是否有锁(并且符合标识,用hexists),比如wyh这个锁,标识为746879613的是否存在——>2.不存在返回nil——>3.如果存在进行解锁,hincrby将锁注入次数-1——>4.否则解锁失败返回0——>5.进行执行,如果返回为nil,说明解不了当前锁

然后我们锁标识uuid是通过线程+服务结合而来的 

HEXISTS wyh 746879613
  /**
     * 3.解锁方法
     */
    @Override
    public void unlock() {

        /**
         * 3.1判断线程是否拥有该锁(用uuid来判断)——>HEXISTS lock 746879613
         * ——>如果不存在说明解不了return nil——>如果解的了就-1,若-为0就删除
         */
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
                "   then return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);
        //判断结果异常
        if (flag == null) {
            throw new IllegalMonitorStateException("锁不属于你");
        }
    }

7.自定义分布式锁的构造+uuid识别

 思路:1.我们的锁名称,redisTemplate这些由我们自定义的一个工厂类传入,并且设置过期时间——>2.uuid:由我们的工厂类(传入第一个标识(服务的)),与当前线程的uuid结合起来

工厂类:

package com.wyh.fbs_demo.com.wyh.lock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * 工厂类:提供redis的mysql的分布式锁
 */
@Component
public class DistributedLockClient {

    @Autowired
    private StringRedisTemplate redisTemplate;

    //不同服务的uuid
    private String uuid;

    public DistributedLockClient() {
        this.uuid= UUID.randomUUID().toString();
    }

    /**
     * 1.返回redis分布式锁,并且指定锁名称
     * @param name:锁名称
     * @return
     */
    public DistributeRedisLock getRedisLock(String name){
        return  new DistributeRedisLock(redisTemplate,name,uuid);
    }

}

锁构造: 

/**
 * Redis分布式锁没有被容器管理
 * 所以切记不能Autowired
 */
public class DistributeRedisLock implements Lock {

    private StringRedisTemplate redisTemplate;

    public String lockName;//锁的名称

    private String uuid;//拥有锁的标识

    private long expire = 30;//默认过期时间

    /**
     * 1.构造redis分布式锁
     *
     * @param redisTemplate
     * @param lockName:锁名称
     * @param uuid:每个服务的uuid标识
     */
    public DistributeRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = uuid + ":" + Thread.currentThread().getId();//每个服务的id标识
    }

所有代码 

package com.wyh.fbs_demo.com.wyh.lock;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Redis分布式锁没有被容器管理
 * 所以切记不能Autowired
 */
public class DistributeRedisLock implements Lock {

    private StringRedisTemplate redisTemplate;

    public String lockName;//锁的名称

    private String uuid;//拥有锁的标识

    private long expire = 30;//默认过期时间

    /**
     * 1.构造redis分布式锁
     *
     * @param redisTemplate
     * @param lockName:锁名称
     * @param uuid:每个服务的uuid标识
     */
    public DistributeRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = uuid + ":" + Thread.currentThread().getId();//每个服务的id标识
    }

    @Override
    public void lock() {
        //1.尝试获取锁
        this.tryLock();
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            //1.加锁核心,设置默认的过期时间
            return this.tryLock(-1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 2.核心加锁方法
     *
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time != -1) {
            //2.1指定了时间设置为你指定的
            this.expire = unit.toSeconds(time);
        }

        /**
         * 2.2判断锁是否存在,判断对应锁的标识,也就是uuid下的拥有者是否存在
         * 如果不存在(也就是为0)的话就加锁值为1(代表第一次加锁),并且设置过期时间——>返回1,否则返回0
         */
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +
                "then " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                "   redis.call('expire', KEYS[1], ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
            //2.3执行失败,循环尝试获取锁
            Thread.sleep(50);
        }

        /**
         * 2.4获取锁成功
         * 先获取自动续期
         */
        this.renewExpire();
        return true;
    }

    /**
     * 3.解锁方法
     */
    @Override
    public void unlock() {

        /**
         * 3.1判断线程是否拥有该锁(用uuid来判断)——>HEXISTS lock 746879613
         * ——>如果不存在说明解不了return nil——>如果解的了就-1,若-为0就删除
         */
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
                "   then return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);
        //判断结果异常
        if (flag == null) {
            throw new IllegalMonitorStateException("锁不属于你");
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    /**
     * 4.获取当前线程的id,与服务id做拼接保证唯一性
     *
     * @return
     */
//    String getId() {
//        return uuid + ":" + Thread.currentThread().getId();
//    }

    /**
     * 5.定时器功能——>重置时间自动续期
     */
    private void renewExpire() {
        String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +
                "then " +
                "   return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "   return 0 " +
                "end";
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                //这里注意子线程需要的uuid肯定得和父id一样,不然获得不了锁
                Boolean flag = redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire));
                if (flag) {
                   renewExpire();
                }
            }
        }, this.expire * 1000 / 3, this.expire * 1000 / 3);
    }

}


测试:

@Service
//@Scope(value = "prototype",proxyMode = ScopedProxyMode.TARGET_CLASS)
public class StockService {

    @Resource(name = "redisTemplate")
    private RedisTemplate redisTemplate;
    @Autowired
    private StockMapper stockMapper;
    //注入工厂类
    @Autowired
    private DistributedLockClient client;


//    @Autowired
//    private ReentrantLock lock = new ReentrantLock();

    /**
     * 6.redis分布式锁实现——可重入加锁减锁
     */
    public void deduct6(){
        /**
         * 1.获取redis分布式锁
         */
        DistributeRedisLock redisLock = this.client.getRedisLock("lock");
        redisLock.lock();

        try {
            /**
             * 2.对库存信息进行操作
             */
            String stock = redisTemplate.opsForValue().get("stock").toString();

            //2.1判断库存是否充足
            if(stock!=null&&stock.length()!=0){
                Integer st = Integer.valueOf(stock);
                //2.2将库存减-1
                if(st>0) redisTemplate.opsForValue().set("stock",String.valueOf(--st));
            }
            //2.3可重入测试
            this.test();
        } finally {
            /**
             * 3.解锁
             */
            redisLock.unlock();
        }

    }

    /**
     * 7.测试可重入
     */
    public void test(){
        DistributeRedisLock lock = this.client.getRedisLock("lock");
        lock.lock();
        System.out.println("测试可重入锁....");
        lock.unlock();
    }
}

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到