整合方案设计思路
- 封装一个高级缓存服务 (AdvancedCacheService):将所有逻辑(查询、空值缓存、锁、过期策略)封装在内,对上游业务代码透明。
- 使用分布式锁:解决缓存击穿,确保只有一个请求重建缓存。
- 缓存空值:解决缓存穿透,并设置较短的过期时间。
- 差异化过期时间:解决缓存雪崩,为基础过期时间添加随机扰动。
- 降级策略:当获取锁失败或数据库查询异常时,具备降级能力(如返回旧值或null)。
①、分布式锁服务
@Service
@RequiredArgsConstructor
public class RedisDistributedLock {
private final RedisTemplate<String, Object> redisTemplate;
/**
* 尝试获取分布式锁
*
* @param lockKey 锁的Key
* @param requestId 请求标识(可用UUID)
* @param expireTime 锁的持有时间
* @param timeUnit 时间单位
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit) {
// 使用SET命令:NX表示不存在时才设置,PX设置过期时间(毫秒)
return Boolean.TRUE.equals(
redisTemplate.opsForValue().setIfAbsent(
lockKey,
requestId,
expireTime,
timeUnit
)
);
}
/**
* 释放分布式锁(使用Lua脚本保证原子性)
*
* @param lockKey 锁的Key
* @param requestId 请求标识
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey, String requestId) {
// Lua脚本:判断锁的值是否还是自己的,是则删除
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey),
requestId
);
return result != null && result == 1;
}
/**
* 生成一个锁请求ID
*/
public String generateRequestId() {
return UUID.randomUUID().toString();
}
}
②、高级缓存服务(整合三大问题的解决方案)
@Slf4j
@Service
@RequiredArgsConstructor
public class AdvancedCacheService {
private final RedisTemplate<String, Object> redisTemplate;
private final RedisDistributedLock distributedLock;
// 空值占位符,用于解决缓存穿透
private static final String NULL_PLACEHOLDER = "__NULL__";
// 默认锁过期时间(防止死锁)
private static final long DEFAULT_LOCK_EXPIRE_TIME = 10L;
// 默认缓存基础过期时间
private static final long DEFAULT_CACHE_EXPIRE_TIME = 3600L;
// 空值缓存过期时间(较短)
private static final long NULL_CACHE_EXPIRE_TIME = 300L;
// 缓存过期时间随机上限(解决雪崩,5分钟)
private static final long RANDOM_EXPIRE_BOUND = 300L;
/**
* 获取或计算缓存值(整合三大问题解决方案的核心方法)
*
* @param key 缓存键
* @param valueLoader 缓存未命中时的数据加载器(例如:从数据库查询的方法)
* @param clazz 返回值类型
* @param <T> 泛型类型
* @return 缓存的值或计算后的值
*/
public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz) {
return getOrCalculate(key, valueLoader, clazz, DEFAULT_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
}
/**
* 重载方法,可自定义过期时间
*/
public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz,
long baseExpireTime, TimeUnit timeUnit) {
// 1. 首先尝试从缓存读取
Object cachedValue = redisTemplate.opsForValue().get(key);
// 2. 缓存命中
if (cachedValue != null) {
// 2.1 如果命中空值占位符,返回null(解决穿透)
if (NULL_PLACEHOLDER.equals(cachedValue)) {
log.debug("Cache hit null placeholder for key: {}", key);
return null;
}
// 2.2 命中有效值,直接返回
log.debug("Cache hit for key: {}", key);
return clazz.cast(cachedValue);
}
// 3. 缓存未命中,准备解决【缓存击穿】问题
String lockKey = "lock:" + key; // 锁的Key
String requestId = distributedLock.generateRequestId();
T value;
try {
// 3.1 尝试获取分布式锁
boolean isLockAcquired = distributedLock.tryLock(lockKey, requestId,
DEFAULT_LOCK_EXPIRE_TIME, TimeUnit.SECONDS);
if (!isLockAcquired) {
// 3.2 获取锁失败,说明有其他线程正在重建缓存
log.warn("Failed to acquire lock for key: {}, waiting and retrying...", key);
// 策略:短暂休眠后重试,避免恶性循环(这里重试一次,也可实现更复杂的重试逻辑)
try {
Thread.sleep(100); // 休眠100毫秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 重试一次缓存获取
Object retryValue = redisTemplate.opsForValue().get(key);
if (retryValue != null) {
return NULL_PLACEHOLDER.equals(retryValue) ? null : clazz.cast(retryValue);
}
// 如果重试仍未获取到,可返回null或抛出异常(根据业务降级策略)
log.error("Cache breakdown occurred for key: {}, after retry still failed.", key);
return null; // 降级策略:直接返回null
}
// 3.3 获取锁成功,再次检查缓存(Double Check Locking 模式)
log.info("Lock acquired for key: {}, recalculating...", key);
cachedValue = redisTemplate.opsForValue().get(key);
if (cachedValue != null) {
return NULL_PLACEHOLDER.equals(cachedValue) ? null : clazz.cast(cachedValue);
}
// 4. 【缓存穿透】保护:执行数据加载器(如查询数据库)
try {
value = valueLoader.get();
} catch (Exception e) {
log.error("Error executing value loader for key: {}", key, e);
// 如果数据库查询异常,可以设置一个极短的空值缓存,防止持续穿透
setCache(key, NULL_PLACEHOLDER, 30L, TimeUnit.SECONDS);
throw e; // 或者返回null,根据业务决定
}
// 5. 写入缓存
if (value == null) {
// 5.1 数据库查询结果为空,缓存空值(解决穿透)
setCache(key, NULL_PLACEHOLDER, NULL_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
} else {
// 5.2 数据库有值,写入缓存并设置【差异化过期时间】(解决雪崩)
long finalExpireTime = baseExpireTime +
ThreadLocalRandom.current().nextLong(0, RANDOM_EXPIRE_BOUND);
setCache(key, value, finalExpireTime, timeUnit);
}
return value;
} finally {
// 6. 无论如何,释放锁
if (requestId != null) {
distributedLock.releaseLock(lockKey, requestId);
}
}
}
/**
* 设置缓存值,统一入口
*/
private void setCache(String key, Object value, long expireTime, TimeUnit timeUnit) {
try {
redisTemplate.opsForValue().set(key, value, expireTime, timeUnit);
} catch (Exception e) {
log.error("Failed to set cache for key: {}", key, e);
// 缓存写入失败不应影响主流程,只记录日志
}
}
/**
* 删除缓存(用于数据更新时)
*/
public void evict(String key) {
try {
redisTemplate.delete(key);
} catch (Exception e) {
log.error("Failed to evict cache for key: {}", key, e);
}
}
}
③、业务中使用
@Service
@RequiredArgsConstructor
public class ProductService {
private final ProductMapper productMapper;
private final AdvancedCacheService advancedCacheService;
public Product getProductById(Long id) {
String cacheKey = "product:" + id;
// 使用高级缓存服务,只需关注业务逻辑(数据加载器)
return advancedCacheService.getOrCalculate(
cacheKey,
() -> {
// 这个Supplier只在缓存未命中且成功获取锁后执行
log.info("Querying product from database, id: {}", id);
return productMapper.selectById(id);
},
Product.class,
1L, TimeUnit.HOURS // 基础过期时间1小时
);
}
public void updateProduct(Product product) {
// 1. 更新数据库
productMapper.updateById(product);
// 2. 删除缓存(后续查询会自动重建)
String cacheKey = "product:" + product.getId();
advancedCacheService.evict(cacheKey);
}
public void deleteProduct(Long id) {
// 1. 删除数据库记录
productMapper.deleteById(id);
// 2. 删除缓存
String cacheKey = "product:" + id;
advancedCacheService.evict(cacheKey);
}
}
对上述业务进行监控
①、集成监控依赖
<!-- Micrometer 核心 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<!-- Micrometer Prometheus 注册中心 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Actuator (提供/metrics端点) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
management:
endpoints:
web:
exposure:
include: health, info, metrics, prometheus
metrics:
tags:
application: ${spring.application.name} # 为所有指标添加应用标签
distribution:
percentiles-histogram:
"[spring.data.redis.used.memory]": true # 开启直方图
②、改造高级缓存服务,集成指标收集
@Slf4j
@Service
@RequiredArgsConstructor
public class AdvancedCacheService {
private final RedisTemplate<String, Object> redisTemplate;
private final RedisDistributedLock distributedLock;
private final MeterRegistry meterRegistry; // Micrometer 指标注册中心
// ... (之前的常量定义保持不变) ...
// 定义指标Tag常量
private static final String TAG_CACHE_NAME = "cache.name";
private static final String TAG_OPERATION = "operation";
private static final String TAG_RESULT = "result";
private static final String TAG_HIT = "hit";
private static final String TAG_MISS = "miss";
private static final String TAG_SUCCESS = "success";
private static final String TAG_FAILURE = "failure";
// 定义指标名称
private static final String METRIC_CACHE_OPERATION = "cache.operation";
private static final String METRIC_CACHE_HIT = "cache.hit";
private static final String METRIC_CACHE_LOAD_TIME = "cache.load.time";
private static final String METRIC_LOCK_ACQUIRE_TIME = "cache.lock.acquire.time";
private static final String METRIC_LOCK_HOLD_TIME = "cache.lock.hold.time";
private static final String METRIC_DB_QUERY = "cache.db.query";
/**
* 获取或计算缓存值(集成监控版本)
*/
public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz,
long baseExpireTime, TimeUnit timeUnit) {
String cacheName = extractCacheName(key); // 从key中提取缓存名,如"product"
Timer.Sample totalTimer = Timer.start(meterRegistry); // 开始总耗时计时
try {
Object cachedValue = redisTemplate.opsForValue().get(key);
if (cachedValue != null) {
if (NULL_PLACEHOLDER.equals(cachedValue)) {
// 记录命中空值
recordCacheHit(cacheName, "null");
return null;
}
// 记录命中有效值
recordCacheHit(cacheName, "hit");
return clazz.cast(cachedValue);
}
// 记录未命中
recordCacheHit(cacheName, "miss");
String lockKey = "lock:" + key;
String requestId = distributedLock.generateRequestId();
T value;
long lockWaitStartTime = System.currentTimeMillis();
try {
boolean isLockAcquired = distributedLock.tryLock(lockKey, requestId,
DEFAULT_LOCK_EXPIRE_TIME, TimeUnit.SECONDS);
long lockAcquireTime = System.currentTimeMillis() - lockWaitStartTime;
// 记录锁等待时间
recordLockAcquireTime(cacheName, lockAcquireTime, isLockAcquired);
if (!isLockAcquired) {
log.warn("Failed to acquire lock for key: {}", key);
// 记录锁竞争失败
recordLockCompetition(cacheName, false);
// ... (重试逻辑保持不变) ...
return null;
}
// 记录锁竞争成功
recordLockCompetition(cacheName, true);
// Double Check
cachedValue = redisTemplate.opsForValue().get(key);
if (cachedValue != null) {
return NULL_PLACEHOLDER.equals(cachedValue) ? null : clazz.cast(cachedValue);
}
long loadStartTime = System.currentTimeMillis();
try {
value = valueLoader.get();
// 记录数据库查询成功
recordDbQuery(cacheName, true, System.currentTimeMillis() - loadStartTime);
} catch (Exception e) {
// 记录数据库查询失败
recordDbQuery(cacheName, false, System.currentTimeMillis() - loadStartTime);
setCache(key, NULL_PLACEHOLDER, 30L, TimeUnit.SECONDS);
throw e;
}
if (value == null) {
setCache(key, NULL_PLACEHOLDER, NULL_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
} else {
long finalExpireTime = baseExpireTime +
ThreadLocalRandom.current().nextLong(0, RANDOM_EXPIRE_BOUND);
setCache(key, value, finalExpireTime, timeUnit);
}
return value;
} finally {
if (requestId != null) {
long lockHoldTime = System.currentTimeMillis() - lockWaitStartTime;
// 记录锁持有时间
recordLockHoldTime(cacheName, lockHoldTime);
distributedLock.releaseLock(lockKey, requestId);
}
}
} finally {
// 记录总操作耗时
totalTimer.stop(Timer.builder(METRIC_CACHE_OPERATION)
.tags(TAG_CACHE_NAME, cacheName, TAG_OPERATION, "getOrCalculate")
.register(meterRegistry));
}
}
// --- 监控指标记录方法 ---
private void recordCacheHit(String cacheName, String result) {
Counter.builder(METRIC_CACHE_HIT)
.tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, result)
.register(meterRegistry)
.increment();
}
private void recordLockAcquireTime(String cacheName, long millis, boolean success) {
Timer.builder(METRIC_LOCK_ACQUIRE_TIME)
.tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE)
.register(meterRegistry)
.record(millis, TimeUnit.MILLISECONDS);
}
private void recordLockHoldTime(String cacheName, long millis) {
Timer.builder(METRIC_LOCK_HOLD_TIME)
.tags(TAG_CACHE_NAME, cacheName)
.register(meterRegistry)
.record(millis, TimeUnit.MILLISECONDS);
}
private void recordLockCompetition(String cacheName, boolean success) {
Counter.builder("cache.lock.competition")
.tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE)
.register(meterRegistry)
.increment();
}
private void recordDbQuery(String cacheName, boolean success, long durationMs) {
Timer.builder(METRIC_DB_QUERY)
.tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE)
.register(meterRegistry)
.record(durationMs, TimeUnit.MILLISECONDS);
}
private String extractCacheName(String key) {
// 简单实现:从"product:123"中提取"product"
int index = key.indexOf(":");
return index > 0 ? key.substring(0, index) : "unknown";
}
// ... setCache 和 evict 方法也需要添加类似的监控 ...
}
③、配置Prometheus和Grafana
prometheus.yml 配置
scrape_configs:
- job_name: 'springboot-cache-app'
metrics_path: '/actuator/prometheus'
scrape_interval: 15s # 抓取间隔
static_configs:
- targets: ['your-app-host:8080'] # 应用服务器地址
labels:
application: 'user-service-cache'
Grafana 仪表盘JSON
创建一个全面的缓存监控看板,包含以下面板:
Formula: (sum(rate(cache_hit_total{cache_name="$cache_name", result="hit"}[5m])) + sum(rate(cache_hit_total{cache_name="$cache_name", result="null"}[5m]))) / sum(rate(cache_hit_total{cache_name="$cache_name"}[5m])) * 100
- QPS & 延迟面板: 显示缓存操作QPS和P99/P95延迟
- 锁竞争面板: 显示锁等待时间、锁竞争成功率
- 数据库查询面板: 显示缓存触发的数据库查询次数和延迟
- Redis内存面板: 显示Redis内存使用情况(需配置Redis Exporter)
④、设置告警规则 (prometheus-rules.yml)
groups:
- name: cache-alerts
rules:
- alert: CacheHitRateLow
expr: (sum(rate(cache_hit_total{result="hit"}[5m])) by (cache_name) + sum(rate(cache_hit_total{result="null"}[5m])) by (cache_name)) / sum(rate(cache_hit_total[5m])) by (cache_name) * 100 < 80
for: 5m
labels:
severity: warning
annotations:
summary: "缓存命中率过低 (instance {{ $labels.instance }})"
description: "缓存 {{ $labels.cache_name }} 命中率低于80%,当前为 {{ $value }}%"
- alert: LockCompetitionHigh
expr: rate(cache_lock_competition_total[5m]) > 10
for: 2m
labels:
severity: warning
annotations:
summary: "缓存锁竞争激烈 (instance {{ $labels.instance }})"
description: "缓存 {{ $labels.cache_name }} 锁竞争频率过高,可能存在热点Key或锁过期时间设置不合理"
- alert: DbQueryRateHigh
expr: rate(cache_db_query_seconds_count[5m]) > 50
for: 2m
labels:
severity: warning
annotations:
summary: "缓存层数据库查询过高 (instance {{ $labels.instance }})"
description: "缓存 {{ $labels.cache_name }} 触发的数据库查询频率过高,可能大量缓存未命中"
- alert: LockAcquireSlow
expr: histogram_quantile(0.95, rate(cache_lock_acquire_time_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "分布式锁获取缓慢 (instance {{ $labels.instance }})"
description: "缓存锁获取P95延迟超过500ms,当前为 {{ $value }}s"
⑤、持续调优策略
根据监控数据,实施以下调优策略:
- 调优缓存命中率
- 问题: 命中率 < 80%
行动:
- 分析 cache.hit 指标,找出命中率低的缓存名
- 检查该缓存的过期时间是否过短:increase(cache_operation_seconds_count{cache_name=“product”,
operation=“getOrCalculate”}[1h]) /
increase(cache_db_query_seconds_count{cache_name=“product”}[1h]) - 优化缓存Key设计,确保热点数据被正确缓存
- 考虑增加本地缓存(Caffeine)作为一级缓存
- 调优锁竞争
- 问题: 锁竞争频率高或等待时间长
行动:
- 分析 cache.lock.competition 和 cache.lock.acquire.time
- 对于极热点数据,使用逻辑过期而非互斥锁:在缓存值中存储逻辑过期时间,异步更新
- 调整锁过期时间 DEFAULT_LOCK_EXPIRE_TIME,避免持有时间过长或过短
- 实现锁获取的退避策略(如指数退避)
- 调优数据库负载
问题: cache.db.query 频率过高
行动:
- 优化 valueLoader 中的SQL查询,添加数据库索引
- 对批量查询实现缓存预热
- 考虑使用 Read-Through 模式,由缓存层统一管理数据加载
- 调优Redis内存
问题: 内存使用率 > 80%
行动:
- 分析大Key:redis-cli --bigkeys
- 优化数据结构:使用Hash代替多个String存储对象属性
- 设置适当的内存淘汰策略:maxmemory-policy allkeys-lru
- 考虑分片或集群部署