12 数据一致性&自定义starter封装缓存操作
12.1 缓存和数据库的数据一致性
使用缓存的主要目的是为了提升查询的性能。大多数情况下,使用缓存的过程如下图:
- 当应用程序需要从数据库读取数据时,先检查缓存数据是否命中;
- 如果缓存未命中,则查询数据库数据,同时将数据写到缓存中,以便后续读取相同的数据会命中缓存,最后在把数据返回给调用者;
- 如果缓存命中,直接返回;
缓存不一致产生的原因:
- A线程修改数据库,接着修改缓存;
- 在A线程执行操作期间,B线程来读取缓存中数据,此时如果A线程还未将数据同步到缓存,那么B线程读到的就是缓存的旧数据;
- A将更新后的数据同步到缓存会有一个时间差,因此在并发读写的时候可能就会出现缓存不一致的问题;
写数据的流程无非以下四种:
- 先更新缓存在更新数据库
- 先删除缓存在更新数据库
- 先更新数据库在更新缓存
- 先更新数据库在删除缓存
对于缓存,大多还是选择删除,为什么?因为相比于删除缓存,计算机更新缓存的成本更高
那么是先操作缓存?还是先操作数据库?
先删除缓存,再更新数据库
- 在多线程情况下,当A线程把缓存删除后,B线程过来读缓存;
- 但A已经把缓存删除了,所以B线程会因为缓存未命中就会直接去读数据库,然后将读到的数据去更新缓存;
- 此时A线程才来更新数据库,这就造成了缓存脏数据的情况;
- 而且,如果不采用给B更新到缓存中的数据设置过期时间,那么该数据永远都是脏数据;
通过2pc、Paxos算法或者分布式锁保持一致性,即在A将“删除缓存,更新数据库”这一套操作执行完后,其它线程才能来查询缓存和数据库;
但是这可能会影响系统吞吐量、增加系统响应时间,因此通常采用相对宽松的一致性方法,称为最终一致性;
先更新数据库,再删除缓存
- 在多线程情况下,当A线程直接去更新数据库,B线程读取缓存数据,此时B读到的是缓存的旧数据;
- A线程更新完数据库后就会更新缓存,这时缓存就正常了,只产生了一次脏读;
最终一致性
延迟双删
- 对于“先删除缓存,再更新数据库”的方式,可以用延迟双删来解决,即:先删除缓存,再更新数据库,休眠一段时间后,再删除缓存;
- 对于“先更新数据库,再删除缓存”的方式,也可以使用延迟双删,防止第一次删除缓存失败;
- 休眠时间怎么确定呢?需要自行压测项目读取数据的业务逻辑耗时(即第二个线程从数据库读取数据然后完成写入缓存),防止二次删除不起作用;
删除缓存的重试机制
如果缓存删除失败怎么办?比如延迟双删的第二次删除缓存失败,那岂不是无法删除脏数据?
可以启用删除缓存的重试机制,以保证删除缓存成功;
在高并发下,重试最好使用异步方式,比如发送消息到 MQ 中实现异步解耦;
该方案有个缺点,就是会对业务代码造成侵入, 那么可以启动一个专门订阅数据库 binlog 的服务去读取需要删除的数据然后进行缓存删除操作;
读取 binlog 异步删除
Canal 是阿里的一款开源框架,主要用途是基于 MySQL 数据库的增量日志解析,提供增量数据订阅和消费,且 Canal 提供了各种语言的客户端,当Canal监听到 binlog 变化时,会通知 Canal 的客户端;
可以利用 Canal 提供的 Java 客户端,监听 Canal 的通知消息。当收到数据变化的消息时,完成对缓存的更新;
- 更新数据库
- 数据库会把操作信息记录在 binlog 日志中
- Canal 订阅了 binlog 日志,在数据库发生变动时,会获取目标数据和key,并通知缓存处理服务
- 缓存处理服务获取到 Canal 发来的数据,解析得到目标 key,尝试删除缓存
- 如果缓存处理服务处理删除缓存失败,异步发送 key 给 MQ,缓存处理服务或者其他服务会订阅队列的key,继续重试删除缓存
12.2 Canal
12.2.1 简介
Canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费;
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务;
GitHub:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
特点:
- 高性能:Canal 采用了基于网络协议的方式来解析和同步 MySQL 的增量日志,相较于数据库级别的触发器或轮询方式,可以提供更高的同步性能;
- 支持多种数据格式:Canal 可以将 MySQL 的增量日志解析为多种数据格式,包括 JSON、XML 等,方便用户进行二次开发和数据处理;
- 多种同步方式:Canal 支持多种同步方式,包括基于缓存、MQ、HTTP 接口等多种方式,可以根据业务需求选择不同的同步方式;
- 灵活的订阅机制:Canal 支持灵活的订阅机制,可以根据表、库、列等维度进行精确的订阅,同时也支持动态增加和删除订阅;
- 多种部署方式:Canal 可以在单机、集群等多种环境下进行部署,同时也支持 Docker 容器化部署;
- 易于扩展:Canal 采用了插件化的设计,支持用户自定义插件,可以方便地扩展新的功能;
总的来说,Canal 是一款功能强大、性能高效、易于使用、可扩展的数据同步工具,被广泛应用于阿里巴巴和其他企业的数据同步场景中。
12.2.2 工作原理
Canal 的工作原理是将自己伪装成 Mysql 的 slave 节点(即从节点),来订阅 MySQL binlog 的变更,所以在配置启动 Canal 前,需要先配置 MySQL;
MySQL 主从复制原理
- MySQL master 将数据变更写入二进制日志(即binary log),其中的记录叫做二进制日志事件 binary log events,可以通过
show binlog events
进行查看; - MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据;
- MySQL master 将数据变更写入二进制日志(即binary log),其中的记录叫做二进制日志事件 binary log events,可以通过
Canal 工作原理
- Canal 模拟 MySQL slave 的交互协议,将自己伪装成 MySQL slave ,向 MySQL master 发送 dump 协议;
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave(即 Canal);
- Canal 解析 binary log 对象(原始为 byte 流)。
12.2.3 安装 Cannl
先查看当前 MySQL 是否开启了 binlog 模式
# 进入到MySQL容器的工作目录下 docker exec -it 3a0532e74496 bash # 登录MySQL mysql -uroot -proot # 开启binlog模式 SHOW VARIABLES LIKE '%log_bin%';
创建账号并授权
- 创建
canal
用户,允许远程连接; - 授予
SELECT
、REPLICATION SLAVE
、REPLICATION CLIENT
、SUPER
权限,使 Canal 能读取binlog
并监控数据库变更; - 修改认证方式(针对 MySQL 8.0+ 兼容性);
- 刷新权限,确保配置立即生效;
# 创建 canal 用户并设置密码 create user canal@'%' IDENTIFIED by 'canal'; # canal@'%':创建一个用户名为 canal,允许从 任意主机(% 表示所有IP) 连接 MySQL # IDENTIFIED BY 'canal':设置该用户的密码为 canal # 授予 canal 用户必要的权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; # SELECT:允许查询数据(Canal 需要读取 binlog,因此需要查询权限) # REPLICATION SLAVE:允许作为从库读取主库的 binlog(Canal 模拟 MySQL 从库拉取变更日志) # REPLICATION CLIENT:允许查看主库/从库状态(Canal 需要检查复制状态) # SUPER:允许执行某些管理命令(如 SET GLOBAL,某些 MySQL 版本 Canal 需要此权限) # ON *.*:对所有数据库(*.*)生效。 # 修改 canal 用户的认证方式(MySQL 8.0+ 需要) ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; # mysql_native_password:使用 旧版密码认证(MySQL 8.0+ 默认使用 caching_sha2_password,但部分客户端(如 Canal)可能不支持,需切换回旧方式) # BY 'canal':保持密码不变。 # 刷新权限,使更改立即生效 FLUSH PRIVILEGES;
- 创建
安装 Canal 容器:先执行两次
exit
,退出MySQL,退出MySQL容器docker run -p 11111:11111 --name canal --restart=always \ -e canal.destinations=tingshuTopic \ -e canal.instance.master.address=服务器ip地址:3307 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.instance.filter.regex=tingshu_album.album_info \ -d registry.cn-shanghai.aliyuncs.com/atguiguhzk/canal:v1.0 # destinations定义了Canal需要监听的MySQL实例名称 # address指定Cannal连接 MySQL 数据库地址 # dbUsername指定Cannal连接MySQL数据库的用户名 # dbPassword指定Cannal连接MySQL数据库的密码 # connectionCharset用于指定Cannal连接MySQL数据库时使用的字符集编码为UTF-8 # tsdb用于存储和查询元数据信息,例如数据库表的结构、字段类型等。启用TSDB功能后,Canal可以更好地管理和维护这些元数据信息,从而提高数据同步的准确性和效率 # gtion用于决定Canal是否启用GTID(全局事务标识符)支持确保数据的一致性和完整性。如果设置为false,那么Canal不会使用GTID来同步数据,而是依赖于传统的二进制日志位置(binlog position)来进行数据同步 # regex即正则表达式,用于指定需要订阅的数据库和表 比如.*\\..* 表示监听所有库表
查看启动日志。看完
Ctrl+C
终止一下服务器记得开放防火墙。
12.3 搭建service-cdc
工程
在
service
模块下新建子模块service-cdc
(Change Data Capture,变更数据捕获):pom.xml
:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.6.RELEASE</version> <!--relativePath标签用于定位父工程的pom文件,默认去找上一层的pom文件--> <!--如果需要找同级的,比如可以写../service-album/pom.xml--> <!--如果relativePath标签中什么都不写,或者写成下面的单标签形式。代表直接从本地仓库找spring-boot-starter-parent的pom文件,本地仓库找不到就去远程仓库找--> <relativePath/> </parent> <groupId>com.shisan.tingshu.cdc</groupId> <artifactId>service-cdc</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency> <dependency> <groupId>javax.persistence</groupId> <artifactId>persistence-api</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <!--<dependency>--> <!-- <groupId>com.shisan.tingshu</groupId>--> <!-- <artifactId>rabbit-util</artifactId>--> <!-- <version>1.0</version>--> <!--</dependency>--> </dependencies> </project>
配置
application.yaml
:server: port: 10001 spring: redis: host: 服务器ip地址 port: 6379 password: 123456 application: name: service-cdc # canal canal: destination: tingshuTopic # 和安装Cannl容器时指定的一样 server: 服务器ip地址:11111
修改项目的JDK版本为 JDK8:每刷新一次Maven这个设置就会重置,所以如果有刷新Maven的话,需要重新操作一次
启动项目:
新建实体类:
package com.shisan.tingshu.cdc.entity; import lombok.Data; import javax.persistence.Column; /** * 映射对应(监听的)表中的字段 */ @Data public class CdcEntity { @Column(name = "id") private Long id; // 只需要监听到表中的id字段,所以只用定义一个属性就可以了 }
编写处理器自定义实现
EntryHandler
接口:EntryHandler
接口的作用是处理 Canal 数据变更事件;- 在阿里巴巴的开源项目 Canal 中,该接口用于监听数据库表的数据变更,并将变更的数据进行处理和存储;
- 具体来说,当 Canal 客户端连接到 Canal 服务器并订阅了相应的数据库表后,每当表中的数据发生变更时,Canal 服务器会将变更的数据封装成一个
Entry
对象,然后通过 Canal 协议将这个对象发送给 Canal 客户端。Canal 客户端接收到这个Entry
对象后,会调用EntryHandler
类的相应方法来处理这个对象;
实现
EntryHandler
接口的类通常需要实现以下三个方法:public interface EntryHandler<T> { //监听到数据添加 default void insert(T t) { } //监听到数据修改 default void update(T before, T after) { } //监听数据删除 default void delete(T t) { } }
在
EntryHandler
接口的实现类上会加上一个@CanalTable
注解,需要传入一个参数来指定一个表名,用于在 MyBatisPlus 中与 Canal 进行数据同步。当 MyBatisPlus 执行数据库操作时,它会将操作记录到 Canal 中,然后通过监听器将 Canal 中的数据同步到目标数据库。通过使用@CanalTable
注解,可以指定要同步的表名,以便只同步特定的表;新建:
@Component @CanalTable("album_info") // 监听变更表 @Slf4j public class CdcEntityHandler implements EntryHandler<CdcEntity> { /** * 监听的表中有数据新增的时候,会回调该方法 * @param cdcEntity */ @Override public void insert(CdcEntity cdcEntity) { log.info("Canal客户端监听到了album_info表中有数据的新增的id:{}", cdcEntity.getId()); } /** * 监听的表中有数据变更的时候,会回调该方法 * @param before 修改之前的老数据 * @param after 修改后的新数据 */ @Override public void update(CdcEntity before, CdcEntity after) { log.info("Canal客户端监听到了album_info表中有数据的修改,修改之前的id:{}", before.getId()); log.info("Canal客户端监听到了album_info表中有数据的修改,修改之后的id:{}", after.getId()); } /** * 监听的表中有数据删除的时候,会回调该方法 * @param cdcEntity 删除的对象 */ @Override public void delete(CdcEntity cdcEntity) { log.info("Canal客户端监听到了album_info表中有数据的删除,删除的数据的id:{}", cdcEntity.getId()); } }
12.4 先删除缓存再更新数据库实现
流程:
修改:
@Component @CanalTable("album_info") // 监听变更表 @Slf4j public class CdcEntityHandler implements EntryHandler<CdcEntity> { @Autowired private StringRedisTemplate redisTemplate; // @Autowired // private RabbitService rabbitService; /** * 监听的表中有数据新增的时候,会回调该方法 * @param cdcEntity */ @Override public void insert(CdcEntity cdcEntity) { log.info("Canal客户端监听到了album_info表中有数据的新增的id:{}", cdcEntity.getId()); } /** * 监听的表中有数据变更的时候,会回调该方法 * @param before 修改之前的老数据 * @param after 修改后的新数据 */ @Override public void update(CdcEntity before, CdcEntity after) { log.info("Canal客户端监听到了album_info表中有数据的修改,修改之前的id:{}", before.getId()); log.info("Canal客户端监听到了album_info表中有数据的修改,修改之后的id:{}", after.getId()); String cacheKey = "cache:info:" + after.getId(); // 创建一个线程池 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); // 延迟300毫秒删除缓存 try { scheduledExecutorService.schedule(new Runnable() { @Override public void run() { redisTemplate.delete(cacheKey); } }, 300, TimeUnit.MICROSECONDS); } catch (Exception e) { // TODO 使用MQ进行重试删除操作 // rabbitService.sendMessage() } } /** * 监听的表中有数据删除的时候,会回调该方法 * @param cdcEntity 删除的对象 */ @Override public void delete(CdcEntity cdcEntity) { log.info("Canal客户端监听到了album_info表中有数据的删除,删除的数据的id:{}", cdcEntity.getId()); } }
新建:
package com.shisan.tingshu.cdc.listener; import org.springframework.stereotype.Component; @Component public class CdcInfoListener { /** * TODO:监听重试发送3次 */ }
由于包名不一致的问题,
service-cdc
中有一层cdc
包,所以需要在启动类上加上@Import(RabbitService.class)
注解,这样才能导入rabbit-util
依赖;修改:
/** * 修改专辑信息 * @param albumId * @param albumInfoVo */ @Transactional(rollbackFor = Exception.class) @Override public void updateAlbumInfo(Long albumId, AlbumInfoVo albumInfoVo) { // 先删除缓存,再更新数据库,在CdcEntityHandler中使用异步线程,再次删除缓存,同时加入了消息队列重试机制 String cacheKey = RedisConstant.CACHE_INFO_PREFIX + albumId; redisTemplate.delete(cacheKey); // …… 其它逻辑 }
12.5 SpringEL 入门
SpringEL(Spring Expression Language)是Spring框架提供的一种强大的表达式语言,用于在运行时动态查询和操作对象图,支持方法调用、属性访问、运算符运算等特性;
简单使用:
@Test public void testApi1() { // 创建解析表达式的解析器对象 ExpressionParser parser = new SpelExpressionParser(); Expression expression1 = parser.parseExpression("'Hello World'.concat('!')"); // 直接将表达式作为参数传入并解析 Object message1 = expression1.getValue(); System.out.println(message1); // Hello World! String expression2 = "'abc'.length()"; // 定义一个表达式 Expression exp2 = parser.parseExpression(expression2); // 计算abc.length的值 Object message2 = exp2.getValue(); System.out.println(message2); // 3 String expression3 = "1+1"; Expression exp3 = parser.parseExpression(expression3); // 计算1+1的值 Object message3 = exp3.getValue(); System.out.println(message3); // 2 String expression4 = "1==1"; Expression exp4 = parser.parseExpression(expression4); // 判断1==1的布尔值 Object message4 = exp4.getValue(); System.out.println(message4); // true } /** * 表达式中有变量 */ @Test public void testApi2() { Long[] longs = {1l, 2l, 3l}; // 创建解析表达式的解析器对象 ExpressionParser parser = new SpelExpressionParser(); // 创建计算上下文对象 StandardEvaluationContext standardEvaluationContext = new StandardEvaluationContext(); standardEvaluationContext.setVariable("args", longs); // 从args变量中获取第二个元素 Expression expression = parser.parseExpression("#args[1]"); // 获取结果 Object value = expression.getValue(standardEvaluationContext); System.out.println(value); // 2 } /** * 表达式中有临界符 */ @Test public void testApi3() { // 创建解析表达式的解析器对象 ExpressionParser parser = new SpelExpressionParser(); // 创建一个解析上下文模版对象【临界符】 TemplateParserContext templateParserContext = new TemplateParserContext(); // 定义表达式 String expression = "album:Info:#{1+1}"; Expression exp = parser.parseExpression(expression, templateParserContext); Object value = exp.getValue(); System.out.println(value); // album:Info:2 } /** * 表达式中既有变量,又有临界符 */ @Test public void testApi4() { Long[] values = {1l, 2l, 3l}; // 创建解析表达式的解析器对象 ExpressionParser parser = new SpelExpressionParser(); // 创建一个解析上下文模版对象【临界符】 TemplateParserContext templateParserContext = new TemplateParserContext(); // 创建计算上下文对象 StandardEvaluationContext standardEvaluationContext = new StandardEvaluationContext(); standardEvaluationContext.setVariable("args", values); // 定义表达式 String expression="album:info:#{#args[0]}"; // 解析 Expression exp = parser.parseExpression(expression, templateParserContext); String value = exp.getValue(standardEvaluationContext, String.class); System.out.println(value); // album:info:1 }
12.6 自定义starter封装缓存操作
随着业务中缓存及分布式锁的加入,业务代码变的复杂起来,除了需要考虑业务逻辑本身,还要考虑缓存及分布式锁的问题,增加了程序员的工作量及开发难度;
- 接下来将对于缓存的操作都封装成一个 starter,彻底与业务代码解耦,作为一个只操作缓存的依赖来使用(谁要用谁依赖即可);
- 再借助 AOP 和自定义注解,谁需要操作缓存,就加上注解,并传入指定参数即可;
新建:
pom.xml
:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.0.5</version> <relativePath/> </parent> <groupId>org.shisan.cache</groupId> <artifactId>cache-starter</artifactId> <version>1.0</version> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.25.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <skip>true</skip> <!--未来cache-starter打成的jar包不用执行,直接让对方依赖就行--> </configuration> </plugin> </plugins> </build> </project>
项目目录的最终结构:
自定义注解
Cacheable
接口:package org.shisan.cache.aspect.annotaion; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface Cacheable { String cacheKey() default "";// 定义缓存key String lockKey(); // 定义锁的key String bloomKey(); // 定义布隆过滤器的key boolean enableBloomFilter() default false; // 布隆开关 boolean enableLock() default false; // 锁的开关 }
自定义注解的AOP逻辑
CacheAspect
类:package org.shisan.cache.aspect; import org.springframework.expression.Expression; import com.fasterxml.jackson.core.type.TypeReference; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RBloomFilter; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.shisan.cache.aspect.annotaion.Cacheable; import org.shisan.cache.constant.CacheAbleConstant; import org.shisan.cache.service.CacheOpsService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.expression.common.TemplateParserContext; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Component; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.lang.reflect.Type; //@Component 此处采用SPI机制注入容器 @Aspect public class CacheAspect { @Autowired private CacheOpsService cacheOpsService; @Autowired private RedissonClient redissonClient; @Autowired private RBloomFilter rBloomFilter; /** * 切面逻辑:查询缓存 >> 根据查询结果决定是否回源 >> 若回源则需要同步到缓存中 * 使用环绕通知 * 使用任何方法 */ @Around(value = "@annotation(org.shisan.cache.aspect.annotaion.Cacheable)") public Object cacheCheck(ProceedingJoinPoint pjp) throws Throwable { // 1.获取目标方法的指定注解对象 Cacheable cacheable = getMethodAnnotaion(pjp, Cacheable.class); // 2.获取目标方法的返回值类型(带泛型) Type genericReturnType = getMethodGerenicReturnType(pjp); // 3.定义变量 // 3.1 定义缓存key表达式 String cacheKeyExpression = cacheable.cacheKey(); // 3.2 计算缓存key String cacheKey = dynamicComputeKey(cacheKeyExpression, pjp, String.class); // 3.3 定义锁key表达式 String lockKeyExpression = cacheable.lockKey(); // 3.4 计算锁的key String lockKey = dynamicComputeKey(lockKeyExpression, pjp, String.class); // 3.5 定义布隆过滤器的key表达式 String bloomKeyExpression = cacheable.bloomKey(); // 3.6 计算布隆过滤器的key Long bloomKey = dynamicComputeKey(bloomKeyExpression, pjp, Long.class); // 3.7 获取分布式布隆过滤器的开关 boolean enableBloomFilter = cacheable.enableBloomFilter(); // 3.8 获取分布式锁的开关 boolean enableLockFlag = cacheable.enableLock(); // 4.使用布隆过滤器 if (enableBloomFilter) { if (!rBloomFilter.contains(bloomKey)) { return null; } } // 5.没有使用布隆过滤器直接查询缓存 Object dataFromCache = cacheOpsService.getDataFromCache(cacheKey, new TypeReference<Object>() { @Override public Type getType() { return genericReturnType; } }); // 6.缓存命中 if (dataFromCache != null) { return dataFromCache; } // 7.缓存未命中且没有开启分布式锁 if (!enableLockFlag) { // 7.1.回源 Object proceed = pjp.proceed(); // 执行目标方法 // 7.2 同步数据到缓存中 cacheOpsService.saveDataToCache(cacheKey, proceed); // 7.3 返回数据 return proceed; } // 8.开启分布式锁,获取锁对象 RLock lock = redissonClient.getLock(lockKey); // 9.抢锁 boolean acquireLock = lock.tryLock(); // 10.抢锁成功 if (acquireLock) { try { // 11.回源 Object proceed = pjp.proceed(); // 执行目标方法 // 12.同步数据到缓存中 cacheOpsService.saveDataToCache(cacheKey, proceed); // 13.返回数据 return proceed; } finally { lock.unlock(); // 释放锁 } } else { // 14.抢锁失败 Thread.sleep(CacheAbleConstant.DATA_SYNC_TTL); // 压测给一个精准值 // 15.查询缓存 Object result = cacheOpsService.getDataFromCache(cacheKey, new TypeReference<Object>() { @Override public Type getType() { return genericReturnType; } }); // 16. 缓存命中 if (result != null) { return result; } // 17. 兜底继续回源 return pjp.proceed(); // 执行目标方法 } } /** * 获取目标方法带泛型的返回值类型 * @param pjp * @return */ private static Type getMethodGerenicReturnType(ProceedingJoinPoint pjp) { MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); Type genericReturnType = method.getGenericReturnType(); return genericReturnType; } /** * 根据表达式获取key * @param cacheKeyExpression * @param pjp * @param resultClass * @return */ private <T> T dynamicComputeKey(String cacheKeyExpression, ProceedingJoinPoint pjp, Class<T> resultClass) { // 1.创建表达式解析器对象 SpelExpressionParser spelExpressionParser = new SpelExpressionParser(); // 2.创建计算上下文对象 StandardEvaluationContext standardEvaluationContext = new StandardEvaluationContext(); standardEvaluationContext.setVariable("args", pjp.getArgs()); // 和注解在使用时的变量名保持一致,都是args // 3.创建解析模版对象 TemplateParserContext templateParserContext = new TemplateParserContext(); // 4.解析表达式 Expression expression = spelExpressionParser.parseExpression(cacheKeyExpression, templateParserContext); // 5.获取计算之后的值 T value = expression.getValue(standardEvaluationContext, resultClass); // 6.返回缓存key的值 return value; } /** * 获取目标方法的指定类型注解 * @param pjp * @param tClass * @param <T> * @return */ private static <T extends Annotation> T getMethodAnnotaion(ProceedingJoinPoint pjp, Class<T> tClass) { MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); T annotation = (T) method.getAnnotation(tClass); return annotation; } }
在
CacheAutoConfiguration
中借助 SPI 机制注入 Bean:package org.shisan.cache.configuration; import org.redisson.Redisson; import org.redisson.api.RBloomFilter; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.shisan.cache.aspect.CacheAspect; import org.shisan.cache.constant.CacheAbleConstant; import org.shisan.cache.service.impl.CacheOpsServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.StringRedisTemplate; @Configuration // 该注解也可以省略 public class CacheAutoConfiguration { Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RedisProperties redisProperties; @Autowired private StringRedisTemplate redisTemplate; /** * 定义Redisson客户端的Bean对象 */ @Bean public RedissonClient redissonClient() { // 给Redisson设置配置信息 Config config = new Config(); config.useSingleServer() // 使用单机模式 .setPassword(redisProperties.getPassword()) .setAddress(CacheAbleConstant.CACHE_REDIS_PROTOCOL + redisProperties.getHost() + CacheAbleConstant.CACHE_REDIS_PORT_SPLIT + redisProperties.getPort()); // 创建Redisson客户端 RedissonClient redissonClient = Redisson.create(config); return redissonClient; } /** * 定义一个BloomFilter的Bean对象 */ @Bean public RBloomFilter rBloomFilter(RedissonClient redissonClient) { // 如果在Redis中没有这个key,那么会自动创建,并返回这个key对应的布隆过滤器对象。反之 直接返回已经创建好的布隆过滤器 // tryInit()方法返回true表示初始化成功(即之前不存在,现在新创建了),返回false表示已经存在(即之前已经初始化过) RBloomFilter<Object> albumIdBloomFilter = redissonClient.getBloomFilter(CacheAbleConstant.DISTRO_BLOOM_FILTER_NAME); // 加个锁,让分布式布隆过滤器只初始化一次且同步数据只做一次 // 当锁存在的时候,表示布隆过滤器已经初始化过了,直接返回布隆过滤器对象 String bloomFilterLockKey = CacheAbleConstant.DISTRO_BLOOM_FILTER_LOCK_KEY; Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(bloomFilterLockKey, CacheAbleConstant.DISTRO_BLOOM_FILTER_LOCK_VALUE); if (aBoolean) { // 初始化布隆过滤器 boolean b = albumIdBloomFilter.tryInit(CacheAbleConstant.DISTRO_BLOOM_FILTER_EXCEPTED_INSERT, CacheAbleConstant.DISTRO_BLOOM_FILTER_FPP); // 利用分布式锁保证分布式布隆的初始化只做一次 if (b) { logger.info("成功创建新的布隆过滤器,等待数据填充"); } else { logger.info("布隆过滤器已存在,直接使用"); } } return albumIdBloomFilter; } /** * 定义缓存切面类组件 */ @Bean public CacheAspect cacheAspect() { return new CacheAspect(); } /** * 定义操作缓存的业务组件 */ @Bean public CacheOpsServiceImpl cacheOpsService() { return new CacheOpsServiceImpl(); } }
同时:
org.shisan.cache.configuration.CacheAutoConfiguration
自定义常量类
CacheAbleConstant
:package org.shisan.cache.constant; /** * 常量类 */ public class CacheAbleConstant { // 数据同步时间(需要通过压测得到) public static final Long DATA_SYNC_TTL = 200l; // 缓存协议 public static final String CACHE_REDIS_PROTOCOL = "redis://"; // 缓存端口的分隔符 public static final String CACHE_REDIS_PORT_SPLIT = ":"; // 布隆过滤器的名字 public static final String DISTRO_BLOOM_FILTER_NAME = "albumIdBloomFilter"; // 布隆过滤器的锁名 public static final String DISTRO_BLOOM_FILTER_LOCK_KEY = "albumIdBloomFilter:lock"; // 布隆过滤器的锁值 public static final String DISTRO_BLOOM_FILTER_LOCK_VALUE = "1"; // 布隆过滤器期望插入的元素个数 public static final Long DISTRO_BLOOM_FILTER_EXCEPTED_INSERT = 1000000l; // 布隆过滤器的误判率 public static final Double DISTRO_BLOOM_FILTER_FPP = 0.001; // 有数据的缓存时间 public static final Long HAS_DATA_TTL = 60 * 60 * 24 * 7l; // 无数据的缓存时间 public static final Long NO_DATA_TTL = 60 * 60 * 24l; }
统一返回结果状态信息类
ResultCodeEnum
:CV一下,由于cache-starter
中没有引入 Lombok 依赖,所以无法使用@Data
注解,需要实现get和set方法自定义全局异常类
ShisanException
:CV一下,由于cache-starter
中没有引入 Lombok 依赖,所以无法使用@Data
注解,需要实现get和set方法对缓存进行读写操作的API封装:
CacheOpsService
接口package org.shisan.cache.service; import com.fasterxml.jackson.core.type.TypeReference; /** * 对缓存进行读写操作的API封装 */ public interface CacheOpsService { /** * 将数据写入到缓存中(写操作) * cacheKey 将数据存储到缓存中用到的key * Object object 要保存进缓存的数据对象 */ public void saveDataToCache(String cacheKey, Object object); /** * 从缓存中将数据读取出来(读操作),不带泛型 * cacheKey 从缓存获取数据要用到的key * clazz 要从缓存中反序列化的类型 */ public <T> T getDataFromCache(String cacheKey, Class<T> clazz); /** * 从缓存中将数据读取出来(读操作),带泛型 * cacheKey 从缓存获取数据要用到的key * clazz 要从缓存中反序列化的类型 */ public <T> T getDataFromCache(String cacheKey, TypeReference<T> tTypeReference); }
对缓存进行读写操作的API封装:
CacheOpsService
接口的实现类CacheOpsServiceImpl
package org.shisan.cache.service.impl; import com.fasterxml.jackson.core.type.TypeReference; import org.shisan.cache.constant.CacheAbleConstant; import org.shisan.cache.service.CacheOpsService; import org.shisan.cache.utils.Jsons; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.util.List; import java.util.concurrent.TimeUnit; //@Service 此处采用SPI机制注入容器 public class CacheOpsServiceImpl implements CacheOpsService { @Autowired private StringRedisTemplate redisTemplate; @Override public void saveDataToCache(String cacheKey, Object object) { // 将对象序列化成字符串 // String s = JSONObject.toJSONString(object); // 将字符串存储到缓存中 // redisTemplate.opsForValue().set(cacheKey,s); // 下面使用自定义的工具类完成对数据的序列化和反序列化操作 String resultStr = Jsons.objToStr(object); long ttl = CacheAbleConstant.HAS_DATA_TTL; List<String> allRegexRules = Jsons.getAllRegexRules(); for (String allRegexRule : allRegexRules) { if (Jsons.isMath(resultStr, allRegexRule)) { ttl = CacheAbleConstant.NO_DATA_TTL; } } // 将字符串存储到缓存中 redisTemplate.opsForValue().set(cacheKey, resultStr, ttl, TimeUnit.SECONDS); } @Override public <T> T getDataFromCache(String cacheKey, Class<T> clazz) { // 从缓存中获取数据 // 将获取的数据反序列化成指定类型的对象 // 返回指定类型的对象 String resultStr = redisTemplate.opsForValue().get(cacheKey); if (StringUtils.isEmpty(resultStr)) { return null; } return Jsons.strToObj(resultStr, clazz); } @Override public <T> T getDataFromCache(String cacheKey, TypeReference<T> tTypeReference) { // 1.从缓存中获取数据 // 2.将获取的数据反序列化成指定类型的对象 // 3.返回指定类型的对象 String resultStr = redisTemplate.opsForValue().get(cacheKey); if (StringUtils.isEmpty(resultStr)) { return null; } return Jsons.strToObj(resultStr, tTypeReference); } }
自定义完成对数据的序列化和反序列化操作
Jsons
类:package org.shisan.cache.utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.shisan.cache.exception.ShisanException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.regex.Pattern; /** * 完成对数据的序列化和反序列化操作 */ public class Jsons { static Logger logger = LoggerFactory.getLogger(Jsons.class); static final ObjectMapper objectMapper = new ObjectMapper(); /** * 序列化操作:接收对象,返回字符串 */ public static String objToStr(Object content) { // 可以完成对任意数据的序列化和反序列化操作 // @RequestBody注解的底层使用的就是ObjectMapper完成的操作:将字符串反序列化成对象 // @ResponseBody注解的底层使用的就是ObjectMapper完成的操作:将对象转成字符串 // 传过来的对象有以下几种情况 // 1、对象有数据 // 2、对象没数据: // 2.1 对象为空,传过来的就是null // 2.2 对象是一个Map(双列),为空时传过来的就是{} // 2.3 对象是是一个List(单列),为空时传过来的就是[] // 2.4 对象是是一个Set(单列),为空时传过来的就是[] // 2.5 对象是一个数组String[],为空时传过来的就是[] try { String resultStr = objectMapper.writeValueAsString(content); return resultStr; } catch (JsonProcessingException e) { logger.error("{}对象序列化成字符串失败,原因是{}", content, e.getMessage()); throw new ShisanException(201, "数据在转换期间出现了序列化异常"); } } /** * 反序列化操作:接收字符串,返回对象【指定的类型】(不带泛型) * @return */ public static <T> T strToObj(String content, Class<T> tClass) { try { // "{"name":"shisan","age":18}"--Map.class // "{}"---Map.class // "[]"---List.class Set.class Array.class T t = objectMapper.readValue(content, tClass); return t; } catch (JsonProcessingException e) { logger.error("{}字符串反序列化成对象失败,原因是{}", content, e.getOriginalMessage()); throw new ShisanException(201, "数据在转换期间出现了反序列化异常"); } } /** * 反序列化操作:接收字符串,返回对象【指定的类型】(带泛型) * @return */ public static <T> T strToObj(String content, TypeReference<T> tClass) { try { // "{"name":"shisan","age":18}"--Map.class // "{}"---Map.class // "[]"---List.class Set.class Array.class T t = objectMapper.readValue(content, tClass); return t; } catch (JsonProcessingException e) { logger.error("{}字符串反序列化成对象失败,原因:{}", content, e.getOriginalMessage()); throw new ShisanException(201, "数据在转换期间出现了反序列化异常"); } } public static List<String> getAllRegexRules() { ArrayList<String> list = new ArrayList<>(); list.add("^null$"); list.add("^\\{\\}$"); list.add("^\\[\\]$"); return list; } public static Boolean isMath(String compareContent, String regexRule) { return Pattern.matches(regexRule, compareContent); } }
12.7 cache-starter
的使用之稍微解耦版
修改:引入
cache-starter
依赖<dependency> <groupId>org.shisan.cache</groupId> <artifactId>cache-starter</artifactId> <version>1.0</version> </dependency>
如果只是调用对缓存进行读写操作的API,即只调用
cache-starter
依赖中的CacheOpsServiceImpl
方法,步骤如下:修改:
@Autowired private CacheOpsService cacheOpsService; /** * 根据专辑id查询专辑详情 * @param albumId * @return */ @Override public Map<String, Object> getAlbumInfo(Long albumId) { // 如果只是调用对缓存进行读写操作的API,即只调用`cache-starter`依赖中的`CacheOpsServiceImpl`方法 return getDistroCacheAndLockFinallyRedissonVersion2(albumId); } /** * 最最终版本:Redisson分布式布隆过滤器+Redisson分布式锁(抽取缓存操作的方法版本) * @param albumId * @return */ @SneakyThrows private Map getDistroCacheAndLockFinallyRedissonVersion2(Long albumId) { // 1.定义缓存key String cacheKey = RedisConstant.CACHE_INFO_PREFIX + albumId; // 缓存key String lockKey = RedisConstant.ALBUM_LOCK_SUFFIX + albumId; // 分布式锁key long ttl = 0l; // 数据的过期时间 // 2.查询分布式布隆过滤器 boolean contains = rBloomFilter.contains(albumId); if (!contains) { return null; } // 3.查询缓存 Map dataFromCache = cacheOpsService.getDataFromCache(cacheKey, Map.class); // 3.1 缓存命中 if (dataFromCache != null) { return dataFromCache; } // 3.2 缓存未命中 查询数据库 // 3.2.1 添加分布式锁 RLock lock = redissonClient.getLock(lockKey); boolean accquireLockFlag = lock.tryLock(); // tryLock:非阻塞、自动续期 if (accquireLockFlag) { // 抢到锁 try { // 3.2.2 回源查询数据 Map<String, Object> albumInfoFromDb = getAlbumInfoFromDb(albumId); // 3.2.3 同步数据到缓存中去 cacheOpsService.saveDataToCache(cacheKey, albumInfoFromDb); return albumInfoFromDb; } finally { lock.unlock();// 释放锁 } } else { // 没抢到锁。等同步时间之后,查询缓存即可 Thread.sleep(200); Map result = cacheOpsService.getDataFromCache(cacheKey, Map.class); if (result != null) { return result; } return getAlbumInfoFromDb(albumId); } }
12.8 cache-starter
的使用之彻底解耦版+测试
修改:给需要操作缓存的接口加上
@Cacheable
注解@GetMapping("/{albumId}") @Operation(summary = "根据专辑id查询专辑详情") @Cacheable(cacheKey = RedisConstant.CACHE_INFO_PREFIX + "#{#args[0]}", // 要传递方法中的第几个参数就写第几个参数 lockKey = RedisConstant.ALBUM_LOCK_SUFFIX+"#{#args[0]}", bloomKey = "#{#args[0]}", enableBloomFilter = true, enableLock = true) public Result getAlbumInfo(@PathVariable(value = "albumId") Long albumId) { Map<String, Object> result = itemService.getAlbumInfo(albumId); return Result.ok(result); }
修改:因为所有关于缓存、分布式锁、布隆过滤器的逻辑都在
cache-starter
中实现了,所以只需要在原方法中回源查询数据库即可/** * 根据专辑id查询专辑详情 * @param albumId * @return */ @Override public Map<String, Object> getAlbumInfo(Long albumId) { // 回源查询数据库 return getAlbumInfoFromDb(albumId); }
测试:
先打断点
浏览器访问
http://localhost:8500/api/search/albumInfo/936
,然后逐条步过:得到结果:
返回前端: