Redis企业开发实战(五)——点评项目之分布式锁Redission与秒杀优化

发布于:2025-02-11 ⋅ 阅读:(14) ⋅ 点赞:(0)

目录

一、Redisson

(一)Redisson基本介绍 

(二)Redisson入门

1.引入依赖

2.配置Redisson客户端

3.使用Redission的分布式锁

4.tryLock参数解析

4.1tryLock()

4.2tryLock(long waitTime, TimeUnit unit)

4.3tryLock(long waitTime, long leaseTime, TimeUnit unit)

4.4注意事项

(三)Redisson可重入锁原理

(四)redission锁重试和WatchDog机制 

(五)redission锁的MutiLock原理

二、秒杀优化 

(一)异步秒杀思路

1.优惠券秒杀下单流程

2.优化方案

3.整体思路

(二)Redis完成秒杀资格判断 

1.保存秒杀优惠券的库存到redis中

2.重启项目,新增优惠券

3.编写seckill.lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

4.从阻塞队列中获取优惠券信息,开启异步线程执行下单操作

5.秒杀业务的优化思路总结


一、Redisson

(一)Redisson基本介绍 

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redission提供了分布式锁的多种多样的功能:

官网地址: https://redisson.org

GitHub地址: GitHub - redisson/redisson: Redisson - Valkey and Redis Java client. Real-Time Data Platform. Sync/Async/RxJava/Reactive API. Over 50 Valkey and Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache..

(二)Redisson入门

1.引入依赖

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>3.13.6</version>
</dependency>

2.配置Redisson客户端

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        // 配置
        Config config = new Config();
        // 创建RedissonClient对象
        config.useSingleServer().setAddress("redis://192.168.22.145:6379").setPassword("root");
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

3.使用Redission的分布式锁

@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
    // 查询优惠券是否存在
    SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
    if (seckillVoucher == null) {
        return Result.fail("优惠券不存在");
    }
    // 查询秒杀是否开始
    LocalDateTime beginTime = seckillVoucher.getBeginTime();
    if (beginTime.isAfter(LocalDateTime.now())) {
        return Result.fail("秒杀尚未开始");
    }
    // 查询秒杀是否结束
    LocalDateTime endTime = seckillVoucher.getEndTime();
    if (endTime.isBefore(LocalDateTime.now())) {
        return Result.fail("秒杀已经结束");
    }
    // 判断库存是否充足
    Integer stock = seckillVoucher.getStock();
    if (stock < 1) {
        return Result.fail("库存不足");
    }
    Long userId = UserHolder.getUser().getId();   
    // 获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("lock:order:" + userId);
    // 尝试获取锁
    boolean isLock = lock.tryLock();
    // 判断获取锁是否成功
    if (!isLock) {
        // 获取锁失败返回错误信息
        return Result.fail("不允许重复下单!");
    }
    // 获取锁成功进行事务操作
    try {
        // 获取和事务有关的代理对象
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        // 返回订单id
        return proxy.createVoucherOrder(voucherId);
    } finally {
        // 最后必须要释放锁
        // simpleRedisLock.unLock();
        lock.unlock();
    }
}

4.tryLock参数解析

tryLock()还有其他几个重载版本,可以接受不同的参数来提供更灵活的锁定行为:

4.1tryLock()
boolean isLock = lock.tryLock();

描述:尝试立即获取锁。
返回值:如果成功获取锁,则返回true;否则返回false。

4.2tryLock(long waitTime, TimeUnit unit)
boolean isLock = lock.tryLock(10, TimeUnit.SECONDS);

参数:

  1. waitTime: 等待获取锁的最大时间。
  2. unit: 时间单位(如TimeUnit.SECONDS,TimeUnit.MILLISECONDS等)。

描述:尝试获取锁,直到指定的等待时间结束。如果在这段时间内成功获取了锁,则返回true;如果超时仍未获取到锁,则返回false。

