缓存工具服务(封装缓存击穿+缓存穿透+缓存雪崩)

发布于:2025-09-03 ⋅ 阅读:(16) ⋅ 点赞:(0)

整合方案设计思路

  • 封装一个高级缓存服务 (AdvancedCacheService):将所有逻辑(查询、空值缓存、锁、过期策略)封装在内,对上游业务代码透明。
  • 使用分布式锁:解决缓存击穿,确保只有一个请求重建缓存。
  • 缓存空值:解决缓存穿透,并设置较短的过期时间。
  • 差异化过期时间:解决缓存雪崩,为基础过期时间添加随机扰动。
  • 降级策略:当获取锁失败或数据库查询异常时,具备降级能力(如返回旧值或null)。

①、分布式锁服务

@Service
@RequiredArgsConstructor
public class RedisDistributedLock {

    private final RedisTemplate<String, Object> redisTemplate;

    /**
     * 尝试获取分布式锁
     *
     * @param lockKey     锁的Key
     * @param requestId   请求标识(可用UUID)
     * @param expireTime  锁的持有时间
     * @param timeUnit    时间单位
     * @return 是否获取成功
     */
    public boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit) {
        // 使用SET命令:NX表示不存在时才设置,PX设置过期时间(毫秒)
        return Boolean.TRUE.equals(
                redisTemplate.opsForValue().setIfAbsent(
                        lockKey,
                        requestId,
                        expireTime,
                        timeUnit
                )
        );
    }

    /**
     * 释放分布式锁(使用Lua脚本保证原子性)
     *
     * @param lockKey   锁的Key
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public boolean releaseLock(String lockKey, String requestId) {
        // Lua脚本:判断锁的值是否还是自己的,是则删除
        String luaScript =
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                        "return redis.call('del', KEYS[1]) " +
                        "else " +
                        "return 0 " +
                        "end";
        Long result = redisTemplate.execute(
                new DefaultRedisScript<>(luaScript, Long.class),
                Collections.singletonList(lockKey),
                requestId
        );
        return result != null && result == 1;
    }

    /**
     * 生成一个锁请求ID
     */
    public String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}

②、高级缓存服务(整合三大问题的解决方案)

@Slf4j
@Service
@RequiredArgsConstructor
public class AdvancedCacheService {

    private final RedisTemplate<String, Object> redisTemplate;
    private final RedisDistributedLock distributedLock;

    // 空值占位符,用于解决缓存穿透
    private static final String NULL_PLACEHOLDER = "__NULL__";
    // 默认锁过期时间(防止死锁)
    private static final long DEFAULT_LOCK_EXPIRE_TIME = 10L;
    // 默认缓存基础过期时间
    private static final long DEFAULT_CACHE_EXPIRE_TIME = 3600L;
    // 空值缓存过期时间(较短)
    private static final long NULL_CACHE_EXPIRE_TIME = 300L;
    // 缓存过期时间随机上限(解决雪崩,5分钟)
    private static final long RANDOM_EXPIRE_BOUND = 300L;

    /**
     * 获取或计算缓存值(整合三大问题解决方案的核心方法)
     *
     * @param key         缓存键
     * @param valueLoader 缓存未命中时的数据加载器(例如:从数据库查询的方法)
     * @param clazz       返回值类型
     * @param <T>         泛型类型
     * @return 缓存的值或计算后的值
     */
    public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz) {
        return getOrCalculate(key, valueLoader, clazz, DEFAULT_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
    }

    /**
     * 重载方法,可自定义过期时间
     */
    public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz, 
                               long baseExpireTime, TimeUnit timeUnit) {
        // 1. 首先尝试从缓存读取
        Object cachedValue = redisTemplate.opsForValue().get(key);
        
        // 2. 缓存命中
        if (cachedValue != null) {
            // 2.1 如果命中空值占位符,返回null(解决穿透)
            if (NULL_PLACEHOLDER.equals(cachedValue)) {
                log.debug("Cache hit null placeholder for key: {}", key);
                return null;
            }
            // 2.2 命中有效值,直接返回
            log.debug("Cache hit for key: {}", key);
            return clazz.cast(cachedValue);
        }

        // 3. 缓存未命中,准备解决【缓存击穿】问题
        String lockKey = "lock:" + key; // 锁的Key
        String requestId = distributedLock.generateRequestId();
        T value;

        try {
            // 3.1 尝试获取分布式锁
            boolean isLockAcquired = distributedLock.tryLock(lockKey, requestId, 
                                                            DEFAULT_LOCK_EXPIRE_TIME, TimeUnit.SECONDS);
            if (!isLockAcquired) {
                // 3.2 获取锁失败,说明有其他线程正在重建缓存
                log.warn("Failed to acquire lock for key: {}, waiting and retrying...", key);
                // 策略:短暂休眠后重试,避免恶性循环(这里重试一次,也可实现更复杂的重试逻辑)
                try {
                    Thread.sleep(100); // 休眠100毫秒
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                // 重试一次缓存获取
                Object retryValue = redisTemplate.opsForValue().get(key);
                if (retryValue != null) {
                    return NULL_PLACEHOLDER.equals(retryValue) ? null : clazz.cast(retryValue);
                }
                // 如果重试仍未获取到,可返回null或抛出异常(根据业务降级策略)
                log.error("Cache breakdown occurred for key: {}, after retry still failed.", key);
                return null; // 降级策略:直接返回null
            }

            // 3.3 获取锁成功,再次检查缓存(Double Check Locking 模式)
            log.info("Lock acquired for key: {}, recalculating...", key);
            cachedValue = redisTemplate.opsForValue().get(key);
            if (cachedValue != null) {
                return NULL_PLACEHOLDER.equals(cachedValue) ? null : clazz.cast(cachedValue);
            }

            // 4. 【缓存穿透】保护:执行数据加载器(如查询数据库)
            try {
                value = valueLoader.get();
            } catch (Exception e) {
                log.error("Error executing value loader for key: {}", key, e);
                // 如果数据库查询异常,可以设置一个极短的空值缓存,防止持续穿透
                setCache(key, NULL_PLACEHOLDER, 30L, TimeUnit.SECONDS);
                throw e; // 或者返回null,根据业务决定
            }

            // 5. 写入缓存
            if (value == null) {
                // 5.1 数据库查询结果为空,缓存空值(解决穿透)
                setCache(key, NULL_PLACEHOLDER, NULL_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
            } else {
                // 5.2 数据库有值,写入缓存并设置【差异化过期时间】(解决雪崩)
                long finalExpireTime = baseExpireTime + 
                                      ThreadLocalRandom.current().nextLong(0, RANDOM_EXPIRE_BOUND);
                setCache(key, value, finalExpireTime, timeUnit);
            }
            return value;

        } finally {
            // 6. 无论如何,释放锁
            if (requestId != null) {
                distributedLock.releaseLock(lockKey, requestId);
            }
        }
    }

    /**
     * 设置缓存值,统一入口
     */
    private void setCache(String key, Object value, long expireTime, TimeUnit timeUnit) {
        try {
            redisTemplate.opsForValue().set(key, value, expireTime, timeUnit);
        } catch (Exception e) {
            log.error("Failed to set cache for key: {}", key, e);
            // 缓存写入失败不应影响主流程,只记录日志
        }
    }

    /**
     * 删除缓存(用于数据更新时)
     */
    public void evict(String key) {
        try {
            redisTemplate.delete(key);
        } catch (Exception e) {
            log.error("Failed to evict cache for key: {}", key, e);
        }
    }
}

③、业务中使用

@Service
@RequiredArgsConstructor
public class ProductService {

    private final ProductMapper productMapper;
    private final AdvancedCacheService advancedCacheService;

    public Product getProductById(Long id) {
        String cacheKey = "product:" + id;

        // 使用高级缓存服务,只需关注业务逻辑(数据加载器)
        return advancedCacheService.getOrCalculate(
                cacheKey,
                () -> {
                    // 这个Supplier只在缓存未命中且成功获取锁后执行
                    log.info("Querying product from database, id: {}", id);
                    return productMapper.selectById(id);
                },
                Product.class,
                1L, TimeUnit.HOURS // 基础过期时间1小时
        );
    }

    public void updateProduct(Product product) {
        // 1. 更新数据库
        productMapper.updateById(product);
        // 2. 删除缓存(后续查询会自动重建)
        String cacheKey = "product:" + product.getId();
        advancedCacheService.evict(cacheKey);
    }

    public void deleteProduct(Long id) {
        // 1. 删除数据库记录
        productMapper.deleteById(id);
        // 2. 删除缓存
        String cacheKey = "product:" + id;
        advancedCacheService.evict(cacheKey);
    }
}

对上述业务进行监控

①、集成监控依赖

<!-- Micrometer 核心 -->
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-core</artifactId>
</dependency>
<!-- Micrometer Prometheus 注册中心 -->
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Actuator (提供/metrics端点) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
management:
  endpoints:
    web:
      exposure:
        include: health, info, metrics, prometheus
  metrics:
    tags:
      application: ${spring.application.name} # 为所有指标添加应用标签
    distribution:
      percentiles-histogram:
        "[spring.data.redis.used.memory]": true # 开启直方图

②、改造高级缓存服务,集成指标收集

@Slf4j
@Service
@RequiredArgsConstructor
public class AdvancedCacheService {

    private final RedisTemplate<String, Object> redisTemplate;
    private final RedisDistributedLock distributedLock;
    private final MeterRegistry meterRegistry; // Micrometer 指标注册中心

    // ... (之前的常量定义保持不变) ...

    // 定义指标Tag常量
    private static final String TAG_CACHE_NAME = "cache.name";
    private static final String TAG_OPERATION = "operation";
    private static final String TAG_RESULT = "result";
    private static final String TAG_HIT = "hit";
    private static final String TAG_MISS = "miss";
    private static final String TAG_SUCCESS = "success";
    private static final String TAG_FAILURE = "failure";

    // 定义指标名称
    private static final String METRIC_CACHE_OPERATION = "cache.operation";
    private static final String METRIC_CACHE_HIT = "cache.hit";
    private static final String METRIC_CACHE_LOAD_TIME = "cache.load.time";
    private static final String METRIC_LOCK_ACQUIRE_TIME = "cache.lock.acquire.time";
    private static final String METRIC_LOCK_HOLD_TIME = "cache.lock.hold.time";
    private static final String METRIC_DB_QUERY = "cache.db.query";

    /**
     * 获取或计算缓存值(集成监控版本)
     */
    public <T> T getOrCalculate(String key, Supplier<T> valueLoader, Class<T> clazz, 
                               long baseExpireTime, TimeUnit timeUnit) {
        
        String cacheName = extractCacheName(key); // 从key中提取缓存名,如"product"
        Timer.Sample totalTimer = Timer.start(meterRegistry); // 开始总耗时计时

        try {
            Object cachedValue = redisTemplate.opsForValue().get(key);
            
            if (cachedValue != null) {
                if (NULL_PLACEHOLDER.equals(cachedValue)) {
                    // 记录命中空值
                    recordCacheHit(cacheName, "null");
                    return null;
                }
                // 记录命中有效值
                recordCacheHit(cacheName, "hit");
                return clazz.cast(cachedValue);
            }
            // 记录未命中
            recordCacheHit(cacheName, "miss");

            String lockKey = "lock:" + key;
            String requestId = distributedLock.generateRequestId();
            T value;
            long lockWaitStartTime = System.currentTimeMillis();

            try {
                boolean isLockAcquired = distributedLock.tryLock(lockKey, requestId, 
                                                                DEFAULT_LOCK_EXPIRE_TIME, TimeUnit.SECONDS);
                long lockAcquireTime = System.currentTimeMillis() - lockWaitStartTime;
                // 记录锁等待时间
                recordLockAcquireTime(cacheName, lockAcquireTime, isLockAcquired);

                if (!isLockAcquired) {
                    log.warn("Failed to acquire lock for key: {}", key);
                    // 记录锁竞争失败
                    recordLockCompetition(cacheName, false);
                    // ... (重试逻辑保持不变) ...
                    return null;
                }
                // 记录锁竞争成功
                recordLockCompetition(cacheName, true);

                // Double Check
                cachedValue = redisTemplate.opsForValue().get(key);
                if (cachedValue != null) {
                    return NULL_PLACEHOLDER.equals(cachedValue) ? null : clazz.cast(cachedValue);
                }

                long loadStartTime = System.currentTimeMillis();
                try {
                    value = valueLoader.get();
                    // 记录数据库查询成功
                    recordDbQuery(cacheName, true, System.currentTimeMillis() - loadStartTime);
                } catch (Exception e) {
                    // 记录数据库查询失败
                    recordDbQuery(cacheName, false, System.currentTimeMillis() - loadStartTime);
                    setCache(key, NULL_PLACEHOLDER, 30L, TimeUnit.SECONDS);
                    throw e;
                }

                if (value == null) {
                    setCache(key, NULL_PLACEHOLDER, NULL_CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
                } else {
                    long finalExpireTime = baseExpireTime + 
                                          ThreadLocalRandom.current().nextLong(0, RANDOM_EXPIRE_BOUND);
                    setCache(key, value, finalExpireTime, timeUnit);
                }
                return value;

            } finally {
                if (requestId != null) {
                    long lockHoldTime = System.currentTimeMillis() - lockWaitStartTime;
                    // 记录锁持有时间
                    recordLockHoldTime(cacheName, lockHoldTime);
                    distributedLock.releaseLock(lockKey, requestId);
                }
            }
        } finally {
            // 记录总操作耗时
            totalTimer.stop(Timer.builder(METRIC_CACHE_OPERATION)
                    .tags(TAG_CACHE_NAME, cacheName, TAG_OPERATION, "getOrCalculate")
                    .register(meterRegistry));
        }
    }

    // --- 监控指标记录方法 ---

    private void recordCacheHit(String cacheName, String result) {
        Counter.builder(METRIC_CACHE_HIT)
                .tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, result)
                .register(meterRegistry)
                .increment();
    }

    private void recordLockAcquireTime(String cacheName, long millis, boolean success) {
        Timer.builder(METRIC_LOCK_ACQUIRE_TIME)
                .tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE)
                .register(meterRegistry)
                .record(millis, TimeUnit.MILLISECONDS);
    }

    private void recordLockHoldTime(String cacheName, long millis) {
        Timer.builder(METRIC_LOCK_HOLD_TIME)
                .tags(TAG_CACHE_NAME, cacheName)
                .register(meterRegistry)
                .record(millis, TimeUnit.MILLISECONDS);
    }

    private void recordLockCompetition(String cacheName, boolean success) {
        Counter.builder("cache.lock.competition")
                .tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE)
                .register(meterRegistry)
                .increment();
    }

    private void recordDbQuery(String cacheName, boolean success, long durationMs) {
        Timer.builder(METRIC_DB_QUERY)
                .tags(TAG_CACHE_NAME, cacheName, TAG_RESULT, success ? TAG_SUCCESS : TAG_FAILURE)
                .register(meterRegistry)
                .record(durationMs, TimeUnit.MILLISECONDS);
    }

    private String extractCacheName(String key) {
        // 简单实现:从"product:123"中提取"product"
        int index = key.indexOf(":");
        return index > 0 ? key.substring(0, index) : "unknown";
    }

    // ... setCache 和 evict 方法也需要添加类似的监控 ...
}

