【 Redis | 实战篇 秒杀优化 】

发布于:2025-05-18 ⋅ 阅读:(12) ⋅ 点赞:(0)

目录

 前言:

1.分布式锁

1.1.分布式锁的原理与方案

1.2.Redis的String结构实现分布式锁

1.3.锁误删问题

1.4.锁的原子性操作问题

1.5.Lua脚本解决原子性问题

1.6.基于String实现分布式锁存在的问题

1.7.Redisson分布式锁

2.秒杀优化

3.秒杀的异步优化

3.1.基于消息队列的异步下单思路

3.2.基于List结构的消息队列

3.3.基于PubSub的消息队列

3.4.基于Stream的消息队列

3.5.Redis作为消息队列的3种方式对比

3.6.基于Stream消息队列实现异步秒杀下单 


 前言:

解决集群模式下的安全问题(分布式锁),Redis实现秒杀优化,秒杀的异步优化

1.分布式锁

1.1.分布式锁的原理与方案

前述:由于我们发生了集群问题(不同的jvm下的监视器对象不同,那么同一把锁可以获取多次),【 Redis | 实战篇 秒杀实现 】-CSDN博客(问题描述),因此无法实现多个jvm线程的互斥

分析:其实就是因为我们使用的是jvm的锁,而多个jvm监视器并不共享,因此我们需要使用一把可以实现共享的锁(Redis的分布式锁),因为我们Redis只有一个,那么我们的资源就可以实现共享(互斥),从而避免集群问题

分布式锁介绍:满足分布式系统或集群模式下的进程可见并且互斥的锁

必须满足的要求

  • 多线程可见:所以线程都可以看见
  • 互斥:保证只有一个线程可以拿到锁,其他线程失败
  • 高可用:保证不管什么时候获取锁都会成功
  • 高性能:加锁本来就是会影响性能(串行执行),要加快获取锁的速度
  • 安全性:考虑没有成功释放锁出现的问题(死锁)

必须要求

  • 可重入性:可不可以重新来获取锁
  • 阻塞性:获取锁失败后会不会继续等待
  • 公平/非公平:获取锁是否公平

实现方案:

方案一:MySQL

MySQL:SQL型数据库

  • 多线程可见:可见,线程都可以来访问数据库
  • 互斥:互斥,线程执行操作时,我们可以向数据库来申请一个互斥锁,当事务提交时锁释放(互斥锁只允许一个线程拿到)
  • 高可用:好,利用主从机制
  • 高性能:一般,基于硬盘操作
  • 安全性:好,断开连接,自动释放锁

解释互斥:就是其实我们之前实现数据库更新数据的操作时,数据库会分配一个互斥锁,因此在更新操作时不允许多个线程来执行更新(只允许一个线程),因此我们利用这个特性,自己来从数据库申请互斥锁,实现互斥,而锁的释放数据库会通过事务的方式来进行操作(如果提交成功那么就释放),总的来说就是你只需要申请锁,锁的释放你不需要管数据库会帮你搞定

方案二:Redis

Redis:非SQL性数据库

  • 多线程可见:可见,线程直接访问
  • 互斥:互斥,利用setnx命令来实现(数据不存在才能set成功,存在则失败,因此只有一个线程能成功获取锁)
  • 高可用:好,主从,哨兵,集群机制
  • 高性能:好,基于内存操作
  • 安全性:一般,如果线程获取锁成功,服务宕机,锁没有释放(死锁),因此需要设置过期时间(时间一到自动释放锁)

解释互斥:利用Redis的命令setnx,它的原理就是看Redis中有没有对应key,没有key帮你自动创建(获取锁成功标识),有不会进行任何操作(不会覆盖)(获取锁失败标识),所以它只有第一次执行才可以真正的执行成功,那么利用它就可以实现互斥(只有一个线程才能获取成功)

解释安全性:

  • 问题:当线程获取锁成功后,还未执行释放锁操作,服务却宕机了,锁没有释放(死锁),那么以后的线程都无法获取锁,形成了死锁问题
  • 解决:既然服务宕机问题无法避免,那么我们只能从释放锁出发,因此我们可以给锁设置一个过期时间,时间一到锁自动删除(注意细节,不然还是会出问题)

方案三:Zookeeper

Zookeeper:分布式协调服务

  • 多线程可见:可见,直接访问
  • 互斥:互斥,有两种方法实现互斥,下面解释
  • 高可用:好,集群机制
  • 高性能:一般,主从之前的数据同步需要消耗一定的时间
  • 安全性:好,创建的临时节点,服务宕机自动释放

