本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别
概述
Redis除了可以用于缓存临时数据,以及排行榜,共同关注等业务功能的实现之外,最主要应用也是最广的地方是缓存热点数据,防止高并发场景下所有的请求都打到数据库。数据库的并发能力是有限的,数据库链接耗尽,连接超时,导致整个服务从下至上不可用。
真正对于高并发的场景,使用Redis做缓存并非简单地先查询缓存->缓存查询到就返回,查询不到则查询数据库->写回缓存这样的操作,而是需要根据业务选择一套合理的架构去实现,其中需要考虑到:
- 数据冷热分离
- 缓存穿透,击穿,雪崩问题的处理
- 突发性的热点缓存重建问题
- 缓存与数据库双写不一致的问题
- 分布式锁的优化
- 考虑引入多级缓存机制
基础案例工程,一套对于缓存和数据库的增改查操作:
- 新增数据,同步缓存入redis中。
- 修改数据,将修改后的结果以新增时相同的key进行覆盖。
- 查询数据,先查询缓存,缓存查询不到就查数据库,然后写回到缓存中。
@Service
public class ProductServiceTest {
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult));
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult));
return productResult;
}
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
product = JSON.parseObject(productStr, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product));
}
return product;
}
}
二、数据冷热分离
在电商网站中,经常被用户访问点击购买的商品称之为热点商品
,这样的商品占比不大,剩下的大部分商品都是访问频次较低的冷门商品
。
热点商品
的访问量大,需要在数据库之上使用缓存拦截,而冷门商品
,因为其访问频次不高的原因,可以让用户直接访问数据库。
进行冷热分离的手段,是使用给缓存加上过期时间 + 续期
的策略实现,这样改造的思想在于,无论是热点商品
还是冷门商品
,在设置进缓存时都统一地给上过期时间,冷门商品
在一段时间内没有访问,就会从缓存中移除,不占用缓存的空间,而热点商品
每次从缓存中获取到后都会进行续期
,保证常驻于缓存中
/**
* 缓存过期时间,默认单位:秒
*/
private final Integer CACHE_EXPIRE_TIMEOUT = 24 * 60 * 60;
/**
* 第一版改动,增加缓存到期时间
* @param product
* @return
*/
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
CACHE_EXPIRE_TIMEOUT, TimeUnit.SECONDS);
return productResult;
}
/**
* 第一版改动,增加缓存到期时间
* @param product
* @return
*/
@Transactional
public Product update(Product product) {
Product productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
CACHE_EXPIRE_TIMEOUT, TimeUnit.SECONDS);
return productResult;
}
/**
* 第一版改动,增加缓存到期时间,缓存续期
* @param productId
* @return
*/
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey,CACHE_EXPIRE_TIMEOUT,TimeUnit.SECONDS);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),CACHE_EXPIRE_TIMEOUT, TimeUnit.SECONDS);
}
return product;
}
三、解决缓存击穿
上面的案例,存在一个问题,因为对于所有的商品key,设置的过期时间都是相同的,如果某一时刻大量的key同时到期,又处于高并发的场景,同样会造成数据库的崩溃,也就是缓存击穿问题
。
解决方式是给key加上不同的随机的过期时间:
public Integer makeRandomTimeOut(){
return CACHE_EXPIRE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
四、解决缓存穿透
缓存穿透指的是,请求中的数据,在Redis中不存在,所以会打到数据库,而在数据库中也不存在,无法重建缓存,最终导致数据库崩溃。
一方面可能存在的原因是请求的数据在数据库中被误操作删除,还有可能是对于系统的恶意请求攻击。和缓存击穿
的区别在于,缓存击穿
目标数据只是对于缓存来说不存在,而缓存穿透
则是数据库和缓存均无目标数据。
传统的解决方式是缓存空值:
private final String EMPTY_CACHE_VALUE = "{}";
/**
* 第一版改动,增加缓存到期时间,缓存续期
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,解决缓存穿透,缓存空值
* @param productId
* @return
*/
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//查询缓存
String productStr = redisUtil.get(productCacheKey);
//缓存中存在,续期并返回
if (!StringUtils.isEmpty(productStr)) {
//这里需要区分productStr是否为空缓存标识
if (product.equals(EMPTY_CACHE_VALUE)){
return null;
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey,makeRandomTimeOut(),TimeUnit.SECONDS);
return product;
}
//缓存中不存在,查询数据库
product = productDao.get(productId);
//数据库不为空,存入缓存,返回
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),makeRandomTimeOut(), TimeUnit.SECONDS);
return product;
}
//数据库为空,则缓存空值
redisUtil.set(productCacheKey,EMPTY_CACHE_VALUE);
return null;
}
这样做是否存在问题?如果对于系统的恶意攻击不是单独针对某一个不存在的商品,而是批量伪造出了数十万个不存在的商品,按照上面的方式仅仅设置空值,可能会有:key:product:cache:1,product:cache:2,…,product:cache:100000,它们的value都是"{}"。这样依旧在Redis中占有了不少的空间。
针对这样的问题,可以采用设置过期时间 + 续期
策略的实现。这样改造的思想在于,恶意攻击停止后,设置了超时时间的空值key会逐渐失效,如果攻击还在持续,那么访问到的空值key还应该续期
/**
* 第一版改动,增加缓存到期时间,缓存续期
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,解决缓存穿透,缓存空值
* 第四版改动,设置过期时间 + 续期,避免Redis中存在大量空值。
* @param productId
* @return
*/
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//查询缓存
String productStr = redisUtil.get(productCacheKey);
//缓存中存在,续期并返回
if (!StringUtils.isEmpty(productStr)) {
//这里需要区分productStr是否为空缓存标识
if (productStr.equals(EMPTY_CACHE_VALUE)){
//续期,防止攻击持续
redisUtil.expire(productCacheKey, makeRandomEmptyValTimeOut(), TimeUnit.SECONDS);
return null;
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey,makeRandomTimeOut(),TimeUnit.SECONDS);
return product;
}
//缓存中不存在,查询数据库
product = productDao.get(productId);
//数据库不为空,存入缓存,返回
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),makeRandomTimeOut(), TimeUnit.SECONDS);
return product;
}
//数据库为空,则缓存空值,带上过期时间
redisUtil.set(productCacheKey,EMPTY_CACHE_VALUE,makeRandomEmptyValTimeOut(),TimeUnit.SECONDS);
return null;
}
五、热点缓存重建
如果某个冷门商品,由于一、数据冷热分离 中的操作,上架时同时设置了过期时间,然后很久一段时间没有用户访问,导致过了过期时间,从Redis中移除。这时突然有大量的用户去进行访问,必然导致请求都打到数据库,特别是查询数据库条件复杂,缓存不能快速重建的场景下:
解决这样问题的方案是加分布式锁
,保证只有一个线程可以执行查询数据库,重建缓存的操作,这里有两个注意点:
- 在处理缓存穿透时,如果返回的也是null,则无法区分缓存中不存在从而返回null,后续要去查数据库的情况。还是应对缓存穿透,无需查询数据库的场景。
- 在重建缓存时,需要再次从缓存中查询。假设有n个线程同时执行到分布式锁这一行代码:
- 第一个线程获取到了锁,重建完成了缓存
- 后续的线程获取到锁之后,不用再去查数据库,直接从缓存中获取并返回。
/**
* 重建热点缓存分布式锁key
*/
private final String REBUILD_HOT_CACHE = "rebuild:hot_cache:";
/**
* 第一版改动,增加缓存到期时间,缓存续期
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,解决缓存穿透,缓存空值
* 第四版改动,设置过期时间 + 续期,避免Redis中存在大量空值。
* 第五版改动,加分布式锁,只有一个线程可以去查询数据库重建缓存
* @param productId
* @return
*/
public Product get(Long productId) throws InterruptedException {
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//从缓存中查
Product product = queryFromCache(productCacheKey);
//能查询到就直接返回
if (product != null){
return product;
}
//获取重建缓存的分布式锁
RLock lock = redisson.getLock(REBUILD_HOT_CACHE + productId);
lock.lock();
try {
product = rebuildHotCache(productId, productCacheKey);
} finally {
lock.unlock();
}
return product;
}
/**
* 缓存空值
* @param productId
* @param productCacheKey
* @return
*/
private Product rebuildHotCache(Long productId,String productCacheKey){
//再次从缓存中查
Product product = queryFromCache(productCacheKey);
//能查询到就直接返回
if (product != null){
return product;
}
//缓存中不存在,查询数据库
product = productDao.get(productId);
//数据库不为空,存入缓存,返回
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),makeRandomTimeOut(), TimeUnit.SECONDS);
return product;
}
//数据库为空,则缓存空值,带上过期时间
redisUtil.set(productCacheKey,EMPTY_CACHE_VALUE,makeRandomEmptyValTimeOut(),TimeUnit.SECONDS);
return null;
}
/**
* 从缓存中查
* @param productCacheKey
* @return
*/
/**
* 从缓存中查
* @param productCacheKey
* @return
*/
private Product queryFromCache(String productCacheKey){
Product product = null;
//查询缓存
String productStr = redisUtil.get(productCacheKey);
//缓存中存在,续期并返回
if (!StringUtils.isEmpty(productStr)) {
//这里需要区分productStr是否为空缓存标识
if (productStr.equals(EMPTY_CACHE_VALUE)){
//续期,防止攻击持续
redisUtil.expire(productCacheKey, makeRandomEmptyValTimeOut(), TimeUnit.SECONDS);
//如果这里返回null,无法区分缓存中真的是null,要去查数据库,还是应对缓存穿透,无需查询数据库
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey,makeRandomTimeOut(),TimeUnit.SECONDS);
return product;
}
return null;
}
六、缓存一致性问题
结合下面这张图片,可以很方便的理解本案例中可能出现的缓存一致性问题,即查询缓存,和写入缓存,是非原子性的操作,尤其是数据库查询条件复杂,速度较慢的场景下,在查询和写入缓存之间,其他线程可能趁虚而入,最终导致数据库和缓存的不一致问题。先删除缓存,再更新数据库,两步之间会存在同样的问题。
线程二将最新的数据修改为了30,但是最终缓存中被覆盖为了20。
为了解决这样的问题,同样可以使用分布式锁
,同时只能有一个线程操作缓存:
- 重建缓存,和修改数据库缓存,使用同一把分布式锁,避免上图的问题。
/**
* 对于重建热点缓存,用分布式锁保护,防止数据库和缓存不一致
*/
private final String PROTECT_REBUILD_HOT_CACHE = "protect:rebuild:hot_cache:";
/**
* 第一版改动,增加缓存到期时间
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,更新数据库和缓存的代码,使用重建缓存同一分布式锁保护。
* @param product
* @return
*/
@Transactional
public Product update(Product product) {
RLock protectLock = redisson.getLock("PROTECT_REBUILD_HOT_CACHE" + product.getId());
protectLock.lock();
Product productResult;
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
makeRandomTimeOut(), TimeUnit.SECONDS);
} finally {
protectLock.unlock();
}
return productResult;
}
/**
* 缓存重建
* @param productId
* @param productCacheKey
* @return
*/
private Product rebuildHotCache(Long productId,String productCacheKey){
//再次从缓存中查
Product product = queryFromCache(productCacheKey);
//能查询到就直接返回,
if (product != null){
return product;
}
RLock protectLock = redisson.getLock("PROTECT_REBUILD_HOT_CACHE" + productId);
protectLock.lock();
try {
//缓存中不存在,查询数据库
product = productDao.get(productId);
//数据库不为空,存入缓存,返回
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),makeRandomTimeOut(), TimeUnit.SECONDS);
return product;
}
//数据库为空,则缓存空值,带上过期时间
redisUtil.set(productCacheKey,EMPTY_CACHE_VALUE,makeRandomEmptyValTimeOut(),TimeUnit.SECONDS);
} finally {
protectLock.unlock();
}
return null;
}
没有方案能保证不损失性能的前提下,完全保证缓存和数据库的一致性,写多读多,又对一致性要求高的场景,不要使用缓存。
七、分布式锁的优化
为了解决热点缓存重建
和缓存一致性问题
,查询商品和修改商品的逻辑都加上了分布式锁,特别是查询商品加入了两把分布式锁。分布式锁的特性是set NX命令的互斥,也就是会导致所有的查询操作都会串行化。对于系统性能还是存在一定的损失的。
如果要进行优化,可以从以下的方面入手:
- 降低锁的粒度,锁定的代码范围尽可能小
- 采取分段锁的设计思想,将key进行拆分。
- 使用Redisson提供的
读写锁
,进行读写分离。 - 利用tryLock带有超时时间的API,一段时间获取不到锁就执行其他的操作。
上面的案例中,为了解决缓存一致性问题
而加入的锁,是可以用读写锁
进行优化的:(当然缓存重建的最外层还有一把分布式锁,保证了缓存重建的过程只能有一个线程同时执行,不会存在多个线程同时进入缓存重建代码,获取读锁不互斥的场景,仅仅作为演示读写分离的思想。)
/**
* 第一版改动,增加缓存到期时间
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,更新数据库和缓存的代码,使用重建缓存同一分布式锁保护。
* 第四版改动,第三版的锁改为读写锁
* @param product
* @return
*/
@Transactional
public Product update(Product product) {
//申请写锁,写-写,写-读之间互斥
RReadWriteLock protectReadWriteLock = redisson.getReadWriteLock("PROTECT_REBUILD_HOT_CACHE" + product.getId());
RLock protectRLock = protectReadWriteLock.writeLock();
Product productResult;
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
makeRandomTimeOut(), TimeUnit.SECONDS);
} finally {
protectRLock.unlock();
}
return productResult;
}
/**
* 缓存重建
* @param productId
* @param productCacheKey
* @return
*/
private Product rebuildHotCache(Long productId,String productCacheKey){
//再次从缓存中查
Product product = queryFromCache(productCacheKey);
//能查询到就直接返回,
if (product != null){
return product;
}
//加入读锁,读锁和读锁之间不互斥
RReadWriteLock protectReadWriteLock = redisson.getReadWriteLock("PROTECT_REBUILD_HOT_CACHE" + productId);
RLock protectRLock = protectReadWriteLock.readLock();
protectRLock.lock();
try {
//缓存中不存在,查询数据库
product = productDao.get(productId);
//数据库不为空,存入缓存,返回
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),makeRandomTimeOut(), TimeUnit.SECONDS);
return product;
}
//数据库为空,则缓存空值,带上过期时间
redisUtil.set(productCacheKey,EMPTY_CACHE_VALUE,makeRandomEmptyValTimeOut(),TimeUnit.SECONDS);
} finally {
protectRLock.unlock();
}
return null;
}
防止重建缓存
并发访问数据库的锁,可以通过设置tryLock过期时间优化,利用tryLock的机制,假设通过实验得知,一般情况下执行查询数据库的操作最多需要5s,就可以设置5s的超时时间,第一个线程竞争到了锁,在5s内执行查询数据库,重建缓存的操作,其他线程阻塞5s没有获取到锁,并且没有人为的通过lock.tryLock
的boolean返回值进行限制,则其他线程可以并发地执行后续的代码,查询缓存并且返回。
当然这样做是存在风险的,假如第一个线程在超时时间内没有完成查询数据库,重建缓存的逻辑,那么后续所有的线程依旧会并发访问数据库。
/**
* 第一版改动,增加缓存到期时间,缓存续期
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,解决缓存穿透,缓存空值
* 第四版改动,设置过期时间 + 续期,避免Redis中存在大量空值。
* 第五版改动,加分布式锁,只有一个线程可以去查询数据库重建缓存
* 第六版改动,重建缓存的代码用另一把分布式锁保护,防止其他线程修改数据库和缓存中的值。
* 第七版改动,重建缓存的代码使用读写锁优化
* 第八版改动,重建缓存的分布式锁加入超时时间
* @param productId
* @return
*/
public Product get(Long productId) throws InterruptedException {
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//从缓存中查
Product product = queryFromCache(productCacheKey);
//能查询到就直接返回
if (product != null){
return product;
}
//获取重建缓存的分布式锁
RLock lock = redisson.getLock(REBUILD_HOT_CACHE + productId);
//假设数据库的操作最多需要5s
lock.tryLock(5,TimeUnit.SECONDS);
try {
product = rebuildHotCache(productId, productCacheKey);
} finally {
lock.unlock();
}
return product;
}
八、解决缓存雪崩
缓存虽然扛并发的能力要优于数据库,但是在极端高并发的场景下,Redis单级缓存依旧是可能存在支撑不住的情况,导致大量的请求依旧会将数据库击垮,发生服务的崩溃,这种现象称为缓存雪崩
。
解决缓存雪崩,可以通过Redis的主从-哨兵
架构,或集群部署
,将访问压力进行分担。还可以在服务层面利用中间件进行流控。当然还可以引入多级缓存
。JDK自带的Map本身就可以作为缓存,也可以引入Caffeine
等第三方组件,如果是微服务架构,多点部署的情况下,本地缓存还要保证服务实例间的一致性。
九、最终案例
@Service
public class ProductServiceTest {
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Autowired
private Redisson redisson;
/**
* 缓存过期时间,默认单位:秒
*/
private final Integer CACHE_EXPIRE_TIMEOUT = 24 * 60 * 60;
/**
* 缓存空值
*/
private final String EMPTY_CACHE_VALUE = "{}";
/**
* 重建热点缓存分布式锁key
*/
private final String REBUILD_HOT_CACHE = "rebuild:hot_cache:";
/**
* 对于重建热点缓存,用分布式锁保护,防止数据库和缓存不一致
*/
private final String PROTECT_REBUILD_HOT_CACHE = "protect:rebuild:hot_cache:";
private Map<String, Product> localCache = new ConcurrentHashMap<>();
/**
* 第一版改动,增加缓存到期时间
* 第二版改动,过期时间随机生成,解决缓存击穿
*
* @param product
* @return
*/
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
makeRandomTimeOut(), TimeUnit.SECONDS);
return productResult;
}
/**
* 第一版改动,增加缓存到期时间
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,更新数据库和缓存的代码,使用重建缓存同一分布式锁保护。
* 第四版改动,第三版的锁改为读写锁
* 第五版改动,加入本地缓存
*
* @param product
* @return
*/
@Transactional
public Product update(Product product) {
//申请写锁,写-写,写-读之间互斥
RReadWriteLock protectReadWriteLock = redisson.getReadWriteLock("PROTECT_REBUILD_HOT_CACHE" + product.getId());
RLock protectRLock = protectReadWriteLock.writeLock();
Product productResult;
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
makeRandomTimeOut(), TimeUnit.SECONDS);
localCache.put(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), productResult);
} finally {
protectRLock.unlock();
}
return productResult;
}
/**
* 第一版改动,增加缓存到期时间,缓存续期
* 第二版改动,过期时间随机生成,解决缓存击穿
* 第三版改动,解决缓存穿透,缓存空值
* 第四版改动,设置过期时间 + 续期,避免Redis中存在大量空值。
* 第五版改动,加分布式锁,只有一个线程可以去查询数据库重建缓存
* 第六版改动,重建缓存的代码用另一把分布式锁保护,防止其他线程修改数据库和缓存中的值。
* 第七版改动,重建缓存的代码使用读写锁优化
* 第八版改动,重建缓存的分布式锁加入超时时间
* 第九版改动,加入本地缓存
*
* @param productId
* @return
*/
public Product get(Long productId) throws InterruptedException {
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//从缓存中查
Product product = queryFromCache(productCacheKey);
//能查询到就直接返回
if (product != null) {
return product;
}
//获取重建缓存的分布式锁
RLock lock = redisson.getLock(REBUILD_HOT_CACHE + productId);
//假设数据库的操作最多需要5s
lock.tryLock(5, TimeUnit.SECONDS);
try {
product = rebuildHotCache(productId, productCacheKey);
} finally {
lock.unlock();
}
return product;
}
/**
* 缓存重建
*
* @param productId
* @param productCacheKey
* @return
*/
private Product rebuildHotCache(Long productId, String productCacheKey) {
//再次从缓存中查
Product product = queryFromCache(productCacheKey);
//能查询到就直接返回,
if (product != null) {
return product;
}
//加入读锁,读锁和读锁之间不互斥
RReadWriteLock protectReadWriteLock = redisson.getReadWriteLock("PROTECT_REBUILD_HOT_CACHE" + productId);
RLock protectRLock = protectReadWriteLock.readLock();
protectRLock.lock();
try {
//缓存中不存在,查询数据库
product = productDao.get(productId);
//数据库不为空,存入缓存,返回
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), makeRandomTimeOut(), TimeUnit.SECONDS);
//存入二级缓存
localCache.put(productCacheKey,product);
return product;
}
//数据库为空,则缓存空值,带上过期时间
redisUtil.set(productCacheKey, EMPTY_CACHE_VALUE, makeRandomEmptyValTimeOut(), TimeUnit.SECONDS);
} finally {
protectRLock.unlock();
}
return null;
}
/**
* 从缓存中查
*
* @param productCacheKey
* @return
*/
private Product queryFromCache(String productCacheKey) {
Product product = null;
product = localCache.get(productCacheKey);
//查询二级缓存
if (product != null) {
return product;
}
//查询缓存
String productStr = redisUtil.get(productCacheKey);
//缓存中存在,续期并返回
if (!StringUtils.isEmpty(productStr)) {
//这里需要区分productStr是否为空缓存标识
if (productStr.equals(EMPTY_CACHE_VALUE)) {
//续期,防止攻击持续
redisUtil.expire(productCacheKey, makeRandomEmptyValTimeOut(), TimeUnit.SECONDS);
//如果这里返回null,无法区分缓存中真的是null,要去查数据库,还是应对缓存穿透,无需查询数据库
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey, makeRandomTimeOut(), TimeUnit.SECONDS);
return product;
}
return null;
}
public Integer makeRandomTimeOut() {
return CACHE_EXPIRE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
public Integer makeRandomEmptyValTimeOut() {
return 60 + new Random().nextInt(30);
}
}