③、配置Prometheus和Grafana

prometheus.yml 配置

scrape_configs:
  - job_name: 'springboot-cache-app'
    metrics_path: '/actuator/prometheus'
    scrape_interval: 15s # 抓取间隔
    static_configs:
      - targets: ['your-app-host:8080'] # 应用服务器地址
        labels:
          application: 'user-service-cache'

Grafana 仪表盘JSON

创建一个全面的缓存监控看板,包含以下面板:

Formula: (sum(rate(cache_hit_total{cache_name="$cache_name", result="hit"}[5m])) + sum(rate(cache_hit_total{cache_name="$cache_name", result="null"}[5m]))) / sum(rate(cache_hit_total{cache_name="$cache_name"}[5m])) * 100
  • QPS & 延迟面板: 显示缓存操作QPS和P99/P95延迟
  • 锁竞争面板: 显示锁等待时间、锁竞争成功率
  • 数据库查询面板: 显示缓存触发的数据库查询次数和延迟
  • Redis内存面板: 显示Redis内存使用情况(需配置Redis Exporter)

④、设置告警规则 (prometheus-rules.yml)

groups:
- name: cache-alerts
  rules:
  - alert: CacheHitRateLow
    expr: (sum(rate(cache_hit_total{result="hit"}[5m])) by (cache_name) + sum(rate(cache_hit_total{result="null"}[5m])) by (cache_name)) / sum(rate(cache_hit_total[5m])) by (cache_name) * 100 < 80
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "缓存命中率过低 (instance {{ $labels.instance }})"
      description: "缓存 {{ $labels.cache_name }} 命中率低于80%,当前为 {{ $value }}%"

  - alert: LockCompetitionHigh
    expr: rate(cache_lock_competition_total[5m]) > 10
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "缓存锁竞争激烈 (instance {{ $labels.instance }})"
      description: "缓存 {{ $labels.cache_name }} 锁竞争频率过高,可能存在热点Key或锁过期时间设置不合理"

  - alert: DbQueryRateHigh
    expr: rate(cache_db_query_seconds_count[5m]) > 50
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "缓存层数据库查询过高 (instance {{ $labels.instance }})"
      description: "缓存 {{ $labels.cache_name }} 触发的数据库查询频率过高,可能大量缓存未命中"

  - alert: LockAcquireSlow
    expr: histogram_quantile(0.95, rate(cache_lock_acquire_time_seconds_bucket[5m])) > 0.5
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "分布式锁获取缓慢 (instance {{ $labels.instance }})"
      description: "缓存锁获取P95延迟超过500ms,当前为 {{ $value }}s"

