Redis学习Day3——黑马点评项目工程开发`

发布于:2024-09-18 ⋅ 阅读:(128) ⋅ 点赞:(0)

目录

扩展阅读推荐:

一、项目介绍及其初始化

1.1 数据库连接配置

1.2 工程启动演示

二、短信登录功能实现

2.1 基于传统Session实现的短信登录及其校验

2.1.1 基于Session登录校验的流程设计

2.1.2 实现短信验证码发送功能

2.1.3 实现登录、注册功能

2.1.4 实现登录状态校验拦截器

2.1.5 实现获取用户请求

2.1.6 (附加)用户信息脱敏处理

2.2 传统Session在集群环境下的弊端

2.3 基于Redis实现短信登录功能

2.3.1 基于Redis实现短信登录流程设计

2.3.2 修改发送短信验证码功能

2.3.3 修改登录、注册功能

2.3.4 添加刷新Token拦截器逻辑 (只做判断,不做拦截)

2.3.5 (补充)退出登录功能实现

2.3.6 【故障排查】关于一登陆后前端立马闪退回登录界面的问题

2.4 (TODO) 基于阿里云完善验证码功能

三、商户查询缓存系列功能实现

3.1 缓存的理解

3.2 查询商户店铺---添加Redis缓存

3.2.1 添加缓存逻辑理解

3.2.2 添加缓存逻辑实现

3.2.3 缓存功能效果展示

3.3(课后作业)查询商户分类列表---添加Redis缓存

3.3.1 使用String存储类型实现

3.3.2 使用List存储类型实现

3.3.3 缓存功能效果展示

3.4 (知识点)缓存更新策略最佳实践

3.4.1 数据一致性问题

3.4.2 缓存更新策略的最佳实践

总结:最佳的缓存更新策略:

3.4.3 实现查询商户店铺详细的缓存与数据库双写一致

3.5 (知识点)缓存穿透与解决策略

3.5.1 什么是缓存穿透?

3.5.2 缓存穿透的危害?

3.5.3 缓存穿透的解决措施

3.6 (知识点)缓存雪崩与解决策略 

3.7 (知识点)缓存击穿与解决策略

3.7.1 缓存击穿的解决方案

3.7.1.1 基于互斥锁的解决方案

3.7.1.2 基于逻辑过期的解决方案

3.7.1.3 总结比较

3.7.2 基于互斥锁解决缓存击穿案例——以请求店铺信息为例

3.7.2.1 互斥锁解决缓存击穿思路流程

3.7.2.2 代码实现——抽取方法

3.7.2.3 功能测试——基于JMeter的压力测试

3.7.3 基于逻辑过期解决缓存击穿案例——以请求店铺信息为例

3.7.3.1 逻辑过期解决缓存击穿思路流程

3.7.3.2 构建RedisData对象——组合优于继承

3.7.3.3 代码实现

3.7.3.4 功能测试——基于JMeter的压力测试

3.7.4【故障排查】关于逻辑过期时间策略无法查询店铺信息的现象

3.8 封装Redis缓存工具

3.8.1 封装代码

3.8.2 使用方法

四、 优惠卷秒杀系列功能实现

4.1 全局ID生成器

4.1.1 全局ID生成器的选型

4.1.2 全局ID生成器的实现

4.1.3 全局ID生成器的测试

4.1.4 其他ID生成器的拓展

4.2  利用PostMan模拟管理员后台添加秒杀优惠卷信息 

 4.3 优惠卷秒杀下单功能基础逻辑实现

4.4 (优化)解决超卖问题

4.4.1 超卖现象重现及其原因

4.4.2 解决超卖问题的方案比较

悲观锁方案

乐观锁方案

4.4.3 乐观锁两大方案比较

 版本号法(涉及ABA问题时使用)

CAS法(不涉及ABA问题时使用)

4.4.4 基于CAS法解决超卖问题

4.5  (优化)实现单机情况下 限购一人一单

4.5.1 限购功能整体概述

4.5.2 限购功能实现及说明

4.5.3 限购功能测试检验

4.6 (拓展)集群模式下限购一人一单 

4.6.1 单体模式下限购功能实现的局限性

4.6.2 配置集群环境,测试限购代码的并发问题

4.6.3 解决策略——分布式锁

4.7 (知识点)分布式锁 

4.7.1 分布式锁介绍

4.7.2 基于Redis的分布式锁方案

4.7.3 基于Redis实现分布式锁的初级版本

4.7.4 (优化)解决分布式锁出现的误删问题

4.7.4.1 出现误删的原因判断

4.7.4.2 解决方案——检查锁是不是自己的:

4.7.4.3 代码实现及其测试

4.7.5 (优化)解决分布式锁的原子性问题

4.7.5.1 非原子性操作带来的并发安全问题

4.7.5.2 解决方案——引入Rua脚本实现命令原子化

4.7.5.3 Rua脚本在Redis中的基本使用

4.7.5.4 使用Rua编写释放锁脚本

4.7.5.5 在IDEA编写使用lua脚本的代码

4.7.5.6 功能测试

4.8 (拓展知识点) Redisson ——成熟的锁工具包 

4.8.1 基于setnx实现的分布式锁还存在的问题

4.8.2 Redisson介绍

4.8.3 Redisson入门

4.8.4 Redisson的可重入原理

4.8.5 (TODO)Redisson的可重试原理

4.8.6(TODO) Redisson的超时释放原理

4.8.7 Redisson的主从一致性原理

4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题

4.9 Redis优化秒杀系列功能实现 

4.9.1 业务性能瓶颈分析

4.9.2 优化流程分析

4.9.3 改进秒杀业务代码改造及详细解释

4.9.4 Redis优化秒杀功能测试

4.9.5 总结

4.10(知识点) 学习Redis消息队列实现异步秒杀

4.10.1 基于List类型模拟的消息队列原理及其使用

4.10.2  基于PubSub的消息队列原理及其使用

 4.10.3(三者之最) 基于Stream的消息队列原理及其使用

4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能

4.10.4.1 创建Stream消息队列

4.10.4.2 修改秒杀资格判断的Lua脚本

4.10.4.3 完善秒杀下单代码及其代码说明

1. 首先是修改预加载Lua脚本信息

2. 主程序改变逻辑,将消息队列逻辑放入到了lua脚本

3. 重头戏,开启线程任务!!

4.10.4.4 秒杀下单功能测试

1. 简单测试功能是否实现,使用postman发送请求

2. 利用jmeter测试高并发情况

4.11 总结情况

五、达人探店系列功能实现

 5.1 发布和查看探店笔记

5.1.1 发布探店笔记

5.1.2 查看探店笔记

5.2 点赞功能实现

5.2.1 当前点赞功能存在的问题

5.2.2 完善点赞功能需求与实现步骤

5.2.3 点赞功能实现

5.2.4 点赞功能测试

5.3 (TODO)点赞排行榜功能实现

扩展阅读推荐:

Redis学习Day1——配置Linux环境下的运行环境-CSDN博客

Redis学习Day2——Redis基础使用-CSDN博客

黑马程序员Redis入门到实战教程_哔哩哔哩_bilibili

使用git命令行将本地仓库代码上传到gitee/github远程仓库-CSDN博客

布隆(Bloom Filter)过滤器——全面讲解,建议收藏-CSDN博客

实战篇-07.优惠券秒杀-实现一人一单功能_哔哩哔哩_bilibili

从认知到实现:一文读懂实现分布式锁的五种方案-CSDN博客

Lua 教程 | 菜鸟教程 (runoob.com)

Redisson帮助文档————Home · redisson/redisson Wiki (github.com)

深入了解分布式锁 Redisson 原理—可重入原理、可重试原理、主从一致性原理、解决超时锁失效

【黑马点评】已解决java.lang.NullPointerException异常-CSDN博客

一、项目介绍及其初始化

        学习Redis的过程,我们还将遇到各种实际问题,例如缓存穿透、雪崩、击穿等问题,只有在实际的项目实践中解决这些问题,才能更好的掌握和理解Redis的企业开发思维。

        以下是本次【黑马点评】项目的主要内容:

本项目的功能概述
本项目的搭建结构
本项目的搭建结构

1.1 数据库连接配置

在yml配置文件中,配置好自己的数据库连接信息

数据库表结构

1.2 工程启动演示

想要成功启动该项目,需要以下步骤:

1. 打开VM虚拟机,激活Linux系统中事先配置好的Redis数据库

2. 启动nignx服务器(注意nignx服务器必须放在无中文的目录下)

3. 启动后端程序(观察端口,访问初始工程)

4. 访问请求地址,验证工程启动正确http://localhost:8081/shop-type/list

前端网页端口:8080 
后端数据端口:8081

二、短信登录功能实现

2.1 基于传统Session实现的短信登录及其校验

2.1.1 基于Session登录校验的流程设计

2.1.2 实现短信验证码发送功能

请求接口 /user/code
请求类型 post
请求参数 phone
返回值
    /**
     * 发送手机验证码
     */
    @PostMapping("/code")
    public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
        log.info("发送验证码, 手机号:{}", phone);
        return userService.sendCode(phone, session);
    }

    /**
     * 发送验证码
     * @param phone
     * @param session
     * @return
     */
    @Override
    public Result sendCode(String phone, HttpSession session) {
        // 1. 校验手机号码
        if(RegexUtils.isPhoneInvalid(phone)){
            return Result.fail("手机号码格式错误!");
        }
        // 2. 生成验证码
        String code = RandomUtil.randomNumbers(6);
        // 3. 将验证码保存到Session中
        session.setAttribute("code", code);
        //TODO 4. 调用阿里云 将短信信息发送到指定手机
        log.info("发送短信验证码成功,验证码:{}", code);
        return Result.ok();
    }

2.1.3 实现登录、注册功能

请求接口 /user/login
请求类型 post
请求参数 LoginForm---> phone,code,[password]
返回值
    /**
     * 登录功能
     * @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
     */
    @PostMapping("/login")
    public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
        log.info("用户登录, 参数:{}", loginForm);
        return userService.login(loginForm, session);
    }


/**
     * 登录功能
     * @param loginForm
     * @param session
     * @return
     */
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        // 1. 校验手机号
        String phone = loginForm.getPhone();
        if(RegexUtils.isPhoneInvalid(phone)){
            return Result.fail("手机号码格式错误!");
        }
        // 2. 校验验证码
        Object cacheCode = session.getAttribute("code");
        String code = loginForm.getCode();
        if(cacheCode==null || !cacheCode.toString().equals(code)){
            return Result.fail("验证码错误!");
        }
        // 3. 根据手机号查询用户 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();
        //    if 0 :创建新用户,保存数据库,将用户信息存储到Session
        //
        if(user == null){
            user = createUserWithPhone(phone);
        }
        //else: 登录成功,将用户信息存储到Session
        session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
        return Result.ok();
    }


/**
     * 根据手机号创建用户
     * @param phone
     * @return
     */
    private User createUserWithPhone(String phone) {
        //创建用户
        User user = new User();
        user.setPhone(phone);
        user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
        //保存用户
        save(user);
        // 返回
        return user;
    }

2.1.4 实现登录状态校验拦截器

        由于日后项目功能会越来越多,需要登录才能进行访问的界面也会越来越多,我们必须想办法将登录状态校验抽离出来形成一个前置校验的条件,再放行到后续逻辑。

1. 封装TreadLocal工具类

将用户信息保存到 TreadLocal中 并封装TreadLocal工具类用于 保存用户、获取用户、移除用户

在 urils / UserHolder 

/**
 * TreadLocal工具类
 */
public class UserHolder {
    private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
    // 保存用户
    public static void saveUser(UserDTO user){
        tl.set(user);
    }
    // 获取ThreadLocal中的用户
    public static UserDTO getUser(){
        return tl.get();
    }
    // 清空ThreadLocal
    public static void removeUser(){
        tl.remove();
    }
}
2. 创建登录拦截器

在 urils / LoginInterceptor 

public class LoginInterceptor implements HandlerInterceptor {
    /**
     * 前置拦截器
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1. 获取session
        HttpSession session = request.getSession();
        // 2. 获取session中的用户
        Object user = session.getAttribute("user");
        // 3. 判断用户是否存在
        if(user == null){
            response.setStatus(401);
            return false;
        }
        // 4. 如果存在,用户信息保存到 ThreadLocal 并放行
        UserHolder.saveUser((UserDTO) user);
        return true;
    }

    /**
     * 后置拦截器(移除用户)
     */
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 移除用户
        UserHolder.removeUser();
    }
}
3. 添加配置,生效拦截器,并配置放行路径

在 config/ MvcConfig

@Configuration
public class MvcConfig implements WebMvcConfigurer {
    /**
     * 添加拦截器
     * @param registry
     */
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/host",
                        "/shop/**",
                        "/shop-type/**",
                        "/voucher/**"
                );
    }
}

2.1.5 实现获取用户请求

前端点击我的,发送请求到后端,获取当前登录状态,方能进入个人中心

    /**
     * 获取当前登录的用户
     * @return
     */
    @GetMapping("/me")
    public Result me(){
        UserDTO user = UserHolder.getUser();
        return Result.ok(user);
    }

2.1.6 (附加)用户信息脱敏处理

为防止出现以下这种情况(将用户隐私信息暴露过多),我们采用UserDTO对象对用户信息脱敏处理:

@Data
public class UserDTO {
    private Long id;
    private String nickName;
    private String icon;
}

并借助拷贝工具 进行对象拷贝

session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));

2.2 传统Session在集群环境下的弊端

Session共享问题

多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。

解决策略

1. 让Session可以共享

Tomcat提供了Session拷贝功能,但是这会增加服务器的额外内存开销,并且带来数据一致性问题

2. 【推荐】使用Redis进行替代

数据共享、内存存储(快)、key-value结构

2.3 基于Redis实现短信登录功能

2.3.1 基于Redis实现短信登录流程设计

发送短信验证码逻辑
校验登录状态逻辑

        对于验证码,使用 手机号码作为KEY,确保了正确的手机对应着正确的短信验证码。

        对于用户信息唯一标识使用 UUID生成的Token作为 KEY,而不使用手机号码,从而提高了用户数据安全性。

2.3.2 修改发送短信验证码功能

只需要在Session的基础上,将第三步保存到Redis中

格式:

key value TTL
login:code:[手机号码] [验证码] 120S
//        // 3. 将验证码保存到Session中
//        session.setAttribute("code", code);

        // 3. 将验证码保存到Redis中
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);

2.3.3 修改登录、注册功能

1. 手机号校验

2. 从Redis中取出验证码进行校验

3.查询用户信息

4. 将用户信息存储到Redis ---> 需要以Hash结构进行存储 ----> 需要将user对象转成 Map对象

5. 将token返回给客户端 ,充当Session的标识作用

/**
     * 登录功能
     * @param loginForm
     * @param session
     * @return
     */
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        // 1. 校验手机号
        String phone = loginForm.getPhone();
        if(RegexUtils.isPhoneInvalid(phone)){
            return Result.fail("手机号码格式错误!");
        }
        // 2. 校验验证码 REDIS
//        Object cacheCode = session.getAttribute("code");
        // 2.1 从Redis中获取验证码
        String redisCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
        // 2.2 校验验证码
        String code = loginForm.getCode();
        if(redisCode==null || !redisCode.equals(code)){
            return Result.fail("验证码错误!");
        }
        // 3. 根据手机号查询用户 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();
        //    if 0 :创建新用户,保存数据库,将用户信息存储到Session
        if(user == null){
            user = createUserWithPhone(phone);
        }
//        //else: 登录成功,将用户信息存储到Session
//        session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));

        // 4. 将用户信息存储到Redis中
         // 1. 随机生成token,作为登录令牌 ---> UUID导入工具包中的方法,不要导入java自带的
        String token = UUID.randomUUID().toString(true);
         // 2. 以hash结构进行存储
        UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class);
        //TODO 这里报错了,因为UserDTO中有个id属性,不是字符串,在Redis序列化下报错
//        Map<String,Object> userMap = BeanUtil.beanToMap(userDTO);
        Map<String,Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName,fieldValue)-> fieldValue.toString()));
        // 3. 存储到Redis中
        stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token,userMap);

        // 给token设置有效期
        // 超过30分钟不访问任何界面就会剔除,所以还需要设置在访问过程中不断更新token的有效期
        // 实现方式: 在登录拦截器中进行处理
        stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);

        // 5. 返回token到客户端,客户端保存到浏览器中
        return Result.ok(token);
    }

2.3.4 添加刷新Token拦截器逻辑 (只做判断,不做拦截)

        首先,由于需要在自定义的拦截器中使用StringRedisTemplate对象,由于不是交由spring管理的,所以我们需要自己写构造函数进行导入。同时在MvcConfig中直接交给Spring管理

        其次,这里选择了新建一个专门负责刷新Token的“拦截器”,只做判断不做拦截。确保请求在经过登录校验拦截器之前,会统一先被该“拦截器”获取,并对Token进行判断,如果没有Token,则会被接下来的登录拦截器进行拦截

package com.hmdp.config;

import com.hmdp.Interceptor.LoginInterceptor;
import com.hmdp.Interceptor.RefreshTokenInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.annotation.Resource;

@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 添加拦截器
     * @param registry
     */
    public void addInterceptors(InterceptorRegistry registry) {
        // 刷新token拦截器 全部拦截 只做判断
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**");
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/host",
                        "/shop/**",
                        "/shop-type/**",
                        "/voucher/**"
                );

    }
}
package com.hmdp.Interceptor;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.hmdp.dto.UserDTO;
import com.hmdp.utils.UserHolder;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL;

/**
 * Token缓存刷新拦截器 只会放行不会拦截
 */
public class RefreshTokenInterceptor implements HandlerInterceptor {

    private StringRedisTemplate stringRedisTemplate;

    /**
     * 手动创建的对象,需要手动注入,所以需要构造方法
     * @param stringRedisTemplate
     */
    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1.获取请求头中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            return true;
        }
        // 2.基于TOKEN获取redis中的用户
        String key  = LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        // 3.判断用户是否存在
        if (userMap.isEmpty()) {
            return true;
        }
        // 5.将查询到的hash数据转为UserDTO
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // 6.存在,保存用户信息到 ThreadLocal
        UserHolder.saveUser(userDTO);
        // 7.刷新token有效期
        stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
        // 8.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 移除用户
        UserHolder.removeUser();
    }
}

2.3.5 (补充)退出登录功能实现

        前端点击退出登录时发送logout请求,我们只需要将TreadLocal的用户对象给清除掉,这样一来前端的请求就获取不到用户信息,强制被拦截到登录界面了

    /**
     * 登出功能
     * @return 无
     */
    @PostMapping("/logout")
    public Result logout(){
        // 清除用户登录状态
        UserHolder.removeUser();
        return Result.ok();
    }

 2.3.6 (补充)查看用户首页功能实现

点击用户头像,可以进入用户的首页

查看用户首页
    /**
     * 根据id查询用户
     * @param userId
     * @return
     */
    @GetMapping("/{id}")
    public Result getUserById(@PathVariable("id") Long userId){
        User user = userService.getById(userId);
        if(user == null){
            return Result.ok();
        }
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        return Result.ok(userDTO);
    }

2.3.7【故障排查】关于一登陆后前端立马闪退回登录界面的问题

        在跟着视频练习的过程中,在所以代码开发完成后,我发现了每当自己点击登录校验成功后,前端又会重复的闪退回登录界面。对此我进行了以下排除手段:

1. 检查Redis是否 将 短信验证码及其用户token信息保存成功,有则证明这两个大环节没有问题

2. 猜测是拦截器问题:

         一开始,我们模仿老师将刷新Token的逻辑一并写到了登录校验拦截器上。但是!登录校验拦截器在开发的过程中,有一些需要Token【或是说需要校验TreadLocal对象】的接口并没有被拦截器拦截下来,导致前端认为该用户操作并未携带Token【没被存储到TreadLocal中】,从而误判为未登录状态,从而剔除该用户,强制跳转到登录界面。

        为此,秉承单一职责原则,对于Token,我们需要新建一个将所有界面都“拦截”的拦截器,这样子可以保证在进入后续拦截器的请求,不会再有被误判的情况出现。

    而且,也确保了不管用户进行了什么操作,Token都能刷新时长

(该故障经过拆分拦截器已正常解决,但是题主并没有深刻去揪到底是拿一些请求被误判了,这个故障原因也是我分析,如有高见请分享一下)

2.4 (TODO) 基于阿里云完善验证码功能

TODO

三、商户查询缓存系列功能实现

3.1 缓存的理解

        我们的程序如果想要用户有一个比较良好的使用体验,在请求数据速度上必然要有所突出。因此我们一般会在项目中运用缓存策略,从而提高我们程序的响应速度。与此同时,在添加缓存策略之后,数据一致性、缓存穿透、雪崩、热Key、维护成本提高等相继出现。如何平衡好这种关系,成为了我们学习Redis的重要所在。

3.2 查询商户店铺---添加Redis缓存

3.2.1 添加缓存逻辑理解

        由于Redis访问快的优点,我们在客户端与数据库之间添加一层 Redis数据层。用户发送的请求首先打到Redis进行查询,

- 如果在Redis上查询命中,则直接返回给用户,从而减轻了底层数据库服务器数据压力

- 如果没能命中,则请求打到数据库进行查询,查询未命中则说明此次请求为 错误请求

- 数据库命中的话,就将数据写入Redis中 ,接着返回给用户。

缓存作用模型

3.2.2 添加缓存逻辑实现

    /**
     * 根据id查询商铺信息
     * @param id 商铺id
     * @return 商铺详情数据
     */
    @GetMapping("/{id}")
    public Result queryShopById(@PathVariable("id") Long id) {
        return shopService.queryById(id);
    }


 /**
     * 根据id查询店铺(添加Redis缓存版)
     * @param id
     * @return
     */
    @Override
    public Result queryById(Long id) {
        //1. 根据id到Redis中查询用户信息
        //2. Redis命中 ----------------> 返回商户信息 -------> 结束
        //3. Redis未命中 查询数据库
        //4. 查询数据库未命中 ----------------> 返回错误信息 ------> 结束
        //5. 数据库命中 ---------------> 将数据写入Redis ------> 返回商户信息 -------> 结束
        String stopJson =  stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
        if(StrUtil.isNotBlank(stopJson)){
            // 存在直接返回
            Shop shop = JSONUtil.toBean(stopJson, Shop.class); //将JSON字符串转换为对象
            return Result.ok(shop);
        }
//        query().eq("shop_id",id);
        // 查询数据库
        Shop shop = getById(id);
        if(shop == null){
            Result.fail("店铺不存在");
        }
        stringRedisTemplate.opsForValue().set(
                RedisConstants.CACHE_SHOP_KEY + id,
                  JSONUtil.toJsonStr(shop),
                  RedisConstants.CACHE_SHOP_TTL,
                  TimeUnit.MINUTES);
        return Result.ok(shop);
    }

3.2.3 缓存功能效果展示

第一次查询商户店铺,未缓存Redis,后台查询数据库

第二次再次查询商户店铺,已缓存Redis,后台没有查询店铺的数据库语句

3.3(课后作业)查询商户分类列表---添加Redis缓存

3.3.1 使用String存储类型实现

    /**
     * 查询店铺类型 (添加Redis版)
     * @return
     */
    @GetMapping("list")
    public Result queryTypeList() {
//        List<ShopType> typeList = typeService
//                .query().orderByAsc("sort").list();
        return typeService.queryTypeList();
    }


/**
     * 查询店铺类型列表(添加Redis版)
     * String 实现版
     * @return
     */
    @Override
    public Result queryTypeList() {
        // 1. 在Redis中查询店铺类型列表
        // 2. Redis命中 ------> 直接返回店铺类型数据 -------> 结束
        // 3. Redis未命中, 查询数据库
        // 4. 数据库未命中 -------> 返回报错信息 --------> 结束
        // 5. 数据库命中,-------> 将数据存入Redis --------> 返回店铺类型数据 --------> 结束
        String Key = RedisConstants.CACHE_SHOP_TYPE_KEY;
        String shopTypeJSON = stringRedisTemplate.opsForValue().get(Key);
        // 将字符串转换为对象
        List<ShopType> shopTypeList = null;
        if(StrUtil.isNotBlank(shopTypeJSON)){
            shopTypeList = JSONUtil.toList(shopTypeJSON, ShopType.class);
            return Result.ok(shopTypeList); // 返回店铺类型数据
        }
        // 查询数据库
        shopTypeList = query().orderByAsc("sort").list();
        // 将对象转换为字符串
        shopTypeJSON = JSONUtil.toJsonStr(shopTypeList);
        // 将数据存入Redis
        stringRedisTemplate.opsForValue().set(Key, shopTypeJSON);
        return Result.ok(shopTypeList);
    }

3.3.2 使用List存储类型实现

/**
     * 查询店铺类型列表(添加Redis版)
     * List 实现版
     * @return
     */
    @Override
    public Result queryTypeList() {
        // 1. 在Redis中查询店铺类型列表
        // 2. Redis命中 ------> 直接返回店铺类型数据 -------> 结束
        // 3. Redis未命中, 查询数据库
        // 4. 数据库未命中 -------> 返回报错信息 --------> 结束
        // 5. 数据库命中,-------> 将数据存入Redis --------> 返回店铺类型数据 --------> 结束
        String Key = RedisConstants.CACHE_SHOP_TYPE_KEY;
        // 获取列表中所有元素(字符串格式)
        List<String> shopTypeJSON = stringRedisTemplate.opsForList().range(Key, 0, -1);   // 获取列表中所有元素

        if(shopTypeJSON != null && !shopTypeJSON.isEmpty()){
            // Redis中存在数据,需要将所有的Value转换成 ShopType对象
            // 将字符串转换为对象
            List<ShopType> shopTypeList = new ArrayList<>();
            for(String str : shopTypeJSON){
                shopTypeList.add(JSONUtil.toBean(str, ShopType.class));
            }
            return Result.ok(shopTypeList); // 返回店铺类型数据
        }
        // 查询数据库
        List<ShopType> shopTypeList = query().orderByAsc("sort").list();
        if(shopTypeList == null || shopTypeList.isEmpty()){
            return Result.fail("店铺类型不存在");
        }
        // 将对象转换为字符串(每一项都是)
        for (ShopType shopType : shopTypeList) {
            stringRedisTemplate.opsForList().rightPushAll(Key, JSONUtil.toJsonStr(shopType));
        }
        // 设置过期时间
        stringRedisTemplate.expire(Key, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        return Result.ok(shopTypeList);
    }

3.3.3 缓存功能效果展示

分类列表数据成功存储到Redis中
查询效率从1秒提升到45毫秒

3.4 (知识点)缓存更新策略最佳实践

3.4.1 数据一致性问题

        前面引入Redis时已经讲过了,缓存的使用可以降低后端负载、降低响应时间、提高读写效率。但也会带来系列问题,数据一致性便是其中最为常见的问题之一。

        数据一致性问题的根本原因就是缓存和数据库中的数据不同步,因此,我们需要提出一套良好的缓存更新方案,尽可能的使得缓存和数据库中的数据进行及时同步。

3.4.2 缓存更新策略的最佳实践

  • 内存淘汰机制【全自动触发】

        所谓内存淘汰机制,就是当Redis出现内存不足的情况下,会选择性的剔除一部分数据。这种全自动触发的淘汰机制不受控制,且触发概率不高,因此只适用于一些低一致性需求的业务

  • 超时剔除机制【半自动触发】

        超时剔除机制通常被我们当作一个保底策略,就是习惯性的给缓存数据设置一定的过期时间TTL。到期后,由Redis自动进行剔除,从而更好的利用缓存空间

  • 主动更新机制【手动触发】

    手动编码实现缓存更新,在修改数据库的同时更新缓存,实现双写。

    特别灵活,可控性好

    • 读写穿透(Read / Write Through Pattern)

              缓存与数据库整合成为一个服务,由服务来维护统一性。调用者调用该服务,无需关心缓存的一致性问题

      问题:想实现困难,市面上也很难找到这种服务

    • 写回方案  (Write Behind Caching Pattern)

             调用者只操作缓存,由其他一个独立线程异步的将缓存数据持久化到数据库,从而实现最终保持一致

      问题:异步任务复杂,一致性难以保证(异步非同步),宕机及丢失

    • 双写方案  (Cache Aside Pattern)

      由缓存的调用者,在更新数据库的同时去更新缓存

      • 更新缓存模式

        无效写操作较多,不推荐使用:

                例如对于一个数据,我们要进行100次的修改更新。如果每次都去更新,实际上只有最后一次更新是有效操作。所有对于查询少的情况下,更新缓存模式性能反而不太好。

      • 删除缓存模式

        无效写操作相对较少,推荐使用

        • 先操作缓存

          先操作缓存再操作数据库在多线程环境下出错情况说明:

          【线程1 做更新缓存操作    线程2 做查询操作】

                  由于先操作缓存,线程1执行删除缓存操作

                  如果 从删除缓存 到 更新完成这一个过程复杂 耗时特别长

                  同一时间,线程2执行查询操作,由于缓存未命中,查询数据库

                  此时还未更新完成,查询到旧的值,并把旧的值写回了缓存

                  这样一来,缓存数据和数据库数据产生了不一致

          这个概率比较大:

                   线程1先删掉缓存,然后执行一个耗时长的更新动作

                   而线程2 则是进行查询缓存 和 写入缓存的动作,耗时短

        • 先操作数据库

          先操作数据库再删除缓存出错情况说明:

          【线程1 做查询操作    线程2 做更新操作】

           假设恰好缓存失效了 

           线程1来查,刚好处于未命中状态,于是查询数据库【旧值】

           并准备把数据库数据写入缓存

           恰好线程2 执行更新数据库操作,将数据库的值更新成【新值】

           最后线程1终于开始执行写入缓存操作了,但是写的是【旧值】

          这个概率微乎其微:

          两个恰好,其次在线程1查询后 写缓存的这个微妙级别的操作之内,

          必须恰好有另外的线程完成了 更新数据库这一耗时长的操作。

总结:最佳的缓存更新策略:

        使用超时剔除机制作为保底策略,采用主动更新机制中的双写模式,选择删除缓存模式,先操作数据库后操作缓存。

3.4.3 实现查询商户店铺详细的缓存与数据库双写一致

超时剔除机制保底:

// 在原先的 queryById 方法中 添加过期时间
// 将数据写入Redis + 设置过期时间
        stringRedisTemplate.opsForValue().set(
                RedisConstants.CACHE_SHOP_KEY + id,
                  JSONUtil.toJsonStr(shop),
                  RedisConstants.CACHE_SHOP_TTL,
                  TimeUnit.MINUTES);

更新数据时,主动删除缓存:

注意添加事务,确保更新数据库和删除缓存的原子性

/**
     * 更新店铺信息
     * @param shop
     * @return
     */
    @Override
    @Transactional
    public Result update(Shop shop) {
        Long id = shop.getId();
        if(id == null){
            return Result.fail("店铺id不能为空");
        }
        // 1. 更新数据库
        updateById(shop);
        // 2. 删除缓存
        stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + shop.getId());
        // 3. 返回结果
        return Result.ok();
    }

3.5 (知识点)缓存穿透与解决策略

3.5.1 什么是缓存穿透?

        缓存穿透是指客户端请求的数据在缓存中和在数据库中都不存在,这样缓存永远不会生效,这些请求都会直接被达到数据库上。

3.5.2 缓存穿透的危害?

        若是有大量这种无效、恶意请求直接打到数据库。会极大地增加数据库本身的压力,很可能造成数据库宕机。就像DDOS攻击,导致正常的客户端请求无法得到及时的响应。

3.5.3 缓存穿透的解决措施

        以下介绍的两种方式都是被动的解决缓存穿透方案。除此之外,我们还可以采用主动的方案预防缓存穿透,比如:增强id的复杂度避免被猜测id规律做好数据的基础格式校验加强用户权限校验

3.5.3.1 缓存空对象
优点: 缺点:
实现简单,维护方便

1. 额外的内存消耗

2. 可能造成短期的不一致问题

缓存空对象图解
3.5.3.2 布隆过滤器

当一个元素加入布隆过滤器中的时候,会进行如下操作:

  1. 使用布隆过滤器中的哈希函数对元素值进行计算,得到哈希值(有几个哈希函数得到几个哈希值)。
  2. 根据得到的哈希值,在位数组中把对应下标的值置为 1。

当我们需要判断一个元素是否存在于布隆过滤器的时候,会进行如下操作:

  1. 对给定元素再次进行相同的哈希计算;
  2. 得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。
优点: 缺点:
内存占用少,没有多余的Key

1. 实现复杂

2. 存在误判可能

布隆过滤器应用图解
3.5.3.3 解决缓存穿透——以请求店铺信息为例
解决缓存穿透的流程图

 缓存空对象实现代码

/**
     * 根据id查询店铺(添加Redis缓存版 + 解决缓存穿透[缓存空对象])
     * @param id
     * @return
     */
    @Override
    public Result queryById(Long id) {
        //1. 根据id到Redis中查询用户信息
        //2. Redis命中 ----------------> 返回商户信息 -------> 结束
        //2.改:Redis命中 ----------------> 判断是否为空对象-------> 结束
        //                                        | 非空
        //                                        ------> 返回商户信息 -------> 结束
        //3. Redis未命中 查询数据库
        //4. 查询数据库未命中 ----------------> 返回错误信息 ------> 结束
        //4.改: 查询数据库未命中----------> 将空对象写入Redis  ------> 结束
        //5. 数据库命中 ---------------> 将数据写入Redis ------> 返回商户信息 -------> 结束


        String stopJson =  stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
        // 判断Redis中是否存在数据
        if(StrUtil.isNotBlank(stopJson)){
            // 存在直接返回
            Shop shop = JSONUtil.toBean(stopJson, Shop.class); //将JSON字符串转换为对象
            return Result.ok(shop);
        }
        // 判断Redis中是否存在空对象
        if  (RedisConstants.CACHE_PENETRATION_NULL_VALUE.equals(stopJson)){
            return Result.fail("店铺信息不存在");
        }

//        query().eq("shop_id",id);
        // 查询数据库
        Shop shop = getById(id);
        // 查询数据库不存在
        if(shop == null){
            // 将空对象写入Redis
            stringRedisTemplate.opsForValue().set(
                    RedisConstants.CACHE_SHOP_KEY + id,
                    RedisConstants.CACHE_PENETRATION_NULL_VALUE,
                    RedisConstants.CACHE_NULL_TTL,
                    TimeUnit.MINUTES
            );
            return Result.fail("店铺不存在");
        }

        // 将数据写入Redis + 设置过期时间
        stringRedisTemplate.opsForValue().set(
                RedisConstants.CACHE_SHOP_KEY + id,
                JSONUtil.toJsonStr(shop),
                RedisConstants.CACHE_SHOP_TTL,
                TimeUnit.MINUTES
        );
        return Result.ok(shop);
    }

 测试结果

第一次查询不存在的店铺,Redis缓存空对象,报错信息为:店铺不存在
第二次查询,Redis已缓存空对象,报错信息为:店铺信息不存在

3.6 (知识点)缓存雪崩与解决策略 

        缓存雪崩是指在同一时间段大量缓存key同时失效或者Redis服务宕机,导致大量请求打到数据库,带来巨大压力

缓存雪崩示意图

解决策略:

  1. 事前Redis集群部署,主从 + 哨兵机制, 避免全盘崩溃宕机
  2. 事中本地缓存 + 限流和降级,避免数据库压力过大造成宕机
  3. 事后Redis持久化,机器重启后可以自动从磁盘加载数据,恢复缓存数据

3.7 (知识点)缓存击穿与解决策略

        缓存击穿问题又叫热点Key问题,是指一个高访问量并且缓存重建业务较为复杂的key突然失效了,导致无数请求直接打到数据库,造成巨大压力的情况

        如图,在线程1进行缓存重建的过程,由于重建业务耗时较长,在重建业务期间,有其他大量线程执行查询操作,由于缓存未命中,均尝试执行查询数据库重建缓存的重复操作。

3.7.1 缓存击穿的解决方案

3.7.1.1 基于互斥锁的解决方案

        为了防止在缓存重建的过程中,其余线程也去进行查询数据库重建缓存。互斥锁策略则是采用给第一个尝试重建缓存的线程添加互斥锁,其余的线程则在不断地进行 “尝试获取锁 --- 休眠等待”。从而减少了查询数据库造成的压力问题。

互斥锁解决缓存击穿的原理图

3.7.1.2 基于逻辑过期的解决方案

        首先,我们会给这种高访问量业务的Key设置一个逻辑过期时间(到期不会被Redis自动删除,以确保缓存必定命中)。

        然后,线程每次访问时,会先对当前时间与逻辑过期时间进行判断,过期则获取一个互斥锁,来表明自己是第一个发现需要缓存重建的线程。

        接着,该线程就会开启一个独立线程,专门用于执行缓存重建的任务。自己则是先返回旧的数据使用

        在缓存重建线程执行完成之前,互斥锁不会释放,此时其他线程在访问的过程中获取锁失败,则直接返回旧数据

        最后,当缓存重建线程执行完毕后,释放互斥锁。

3.7.1.3 总结比较

互斥锁追求的是对一致性要求较高的业务,但是代价是多线程等待,性能受到了一定的影响

逻辑过期追求的是高性能的服务,但是却牺牲了一致性。

3.7.2 基于互斥锁解决缓存击穿案例——以请求店铺信息为例

3.7.2.1 互斥锁解决缓存击穿思路流程
基于互斥锁解决缓存击穿的原理图
3.7.2.2 代码实现——抽取方法

        在实现功能的过程中,我们人为的在进行缓存重建的过程中添加了Thread休眠,这样使得整个缓存重建的时长变长,模拟复杂的重建业务,从而更容易能展示出互斥锁在高并发情况下减缓数据库压力的作用

        模拟重建的延迟情况 : Thread.sleep(100);

public Result queryById(Long id) {
        //1. 根据id到Redis中查询用户信息
        //2. Redis命中 ----------------> 返回商户信息 -------> 结束
        //2.改:Redis命中 ----------------> 判断是否为空对象-------> 结束
        //                                        | 非空
        //                                        ------> 返回商户信息 -------> 结束
        //3. Redis未命中  -----> 查询数据库
        //3.改:Redis未命中 -------> 尝试获取互斥锁 ------> 获取成功 -----> 查询数据库 -------> 将数据库结果写入Redis -------> 释放锁 ------> 返回商户信息 -------> 结束
        //                                     | 获取失败
        //                                     ------> 休眠一段时间后重试
        //4. 查询数据库未命中 ----------------> 返回错误信息 ------> 结束
        //4.改: 查询数据库未命中----------> 将空对象写入Redis  ------> 结束
        //5. 数据库命中 ---------------> 将数据写入Redis ------> 返回商户信息 -------> 结束
        // -------------------------------------------上述思路将被分装成两个方法分别解决 缓存穿透 和 缓存击穿 问题------------------------------------------------------------------------------------------------ //

        // 基于互斥锁解决缓存击穿问题
        Shop shop = queryWithMutex(id);
        if(shop == null){
            return Result.fail("店铺不存在");
        }
        return Result.ok(shop);
    }



   /**
     * 获取互斥锁  setNx  ----- setIfAbsent
     * @param key
     * @return
     */
    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key,"1",RedisConstants.LOCK_SHOP_TTL,TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 释放互斥锁
     * @param key
     */
    private void unLock(String key){
        stringRedisTemplate.delete(key);
    }


    /**
     * 基于互斥锁解决缓存击穿问题保存代码
     * @param id
     * @return
     */
    private Shop queryWithMutex(Long id){
        String stopJson =  stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
        // 判断Redis中是否存在数据
        if(StrUtil.isNotBlank(stopJson)){
            // 存在直接返回
            Shop shop = JSONUtil.toBean(stopJson, Shop.class); //将JSON字符串转换为对象
//            return Result.ok(shop);
            return shop;
        }
        // 判断Redis中是否存在空对象
        if  (RedisConstants.CACHE_PENETRATION_NULL_VALUE.equals(stopJson)){
//            return Result.fail("店铺信息不存在");
            return null;
        }

        // 4. 实现缓存重建
        //4.1 尝试获取互斥锁
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        Shop shop = null;
        try {
            boolean isLock = tryLock(lockKey);
            //4.2 获取互斥锁失败 休眠一段时间后 重新查询Redis(自旋)
            if(!isLock){
                Thread.sleep(50);
                return queryWithMutex(id);
            }
            //4.3 获取互斥锁成功 查询数据库 将商铺信息写入Redis
//        query().eq("shop_id",id);
            // 查询数据库
            shop = getById(id);

            // 模拟重建的延迟情况
            Thread.sleep(100);

            // 查询数据库不存在
            if(shop == null){
                // 将空对象写入Redis
                stringRedisTemplate.opsForValue().set(
                        RedisConstants.CACHE_SHOP_KEY + id,
                        RedisConstants.CACHE_PENETRATION_NULL_VALUE,
                        RedisConstants.CACHE_NULL_TTL,
                        TimeUnit.MINUTES
                );
    //            return Result.fail("店铺不存在");
                return null;
            }

            // 将数据写入Redis + 设置过期时间
            stringRedisTemplate.opsForValue().set(
                    RedisConstants.CACHE_SHOP_KEY + id,
                    JSONUtil.toJsonStr(shop),
                    RedisConstants.CACHE_SHOP_TTL,
                    TimeUnit.MINUTES
            );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            //4.4 释放互斥锁
            unLock(lockKey);
        }
        //4.5 返回商铺信息
//        return Result.ok(shop);
        return shop;
    }
3.7.2.3 功能测试——基于JMeter的压力测试

配置JMeter测试任务

 配置访问后端数据地址

访问后端数据接口地址:8081 

 清空缓存店铺信息

清空缓存数据

执行测试任务

全部访问通过,QPS200

后台查询数据库只执行了一次

3.7.3 基于逻辑过期解决缓存击穿案例——以请求店铺信息为例

3.7.3.1 逻辑过期解决缓存击穿思路流程
基于逻辑过期时间解决缓存击穿的原理图
3.7.3.2 构建RedisData对象——组合优于继承

        我们先前定义的Shop对象实际上是没有逻辑过期时间字段的。如何解决这个问题呢?明显有两种方法:

基于继承的方法:【对Shop有侵入性】

        创建一个父类,内涵 expireTime字段,让原先的Shop对象继承父类,从而获得逻辑过期时间字段。

基于组合的方法:

        定义一个组合对象RedisData,包含一个逻辑过期时间字段和一个Object对象

3.7.3.3 代码实现

代码包括:

1. 开启一个大小为10的线程池

2. 查询店铺方法

3. 封装RedisData对象的公用方法【需要用于单元测试获取数据,所以要定义成公用方法】

4. 封装利用逻辑过期时间解决缓存击穿的私有方法

单元测试代码

@SpringBootTest
class HmDianPingApplicationTests {

    @Resource
    private ShopServiceImpl shopService;

    @Test
    void testSaveShop() throws InterruptedException {
        shopService.saveShop2Redis(1L,10L);
    }
}

 /**
     * 开启线程池
     */
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);



/**
     * 根据id查询店铺(添加Redis缓存版 + 解决缓存穿透【缓存空对象】+ 互斥锁实现方案)
     * @param id
     * @return
     */
    @Override
    public Result queryById(Long id) {
        
        // 基于逻辑过期解决缓存击穿问题
        Shop shop = queryWithLogicalExpire(id);
        if(shop == null){
            return Result.fail("热点店铺不存在");
        }
        return Result.ok(shop);
    }



/**
     * 创建RedisData对象 【原Shop + 逻辑过期时间字段】
     * @param id
     * @param expireTime
     */
    public void saveShop2Redis(Long id, Long expireTime) throws InterruptedException {
        // 1.查询店铺数据
        Shop shop = getById(id);
        // 模拟休息情况
        Thread.sleep(100);
        // 2.封装逻辑过期时间
        RedisData redisData = new RedisData();
        redisData.setData(shop);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireTime));
        // 3.写入Redis
        stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
    }



    /**
     * 基于逻辑过期解决缓存击穿问题 保存代码、
     * 为什么不用判断缓存穿透问题,因为一定有Key,如果没有Key必然不对,不需要让它继续访问数据库了,直接打回就可
     * @param id
     * @return
     */
    private Shop queryWithLogicalExpire(Long id){

        String stopJson =  stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
        //1. 判断Redis中是否存在数据
        if(StrUtil.isBlank(stopJson)){
            //1.1  未命中直接结束
            return null;
        }
        //2. 命中了,判断缓存是否过期
        //2.1 将JSON字符串反序列为对象
        RedisData redisData = JSONUtil.toBean(stopJson,RedisData.class);
        JSONObject data =(JSONObject) redisData.getData();
        Shop shop = JSONUtil.toBean(data,Shop.class);
        LocalDateTime expireTime = redisData.getExpireTime();

        if(expireTime.isAfter(LocalDateTime.now())){ // 未过期
            // 直接返回
            return shop;

        }
        // 3. 已过期,需要缓存重建
        // 4. 尝试获取互斥锁
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        if(isLock){
            //5. 获取互斥锁成功,开启独立线程,查询数据库,重建缓存
            CACHE_REBUILD_EXECUTOR.submit(() ->{
                try {
                    //  重建缓存
                    this.saveShop2Redis(id, 20L);

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    // 6. 释放互斥锁
                    unLock(lockKey);
                }
            });
        }
        return shop;
    }


3.7.3.4 功能测试——基于JMeter的压力测试

        想要测试【逻辑过期时间】策略,必须先在缓存中准备好数据,否则测试不成功,具体原因请看3.7.4故障排除

1. 执行测试方法获取Redis缓存店铺数据

等到该缓存数据过期后再执行方法

2. 修改后台数据库中店铺名,用于后续更好的观察该策略的效果

3. JMemet测试

全部请求都通过了,但是前面的请求查询到的数据还是 “102茶餐厅”【旧数据】

从这个请求开始,重建线程已经完成,后续请求都和后台数据库同步了,都是“103茶餐厅”

3.7.4【故障排查】关于逻辑过期时间策略无法查询店铺信息的现象

        当我们使用逻辑过期时间策略时,在直接访问店铺数据时会发现无论怎么样,店铺数据都不会被存储到缓存中,也不会去查询数据库,导致了店铺数据一直无法访问。

        这是出于逻辑过期时间策略 默认要求 热点Key数据是由管理员事先存放到Redis中设置好,等到活动结束后再人为的删除掉的。

        所以在第一次访问时,如果没有提前将店铺数据存放在Redis,该策略直接返回空对象后结束,因而造成了这种现象的发生。

无法正常获取信息
缓存不会存储信息

3.8 封装Redis缓存工具

3.8.1 封装代码

@Component
@Slf4j
public class CacheClient {

    private final StringRedisTemplate stringRedisTemplate;

    /**
     * 开启线程池
     */
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    /**
     * 获取互斥锁  setNx  ----- setIfAbsent
     * @param key
     * @return
     */
    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key,"1",RedisConstants.LOCK_SHOP_TTL,TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 释放互斥锁
     * @param key
     */
    private void unLock(String key){
        stringRedisTemplate.delete(key);
    }

    /**
     * 构造方法
     * @param stringRedisTemplate
     */
    public CacheClient(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /**
     * 将任意Java对象序列化为json并存储在String类型的key中,并且可以设置过期时间
     * @param key
     * @param value 任意java对象
     * @param time 过期时间
     * @param unit 过期时间单位
     */
    public void set(String key, Object value, Long time, TimeUnit unit){
        // 序列化
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
    }

    /**
     * 将任意java对象序列化成json并存储在String类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
     * @param key
     * @param value 任意java对象
     * @param time  逻辑过期时间
     * @param unit  逻辑过期时间单位
     */
    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
        // 封装逻辑过期时间
        RedisData redisData = new RedisData();
        redisData.setData(value);
        // 利用uint的 toSeconds 转换成Second
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        // 写入Redis【逻辑过期】
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
    }


    /**
     * 根据指定的key查询缓存,并反序列为指定的类型,利用缓存空值来解决缓存穿透问题
     * @param keyPrefix  key前缀
     * @param id         查询数据库id
     * @param type       返回值类型
     * @param dbFallback 数据库查询方法
     * @param time       过期时间
     * @param unit       过期时间单位
     * @return
     * @param <R>   返回值类型
     * @param <ID>   查询数据库id类型
     */
    public <R,ID> R  queryWithPassThrough(
            String keyPrefix,
            ID id,
            Class<R> type,
            Function<ID,R> dbFallback,
            Long time,
            TimeUnit unit
    ) {
        String key = keyPrefix + id;
        //1. 查询Redis
        String json = stringRedisTemplate.opsForValue().get(key);
        //2. 判断是否存在
        if(StrUtil.isNotBlank(json)){
            //3. 存在,直接返回
            // 反序列化回 type 类型
            return JSONUtil.toBean(json,type);
        }
        //判断是否为空值
        if(json != null){
            // 返回
            return null;
        }
        //4. 不存在,根据id查询数据库
        R r = dbFallback.apply(id);
        //5. 不存在,返回错误
        if(r == null){
            // 写入空值
            stringRedisTemplate.opsForValue().set(key,CACHE_PENETRATION_NULL_VALUE,CACHE_NULL_TTL,TimeUnit.MINUTES);
            return null;
        }
        //6. 存在,写入Redis
        this.set(key,r,time,unit);
        //7. 返回
        return r;
    }


    /**
     * 根据指定的key查询缓存,并反序列成指定类型,利用逻辑过期解决缓存击穿问题
     * @param KeyPrefix   key前缀
     * @param id          查询数据库id
     * @param type        返回值类型
     * @param dbFallback  数据库查询方法
     * @param time        逻辑过期时间
     * @param unit        逻辑过期时间单位
     * @return
     * @param <R>         返回值类型
     * @param <ID>        查询数据库id类型
     */
    public <R,ID> R queryWithLogicalExpire(
            String KeyPrefix,
            ID id,
            Class<R> type,
            Function<ID,R> dbFallback,
            Long time,
            TimeUnit unit
    ) {
        // 查询Redis
        String json =  stringRedisTemplate.opsForValue().get(KeyPrefix + id);
        //1. 判断Redis中是否存在数据
        if(StrUtil.isBlank(json)){
            //1.1  不存在直接结束
            return null;
        }
        //2. 命中了,判断缓存是否过期
        //2.1 将JSON字符串反序列为对象
        RedisData redisData = JSONUtil.toBean(json,RedisData.class);
        JSONObject data =(JSONObject) redisData.getData();
        R r = JSONUtil.toBean(data,type);
        LocalDateTime expireTime = redisData.getExpireTime();

        if(expireTime.isAfter(LocalDateTime.now())){ // 未过期
            // 直接返回
            return r;

        }
        // 3. 已过期,需要缓存重建
        // 4. 尝试获取互斥锁
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        if(isLock){
            //5. 获取互斥锁成功,开启独立线程,查询数据库,重建缓存
            CACHE_REBUILD_EXECUTOR.submit(() ->{
                try {
                    //  重建缓存
                    //1. 查数据库
                    R r1 = dbFallback.apply(id);
                    //2. 写redis
                    this.setWithLogicalExpire(KeyPrefix,r1,time,unit);

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    // 6. 释放互斥锁
                    unLock(lockKey);
                }
            });
        }
        return r;
    }
}

3.8.2 使用方法

/**
     * 根据id查询店铺(添加Redis缓存版 + 解决缓存穿透【缓存空对象】+ 互斥锁实现方案)
     * @param id
     * @return
     */
    @Override
    public Result queryById(Long id) {
        // 解决缓存穿透问题
//        Shop shop = queryWithPassThrough(id);

        // 基于互斥锁解决缓存击穿问题
//        Shop shop = queryWithMutex(id);

        // 基于逻辑过期解决缓存击穿问题 【需要提前准备好Redis缓存数据】
//        Shop shop = queryWithLogicalExpire(id);


        // 基于自定义封装的Redis缓存工具解决缓存穿透问题
//        Shop shop = cacheClient.queryWithPassThrough(
//                CACHE_SHOP_KEY,
//                id,
//                Shop.class,
//                this::getById,
//                CACHE_SHOP_TTL,
//                TimeUnit.SECONDS);

        // 基于自定义封装的Redis缓存工具解决缓存击穿问题【逻辑过期时间】
        Shop shop = cacheClient.queryWithLogicalExpire(
                CACHE_SHOP_KEY,
                id,
                Shop.class,
                this::getById,
                CACHE_SHOP_TTL,
                TimeUnit.MINUTES);

        if (shop == null){
            return Result.fail("店铺不存在");
        }
        return Result.ok(shop);
    }

四、 优惠卷秒杀系列功能实现

4.1 全局ID生成器

4.1.1 全局ID生成器的选型

为什么需要全局ID生成器呢?

        由于秒杀业务需要,有时候我们会生成数量极为庞大的业务数据。但是对于单张表,往往会存在存储数据的上限(随着存储数据的增加,查询的效率会越来越低),我们往往会进行分表操作。

        这么一来,分表带来的 ID不唯一、ID自增的规律性太强导致安全性低等问题就产生了。为了解决这些问题,我们可以利用全局ID生成器进行开发。

全局ID生成器需要具备的特征:

使用Redis符合上述五点需求

高可用、高性能: Redis的查询能力本身就比数据库要快得多

唯一性: 由于Redis数据库可以全局通用一张表,因此可以更好地维护ID的唯一性

递增性:Redis拥有自增操作,INCREMENT

安全性:通过设置ID的自增规律,可以提高ID的安全性

全局ID生成器的格式规范:

4.1.2 全局ID生成器的实现

步骤:

1. 获取一个基准时间戳,标记为BEGIN_TIMESTAMP

2. 获取当前时间戳标记为now

3. 计算全局ID的时间戳部分 

4. 以天为单位,分割key【可以更好的统计每天的订单数量】

5. 获取全局ID的序列号【调用Redis的自增方法】

6. 利用运算符将时间戳 与 序列号 进行拼接并返回

  在utils包下新建RedisWorkers用于实现全局ID生成:

@Component
public class RedisIdWorker {
    // 开始时间戳 2024-09-12-0-0-0
    private static final long BEGIN_TIMESTAMP = 1726099200L;
    // 序列号位数
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public Long nextId(String keyPrefix){
        // 1. 时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond-BEGIN_TIMESTAMP;

        // 2. 序列号
        //2.1 获取当前日期,精确到天
        String date =  now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
        // 3. 拼接返回
        return timestamp << COUNT_BITS | count;
    }

    /**
     * 获取当前时间戳
     * @param args
     */
    public static void main(String[] args) {
        LocalDateTime time = LocalDateTime.of(2024,9,12,0,0,0);
        Long second = time.toEpochSecond(ZoneOffset.UTC); // 1726099200
        System.out.println("second=" + second);
    }
}

4.1.3 全局ID生成器的测试

开启3000个线程任务,每个线程插入100条订单数据,运行查看效果

@SpringBootTest
class HmDianPingApplicationTests {
    @Resource
    private RedisIdWorker redisIdWorker;

    private ExecutorService es = Executors.newFixedThreadPool(500);

    @Test
    void testIdWorker() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3000);
        Runnable task = () ->{
            for(int i=0;i<100;i++){
                long id = redisIdWorker.nextId("order");
                System.out.println("id = " + id);
            }
            latch.countDown();
        };
        long begin = System.currentTimeMillis();
        for(int i=0;i<3000;i++){
            es.submit(task);
        }
        latch.await();
        long end = System.currentTimeMillis();
        System.out.println("time = " + (end-begin));
    }
}

​​​​​​以日期作为key分割,可以统计当天订单数
7秒生成了30万条不重复的全局ID数据

4.1.4 其他ID生成器的拓展

除了采用Redis自增生成全局ID,还有以下方法可行:

1. UUID 【无序】

2. 数据库自增 【数据量不能太多】

3. 雪花算法 (本次项目使用的Redis自增实际上就是运用了雪花算法的思想)

Redis自增策略

1. 每天一个key,方便统计订单量

2. ID构造是 时间戳 + 计数器

4.2  利用PostMan模拟管理员后台添加秒杀优惠卷信息 

常规优惠券表结构
秒杀优惠券表结构

 【代码实现】

/**
     * 新增秒杀优惠券
     * 多表添加 加事务
     * @param voucher
     */
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        // 关联优惠券id
        seckillVoucher.setVoucherId(voucher.getId());
        // 秒杀库存
        seckillVoucher.setStock(voucher.getStock());
        // 开始时间
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        // 结束时间
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
    }

 【PostMan测试】

 4.3 优惠卷秒杀下单功能基础逻辑实现

【功能说明】

实现基本的下单逻辑

接口说明

请求方式

POST
请求路径         /voucher-order/seckill/{id}
请求参数 优惠卷id
返回值         订单Id

【代码实现】

利用全局Id生成订单id

    @Resource
    private ISeckillVoucherService seckillVoucherService;
    /**
     * 全局ID生成器
     */
    @Resource
    private RedisIdWorker redisIdWorker;

    /**
     * 秒杀优惠券下单
     * @param voucherId
     * @return
     */
    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        //1. 提交优惠卷id 查询优惠卷id信息
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        if(voucher == null){
            return Result.fail("优惠券不存在!");
        }

        //2. 判断优惠卷是否在抢购时间内
        LocalDateTime beginTime = voucher.getBeginTime();
        LocalDateTime endTime = voucher.getEndTime();
        long stockCount =voucher.getStock();
        if(beginTime.isAfter(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀尚未开始!");
        }
        if(endTime.isBefore(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀已经结束!");
        }

        //3. 判断优惠卷库存是否充足
        if(stockCount <= 0){
            //   否----> 返回异常信息---->结束
            return Result.fail("库存不足!");
        }

        //4. 扣减库存
        boolean success =  seckillVoucherService.update()
                            .setSql("stock = stock-1")
                            .eq("voucher_id",voucherId).update();
        if(!success){
            return Result.fail("库存不足!");
        }
        //5. 创建优惠卷订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //5.1 订单id 【全局ID生成器】4
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //5.2 用户id 【当前登录用户】
        long userId =  UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        //5.3 优惠卷id 【传递过来的优惠卷id】
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        //6. 返回订单id ----> 结束
        return Result.ok(orderId);
    }

【功能测试】

点击抢购,成功生成订单信息,库存扣减1

4.4 (优化)解决超卖问题

4.4.1 超卖现象重现及其原因

步骤:

        利用jmeter进行多线程模拟高并发抢购优惠卷时的情况

现象:

        发现优惠卷有超卖的现象

原因:

        在多线程高并发访问的环境下,当库存为1时,正常情况下最后一个线程在执行完最后一个扣减操作的过程中,有其余的线程涌入,此时由于扣减操作未完成,库存量还是为1,从而误判库存剩余,最后导致超卖。

 现象回顾

配置请求路径

 配置JSON断言,根据响应的JSON内容更精确的反映请求是否异常

 

配置请求头,token从自己的Redis里面拿

响应报告:

回看数据库报告

        正常来说异常为50%才是正确的,但是这里的异常没有到50%。在高并发访问的过程中出现了超卖现象,即同一时刻两个线程同时完成导致优惠卷最后的数量异常。

异常不是50%
数据库优惠卷变成了-9

4.4.2 解决超卖问题的方案比较

悲观锁方案
  • 悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:synchronizedlock
  • 悲观锁,适合写入操作较多、冲突频繁的场景
乐观锁方案
  • 乐观锁,认为线程安全问题不一定发生,因此不加锁,而是判断有没有其他线程在自己进行操作时修改了数据,有则重试。常见的实现方式有:版本号法、CAS操作.
  • 乐观锁,适合读取操作较多、冲突较少的场景。

4.4.3 乐观锁两大方案比较

 版本号法(涉及ABA问题时使用

        首先,我们要为数据库表新增一个版本号字段 version

        然后,线程开始查询库存及其库存版本号,记录版本号信息。

        接着,在通过判断后,执行扣减时,检查当前版本号与先前记录的版本号信息是否一致,如若一致(例如线程1),则可以顺利执行扣减更新语句。如若不一致(例如线程2),则无法执行扣减更新语句。

CAS法(不涉及ABA问题时使用

        在版本号的基础上,可以发现其实库存量直接可以充当版本号的作用,于是就出现了更加简洁,不用额外添加字段的方法——CAS法。

        尽管CAS操作具有诸多优点,但它也存在一个潜在的问题,即ABA问题。ABA问题是指在CAS操作过程中,一个线程a将数值改成了b,接着又改成了a,此时CAS认为是没有变化,其实是已经变化过了。这个问题在多线程环境下可能会导致程序行为不符合预期,从而引发并发错误。

4.4.4 基于CAS法解决超卖问题

只需要在扣减库存这里加多一个库存值的判断,同时记得如果判断出问题了要重试!

//4. 扣减库存
        boolean success =  seckillVoucherService.update()
                            .setSql("stock = stock-1")
                            .eq("voucher_id",voucherId).eq("stock",voucher.getStock())
                            .update();

上述代码的异常率特别高 

解决失败率高的问题 

 //4. 扣减库存
        boolean success =  seckillVoucherService.update()
                            .setSql("stock = stock-1")
                            .eq("voucher_id",voucherId).gt("stock",0) // 库存大于0就行了
                            .update();

4.5  (优化)实现单机情况下 限购一人一单

高能瞬间:实战篇-07.优惠券秒杀-实现一人一单功能_哔哩哔哩_bilibili

4.5.1 限购功能整体概述

为什么要一人一单?

        对于秒杀优惠卷,我们当然不希望一个人承包大部分甚至所有的优惠卷,这种行径无疑是电商黄牛。我们希望每个顾客至多只能抢购一张券。

限购一人一单实现逻辑?

        在扣减库存之前,我们对优惠卷数据进行查询,查看当前用户(用户id)是否在数据库中已经存在抢购数据,如果有了,则不再给予抢购机会。否则才会进行扣减库存的操作

实现限购功能需要克服的潜在风险?

存在线程并发问题: 假设有100个相同用户标识的线程同时查询数据库,发现数据库没有数据时,100个线程同时通过了判断,结果导致1个用户抢了多单

4.5.2 限购功能实现及说明

【基础实现】

根据前面的概述,我们大概明白了整个功能的逻辑,实际上就是简单的增加一个判断。

//4. 限制一人一单【悲观锁方案】
        Long userId = UserHolder.getUser().getId();

        //4.1 查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        //4.2 判断订单是否存在
        // 是 -----> 返回异常信息---->结束
        if (count > 0) {
            return Result.fail("该用户已经购买过一次了!");
        }
【基础锁实现】

但是单单只添加判断逻辑,还会造成线程并发问题,因此我们需要给判断过程加锁,最简单的方式就是将一人一单功能抽取成一个方法,在方法上添加synchronized关键字

@Transactional
    public synchronized Result createVoucherOrder(Long voucherId) {
        //4. 限制一人一单【悲观锁方案】
        Long userId = UserHolder.getUser().getId();

        //4.1 查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        //4.2 判断订单是否存在
        // 是 -----> 返回异常信息---->结束
        if (count > 0) {
            return Result.fail("该用户已经购买过一次了!");
        }

        //5. 扣减库存——解决超卖问题【乐观锁方案】
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock-1")
                .eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
                .update();
        if (!success) {
            return Result.fail("库存不足!");
        }


        //6. 创建优惠卷订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1 订单id 【全局ID生成器】4
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2 用户id 【当前登录用户】
//        long userId =  UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        //6.3 优惠卷id 【传递过来的优惠卷id】
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        //7. 返回订单id ----> 结束
        return Result.ok(orderId);
    }
【降低锁粒度实现】

以上代码还无法实现全部功能,由于我们将整个方法都锁住了,相当于每个用户进行抢购时,都需要判断该方法是否已经被执行,这样会导致整个抢购秒杀过程退化成串行执行。 而我们想实现的是同一个用户标识线程串行执行,不同用户之间并行执行。因此我们需要降低锁的粒度,将锁作用在用户上。

【关于如何锁用户】

userId.toString().intern()

由于同一个用户标识的不同线程创建的User对象实际上并不是同一个,为了能达成用户级锁,我们必须是按值锁,因此需要调用toString().intern()方法

@Transactional
    public Result createVoucherOrder(Long voucherId) {
        //4. 限制一人一单【悲观锁方案】
        Long userId = UserHolder.getUser().getId();
        
        synchronized (userId.toString().intern()) {
            //4.1 查询订单
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            //4.2 判断订单是否存在
            // 是 -----> 返回异常信息---->结束
            if (count > 0) {
                return Result.fail("该用户已经购买过一次了!");
            }

            //5. 扣减库存——解决超卖问题【乐观锁方案】
            boolean success = seckillVoucherService.update()
                    .setSql("stock = stock-1")
                    .eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
                    .update();
            if (!success) {
                return Result.fail("库存不足!");
            }


            //6. 创建优惠卷订单
            VoucherOrder voucherOrder = new VoucherOrder();
            //6.1 订单id 【全局ID生成器】4
            long orderId = redisIdWorker.nextId("order");
            voucherOrder.setId(orderId);
            //6.2 用户id 【当前登录用户】
//        long userId =  UserHolder.getUser().getId();
            voucherOrder.setUserId(userId);
            //6.3 优惠卷id 【传递过来的优惠卷id】
            voucherOrder.setVoucherId(voucherId);
            save(voucherOrder);

            //7. 返回订单id ----> 结束
            return Result.ok(orderId);
        }
    }
【扩大锁范围实现】

上述代码实现了将锁的粒度成功的缩小到了用户级,但是还不是最优的代码,还存在一定的风险。具体的,由于@Transactional事务是作用在方法上的,而现在我们的锁仅仅是作用在方法内部,这样一来就会存在一种异常情况:当方法内部都执行完毕了,锁会先释放,然后才是Spring自动提交事务。在提交事务完成之前,锁已经释放掉了,这也就意味着在这期间其他的线程完全有机会趁虚而入。为了解决这个问题,我们必须把锁的范围重新扩大到将方法包裹其中。

/**
     * 秒杀优惠券下单
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        //1. 提交优惠卷id 查询优惠卷id信息
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        if(voucher == null){
            return Result.fail("优惠券不存在!");
        }

        //2. 判断优惠卷是否在抢购时间内
        LocalDateTime beginTime = voucher.getBeginTime();
        LocalDateTime endTime = voucher.getEndTime();
        long stockCount =voucher.getStock();
        if(beginTime.isAfter(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀尚未开始!");
        }
        if(endTime.isBefore(LocalDateTime.now())){
            //   否----> 返回异常信息---->结束
            return Result.fail("秒杀已经结束!");
        }

        //3. 判断优惠卷库存是否充足
        if(stockCount <= 0){
            //   否----> 返回异常信息---->结束
            return Result.fail("库存不足!");
        }


        /* 为什么要加在createVoucherOrder方法中?
         * 因为createVoucherOrder方法中已经加了事务,如果加了锁,事务回滚,锁也会释放,这样就会导致锁失效

         * 为什么要指定锁用户id?
         * 将锁的粒度降低,将锁的粒度降低到用户粒度,这样就可以保证一个用户只会有一个订单
         * 添加.intern() 确保是按值加锁  而不是按对象加锁

         * 为什么要使用代理对象?
         * @Transactional注解是Spring的事务注解,它只能在Spring管理的Bean中生效,
         * 如果直接在ServiceImpl中调用createVoucherOrder方法,
         * 那么@Transactional注解就不会生效,
         * 因为createVoucherOrder方法不是在Spring管理的Bean中调用的,
         * 所以需要使用代理对象来调用createVoucherOrder方法,这样@Transactional注解就会生效
         */
        Long userId = UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            return createVoucherOrder(voucherId);
        }
    }
【解决代理对象事务】

上述代码看起来很完美了,但是事实上还是存在问题@Transactional事务机制是失效的。这是由于我们创建的createVoucherOrder()方法并不受Spring管理,所以没有办法使用Spring的事务机制。要解决这个问题,我们需要引入Spring中AspectJ注解支持来暴露代理对象,通过代理对象机制实现事务功能。具体操作如下:

1. 导入AspectJ注解支持依赖

2. 获取createVoucherOrder()方法的代理对象

3. 通过代理对象调用方法

4. 在启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解,暴露代理对象

        <!--动态代理模式-->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
/**
 * 启动类添加@EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象
 *
 */        
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
// 开启AspectJ注解支持 暴露代理对象
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {

    public static void main(String[] args) {
        SpringApplication.run(HmDianPingApplication.class, args);
    }

}



Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
        // 获取代理对象
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
}

4.5.3 限购功能测试检验

        两百个线程共用一个token向后端发送请求,来模拟同一用户高并发抢购的情况

        200个线程只有1个通过了请求,数据库中优惠卷剩余99张,优惠卷订单只有1个。证明我们的功能实现了 

4.6 (拓展)集群模式下限购一人一单 

4.6.1 单体模式下限购功能实现的局限性

        在4.5时,我们成功的完成了限购一人一单的业务功能。事实上还是有相对较大的局限性。由于秒杀业务的高并发性,为了防止单台服务器压力过大的原因,我们会考虑将多台服务器部署成一个集群环境。这时候,我们在单体环境中写的加用户锁方案就有了并发安全问题。

如下图,我们知道。Java提供的锁方案事实上是由JVM下的锁监视器去进行监听的。因此在单体环境中,由于锁监视器唯一,可以正常的执行线程监听任务。但是放在集群环境中,每一个服务器都有一个JVM平台,都有一个锁监视器。在nginx服务器发送轮询请求时,可以有多个线程同时获得锁,同时执行。于是乎限购功能出现了并发安全问题。 

4.6.2 配置集群环境,测试限购代码的并发问题

配置两台Tomcat服务器 

配置nginx服务器轮询访问

并重新启动nginx服务器

测试集群环境

访问:localhost:8080/api/voucher/list/1  两次

8081、8082 各被轮询了1次

4.6.3 解决策略——分布式锁

        解决策略无非是将监视器统一,且必须要在集成环境中唯一共享。分布式锁正是这样一种策略。

        我们将会在下一小结详细介绍分布式锁的相关知识,并使用分布式锁在集群环境下解决并发安全问题

4.7 (知识点)分布式锁 

4.7.1 分布式锁介绍

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的特点:

1、高可用的获取锁与释放锁;

2、高性能的获取锁与释放锁;

3、多进程可见

4、具备可重入特性;

5、具备锁失效机制,防止死锁;

6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

分布式锁的实现方式:

1. 基于数据库的实现:

        使用数据库的事务和锁机制来实现分布式锁。可靠性高,支持事务机制和锁机制,但是性能较低,对数据库性能有一定影响。适用于需要强一致性和高可靠性的场景

2. 基于缓存的实现:

        使用缓存系统(如Redis)的原子操作来实现分布式锁。性能较高,支持原子操作,但是可靠性相对较低,需要处理缓存故障和失效的情况。适用于高并发场景。

3. 基于分布式协调服务的实现:

        使用分布式协调服务(如ZooKeeper、etcd)来实现分布式锁。通过创建临时顺序节点和监听节点变化来控制锁的获取和释放。可靠性高,支持分布式环境,但是性能较低,对分布式协调服务的性能有一定影响。适用于需要高可靠性和一致性的场景。

4. 基于消息队列的实现:

        使用消息队列来实现分布式锁。通过发送和接收消息来控制锁的获取和释放。

优点简单易用,适用于简单的场景。缺点性能较低,不适用于高并发和高吞吐量的场景。

4.7.2 基于Redis的分布式锁方案

获取锁 + 添加过期时间  【且确保原子性操作】

EX 设置过期时间   NX 不存在才添加

SETNX lock thread1 EX 10 NX

释放锁

DEL key

4.7.3 基于Redis实现分布式锁的初级版本

利用Redis,完成基本的分布式锁构建

4.7.3.1 代码实现

创建Lock接口

//父类

public interface ILock {

    /**
     * 获取锁
     * @param timeoutSec
     * @return
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();
}

实现Redis分布式锁方法 

//实现 ILock接口
public class SimpleRedisLock implements ILock{

    private String name;
    private StringRedisTemplate stringRedisTemplate;

    // 锁前缀
    private static final String KEY_PREFIX = "lock:";

    /**
     * 获取锁
     * @param timeoutSec
     * @return
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        long threadId = Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name ,threadId+"",timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

使用分布式锁 

 /*
         *   分布式锁方案
         */
        // 1. 创建锁对象
        SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

        // 2. 尝试获取锁
         boolean isLock =  lock.tryLock(1200);

        // 3. 判断锁是否获取成功
        if(! isLock){
            return Result.fail("不允许重复下单");
        }
        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        } finally {
            // 4. 释放锁
            lock.unlock();
        }
4.7.3.2 接口测试

利用postman进行测试

发送两个请求,获取锁成功,一个获取锁失败,线程Id为72

 

4.7.4 (优化)解决分布式锁出现的误删问题

4.7.4.1 出现误删的原因判断

        线程1获取分布式锁后运行,结果发生了业务阻塞,阻塞时间大于设定的超时释放锁时间。于是在线程1还未执行完业务时,分布式锁已经被释放了。

        这时候线程2获取锁开始执行业务,恰好线程1结束了阻塞,也顺利的结束了业务,于是将锁释放了。这样一来线程2的锁又没有了,线程3恰巧又来了......如此反复,导致线程1释放线程2拿到的的锁;线程2释放线程3拿到的锁,造成并发安全问题

4.7.4.2 解决方案——检查锁是不是自己的:

如何知道这个锁是不是自己的呢?   -----在获取锁时存入线程标识【UUID】

流程:

4.7.4.3 代码实现及其测试

// 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();

// 判断锁是不是自己的
        if(threadId.equals(id)){    // 如果是自己的锁,则释放锁
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
        // 如果不是自己的锁,则不管

 /**
     * 获取锁
     * @param timeoutSec
     * @return
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();

        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name ,threadId,timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁中的标识
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判断锁是不是自己的
        if(threadId.equals(id)){    // 如果是自己的锁,则释放锁
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
        // 如果不是自己的锁,则不管
    }

集群环境测试

获取锁失败
获取锁成功

4.7.5 (优化)解决分布式锁的原子性问题

4.7.5.1 非原子性操作带来的并发安全问题

        在我们完善了Redis分布式锁误删问题后,程序还是没有达到完美状态。以下这种情况依旧存在并发安全问题:

        线程1获取锁执行业务,在执行业务完成后判断当前锁的标识是否和自己的一致,发现一致后,执行释放锁操作。但恰好在执行释放锁操作时出现了阻塞,知道超过了锁的超时释放时间。锁被释放了,于是乎线程2提前拿到了锁,结果线程1阻塞结束,又把线程2刚刚拿到的锁给释放掉了

4.7.5.2 解决方案——引入Rua脚本实现命令原子化

        出现上述问题,究其原因就是 判断锁标识 和 释放锁 是两个动作,不符合原子性。因此若是在两者间发生了阻塞,就会造成并发安全问题。

       所以解决策略也很简单, 将判断锁的操作和释放锁的操作组合成一个原子性操作,一起执行,要阻塞都阻塞,要通过都通过

        在Redis中, 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。

4.7.5.3 Rua脚本在Redis中的基本使用

1. 执行Redis命令

redis.call('命名名称','key','其他参数')

redi.call('set','name','jack')

2. 执行脚本

EVAL "脚本内容" 脚本需要的key类型的参数个数

# 执行无参脚本
EVAL "return redis.call('set','name','jack') " 0

# EVAL执行带参脚本
redis 127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"
4.7.5.4 使用Rua编写释放锁脚本
-- 释放锁的业务
-- 1. 获取锁中的线程标识
-- 2. 判断是否与指定的标识相同
-- 3. 一致则释放锁
----------------------开始脚本-------------------------

-- 锁的key
--local key = "lock:order:5"
local key = KEYS[1]

-- 当前线程标识
--local threadId = "asdasdasdasd"
local threadId = ARGV[1]

-- 获取锁中的线程标识
local id = redis.call("GET", key)

-- 比较锁中的线程标识与当前线程标识是否一致
if(id == threadId) then
    -- 释放锁
    redis.call("DEL", key)
end
return 0
----------------------结束脚本-------------------------
4.7.5.5 在IDEA编写使用lua脚本的代码

在SimpleRedisLock方法内修改:

// 提前读取脚本
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        // 指定脚本位置
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
        // 指定返回类型
        UNLOCK_SCRIPT.setResultType(Long.class);
    }


/**
     * 释放锁 --- 基于lua脚本
     */
    @Override
    public void unlock() {
        /*
         *   调用Lua脚本
         *   参数1: 预加载的lua脚本
         *   参数2: KEYS[]
         *   参数3: ARGV[]
         */
        String key = KEY_PREFIX + name;
        String argv = ID_PREFIX + Thread.currentThread().getId();
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(key),
                argv
        );
    }
4.7.5.6 功能测试

1.启动集群环境,先让8081 获取锁成功

2. 模拟锁超时,将Redis中的锁删掉,让8082获得一把新的锁

3. 让失去锁的8081执行lua脚本,观察8082的锁会不会被清除 

结果:不会被清除

4. 让获得锁的8082执行lua脚本,观察8082的锁会不会被清除 

结果: 被清除了

4.8 (拓展知识点) Redisson ——成熟的锁工具包 

4.8.1 基于setnx实现的分布式锁还存在的问题

1. 不可重入问题:同一个线程无法多次获取相同的一把锁
2. 不可重试问题:获取锁只尝试一次就返回false,没有重试机制
3. 超时释放问题:业务耗时过长,还是有可能导致锁释放,设置超时时间很讲究
4. 主从一致性问题:主从同步存在延迟,如果在主节点设置锁,在还没有同步到从节点时,主节点宕机。
        当然,以上问题指针对部分有需要的业务才会有问题,我们前面完善的Redis + lua脚本的分布式锁方案已经很优秀了

4.8.2 Redisson介绍

官网:Home · redisson/redisson Wiki (github.com)

        Redisson 是一个在 Redis 的基础上实现的一个 Java 驻内存数据网格(In-Memory Data Grid, IMDG)。它提供了丰富的分布式和可扩展的 Java 数据结构,包括分布式锁和同步器。Redisson 的可重试锁(Retryable Lock)是基于 Redis 的特性来实现的一种锁机制,它允许在锁不可用时进行重试,直到成功获取锁或者达到重试次数上限。

4.8.3 Redisson入门

1. 引入依赖

<!--Redisson-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.6</version>
        </dependency>

2. 创建Config文件

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        // 单机模式
        // 集群模式:config.useClusterServers().addNodeAddress("redis://192.168.186.136:7000");
        config.useSingleServer().setAddress("redis://192.168.186.136:6379").setPassword("123321");
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

3. 使用Redisson——以秒杀优惠卷为例

/*
         *   Redisson方案
         */
        //1. 创建锁对象
        RLock lock =  redissonClient.getLock("lock:order:" + userId);

        //2. 尝试获取锁
        boolean isLock = lock.tryLock();

        // 3. 判断锁是否获取成功
        if(! isLock){
            return Result.fail("不允许重复下单");
        }
        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        } finally {
            // 4. 释放锁
            lock.unlock();
        }

4.8.4 Redisson的可重入原理

不可重入锁的基本流程Demo:

        在method1中获取锁成功后,其调用的method2没办法获取锁。在这种嵌套的业务中,不可重入锁具有局限性

如何实现可重入?
添加一个当前获取锁的次数字段   —— hash结构存储

        
        同一个线程内的业务方法在获取锁的时候,发现如果锁存在了就将锁的次数+1,锁的次数就代表了当前获取锁的方法个数。在进行删除锁时,需要先检查锁的重入次数-1 后是否为 0 ,是才能释放锁,否则只做-1操作。

可重入锁的设计流程

可重入锁的脚本实现

---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by good boy.
--- DateTime: 2024/9/15 16:30
---

------------------------获取可重入锁的Lua脚本-------------------------------------
local key = KEYS[1] -- 获取锁的key
local threadId = ARGV[1] -- 获取线程唯一标识
local releaseTime = ARGV[2] -- 获取锁的自动释放时间

-- 判断锁是否存在
    -- 不存在的逻辑
if(redis.call('exists',key) == 0 ) then
    -- 不存在则,获取锁,标记获取锁的次数
    redis.call('hset', key, threadId, '1');
    -- 设置过期时间
    redis.call('expire', key, releaseTime);
    -- 返回获取锁成功
    return 1;
end;

-- 锁已经存在,判断线程标识是不是自己的
    -- 是自己的逻辑
if(redis.call('hexists', key, threadId) == 1) then
    -- 是自己的,则获取锁的次数+1
    redis.call('hincrby', key, threadId, 1);
    -- 重置有效期
    redis.call('expire', key, releaseTime);
    -- 返回获取锁成功
    return 1;
end;

-- 走到这里说明获取到的锁并不是自己的
return 0;




----------------释放可重入锁的Lua脚本-----------------------------
local key = KEYS[1] --获取锁的key
local threadId = ARGV[1] --获取线程唯一标识
local releaseTime = ARGV[2] --获取锁的自动释放时间

-- 判断锁是不是自己的
    -- 锁不是自己的逻辑
if(redis.call('hexists', key, threadId) == 0 ) then
    return nil; -- 锁不是自己的,直接返回nil,表示自己的锁已经被释放了
end;

-- 锁是自己的逻辑
-- 将锁的计数器-1
local count = redis.call('hincrby', key, threadId, -1);

-- 判断锁的计数器是否为0
if(count > 0) then
    -- 大于0,说明还有别的方法获取了锁,不用释放锁,只需要重置过期时间,让其他业务有足够的时间执行
    redis.call('expire', key, releaseTime);
    return nil;
else
    -- 等于0,说明最外层业务已经完成了,可以释放锁
    redis.call('del', key);
    return nil;
end;

4.8.5 (TODO)Redisson的可重试原理

基于看门狗 + 消息订阅模型实现

(看晕了,源码留后面啃)

实战篇-19.分布式锁-Redisson的可重入锁原理_哔哩哔哩_bilibili

4.8.6(TODO) Redisson的超时释放原理

基于看门狗的定时续约机制

        简单来说,在成功获取锁之后,系统会立即启动一个自动续期机制(通常通过scheduleExpirationRenewal(threadId)方法实现),该机制利用一个映射(map)来关联业务名称与定时任务。这个定时任务被设置为每隔一定时间(例如10秒)执行一次,其主要职责是重置锁的最大超时时间。通过递归或循环调用重置锁时间的逻辑,确保锁在业务执行期间不会因为超时而被自动释放,从而实现了锁的“永久持有”效果,直到业务逻辑执行完毕。

当业务逻辑执行完成并显式释放锁时,系统会同时取消之前设置的定时任务,以避免不必要的资源消耗。这种设计确保了锁的持有与业务执行周期紧密相关,既提高了系统的灵活性,又增强了资源使用的效率。

实战篇-20.分布式锁-Redisson的锁重试和WatchDog机制_哔哩哔哩_bilibili

4.8.7 Redisson的主从一致性原理

Redis主从集群模式造成一致性问题的原因

        Java应用发送命令到主节点,主节点崩溃,Redis哨兵机制选取更新最近主节点的从节点,将其变成新的主节点。但是还是存在先前存储的命令失效问题

Redisson解决主从一致性问题方案——连锁方案

        取消单主多从方案,采用高可用集群 + 主从分布

4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题

实战篇-21.分布式锁-Redisson的multiLock原理_哔哩哔哩_bilibili

太吃电脑性能了,得晚点实现,或者等上docker

4.9 Redis优化秒杀系列功能实现 

4.9.1 业务性能瓶颈分析

优化前的方案

        业务耗时为Tomcat六步总和,其中查询优惠卷、查询订单、减库存、创建订单都去数据库查询,效率慢。此外,目前我们还没实现MySQL集群,就更慢了

优化后的方案

        将校验秒杀资格的两个环节提前到Redis异步执行,从而减少处理的环节,提高整体效率。

4.9.2 优化流程分析

Redis部分——lua脚本实现     

        1. 判断秒杀库存------使用String结构存储
        2. 校验一人一单------使用Set结构存储userId

Java部分

        1. 判断lua脚本执行结果
        2. 返回“小票”,
优惠卷id、用户id、订单id

        

4.9.3 改进秒杀业务代码改造及详细解释

1. 将优惠卷信息存入Redis中
/**
     * 新增秒杀优惠券
     * 多表添加 加事务
     * @param voucher
     */
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);

        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        // 关联优惠券id
        seckillVoucher.setVoucherId(voucher.getId());
        // 秒杀库存
        seckillVoucher.setStock(voucher.getStock());
        // 开始时间
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        // 结束时间
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);

        // 保存库存到Redis
        stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
    }
添加优惠卷
将优惠卷保存到Redis
2. 基于Lua脚本,判断秒杀资格
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by good boy.
--- DateTime: 2024/9/15 22:30
---

-----------判断秒杀资格的lua脚本---------------------------------
local voucherId = ARGV[1]   -- 优惠卷id
local userId = ARGV[2]      -- 用户id

local stockKey = 'seckill:stock:' .. voucherId  -- 库存key
local orderKey = 'seckill:order:' .. voucherId  -- 订单key

-- 脚本开始
-- 1. 判断库存是否充足
if( tonumber(redis.call('get', stockKey))  <=0) then
    -- 库存不足返回1
    return 1
end

-- 2. 判断用户是否下单
if( redis.call('sismember', orderKey, userId) == 1 ) then
    -- 用户已经下单返回2
    return 2
end
-- 3. 扣减库存 incrby -1
redis.call('incrby', stockKey, -1)

-- 4. 下单(保存用户)
redis.call('sadd', orderKey, userId)

-- 5. 返回0表示下单成功
return 0

/**
     * 秒杀优惠券下单------秒杀优化代码----lua脚本
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 获取用户
        Long userId = UserHolder.getUser().getId();

        //1.执行Lua脚本
        Long result = stringRedisTemplate.execute(
                        SECKILL_SCRIPT,
                        Collections.emptyList(),
                        voucherId.toString(),userId.toString()
        );
        //2.判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //3.不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
        }

        long orderId =  redisIdWorker.nextId("order");
        //TODO 4.为0,代表有购买资格,将下单信息保存至阻塞队列

        //5.返回订单id
        return Result.ok(orderId);
    }
测试基本功能:第一次请求成功
第二次请求失败

3. 抢购成功,封装信息到阻塞队列中,实现异步下单

        调用创建订单逻辑,首先会进入到seckillVoucher秒杀优惠卷,进入seckillVouvher方法后,首先调用Lua脚本去判断该用户请求是否具备秒杀资格,如果具备秒杀资格,则创建订单信息,并将订单信息保存到阻塞队列中去.

    /**
     * 预加载lua脚本
     */
    private static DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/Seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);

    }
    /**
     * 秒杀优惠券下单------秒杀优化代码----lua脚本---主线程
     * @param voucherId
     * @return
     */
    private IVoucherOrderService proxy;
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 获取用户
        Long userId = UserHolder.getUser().getId();

        //1.执行Lua脚本
        Long result = stringRedisTemplate.execute(
                        SECKILL_SCRIPT,
                        Collections.emptyList(),
                        voucherId.toString(),userId.toString()
        );
        //2.判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //3.不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
        }

        long orderId =  redisIdWorker.nextId("order");

        //4.为0,代表有购买资格,将下单信息保存至阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);

        // 放入阻塞队列
        orderTasks.add(voucherOrder);
        //提前 获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();

        //5.返回订单id
        return Result.ok(orderId);
    }

        注意标号顺序,我们提前定义了阻塞队列和线程池,并将线程任务放到初始化方法中,只要程序一启动,就会执行init()中的方法。
        当seckillVoucher方法执行成功将订单信息存储到阻塞队列中后。只要阻塞队列有值,我们就会通过线程任务方法VoucherOrderHandler()方法取出队首元素,并且调用handleVocherOrder()方法创建订单。
        进入到handleVocherOrder()方法,我们再次判断是否正常,判断通过后调用创建订单方法createVoucherOrder()
        由于该方法添加了事务,在子线程中事务及其代理对象都失效了,为了能继续使用代理对象进行调用,我们在主方法seckillVoucher()中提前声明并赋值了代理对象,这样在子线程任务中就可以获取到代理对象执行创建订单的方法了。
        进入到最后的创建订单方法createVoucherOrder() ,首先对一人一单、库存超卖问题再次进行判断,均通过以后创建订单。
通过上述步骤实现了异步创建订单,提高了并发效率

//1. 创建-- 阻塞队列
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    //2,创建-- 秒杀线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //5. 初始化方法  一初始化就执行
    @PostConstruct
    public void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //4. 创建-- 订单外部类
    private void handleVocherOrder(VoucherOrder voucherOrder){
        // 获取用户
        Long userId = voucherOrder.getUserId();

        // 1. 创建锁对象
        RLock lock =  redissonClient.getLock("lock:order:" + userId);

        //2. 尝试获取锁
        boolean isLock = lock.tryLock();

        // 3. 判断锁是否获取成功
        if(! isLock){
            log.error("不允许重复下单");
        }
        try {
             proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 4. 释放锁
            lock.unlock();
        }
    }

    //3. 创建-- 秒杀线程任务
    private class VoucherOrderHandler implements Runnable{

        @Override
        public void run() {
            while (true) {
                try {
                    //1. 获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    //2. 创建订单
                    handleVocherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("获取订单异常",e);
                }
            }
        }
    }
/**
     * 秒杀优惠券下单------秒杀优化代码----创建订单
     * @param voucherOrder
     */
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        //4. 限制一人一单【悲观锁方案】
        Long userId = voucherOrder.getUserId();

        //4.1 查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        //4.2 判断订单是否存在
        // 是 -----> 返回异常信息---->结束
        if (count > 0) {
            log.error("用户已经购买了一次了");
        }

        //5. 扣减库存——解决超卖问题【乐观锁方案】
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock-1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // 库存大于0就行了
                .update();
        if (!success) {
            log.error("库存不足");
        }

        //6. 创建订单
        save(voucherOrder);
    }

4.9.4 Redis优化秒杀功能测试

1. 简单测试功能是否实现,使用postman发送请求

【错误提醒】千万不要把Redis中的  seckill:stock:11 删掉啊!这个优惠卷信息是我们创建优惠卷时顺便存进去的,你要是将它清除了,后面的测试就进行不了了,lua脚本会报nil错误的(俺排了大半个小时)

第一次请求成功
第二次请求失败
2. 利用jmeter测试高并发情况

这里我生成了1000个token的文件,实际上是10个token重复了100次,用于模拟1000个请求(10个用户),同时请求jmeter的情况。结果和视频中做出来的不太一样,但是因为没有1000不同用户,所有我也没办法求证我的程序性能到底得到优化没有

准备10个用户token

优惠卷改成了5张

1000个token

准备1000个token的文件
配置基本路径信息
配置请求头读取
配置读取文件

开启测试

测试1s内发送1000个请求 10个不同用户token,抢购5张券

QPS:379,平均耗时:1076
没有出现超卖现象

测试 1s内发送100个请求10个用户 抢5张券

吞吐:595,平均耗时:39

测试5秒内 发送10000个请求 10个不同用户token 抢5张券

吞吐:621,平均耗时:1934

测试1秒内 发送10000个请求 10个不同用户token 抢5张券【有点不行】

吞吐:934,平均耗时:4308

4.9.5 总结

简单来说,就是Java阻塞队列还不够好

4.10(知识点) 学习Redis消息队列实现异步秒杀

消息队列(Message Queue):存放消息的队列。最简单的消息队列模型包括3个角色:

        消息队列:存储和管理消息,也被称为消息代理(Message Broker)
        生产者:发送消息到消息队列
        消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列:
        list结构:基于List结构模拟消息队列
        PubSub:基本的点对点消息模型
        Stream:比较完善的消息队列模型

基础的消息队列模型

4.10.1 基于List类型模拟的消息队列原理及其使用

        在Day2中我就总结过Redis中各个数据结构使用的业务场景,其中基于双向链表实现的list结构则可以用于模拟消息队列的功能(Redis学习Day2——Redis基础使用-CSDN博客

实现原理

  • 使用 LPUSH + RPOP 或 RPUSH + LPOP 实现消息队列
  • 使用 BLPOP 或 BRPOP 实现带有阻塞效果的消息队列

实现效果

 基于List结构实现的消息队列比较简单,只支持单消费者的模式

单消费者

优点

  • 相比于java实现的阻塞队列,不会受限于JVM的存储上限,没有未知的内存溢出风险
  • 属于Redis的基础数据结构,可以实现持久化存储,数据安全性也有所保障
  • List队列,先进先出,消息处理有序

缺点

  • 消息丢失无法避免
  • 只支持单消费者模式

缺点说明

        假设从Redis中取出一条消息,但是还没来得及处理就挂掉了,结果等到恢复正常后,刚从队列中取出来的消息已经不可复现了,因此存在消息丢失的风险。

        由于消息一旦被某个消费者拿走了,就会从消息队列中移除,因此该模式仅仅支持单消费者

常用命令

  • 阻塞监听
  • 存放消息

4.10.2  基于PubSub的消息队列原理及其使用

        PubSub(发布订阅)Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

实现原理

  • 使用 SUBSCRIBE channel [channel] :订阅一个或多个频道
  • 使用 PUBLISH channel msg :向一个频道发送消息
  • 使用 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

实现效果

发布者 + 订阅者模式,也就是说,可以实现多消费者的消息队列模型

多消费者

优点

  • 采用发布订阅模型,支持多生产者、多消费者的模型

缺点

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出是数据丢失

缺点说明

        PubSub本身设计出来是用于消息发送的,不具备存储功能,因而没有持久化策略。当某个频道没有任何人订阅,在该频道发送的数据直接丢失了。此外PubSub发送的信息过多未处理完容易造成堆积丢失。因此PubSub不适用于可靠性要求高的场景。

常见命令使用

  • 订阅单个频道【自带阻塞】
  • 订阅多个频道
  • 发布消息

 4.10.3(三者之最) 基于Stream的消息队列原理及其使用

Stream是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

主要特征

  • 数据结构:Redis Stream是一个由有序消息组成的日志数据结构,每个消息都有一个全局唯一的ID,确保消息的顺序性和可追踪性。

  • 消息ID:消息的ID由两部分组成,分别是毫秒级时间戳和序列号。这种设计确保了消息ID的单调递增有序性。

  • 消费者组:Redis Stream新增消费者组的概念,允许多个消费者以组的形式订阅Stream,并且每个消息只会被组内的一个消费者处理,避免了消息的重复消费

​​​

优点

  • 消息可回溯,可被多个消费者读取,可阻塞读取
  • 消息读完了不消失,会永久的保持在队列当中。

缺点

  • 任然有消息漏读的风险,在处理消息的过程中,如果同时来了多条消息,最后可能只能读到最后一条新消息,从而造成了消息漏读

 常见命令使用

  • 发送消息 —— XADD
  • 读取消息—— XREAD
    • 读取第一条消息,且可重复读
    • 读取最新消息
    • 读取并阻塞等待最新消息
    • BUG——漏读消息

基于Stream的消息队列——消费者组

消费者组(Consumer Group)

将多个消费者划分到一个组中,监听同一个队列。具备下列的特点:

        消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度

        消息标识消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费

        消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。【解决消息丢失问题】

消费者组常见命令

  • 创建消费者组——XGROUP CREATE 
  • 删除消费者组——XGROUP DESTORY
  • 添加消费者到消费组——XGROUP  CREATECONSUMER
  • 将消费者移除消费组——XGROUP  DELCONSUMER
  • 从消费者组读取信息——XREADGROUP 

4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能

步骤说明

①创建一个Stream类型的消息队列,名为stream.orders
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

4.10.4.1 创建Stream消息队列

 1. 登录redis

2. 创建队列和消费者组

4.10.4.2 修改秒杀资格判断的Lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by good boy.
--- DateTime: 2024/9/16 20:32
---

-----------判断秒杀资格的lua脚本---------------------------------
local voucherId = ARGV[1]   -- 优惠卷id
local userId = ARGV[2]      -- 用户id
local orderId = ARGV[3]    -- 订单id

local stockKey = 'seckill:stock:' .. voucherId  -- 库存key
local orderKey = 'seckill:order:' .. voucherId  -- 订单key

-- 脚本开始
-- 1. 判断库存是否充足
if( tonumber(redis.call('get', stockKey))  <=0 ) then
    -- 库存不足返回1
    return 1
end

-- 2. 判断用户是否下单
if( redis.call('sismember', orderKey, userId) == 1 ) then
    -- 用户已经下单返回2
    return 2
end
-- 3. 扣减库存 incrby -1
redis.call('incrby', stockKey, -1)

-- 4. 下单(保存用户)
redis.call('sadd', orderKey, userId)

-- 5. 发送消息到队列中 XADD [队列名]stream.orders * k1 v1 k2 v2 ...
redis.call('XADD','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)

-- 6. 返回0
return 0

4.10.4.3 完善秒杀下单代码及其代码说明
1. 首先是修改预加载Lua脚本信息
/** 方案二、三公共代码
     * 预加载lua脚本
     */
    private static DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        // 这是第二种方案需要执行的lua脚本
        // SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/seckill.lua"));
        // 这是第三种方案需要执行的lua脚本
        SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/streamSeckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);

    }
2. 主程序改变逻辑,将消息队列逻辑放入到了lua脚本
    /**
     *  秒杀优惠券下单------秒杀优化代码----lua脚本---主线程---使用Redis stream的消息队列完成的
     */
    private IVoucherOrderService proxy;
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 获取用户
        Long userId = UserHolder.getUser().getId();
        // 获取订单id
        long orderId =  redisIdWorker.nextId("order");

        //1.执行Lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),userId.toString(),String.valueOf(orderId)
        );
        //2.判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //3.不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
        }

        //5.返回订单id
        return Result.ok(orderId);
    }
3. 重头戏,开启线程任务!!

【注意特意标明】【黑马点评】已解决java.lang.NullPointerException异常-CSDN博客

项目启动时,init()方法执行,开启线程任务VoucherOrderHandler()方法。

进入到VoucherOrderHandler()方法,我们不断循环尝试去去消息队列中的信息,

        获取成功执行handleVocherOrder()方法创建订单

        获取失败: 说明没有消息 ---->继续循环

        出现异常执行handlePendingList()方法处理PendingList的异常

handleVocherOrder()方法和之前java阻塞队列方案写法一致,不过多赘述。

进入到handlePendingList()方法后,一样循环获取PendingList中的消息

        获取成功: 那就反过来调用handleVocherOrder()方法,执行订单创建

        获取失败: 说明Pending List没有消息 ---->结束循环

        出现异常休眠一段时间后自动回到VoucherOrderHandler()方法中的下一次循环

// 1,创建-- 秒杀线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //2. 初始化方法  一初始化就执行
    @PostConstruct
    public void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //3. 创建线程任务用于接收消息队列的信息
    private class VoucherOrderHandler implements Runnable{

        // 消息队列名称
        private String queueName = "stream.orders";


        @Override
        public void run() {
            while (true) {
                try{
                    //1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.oredes >
                    // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
                    List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //2. 判断消息获取是否成功
                    if( list == null || list.isEmpty()){
                        //2.1 获取失败 说明没有消息 ---->继续循环
                        continue;
                    }
                    // 解析消息中的订单信息
                    MapRecord<String,Object,Object> record = list.get(0);
                    //  获取键值对集合
                    Map<Object,Object> values = record.getValue();
                    // 获取订单信息
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);

                    //3. 获取成功,执行订单创建
                    handleVocherOrder(voucherOrder);

                    //4. ACK确认  SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());

                }catch (Exception e) {
                    // 消息没有被ACK确认 进入Pending List
                    log.error("订单处理出现异常",e);
                    handlePendingList();
                }
            }
        }

        // 5.取不到订单————— 处理Pending List中的订单信息
        private void handlePendingList(){
            while (true) {
                try {
                    //1. 获取Pending List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1  STREAMS stream.oredes 0
                    // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //2. 判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        //2.1 获取失败 说明Pending List没有消息 ---->结束循环
                        break;
                    }

                    // 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    //  获取键值对集合
                    Map<Object, Object> values = record.getValue();
                    // 获取订单信息
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);

                    //3. 获取成功,执行订单创建
                    handleVocherOrder(voucherOrder);

                    //4. ACK确认  SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());

                } catch (Exception e) {
                    log.error("Pending List订单处理出现异常", e);
                    try {
                        Thread.sleep(20);
                    }catch (InterruptedException interruptedException){
                        interruptedException.printStackTrace();
                    }
                }
            }
        }
    }


        // 4. 取到了订单—————创建订单
        private void handleVocherOrder(VoucherOrder voucherOrder){
            // 获取用户
            Long userId = voucherOrder.getUserId();

            // 1. 创建锁对象
            RLock lock =  redissonClient.getLock("lock:order:" + userId);

            //2. 尝试获取锁
            boolean isLock = lock.tryLock();

            // 3. 判断锁是否获取成功
            if(! isLock){
                log.error("不允许重复下单");
            }
            try {
                proxy.createVoucherOrder(voucherOrder);
            } finally {
                // 4. 释放锁
                lock.unlock();
            }
        }

4.10.4.4 秒杀下单功能测试
1. 简单测试功能是否实现,使用postman发送请求

2. 利用jmeter测试高并发情况

老规矩10个token

这次我们先测试一下 10个线程请求 10个不同用户 抢5张卷的情况

测试一下 1000个线程请求 10个不同用户 抢5张卷的情况

测试一下 10000个线程请求 10个不同用户 抢5张卷的情况

吞吐669

4.11 总结情况

        思路好像跟着来的,但是耗时和吞吐量始终是上不去,我上网看了看大家做的情况,好像都没有黑马一千多甚至两千的吞吐量。我感觉其实stream实现的消息队列相比java实现的阻塞队列性能上其实也没有优化到哪去,只是更加灵活可靠了吧。

五、达人探店系列功能实现

 5.1 发布和查看探店笔记

5.1.1 发布探店笔记

        这块代码黑马已经完成了,在发布探店笔记界面,有两块内容是需要上传的。一是笔记内容,二是笔记配图。其中笔记配图部分黑马使用的是上传到本地前端服务器上面的。我我觉得可以将图片文件发布在阿里云的OSS存储也行,该功能等后续会完成。等黑马点评后端部分完成后、再去分析黑马写的前端代码。然后再将该功能实现。

文件上传功能实现

在系统常量中修改文件上传的地址(改成你自己的)

// 图片上传路径
    public static final String IMAGE_UPLOAD_DIR = "D:\\software\\hm-dianping-nginx\\nginx-1.18.0\\html\\hmdp\\imgs";
@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {

    @PostMapping("blog")
    public Result uploadImage(@RequestParam("file") MultipartFile image) {
        try {
            // 获取原始文件名称
            String originalFilename = image.getOriginalFilename();
            // 生成新文件名
            String fileName = createNewFileName(originalFilename);
            // 保存文件
            image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
            // 返回结果
            log.debug("文件上传成功,{}", fileName);
            return Result.ok(fileName);
        } catch (IOException e) {
            throw new RuntimeException("文件上传失败", e);
        }
    }


    private String createNewFileName(String originalFilename) {
        // 获取后缀
        String suffix = StrUtil.subAfter(originalFilename, ".", true);
        // 生成目录
        String name = UUID.randomUUID().toString();
        int hash = name.hashCode();
        int d1 = hash & 0xF;
        int d2 = (hash >> 4) & 0xF;
        // 判断目录是否存在
        File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));
        if (!dir.exists()) {
            dir.mkdirs();
        }
        // 生成文件名
        return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);
    }
笔记发布功能实现 
    @PostMapping
    public Result saveBlog(@RequestBody Blog blog) {
        // 获取登录用户
        UserDTO user = UserHolder.getUser();
        blog.setUserId(user.getId());
        // 保存探店博文
        blogService.save(blog);
        // 返回id
        return Result.ok(blog.getId());
    }
发布功能测试

5.1.2 查看探店笔记

        添加TableField注解表示这三个字段不是数据库中的字段。我们查看探店笔记时,需要将创作者的信息也显示出来,但是又不能显示过多暴露,只需要有头像、姓名之类的就好了。

/**
     * 用户图标
     */
    @TableField(exist = false)
    private String icon;
    /**
     * 用户姓名
     */
    @TableField(exist = false)
    private String name;
    
查看探店笔记代码实现
/**
     * 根据id查询探店笔记
     * @param id
     * @return
     */
    @GetMapping("/{id}")
    public Result queryBlogById(@PathVariable("id") Long id) {
        return blogService.queryBlogById(id);
    }


/**
     * 根据id查博客笔记
     * @param id
     * @return
     */
    @Override
    public Result queryBlogById(Long id) {
        //1. 查询blog
        Blog blog = getById(id);
        if(blog==null){
            return Result.fail("博客不存在");
        }
        //2.查用户
        queryBlogUser(blog);
        return Result.ok(blog);
    }

    /**
     * 复用方法封装
     * @param blog
     */
    private void queryBlogUser(Blog blog){
        Long userId = blog.getUserId();
        User user = userService.getById(userId);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }
查看功能测试

5.2 点赞功能实现

5.2.1 当前点赞功能存在的问题

不校验点赞用户信息,一个人可以无限刷赞

    /**
     * 修改点赞数量
     * @param id
     * @return
     */
    @PutMapping("/like/{id}")
    public Result likeBlog(@PathVariable("id") Long id) {
        // 修改点赞数量
        // update tb_blog set liked liked + 1 where id = #{id}
        blogService.update()
                .setSql("liked = liked + 1").eq("id", id).update();
        return Result.ok();
    }

5.2.2 完善点赞功能需求与实现步骤

需求:

1. 同一用户只能点赞一次,再次点击即取消点赞

2. 如果当前用户已经点赞,则点赞按钮高亮显示(isLike 告知前端即可) 

步骤:

1. 给Blog类添加一个isList字段,标记当前用户是否点赞

2. 修改点赞功能,利用Redis的set集合特性存储当前笔记的点赞用户id,用于判断该用户是否给笔记点过赞,为点过则赞 +1 ,否则赞-1

3. 在根据id查询Blog业务时,就判断当前登录用户有无点赞记录,赋值给isList字段.

4. 在分页查询Blog业务时,也去判断并赋值

5.2.3 点赞功能实现

添加一个isList字段

    /**
     * 是否点赞过了
     */
    @TableField(exist = false)
    private Boolean isLike;

修改点赞功能

/**
     * 点赞
     * @param id
     * @return
     */
    @Override
    public Result likeBlog(Long id) {
        //1.判断当前用户有没点赞
        Long userId = UserHolder.getUser().getId();
        String key = RedisConstants.BLOG_LIKED_KEY + id;
        Boolean isMember =  stringRedisTemplate.opsForSet().isMember(key,userId.toString());
        if(BooleanUtil.isFalse(isMember)){
            //2.更新数据库信息 点赞+1
            boolean isSuccess =  update().setSql("liked = liked + 1").eq("id",id).update();
            //3. 保存用户到Redis的set集合
            if(isSuccess){
                stringRedisTemplate.opsForSet().add(key,userId.toString());
            }
        }else{
            //4. 点赞-1
            boolean isSuccess =  update().setSql("liked = liked - 1").eq("id",id).update();
            //5。 移除Redis
            if(isSuccess){
                stringRedisTemplate.opsForSet().remove(key,userId.toString());
            }
        }
        return Result.ok();
    }

添加判断功能

    /**
     * 查询点赞状态
     * @param blog
     */
    private void isBlogLiked(Blog blog) {
        Long userId = UserHolder.getUser().getId();
        String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
        Boolean isMember =  stringRedisTemplate.opsForSet().isMember(key,userId.toString());
        blog.setIsLike(BooleanUtil.isTrue(isMember));
    }



/**
     * 查多个
     * @param current
     * @return
     */
    @Override
    public Result queryHotBlog(Integer current) {
     //    根据用户查询
        Page<Blog> page = query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        // 查询用户
        records.forEach(blog -> {
            this.queryBlogUser(blog);
            // 查询点赞状态
            this.isBlogLiked(blog);
        });
        return Result.ok(records);
    }

    /**
     * 根据id查博客笔记
     * @param id
     * @return
     */
    @Override
    public Result queryBlogById(Long id) {
        //1. 查询blog
        Blog blog = getById(id);
        if(blog==null){
            return Result.fail("博客不存在");
        }
        //2.查用户
        queryBlogUser(blog);

        //3. 查询点赞状态
        isBlogLiked(blog);

        return Result.ok(blog);
    }

5.2.4 点赞功能测试

 5.3 点赞排行榜功能实现

5.3.1 数据结构选型说明

需求是能排序且去重的排行榜功能,因此我们选用zset集合来实现这些需求

5.3.2 排行功能实现

修改点赞及点赞状态逻辑

/**
     * 查询点赞状态
     * @param blog
     */
    private void isBlogLiked(Blog blog) {
        Long userId = UserHolder.getUser().getId();
        String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
        Double score =  stringRedisTemplate.opsForZSet().score(key,userId.toString());
        blog.setIsLike(score != null);
    }

    /**
     * 点赞
     * @param id
     * @return
     */
    @Override
    public Result likeBlog(Long id) {
        //1.判断当前用户有没点赞
        Long userId = UserHolder.getUser().getId();
        String key = RedisConstants.BLOG_LIKED_KEY + id;
        Double score =  stringRedisTemplate.opsForZSet().score(key,userId.toString());
        if(score == null){
            //2.更新数据库信息 点赞+1
            boolean isSuccess =  update().setSql("liked = liked + 1").eq("id",id).update();
            //3. 保存用户到Redis的set集合
            if(isSuccess){
                stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
            }
        }else{
            //4. 点赞-1
            boolean isSuccess =  update().setSql("liked = liked - 1").eq("id",id).update();
            //5。 移除Redis
            if(isSuccess){
                stringRedisTemplate.opsForZSet().remove(key,userId.toString());
            }
        }
        return Result.ok();
    }

实现排行榜功能

ZRANGE 查询范围内的元素

/**
     * 查询点赞排行榜
     * @param id
     * @return
     */
    @Override
    public Result queryBlogLikes(Long id) {
        // 使用zrange查询TOP5
        Set<String> top5StrIds = stringRedisTemplate.opsForZSet().range(RedisConstants.BLOG_LIKED_KEY + id, 0, 4);
        if(top5StrIds==null || top5StrIds.isEmpty()){
            return Result.ok(Collections.emptyList());
        }
        List<Long> ids = top5StrIds.stream().map(Long::valueOf).collect(Collectors.toList());

        // 根据用户id查询数据库
        List<User> users = userService.listByIds(ids);

        // DTO
        List<UserDTO> userDTOList = users
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());

        return Result.ok(userDTOList);
    }

5.3.3 排行功能测试

优化

发现查询点赞排行榜的顺序不正确——原因:数据库in使用时,无法保障顺序

添加ORDER BY FIELD 自定义顺序
 

 // 根据用户id查询数据库
        // 自定义sql查询
        List<User> users = userService.query().in("id", ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();

 5.4 查看用户笔记列表功能实现

在点击用户头像进入首页时,可以查询该用户的博客列表进行展示

博客列表
查看博客列表

/**
     * 根据用户id查询探店笔记列表
     * @param current
     * @param id
     * @return
     */
    @GetMapping("/of/user")
    public Result queryBlogByUserid(
            @RequestParam(value = "current",defaultValue = "1") Integer current,
            @RequestParam("id") Long id) {
        // 根据用户查询
        Page<Blog> page = blogService.query()
                .eq("user_id", id)
                .page(new Page<>(current,SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        return Result.ok(records);
    }


网站公告

今日签到

点亮在社区的每一天
去签到