互斥方法一:利用它的节点有序性,并且节点是单调递增的,Zookeeper约定每次获取时必须获取到最小的节点才成功(保证了先执行的线程先获取小的节点,实现了线程的有序性,从而实现互斥)

互斥方法二:利用它的唯一性,由于它的节点名称都相同,那么所有线程都根据名称来获取,只有一个线程能成功获取

1.2.Redis的String结构实现分布式锁

分析:实现分布式锁那么就需要实现最基础的获取锁,释放锁

获取锁:

  • 利用setnx命令实现互斥
  • 利用expire命令设置过期时间

释放锁:

  • 手动删除锁(key)
  • 超时自动释放锁

问题:因为我们要使用的是setnx与expire两个不同的命令,分步执行,并没有确保原子性操作,那么当我们setnx执行成功,还未执行expire时,服务却宕机了,由于没有设置过期时间,如果出现之前的问题,还是会出现死锁问题(锁未释放)

解决:既然我们是因为没有确保原子性操作,那么我们就使用一个命令同时完成获取锁和设置过期实际的操作,我们可以通过使用set命令,set命令可以设置参数,而这些参数里就可以设置setnx特性(不能重复赋值)(NX),设置过期时间(EX)

思考:当我们获取锁失败后,我们应该执行什么操作?

  • 阻塞式获取:获取锁失败,会阻塞等待,等待锁释放来获取锁
  • 非阻塞式获取:获取锁失败,不继续等待,直接返回信息

实现非阻塞式获取:

步骤:

开始(跳过一些业务)

==》尝试获取锁

==》判断获取锁是否成功

==》获取锁失败

==》返回错误信息,不再重试(由于实现的是一人一单,获取锁失败则已经下过单了)

-------

==》获取锁成功,设置过期时间(原子性操作)

==》执行业务

==》业务超时/服务宕机

==》锁自动释放(删除key)

-------

==》业务执行成功

==》释放锁(删除key)

//接口
public interface ILock {

    boolean tryLock(Long time);

    void unLock();
}
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock{

    private  String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(Long time) {
        //1.设置key
        String key = KEY_PREFIX + name;
        //2.存入Redis,返回
        //获取当前线程id
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId, time, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(isLock);
    }

    @Override
    public void unLock() {
        //1.设置key
        String key = KEY_PREFIX + name;
        //2.删除锁
        stringRedisTemplate.delete(key);
    }
}

解释:由于我们实现的是一人一单的业务,那么不同的用户的锁需要不同,因此我们的key需要拼接用户id,并且我们是将线程id存入value,为了区分是哪个线程执行获取锁的操作(后面要使用的细节)

1.3.锁误删问题

问题:线程1获取锁成功,由于执行业务时间过长,导致锁超时释放,而锁已经释放,线程2获取锁成功,在线程2执行业务时,线程1业务执行完直接将锁释放(删除的是线程2的锁),由于锁释放,线程3获取锁成功,执行业务,最终一人下了多次单,还是出现了并发执行的问题

前提:锁还未获取

线程1获取锁成功

==》线程1执行对应业务

==》线程1由于执行业务时发生阻塞,导致执行时间过长

==》线程1的锁自动释放

==》线程2抢到执行权

==》由于锁已经释放

==》线程2获取锁成功

==》线程2执行业务

==》线程1抢到执行权

==》线程1执行完业务

==》线程1释放锁(细节:没有判断)

==》线程3抢到执行权

==》由于锁已经释放

==》线程3获取锁成功

==》线程3执行业务

==》最终线程1,2,3都执行了业务

-----

最终我们本意是一人一单,但是现在是一人下了3次单,出现了并发执行的问题

解决:其实本质是不是因为线程1误删了线程2的锁,那么我们可不可以在每次删除锁时进行判断,先判断该锁是不是自己线程获取到的锁,如果是的那么就删除锁,不是那么就不执行删除锁操作,而我们之前是不是把线程的id存入了锁对应的value中,那么我们可以从中取出值与执行删除锁的线程id进行比较即可

 步骤:

开始(跳过一些业务)

==》尝试获取锁

==》判断获取锁是否成功

==》获取锁失败