异常:可能会抛出InterruptedException如果当前线程在等待过程中被中断。

4.3tryLock(long waitTime, long leaseTime, TimeUnit unit)
boolean isLock = lock.tryLock(10, 15, TimeUnit.SECONDS);

参数:

  1. waitTime: 等待获取锁的最大时间。
  2. leaseTime: 锁的租约时间,在此时间后锁将自动释放(即使持有锁的客户端未显式解锁)。
  3. unit: 时间单位(如TimeUnit.SECONDS,TimeUnit.MILLISECONDS等)。

描述:尝试获取锁,最多等待waitTime指定的时间。一旦获取锁,将在leaseTime后自动释放锁,除非在此之前已经显式调用了unlock()方法。
异常:可能会抛出InterruptedException如果当前线程在等待过程中被中断。

示例代码

最多等待 10 秒钟来获取锁,同时设置锁的租约时间为 15 秒:

RLock lock = redissonClient.getLock("lock:order:" + userId);
try {
    boolean isLockAcquired = lock.tryLock(10, 15, TimeUnit.SECONDS);
    if (isLockAcquired) {
        // 成功获取锁,执行业务逻辑
        // ...
    } else {
        // 未能获取锁,处理失败情况
        // ...
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // 恢复中断状态
    // 处理中断异常
} finally {
    if (lock.isHeldByCurrentThread()) {
        lock.unlock(); // 确保锁被正确释放
    }
}
4.4注意事项
  • 中断处理:当调用tryLock(...)方法时,可能会抛出InterruptedException,可以通过调用Thread.currentThread().interrupt(),并根据需要进行进一步的错误处理。
  • 锁的释放:最好是在finally块中调用unlock(),以保证即使发生异常也能正确释放锁。
  • 租约时间:如果任务执行时间较长,应确保租约时间足够长以避免锁提前被释放。然而,过长的租约时间也可能导致死锁问题,因此需要权衡考虑。

(三)Redisson可重入锁原理

  • 可重入锁允许同一个线程多次获取同一把锁而不会发生死锁。这意味着如果一个线程已经持有了某个锁,并试图再次获取该锁,它将成功获取而不会被阻塞。这为递归调用或在一个方法内多次需要获取同一资源的场景提供了便利。
  • 不可重入锁不允许同一个线程多次获取同一把锁。如果一个线程已经持有了某一把锁,并试图再次获取该锁,则会被阻塞,导致程序陷入死锁状态。

 

tryLock源码底层调用的获取锁的脚本

unlock源码底层调用的释放锁的脚本

(四)redission锁重试和WatchDog机制 

可重入 :利用 hash 结构记录线程 id 和重入次数
可重试 :利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制
超时续约 :利用 watchDog,watchDog默认是30秒,每隔一段时间10s(releaseTime / 3 ),重置超时时间

(五)redission锁的MutiLock原理

        为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例

        此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。

        为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。 

MutiLock加锁原理:

        当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试。

二、秒杀优化 

(一)异步秒杀思路

1.优惠券秒杀下单流程

        当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤:

2.优化方案

        我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,当然这里边有两个难点

        第一个难点是我们怎么在redis中去快速校验一人一单,还有库存判断

        第二个难点是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。 

3.整体思路

        当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束;如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠券存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua脚本来操作。

        当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。

(二)Redis完成秒杀资格判断 

业务实现步骤:

新增秒杀优惠券的同时,将优惠券信息保存到Redis

基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

1.保存秒杀优惠券的库存到redis中

VoucherServiceImpl

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀到redis中,库存信息不需要考虑有效期
    stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

2.重启项目,新增优惠券

可以看到数据库和redis中都新增的优惠券数据:

3.编写seckill.lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

重构seckillVoucher方法: 

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1.执行lua脚本
        Long userId = UserHolder.getUser().getId();
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString()
        );
        // 2.判断结果是否为0
        int r = result.intValue();
        if (r != 0) {
            // 2.1 不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }

        // 2.2 为0,代表有购买资格,把下单信息保存到阻塞队列中
        long orderId = redisIdWorker.nextId("order");
        // TODO 保存阻塞队列

        // 3.返回订单id
        return Result.ok(orderId);
    }
}