⑤、持续调优策略

根据监控数据,实施以下调优策略:

  1. 调优缓存命中率
  • 问题: 命中率 < 80%

行动:

  • 分析 cache.hit 指标,找出命中率低的缓存名
  • 检查该缓存的过期时间是否过短:increase(cache_operation_seconds_count{cache_name=“product”,
    operation=“getOrCalculate”}[1h]) /
    increase(cache_db_query_seconds_count{cache_name=“product”}[1h])
  • 优化缓存Key设计,确保热点数据被正确缓存
  • 考虑增加本地缓存(Caffeine)作为一级缓存
  1. 调优锁竞争
  • 问题: 锁竞争频率高或等待时间长

行动:

  • 分析 cache.lock.competition 和 cache.lock.acquire.time
  • 对于极热点数据,使用逻辑过期而非互斥锁:在缓存值中存储逻辑过期时间,异步更新
  • 调整锁过期时间 DEFAULT_LOCK_EXPIRE_TIME,避免持有时间过长或过短
  • 实现锁获取的退避策略(如指数退避)
  1. 调优数据库负载

问题: cache.db.query 频率过高

行动:

  • 优化 valueLoader 中的SQL查询,添加数据库索引
  • 对批量查询实现缓存预热
  • 考虑使用 Read-Through 模式,由缓存层统一管理数据加载
  1. 调优Redis内存

问题: 内存使用率 > 80%

行动:

  • 分析大Key:redis-cli --bigkeys
  • 优化数据结构:使用Hash代替多个String存储对象属性
  • 设置适当的内存淘汰策略:maxmemory-policy allkeys-lru
  • 考虑分片或集群部署

网站公告

今日签到

点亮在社区的每一天
去签到