==》返回错误信息,不再重试(由于实现的是一人一单,获取锁失败则已经下过单了

-------

==》获取锁成功,设置过期时间(原子性操作)

==》执行业务

==》业务超时/服务宕机

==》锁自动释放(删除key)

-------

==》业务执行成功

==》取出锁中存储的value(获取锁的线程id)

==》获取id与执行删除锁线程id进行判断

==》id一致

==》释放锁(删除key)

----

==》不一致,不执行删除操作

//接口
public interface ILock {

    boolean tryLock(Long time);

    void unLock();
}
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock{

    private  String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(Long time) {
        //1.设置key
        String key = KEY_PREFIX + name;
        //2.存入Redis,返回
        //获取当前线程id
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId, time, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(isLock);
    }

    @Override
    public void unLock() {
        //1.设置key
        String key = KEY_PREFIX + name;
        //2.获取标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //3.获取Redis中的标识
        String id = stringRedisTemplate.opsForValue().get(key);
        if(threadId.equals(id)){
            //标识相同,释放锁
            //4.删除
            stringRedisTemplate.delete(key);
        }
    }
}

解释:由于我们之前存入了线程id,那么当我们进行删除锁操作时,我们先进行判断id是否正确,再进行删除

思考:由于线程id的创建是不断递增的,但是我们现在在集群情况下存在多个jvm它们之间的线程id不会共享,那么线程的id有可能重复,不同的jvm的线程id很可能出现重复,因此还是会出现误删问题

解决:就是说我们不仅仅是要区分不同的线程,还需要区分不同的tomcat(jvm),总的来说就是需要区分不同jvm下的不同线程,所以我们在存入vaule值的时候就需要存入区分jvm的标识

实现:我们生成一个全局变量UUID(static,final),用UUID区分不同的jvm,用线程id区分不同的线程,将生成的UUID拼接上线程id存入value即可

1.4.锁的原子性操作问题

问题:当线程1获取锁成功后执行业务,在判断锁是否一致,锁一致,但是此时发生了阻塞(jvm的垃圾回收可能会阻塞),由于阻塞时间过长,发生锁的超时释放,由于锁释放了,线程2获取锁成功后,线程1来执行删除锁的操作(已经判断过一致了),其实是把线程2的锁删除,线程3获取锁,执行业务,最终还是出现了多个线程执行业务,出现并发执行问题

前提:此时锁还未获取

线程1获取锁成功

==》线程1执行业务

==》线程1判断锁是否一致

==》线程1判断成功

==》线程1还未删除锁发生了阻塞(jvm有垃圾回收机制可能会操作阻塞)

==》线程1由于阻塞时间过长,导致锁超时释放

==》由于锁已经释放

==》线程2抢到执行权

==》线程2获取锁成功

==》线程2执行业务

==》线程2判断锁是否一致

==》线程2判断成功

==》线程1抢到执行权

==》由于之前已经进行了判断操作,可以直接删除

==》线程1执行删除锁操作(删除线程2的锁)

==》线程3抢到执行权

==》线程3获取锁成功

==》线程3执行业务

==》线程2也会删除线程3的锁

==》循环执行

------

最终一个用户可以下多个单,出现了并发执行问题

 原因:其实出现问题的原因还是因为判断锁标识和释放锁标识是两个动作(如果之间发生阻塞,那么就会出现问题),因此我们还是需要进行原子性操作

思考:一般想到的解决方案是不是进行事务管理,同时成功才事务提交,失败一个就事务回滚

Redis的事务:Redis的事务是一个批处理操作只会一次性就全部执行完,并不会有分布操作),因为你的判断操作是需要查询数据来进行判断,如果你将判断锁和删除锁加入Redis事务,那么你的查询数据的结果需要等到删除锁操作执行时才会有数据(一次性全部执行),因此此方法行不通

1.5.Lua脚本解决原子性问题

Redis来执行Lua脚本:Redis提供了Lua脚本功能,在一个脚本中可以编写多条Redis命令,确保命令执行时的原子性,而Lua是一个编程语言

语法:redis.call('命令名称','key','其他参数',........)(脚本)

在Redis中调用Lua脚本:EVAL "脚本"  0(代表脚本要使用的key个数)(如果你是在命令中写死了key,那么个数就写0,没有写死,而是写的KEYS[N],那么个数就写N)

比如:EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name  Rose

解释:就是说你这里写了key的个数为1,那么它就会从开头找一位参数(name),找完了那么剩余的就是其他参数,Rose就是ARGV[1]

RedisTemplate调用Lua脚本:

-- 比较线程标识是否与锁中的标识一致
if (redis.call('get',KEYS[1]) == ARGV[1]) then
    -- 释放锁
    return redis.call('del',KEYS[1])
end
return 0
public class SimpleRedisLock implements ILock{

    private  String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<Long>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(Long time) {
        //1.设置key
        String key = KEY_PREFIX + name;
        //2.存入Redis,返回
        //获取当前线程id
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId, time, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(isLock);
    }

    @Override
    public void unLock() {
        //调用lua脚本
        stringRedisTemplate.execute(UNLOCK_SCRIPT
                , Collections.singletonList(KEY_PREFIX + name)
                ,ID_PREFIX + Thread.currentThread().getId());
    }

}

1.6.基于String实现分布式锁存在的问题

问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 例子:在同一个线程中方法a获取锁后调用方法b,b也要获取锁,b获取锁失败,如果你是可重试机制,b就会一直等待a将锁释放,而a需要调用b执行完才能释放,从而出现死锁问题
  • ----------
  • 不可重试:实现了非阻塞式,尝试一次获取,失败就返回错误信息
  • 注意:由于实现了不可重试机制,其实上面的例子只能用来理解一下
  • ----------
  • 超时释放:虽然可以避免死锁问题,但是会出现超时误删锁的问题,存在安全隐患
  • ----------
  • 主从一致性:主从同步存在延迟,当主宕机时,就会出现问题
  • 原理:主节点负责写操作,从节点负责读操作,读是从多个节点读,并且当主出现问题时,从会代主
  • 例子:线程1获取锁(set写操作),主节点完成(同步延迟),还未同步到从节点,主节点宕机,从代主(未同步锁),线程2就可以获取锁
  • 解释:虽然有这种情况,但是由于主从延迟可以做到毫秒及一下,所以其概率极低

解决:简单说一下

不可重入是因为你锁只有一次使用权,那么我们可以给锁加个次数,先判断是不是同一个jvm下的同一个线程,是的那就给锁的次数加一,当每次删除锁时先进行判断是不是自己的锁,然后进行次数减一,最后判断次数是不是已经为0,为0才可以删除锁,细节:由于现在有三个字段(key,value,次数)因此我们要使用Hash结构来实现

不可重试:就是更改一段业务代码,既然你需要重试,那么就重试(细节:不要获取锁失败就之间重试,可以等一等,利用订阅和信号量来解决)

超时释放:其实就是因为我们执行业务时,由于业务时间过长导致释放,那么我们可以进行一个判断,在超时时间的三分之一处(别处也可以)你的业务还在执行,那么我就刷新你的锁超时时间,你一直在执行,那么我就一直刷新(细节:利用watchDog)

主从一致性:既然是因为主从同步出现问题,那就不要主从了,直接让所有节点变成Redis的独立节点(都可以进行读写操作),以前获取锁只需要访问主节点,现在你需要访问所有的独立节点,都同意你才能获取到锁(都存入了锁数据)

1.7.Redisson分布式锁

介绍:在Redis的基础上实现了一个分布式工具集合(类似工具包),就是说你不需要自己来实现分布式锁了,直接用它就行

实现步骤:

  • 引入依赖:
  • 配置Redisson客户端:

 使用:直接调用方法,给参数就行(和之前我们自己定义的差不多)

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;

    //代理对象
    IVoucherOrderService proxy;


    @Override
    public Result seckillVoucher(Long voucherId) throws InterruptedException {
        //1.根据id查询数据库优惠券信息
        SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);

        //2.获取时间
        LocalDateTime beginTime = voucher.getBeginTime();
        LocalDateTime endTime = voucher.getEndTime();
        //3.判断时间
        if (beginTime.isAfter(LocalDateTime.now())) {
            return Result.fail("秒杀还未开始");
        }
        if (endTime.isBefore(LocalDateTime.now())) {
            return Result.fail("秒杀已经结束");
        }
        //4.获取库存
        Integer stock = voucher.getStock();
        //库存不足
        if (stock < 1) {
            return Result.fail("库存不足");
        }
        Long userId = UserHolder.getUser().getId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            //获取锁失败
            return Result.fail("只能下一单");
        }
        //获取锁成功
        //获取代理对象
        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.creatOrder(voucherId);
        } catch (IllegalStateException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    @Transactional
    public Result creatOrder(Long voucherId) {
        //根据用户id和优惠券id查询数据库
        Long userId = UserHolder.getUser().getId();
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count > 0) {
            //该用户已经下过单了
            return Result.fail("一个用户只能下一单");
        }
        //库存足
        //5.库存减1
        boolean success = iSeckillVoucherService.update()
                .setSql("stock = stock -1")
                .eq("voucher_id", voucherId).gt("stock", 0)//乐观锁
                .update();
        if (!success) {
            return Result.fail("库存不足");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //优惠券id
        voucherOrder.setVoucherId(voucherId);
        //订单id
        Long orderId = redisIdWorker.setId("order");
        voucherOrder.setId(orderId);
        //用户id
        voucherOrder.setUserId(userId);
        //存入数据库
        save(voucherOrder);
        return Result.ok(orderId);
    }
}

2.秒杀优化

分析:由于我们进行的查询更新操作都是直接对数据库进行操作,而数据库的并发能力本身是比较差的(写操作更慢),并且为了安全问题,我们还加入了分布式锁(影响性能),假设同时有大量的用户来访问(串行执行),一个接一个(等待时间过长)

例子:在一家成本有限的饭店里,店主既要当服务员又要当厨师,当一名顾客来点单,店主需要接待顾客,然后进行炒菜(这个时间就长了),如果有下一个顾客也来点单,但是由于店主正在炒菜,顾客需要等待,你说他会不会等待这么久(工作效率低)

例子解决:是不是因为店主需要干的事情太多了,那么店主就必须多聘用几人,分别负责不同的工作,这样效率就提高了

问题解决:因此我们也需要将业务操作分别由不同的线程来执行,效率就提高了

思考:怎么分开呢?

例子:在饭店里,当用户点餐后,服务员是不是需要给用户一个小票记录用户的单号,而厨师那里也需要一个小票,他需要根据单号来依次炒菜,这样是不是实现了异步执行,服务员只需要等待用户下完单后给小票,然后他就可以接待下一个顾客了(无需过度等待),而用户只需要等餐就行,工作效率大大提高(给完小票就是代表下单成功,之前是给餐后才是下单成功)

再次解决:因此我们也可以实现该思想,我们可以将查询,判断库存,校验一人一单的操作类比于服务员接单操作,而我们的创建订单操作类比于厨师炒菜操作(时间长的你就可以类比厨师炒菜),我们判断校验成功后直接给用户返回下单成功,而具体的创建下单操作用户无需等待,类比后台执行(它会帮我们执行完,异步执行)

总结思路下单操作是不是只需要是判断校验成功,那么他就可以下单,我们就是在判断校验成功直接返回下单成功信息(而下单操作异步执行),这样就大大增加了效率

优化:既然是先查询判断校验,然后异步更新数据库,那么我们可不可以将查询数据库转变成查询Redis(效率再次提高)

实现思路:我们将需要用到的查询数据存入Redis,判断校验成功后,将具体订单信息存入阻塞队列中,然后直接返回订单id即可,异步(新的线程)从队列中取出数据,执行创建订单操作(更新数据库)

思考如何将数据存入Redis:判断时间操作不需要我们判断了,其实前端就已经进行了判断,符合要求的你才能下单,判断库存呢,使用Redis的String结构(key为优惠券id,value为库存数量),校验一人一单呢?我们是不是可以这样思考:我们使用set集合(不可重复特性),key为优惠券id,value为用户id,因为value不可重复,因此只能存在不同的用户id,用户下单时,如果查到这个优惠券已经有该用户时,校验不通过,反之通过

细节:由于我们Redis同步了数据库的库存,那么其实当校验通过时,我们的Redis是不是也需要扣减库存,并且在操作Redis时,我们是不是也需要保证原子性操作(使用Lua脚本)

步骤:

Lua脚本

开始(操作Redis)

==》判断库存是否充足

==》库存不足

==》返回1(约定标识)

-------

==》库存充足

==》判断用户是否下过单

==》用户已经下过单

==》返回2(约定标识)

-------

==》用户没有下过单

==》扣减库存(-1)

==》将用户id存入当前优惠券的set集合

==》返回0(约定标识)


--获取id
local voucherId = ARGV[1]
local userId = ARGV[2]


--获取key
local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId

--获取库存,判断
local stock = tonumber(redis.call('get', stockKey))
if not stock or stock <= 0 then
    return 1 -- 库存不足
end
--判断用户是否重复下单
if(redis.call('sismember',orderKey,userId) == 1) then
    --已经下过单
    return 2