重启项目后,进行优惠券下单测试: 

优惠券库存成功减少:

已经下过订单的用户id也被存储到set集合中:

再次请求会提示重复下单:

4.从阻塞队列中获取优惠券信息,开启异步线程执行下单操作

先恢复redis中优惠券库存数据:

代码:

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    // 创建阻塞队列
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    // 实现异步下单的线程池,给一个即可,速度不需要太快
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    // 需要让VoucherOrderServiceImpl类初始化的时候就执行线程池中的任务
    @PostConstruct
    private void init() {
        // 类初始完成后,提交线程池中的任务
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    // 线程池执行的任务
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            // 1.获取队列中的订单信息
            try {
                // 从队列中获取并移除队列头部的元素。
                // 如果队列为空,take() 方法会阻塞当前线程,直到队列中有可用的元素。
                VoucherOrder voucherOrder = orderTasks.take();
                // 2.创建订单
                handleVoucherOrder(voucherOrder);
            } catch (Exception e) {
                log.error("处理订单异常", e);
            }
        }
    }

    // 子线程中执行创建订单的逻辑
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        // 获取用户
        Long userId = voucherOrder.getUserId();
        // 获取锁(可重入),指定锁的名称
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        // 判断获取锁是否成功
        if (!isLock) {
            // 获取锁失败
            log.error("不允许重复下单");
            return;
        }
        try {
            // 这里无法通过代理对象调用事务,因为该业务逻辑是在子线程中执行的
            // 因此,我们需要提前加载事务对象
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    // 初始化代理对象
    private IVoucherOrderService proxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1.执行lua脚本
        Long userId = UserHolder.getUser().getId();
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString()
        );
        // 2.判断结果是否为0
        int r = result.intValue();
        if (r != 0) {
            // 2.1 不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }

        // 2.2 为0,代表有购买资格,把下单信息保存到阻塞队列中
        // 2.3创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.4订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 2.5用户id
        voucherOrder.setUserId(userId);
        // 2.6代金券id
        voucherOrder.setVoucherId(voucherId);

        // 2.7将订单信息放入阻塞队列中
        orderTasks.add(voucherOrder);

        // 开启异步下单,需要准备线程池
        // 3.在主线程中获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        // 3.返回订单id
        return Result.ok(orderId);
    }

    // 异步请求时创建的订单逻辑
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        // 获取用户id
        Long userId = voucherOrder.getUserId();

        // 查询该用户的订单
        int count = query().eq("user_id", userId)
                .eq("voucher_id", voucherOrder.getVoucherId())
                .count();
        // 判断订单是否存在
        if (count > 0) {
            // 表示用户已经购买过了
            log.error("用户已经购买过一次!");
            return;
        }

        // 如果订单不存在,则扣减优惠券库存
        // UPDATE seckill_voucher SET stock = stock - 1 WHERE voucher_id = ? AND stock > 0;
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock", 0)
                .update();

        // 如果扣减优惠券库存失败,则返回错误信息
        if (!success){
            log.error("库存不足");
            return;
        }

        // 创建订单
        save(voucherOrder);
    }
}

第一次测试下单: 

第二次测试下单:

数据库优惠券被正常删减,订单被正常添加:

redis中的数据也被成功生成:

5.秒杀业务的优化思路总结

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务

  • 再将下单业务放入阻塞队列,利用独立线程异步下单

  • 基于阻塞队列的异步秒杀存在的问题:内存限制问题;数据安全问题(系统出现故障时,队列中的任务被移除,导致数据丢失)

  • 解决阻塞队列的异步秒杀存在问题的办法:使用消息队列