end
-- 扣减库存
redis.call('incrby',stockKey,-1)
-- 保存用户id到Redis中
redis.call('sadd',orderKey,userId)

-- 返回
return 0

服务端

前端传过来优惠券id

==》后端接收id

==》传入Lua脚本需要的用户id和优惠券id

==》执行Lua脚本

==》判断返回结果是否为0

==》结果不为0

==》根据返回结果,返回对应的错误信息

==》1(库存不足),2(不能重复下单)

----

==》结果为0

==》将优惠券id和用户id和订单id存入阻塞队列

==》调用新的线程异步执行更新数据库操作(下单操作)

==》直接返回订单id(下单成功信息)

 

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<Long>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    //队列
    private final BlockingQueue<VoucherOrder>  orderTasks = new ArrayBlockingQueue<>(1024*1024);
    //线程池
    private static final ExecutorService  SCRIPT_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    //代理对象
    IVoucherOrderService proxy;

    @PostConstruct
    private void init(){
        //类加载就执行方法
        SCRIPT_ORDER_EXECUTOR.submit(new OrderRunTask());
    }

    private class OrderRunTask implements Runnable{
        @Override
        public void run() {
            while (true) {
                try {
                    //1.获取队列信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    //2.创建订单
                    handleOrder(voucherOrder);
                } catch (Exception e) {
                   log.error("处理订单异常:",e);
                }
            }
        }
    }

    private void handleOrder(VoucherOrder voucherOrder) throws InterruptedException {
        //设置锁
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            //获取锁失败
            log.error("获取锁失败");
            return ;
        }
        //获取锁成功
        //获取代理对象
        try {
            //创建订单
            proxy.creatOrderTask(voucherOrder);
        } catch (IllegalStateException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.获取用户id
        Long userId = UserHolder.getUser().getId();
        //2.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString());
        //3.判断
        long r = result.longValue();
        if (r != 0){
            //执行失败,无法下单
            return Result.fail(r==1?"库存不足":"无法重复下单");
        }
        //4.成功执行,可以下单
        // 阻塞队列
        Long orderId = redisIdWorker.setId("order");
        //5.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //5.1优惠券id
        voucherOrder.setVoucherId(voucherId);
        //5.2订单id
        voucherOrder.setId(orderId);
        //5.3用户id
        voucherOrder.setUserId(userId);
        //6.加入阻塞队列
        orderTasks.add(voucherOrder);
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        return Result.ok(orderId);
    }

 
    @Transactional
    public void creatOrderTask(VoucherOrder voucherOrder) {
        //根据用户id和优惠券id查询数据库
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count > 0) {
            //该用户已经下过单了
            log.error("不能重复下单");
            return;
        }
        //库存足
        //5.库存减1
        boolean success = iSeckillVoucherService.update()
                .setSql("stock = stock -1")
                .eq("voucher_id", voucherId).gt("stock", 0)//乐观锁
                .update();
        if (!success) {
            return;
        }
        //存入数据库
        save(voucherOrder);
    }
}

解释:

阻塞队列:有一个线程尝试去队列中获取元素时,有元素获取成功,没有那么该线程就会被阻塞(一直等待),直到队列中有元素,获取到元素,才能继续执行后续操作

线程池:准备线程池,来实现新线程异步执行下单操作,准备一个线程任务(下单)让线程执行(一直)

  • 思考:是不是当我们的项目一启动,用户就可以进行下单了,因此我们需要在类加载完毕时就开始执行任务,使用注解@PostConstruct
  • 再思考:由于我们是开启一个新的线程来创建订单,而不同的线程的TreadLocal空间并不共享,所以无法从中获取,同理代理对象也不能获取(原理也是根据线程id来获取的,而我们的线程id已经变化)

解决:我们本来就已经将用户id存入了阻塞队列,我们直接从队列中取值就行,而代理对象也可以将其存入阻塞队列中或者是定义一个成员变量(先在主线程将变量赋值,新的线程直接调用即可)

内存限制问题:由于我们使用的阻塞队列基于jvm来实现,使用的是jvm的内存,如果同时有大量用户下单(队列中的任务还没有来得及执行,内存没来得及释放),从而导致队列中的内存用完了,那么在之后下单的用户不会下单成功(内存限制)

数据安全问题:

  • 原理是先在Redis中保证订单信息,再由新的线程操作数据库完成下单,那如果在操作数据库之前(下单之前),服务器宕机了,没有下单成功(数据库中没有订单数据),Redis和数据库中的数据不一致
  • 原理线程从队列中取出任务后,该任务在队列中就已经删除了,那如果线程取出任务执行时发生了事故,导致任务还未执行完就终止了,而此时队列中也没有该任务了(数据库也没有进行下单操作),数据还是会不一致

3.秒杀的异步优化

3.1.基于消息队列的异步下单思路

分析:我们出现了问题是内存泄漏和数据安全问题,内存泄漏好解决,那么换一个,数据安全呢

例子:在之前,还没有快递柜时,快递员送快递需要受限于用户是否有时间接收(用户在上班上面的),如果用户很忙,快递员先把快递放门口,而用户担心快递被偷,不放门口一直等用户也不行,用户请假回家拿快递也不行,这样造成了双方不好的局面

解决:如果我们设置一个快递柜,快递员只需要将快递送到快递柜即可,用户有时间了自己去拿一下就行,而快递柜既保证了快递的安全也保证了快递的存放数量

思考:我们是不是也可以这样,生产者为快递员,队列为快递柜,消费者为用户,那我们该使用什么队列呢?使用消息队列

消息队列的介绍:存放消息的队列,最简单的消息队列包含3个角色

  • 消息队列:存储和管理消息
  • 生成者:发送消息到消息队列中
  • 消费者:从消息队列中获取消息并处理消息

优势:

  • 它独立于jvm,不受jvm内存限制
  • 不仅仅可以做消息的存储还可以做持久化(消息确认机制:你取出消息后,你需要消息确认,没有确认队列中的消息就不会消失,确保消息至少被消费一次)

基于Redis来实现消息队列的方式:

  • List结构:基于List结构来模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

3.2.基于List结构的消息队列

队列的基于原理:先进的先出去,出口与入口不一致

分析:那么我们就可以使用对应的List命令来实现(左边存,右边取,或者反之)

思路:我们具体使用的命令:BRPOP(左边存,右边取),这个命令它可以设置等待时间,那就代表使用该命令可以实现阻塞式获取数据

优点:

  • 使用Redis,不受jvm内存限制
  • List本身是Redis的数据结构,因此支持持久化,保证数据安全
  • 满足有序性

缺点:

  • 无法避免消息丢失:没有消息确认机制,消费者取出消息后,List中消息删除,而如果消费者自己出现了问题没有消费,导致消息丢失
  • 只支持单消费者:拿了消息就会删除消息(只能使用一次)

3.3.基于PubSub的消息队列

介绍:消息传递模型(广播),消费者可以订阅一个或多个channel(类似频道),只要生产者向对应频道发送消息,那么所有订阅该频道的消费者就都可以收到消息

优点:

  • 使用Redis,不受jvm内存限制
  • 采用发布订阅支持多消费多生成
  • 满足有序性

缺点:

  • 不支持数据持久化:发送一条消息,没人订阅,那么消息就会消失,并不会将消息保证到Redis中
  • 无法避免消息丢失:发送消息没人接收,那就丢失了
  • 消息堆积有上限,超出数据丢失:发送消息时,如果有消费者订阅(监听),那么消费者那里会有一个缓存区域(临时存储消息),消费完一条消息,缓存就减一条消息,如果突然有大量消息发出,消费者来不及处理,而缓存空间有限,超出空间数据丢失

3.4.基于Stream的消息队列

基础命令:

命令XREAD特点:

  • 消息可回溯
  • 可以多消费者抢消息(竞争),加快消费速度
  • 可以阻塞读取
  • 没有消息漏读风险
  • 有消息确认机制,保证消息至少被消费一次

消费者组:

3.5.Redis作为消息队列的3种方式对比

1. 消息持久化

  • List
    支持持久化,消息存储在内存中,可通过RDB/AOF机制持久化到磁盘。适合需要简单持久化的场景,但需注意内存容量限制。

  • PubSub
    不支持持久化。消息仅在发布时推送给当前在线的订阅者,若订阅者离线则消息丢失。适用于实时通知等临时性场景。

  • Stream
    支持持久化,消息按时间顺序存储,可长期保留。支持数据备份和恢复,适合需要高可靠性的场景。

2. 阻塞读取

  • List
    支持阻塞读取(如BLPOP命令),消费者可等待新消息到达,避免轮询资源浪费。适合需要长连接等待消息的场景。

  • PubSub
    不支持阻塞读取。订阅者需在线才能接收消息,消息即时推送后即失效,无法主动拉取历史消息。

  • Stream
    支持阻塞读取(如XREAD命令),消费者可阻塞等待新消息,并支持指定超时时间。结合消费者组时,能实现高效的消息分发。

3. 消息堆积处理

  • List
    消息堆积受限于内存空间,需通过多消费者并行消费(如多个客户端轮询同一List)加快处理速度。适用于低吞吐量场景,但需警惕内存溢出风险。

  • PubSub
    消息堆积能力极弱,受限于消费者缓冲区。若消费者处理速度慢,可能导致消息丢失或缓冲区溢出。仅适合瞬时流量场景。

  • Stream
    支持设定队列最大长度(MAXLEN),超过时自动淘汰旧消息。通过消费者组(Consumer Group)实现负载均衡,多个消费者可并行处理同一队列,显著减少堆积风险。适合高并发场景。

4. 消息确认机制

  • List
    不支持消息确认。消息一旦被消费者读取即从队列移除,若消费失败无法重新投递。需自行实现重试逻辑。

  • PubSub
    不支持消息确认。消息推送后即丢弃,无重试机制,可靠性较低。

  • Stream
    支持消息确认(XACK)。消费者处理消息后需显式确认,若未确认,消息会重新分配给其他消费者。结合消费者组的Pending Entries List(PEL),可实现可靠的消息投递。

5. 消息回溯

  • List
    不支持消息回溯。消息被消费后即从队列头部移除,无法重新访问历史数据。

  • PubSub
    不支持消息回溯。消息发布后仅推送给当前订阅者,无法追溯历史记录。

  • Stream
    支持消息回溯。通过消息ID(时间戳+序号)可精确读取历史消息(如XREAD指定起始ID),便于故障恢复或数据重放。

3.6.基于Stream消息队列实现异步秒杀下单 

Lua脚本:


--获取id
local voucherId = ARGV[1]
local userId = ARGV[2]

--orderId
local orderId = ARGV[3]

--获取key
local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId

--获取库存,判断
local stock = tonumber(redis.call('get', stockKey))
if not stock or stock <= 0 then
    return 1 -- 库存不足
end
--判断用户是否重复下单
if(redis.call('sismember',orderKey,userId) == 1) then
    --已经下过单
    return 2
end
-- 扣减库存
redis.call('incrby',stockKey,-1)
-- 保存用户id到Redis中
redis.call('sadd',orderKey,userId)

-- 发送消息
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
-- 返回
return 0
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<Long>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    //线程池
    private static final ExecutorService  SCRIPT_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    //代理对象
    IVoucherOrderService proxy;
    @PostConstruct
    private void init(){
        //类加载就执行方法
        SCRIPT_ORDER_EXECUTOR.submit(new OrderRunTask());
    }

    private class OrderRunTask implements Runnable{
        String queueName = "stream.orders";
        @Override
        public void run() {
            while (true) {
                try {
                    //1.获取队列信息
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //2.判断消息是否获取成功
                    if(list == null || list.isEmpty()){
                        //2.1.获取失败,没有消息,继续循环
                        continue;
                    }
                    //解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //3.获取成功,可以下单
                    handleOrder(voucherOrder);

                    //4.ACK确认
                  stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常:",e);
                    handlePendingList();
                }
            }
        }

        private void handlePendingList() {
            while (true) {
                try {
                    //1.获取队列信息
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //2.判断消息是否获取成功
                    if(list == null || list.isEmpty()){
                        //2.1.获取失败,说明pendList没有异常消息,退出
                        break;
                    }
                    //解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //3.获取成功,可以下单
                    handleOrder(voucherOrder);

                    //4.ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                } catch (Exception e) {
                    log.error("处理pending-list订单异常:",e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            }
        }
    }


    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.获取用户id
        Long userId = UserHolder.getUser().getId();
        //订单id
        Long orderId = redisIdWorker.setId("order");
        //2.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(),orderId.toString());
        //3.判断
        long r = result.longValue();
        if (r != 0){
            //执行失败,无法下单
            return Result.fail(r==1?"库存不足":"无法重复下单");
        }
        //获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        //返回订单id
        return Result.ok(orderId);
    }

}

问题:

  • Redis持久化数据还是会出现数据丢失风险
  • 只支持消费者确认,不支持生产者(如果是生产者发送消息时,出现了消息丢失呢?)

解决:使用专门的消息中间件