Spring Cache是一个强大的缓存抽象层,提供了统一的缓存操作接口,但原生支持主要集中在单键操作。在高并发场景下,批量操作能力对于提升系统性能至关重要。本文将深入探讨如何通过继承Cache接口,以RedisCache为基础,实现兼容Spring Cache规范的BatchCache扩展。
一、Spring Cache架构与RedisCache源码剖析
1.1 Spring Cache核心组件
Spring Cache的核心是org.springframework.cache.Cache
接口,定义了缓存的基本操作:
public interface Cache {
String getName();
Object getNativeCache();
ValueWrapper get(Object key);
<T> T get(Object key, Class<T> type);
<T> T get(Object key, Callable<T> valueLoader);
void put(Object key, Object value);
ValueWrapper putIfAbsent(Object key, Object value);
void evict(Object key);
void clear();
}
主要实现类包括:
- ConcurrentMapCache:基于ConcurrentHashMap的简单实现
- RedisCache:Redis集成实现
- CaffeineCache:Caffeine缓存实现
1.2 RedisCache源码深入分析
RedisCache是Spring Data Redis提供的缓存实现,核心类结构如下:
public class RedisCache extends AbstractValueAdaptingCache {
private final String name;
private final RedisCacheWriter cacheWriter;
private final RedisCacheConfiguration cacheConfig;
// 构造函数和核心方法实现
}
核心组件解析:
- RedisCacheWriter:负责与Redis通信的底层接口
- RedisCacheConfiguration:缓存配置,如序列化器、TTL等
- AbstractValueAdaptingCache:提供缓存值处理的基础实现
1.3 RedisCacheWriter源码分析
RedisCacheWriter是Redis操作的核心接口:
public interface RedisCacheWriter {
void put(String name, byte[] key, byte[] value, @Nullable Duration ttl);
byte[] get(String name, byte[] key);
byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl);
void remove(String name, byte[] key);
void clean(String name, byte[] pattern);
}
主要实现类:
- DefaultRedisCacheWriter:基于RedisTemplate的默认实现
- LettuceRedisCacheWriter:基于Lettuce客户端的优化实现
- JedisRedisCacheWriter:基于Jedis客户端的实现
二、BatchCache接口设计与实现思路
2.1 BatchCache接口定义
为了实现批量操作能力,我们需要定义一个扩展接口:
/**
* 扩展Spring Cache接口,提供批量操作能力
*
* @author doubao
*/
public interface BatchCache extends Cache {
/**
* 批量获取缓存值
*
* @param keys 缓存键集合
* @return 键值对映射,不存在的键对应的值为null
*/
Map<Object, Object> getAll(Collection<?> keys);
/**
* 批量存入缓存值
*
* @param values 键值对映射
*/
void putAll(Map<?, ?> values);
/**
* 批量存入缓存值,仅当键不存在时
*
* @param values 键值对映射
* @return 实际存入的键值对映射,已存在的键对应的值为null
*/
Map<Object, Object> putAllIfAbsent(Map<?, ?> values);
/**
* 批量删除缓存
*
* @param keys 缓存键集合
*/
void evictAll(Collection<?> keys);
}
2.2 实现思路概述
实现BatchCache接口的核心思路:
- 继承RedisCache类,复用现有功能
- 扩展RedisCacheWriter接口,添加批量操作方法
- 实现BatchRedisCache类,实现BatchCache接口
- 提供配置类,注册自定义CacheManager
- 确保与Spring Cache生态系统兼容
三、核心代码实现
3.1 扩展RedisCacheWriter接口
/**
* 扩展RedisCacheWriter接口,添加批量操作方法
*
* @author doubao
*/
public interface BatchRedisCacheWriter extends RedisCacheWriter {
/**
* 批量获取缓存值
*
* @param name 缓存名称
* @param keys 缓存键集合(字节数组形式)
* @return 键值对映射,不存在的键对应的值为null
*/
Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys);
/**
* 批量存入缓存值
*
* @param name 缓存名称
* @param values 键值对映射(字节数组形式)
* @param ttl 过期时间,null表示不过期
*/
void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl);
/**
* 批量删除缓存
*
* @param name 缓存名称
* @param keys 缓存键集合(字节数组形式)
*/
void removeAll(String name, Collection<byte[]> keys);
}
3.2 实现BatchRedisCacheWriter
/**
* RedisCacheWriter的批量操作实现
*
* @author doubao
*/
public class DefaultBatchRedisCacheWriter implements BatchRedisCacheWriter {
private final RedisTemplate<byte[], byte[]> redisTemplate;
private final Duration sleepTime;
/**
* 构造函数
*
* @param redisOperations Redis操作模板
*/
public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations) {
this(redisOperations, Duration.ZERO);
}
/**
* 构造函数
*
* @param redisOperations Redis操作模板
* @param sleepTime 重试间隔时间
*/
public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations, Duration sleepTime) {
Assert.notNull(redisOperations, "RedisOperations must not be null!");
Assert.notNull(sleepTime, "SleepTime must not be null!");
this.redisTemplate = (RedisTemplate<byte[], byte[]>) redisOperations;
this.sleepTime = sleepTime;
}
@Override
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
connection.setEx(key, ttl.getSeconds(), value);
} else {
connection.set(key, value);
}
return "OK";
});
}
@Override
public byte[] get(String name, byte[] key) {
return execute(name, connection -> connection.get(key));
}
// 其他方法实现...
@Override
public Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys) {
return execute(name, connection -> {
List<byte[]> values = connection.mGet(keys.toArray(new byte[0][]));
Map<byte[], byte[]> result = new LinkedHashMap<>(keys.size());
int index = 0;
for (byte[] key : keys) {
result.put(key, index < values.size() ? values.get(index) : null);
index++;
}
return result;
});
}
@Override
public void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl) {
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
Pipeline pipeline = connection.openPipeline();
for (Map.Entry<byte[], byte[]> entry : values.entrySet()) {
pipeline.setEx(entry.getKey(), ttl.getSeconds(), entry.getValue());
}
pipeline.close();
} else {
Map<byte[], byte[]> nonNullValues = values.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!nonNullValues.isEmpty()) {
connection.mSet(nonNullValues);
}
}
return "OK";
});
}
@Override
public void removeAll(String name, Collection<byte[]> keys) {
execute(name, connection -> {
if (!keys.isEmpty()) {
connection.del(keys.toArray(new byte[0][]));
}
return "OK";
});
}
// 辅助方法...
private <T> T execute(String name, RedisCallback<T> callback) {
try {
return redisTemplate.execute(callback);
} catch (Exception ex) {
throw new CacheOperationFailedException(name, "Redis batch operation failed", ex);
}
}
private boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}
}
3.3 实现BatchRedisCache类
/**
* 支持批量操作的Redis缓存实现
*
* @author doubao
*/
public class BatchRedisCache extends RedisCache implements BatchCache {
private final BatchRedisCacheWriter cacheWriter;
private final RedisSerializationContext<Object, Object> serializationContext;
/**
* 构造函数
*
* @param name 缓存名称
* @param cacheWriter 缓存写入器
* @param cacheConfig 缓存配置
*/
protected BatchRedisCache(String name, BatchRedisCacheWriter cacheWriter,
RedisCacheConfiguration cacheConfig) {
super(name, cacheWriter, cacheConfig);
this.cacheWriter = cacheWriter;
this.serializationContext = cacheConfig.getSerializationContext();
}
@Override
public Map<Object, Object> getAll(Collection<?> keys) {
// 转换键为字节数组
Map<Object, byte[]> keyMappings = new LinkedHashMap<>(keys.size());
for (Object key : keys) {
keyMappings.put(key, serializeCacheKey(createCacheKey(key)));
}
// 批量获取缓存值
Map<byte[], byte[]> results = cacheWriter.getAll(
getName(), keyMappings.values());
// 反序列化结果
Map<Object, Object> valueMappings = new LinkedHashMap<>(results.size());
for (Map.Entry<Object, byte[]> entry : keyMappings.entrySet()) {
byte[] valueBytes = results.get(entry.getValue());
valueMappings.put(entry.getKey(), deserializeCacheValue(valueBytes));
}
return valueMappings;
}
@Override
public void putAll(Map<?, ?> values) {
// 序列化键值对
Map<byte[], byte[]> serializedValues = new LinkedHashMap<>(values.size());
for (Map.Entry<?, ?> entry : values.entrySet()) {
if (entry.getValue() != null) {
String cacheKey = createCacheKey(entry.getKey());
byte[] keyBytes = serializeCacheKey(cacheKey);
byte[] valueBytes = serializeCacheValue(entry.getValue());
serializedValues.put(keyBytes, valueBytes);
}
}
// 批量存入缓存
cacheWriter.putAll(getName(), serializedValues, getTtl());
}
@Override
public Map<Object, Object> putAllIfAbsent(Map<?, ?> values) {
// 实现略,需要使用Redis事务或Lua脚本确保原子性
throw new UnsupportedOperationException("Batch putIfAbsent operation is not supported yet.");
}
@Override
public void evictAll(Collection<?> keys) {
// 转换键为字节数组
Collection<byte[]> keyBytes = keys.stream()
.map(key -> serializeCacheKey(createCacheKey(key)))
.collect(Collectors.toList());
// 批量删除缓存
cacheWriter.removeAll(getName(), keyBytes);
}
// 辅助方法...
private byte[] serializeCacheKey(String cacheKey) {
return serializationContext.getKeySerializationPair().write(cacheKey);
}
private byte[] serializeCacheValue(Object value) {
return serializationContext.getValueSerializationPair().write(value);
}
private Object deserializeCacheValue(byte[] valueBytes) {
if (valueBytes == null) {
return null;
}
return serializationContext.getValueSerializationPair().read(valueBytes);
}
private String createCacheKey(Object key) {
String convertedKey = convertKey(key);
if (!getCacheConfiguration().usePrefix()) {
return convertedKey;
}
return getCacheConfiguration().getKeyPrefixFor(name) + convertedKey;
}
}
3.4 实现BatchRedisCacheManager
/**
* 支持批量操作的Redis缓存管理器
*
* @author doubao
*/
public class BatchRedisCacheManager extends RedisCacheManager {
private final BatchRedisCacheWriter cacheWriter;
private final RedisCacheConfiguration defaultCacheConfig;
/**
* 构造函数
*
* @param cacheWriter 缓存写入器
* @param defaultCacheConfiguration 默认缓存配置
*/
public BatchRedisCacheManager(BatchRedisCacheWriter cacheWriter,
RedisCacheConfiguration defaultCacheConfiguration) {
super(cacheWriter, defaultCacheConfiguration);
this.cacheWriter = cacheWriter;
this.defaultCacheConfig = defaultCacheConfiguration;
}
@Override
protected RedisCache createRedisCache(String name, @Nullable RedisCacheConfiguration cacheConfig) {
return new BatchRedisCache(name, cacheWriter,
cacheConfig != null ? cacheConfig : defaultCacheConfig);
}
/**
* 从RedisCacheManager转换为BatchRedisCacheManager
*
* @param cacheManager Redis缓存管理器
* @return 支持批量操作的Redis缓存管理器
*/
public static BatchRedisCacheManager fromRedisCacheManager(RedisCacheManager cacheManager) {
// 获取RedisCacheManager的私有字段
Field cacheWriterField;
Field defaultCacheConfigField;
try {
cacheWriterField = RedisCacheManager.class.getDeclaredField("cacheWriter");
defaultCacheConfigField = RedisCacheManager.class.getDeclaredField("defaultCacheConfig");
cacheWriterField.setAccessible(true);
defaultCacheConfigField.setAccessible(true);
RedisCacheWriter cacheWriter = (RedisCacheWriter) cacheWriterField.get(cacheManager);
RedisCacheConfiguration defaultCacheConfig =
(RedisCacheConfiguration) defaultCacheConfigField.get(cacheManager);
// 创建BatchRedisCacheWriter
BatchRedisCacheWriter batchCacheWriter;
if (cacheWriter instanceof DefaultRedisCacheWriter) {
DefaultRedisCacheWriter defaultWriter = (DefaultRedisCacheWriter) cacheWriter;
// 使用反射获取RedisOperations
Field redisOperationsField = DefaultRedisCacheWriter.class.getDeclaredField("redisOperations");
redisOperationsField.setAccessible(true);
RedisOperations<byte[], byte[]> redisOperations =
(RedisOperations<byte[], byte[]>) redisOperationsField.get(defaultWriter);
batchCacheWriter = new DefaultBatchRedisCacheWriter(redisOperations);
} else {
// 回退方案,使用RedisTemplate创建
RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(cacheManager.getCacheWriter().getConnectionFactory());
redisTemplate.afterPropertiesSet();
batchCacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate);
}
// 创建BatchRedisCacheManager
return new BatchRedisCacheManager(batchCacheWriter, defaultCacheConfig);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Failed to convert RedisCacheManager to BatchRedisCacheManager", e);
}
}
}
3.5 配置类实现
/**
* 批量缓存配置类
*
* @author doubao
*/
@Configuration
public class BatchCacheConfiguration {
@Bean
public BatchRedisCacheManager batchRedisCacheManager(RedisConnectionFactory redisConnectionFactory) {
// 创建默认配置
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
// 创建BatchRedisCacheWriter
RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
BatchRedisCacheWriter cacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate);
// 创建BatchRedisCacheManager
BatchRedisCacheManager cacheManager = new BatchRedisCacheManager(cacheWriter, config);
cacheManager.setTransactionAware(true);
// 设置缓存名称和配置的映射
Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();
// 可以为不同的缓存名称配置不同的策略
cacheConfigurations.put("batchCache", config.entryTtl(Duration.ofHours(1)));
cacheManager.setCacheConfigurations(cacheConfigurations);
return cacheManager;
}
/**
* 自定义CacheResolver,支持BatchCache
*
* @param cacheManager 缓存管理器
* @return 缓存解析器
*/
@Bean
public CacheResolver batchCacheResolver(BatchRedisCacheManager cacheManager) {
return new SimpleCacheResolver(cacheManager) {
@Override
protected Collection<? extends Cache> getCaches(CacheOperationInvocationContext<?> context) {
Collection<? extends Cache> caches = super.getCaches(context);
return caches.stream()
.map(cache -> {
if (cache instanceof RedisCache && !(cache instanceof BatchRedisCache)) {
// 将普通RedisCache转换为BatchRedisCache
RedisCache redisCache = (RedisCache) cache;
return new BatchRedisCache(
redisCache.getName(),
(BatchRedisCacheWriter) cacheManager.getCacheWriter(),
redisCache.getCacheConfiguration());
}
return cache;
})
.collect(Collectors.toList());
}
};
}
}
四、使用示例
4.1 定义业务服务
/**
* 示例服务类,演示BatchCache的使用
*
* @author doubao
*/
@Service
public class UserService {
private final UserRepository userRepository;
@Autowired
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
/**
* 批量获取用户信息,使用缓存
*
* @param userIds 用户ID集合
* @return 用户信息映射
*/
@Cacheable(value = "users", key = "#root.methodName + '_' + #userIds", unless = "#result == null")
public Map<Long, User> getUsersBatch(Collection<Long> userIds) {
// 模拟从数据库获取数据
return userIds.stream()
.collect(Collectors.toMap(
Function.identity(),
userId -> userRepository.findById(userId).orElse(null)
));
}
/**
* 批量保存用户信息,并更新缓存
*
* @param users 用户信息集合
*/
@CachePut(value = "users", key = "#root.methodName + '_' + #users.![id]", condition = "#users != null && !#users.isEmpty()")
public Map<Long, User> saveUsersBatch(Collection<User> users) {
// 模拟批量保存到数据库
users.forEach(userRepository::save);
// 返回保存后的用户信息
return users.stream()
.collect(Collectors.toMap(User::getId, Function.identity()));
}
/**
* 批量删除用户信息,并清除缓存
*
* @param userIds 用户ID集合
*/
@CacheEvict(value = "users", allEntries = false, key = "#root.methodName + '_' + #userIds")
public void deleteUsersBatch(Collection<Long> userIds) {
// 模拟批量删除
userIds.forEach(userRepository::deleteById);
}
/**
* 直接使用BatchCache接口的批量操作
*
* @param userIds 用户ID集合
* @return 用户信息映射
*/
public Map<Long, User> getUsersBatchWithBatchCache(Collection<Long> userIds) {
// 通过ApplicationContext获取BatchCache
BatchCache batchCache = (BatchCache) applicationContext.getBean("cacheManager").getCache("users");
// 直接使用BatchCache的批量获取方法
Map<Object, Object> cacheResults = batchCache.getAll(userIds);
// 处理缓存结果
Map<Long, User> result = new HashMap<>();
for (Map.Entry<Object, Object> entry : cacheResults.entrySet()) {
Long userId = (Long) entry.getKey();
User user = (User) entry.getValue();
if (user == null) {
// 缓存未命中,从数据库获取
user = userRepository.findById(userId).orElse(null);
if (user != null) {
// 手动放入缓存
batchCache.put(userId, user);
}
}
result.put(userId, user);
}
return result;
}
}
4.2 配置文件示例
spring:
redis:
host: localhost
port: 6379
password:
timeout: 10000ms
lettuce:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
cache:
type: redis
redis:
time-to-live: 600000 # 10分钟
cache-null-values: true
use-key-prefix: true
key-prefix: batch_cache:
五、性能测试与优化
5.1 性能测试框架
/**
* BatchCache性能测试
*
* @author doubao
*/
@SpringBootTest
public class BatchCachePerformanceTest {
@Autowired
private UserService userService;
@Autowired
private CacheManager cacheManager;
private static final int TEST_SIZE = 1000;
private static final int WARMUP_TIMES = 10;
private static final int TEST_TIMES = 100;
@BeforeEach
public void setUp() {
// 准备测试数据
List<User> users = new ArrayList<>(TEST_SIZE);
for (int i = 0; i < TEST_SIZE; i++) {
User user = new User();
user.setId((long) i);
user.setName("User" + i);
user.setAge(20 + i % 30);
users.add(user);
}
// 预热
for (int i = 0; i < WARMUP_TIMES; i++) {
userService.saveUsersBatch(users);
userService.getUsersBatch(users.stream().map(User::getId).collect(Collectors.toList()));
}
// 清除缓存
Cache usersCache = cacheManager.getCache("users");
if (usersCache != null) {
usersCache.clear();
}
}
@Test
public void testBatchGetPerformance() {
List<Long> userIds = IntStream.range(0, TEST_SIZE)
.mapToObj(Long::valueOf)
.collect(Collectors.toList());
// 测试单键获取性能
long singleStartTime = System.currentTimeMillis();
for (int i = 0; i < TEST_TIMES; i++) {
for (Long userId : userIds) {
userService.getUser(userId);
}
}
long singleEndTime = System.currentTimeMillis();
long singleTotalTime = singleEndTime - singleStartTime;
// 测试批量获取性能
long batchStartTime = System.currentTimeMillis();
for (int i = 0; i < TEST_TIMES; i++) {
userService.getUsersBatch(userIds);
}
long batchEndTime = System.currentTimeMillis();
long batchTotalTime = batchEndTime - batchStartTime;
// 输出性能结果
System.out.printf("单键获取 %d 次,总耗时: %d ms,平均每次: %f ms%n",
TEST_SIZE * TEST_TIMES, singleTotalTime, (double) singleTotalTime / (TEST_SIZE * TEST_TIMES));
System.out.printf("批量获取 %d 次,总耗时: %d ms,平均每次: %f ms%n",
TEST_TIMES, batchTotalTime, (double) batchTotalTime / TEST_TIMES);
System.out.printf("批量操作性能提升: %.2f%%%n",
(1 - (double) batchTotalTime / singleTotalTime) * 100);
}
// 其他性能测试方法...
}
5.2 性能优化策略
- 批量操作优化:
- 使用Redis的MGET、MSET等批量命令
- 合理设置批量操作的大小,避免单次操作过大
- 考虑使用Redis Pipeline提升性能
- 序列化优化:
- 使用高效的序列化方式,如Kryo、Protostuff等
- 避免序列化大对象,可考虑拆分数据
- 缓存配置优化:
- 根据业务场景设置合理的TTL
- 使用分区缓存,避免不同业务数据相互影响
- 考虑使用二级缓存(如Caffeine + Redis)提升性能
六、注意事项与最佳实践
6.1 使用注意事项
- 事务支持:
- Spring Cache的@Cacheable、@CachePut等注解不支持事务回滚
- 如果需要事务支持,建议在业务代码中直接使用BatchCache接口
- 异常处理:
- 批量操作可能部分成功部分失败,需要业务层处理这种情况
- 考虑实现重试机制,确保操作的最终一致性
- 缓存穿透与雪崩:
- 批量操作同样需要防范缓存穿透问题
- 合理设置不同数据的TTL,避免缓存雪崩
6.2 最佳实践
- 批量操作大小控制:
- 对于大量数据的批量操作,建议分批处理
- 每批大小可根据网络情况和Redis性能调整,一般建议在100-1000之间
- 缓存预热:
- 对于热点数据,启动时进行缓存预热
- 使用BatchCache的批量操作能力快速填充缓存
- 监控与告警:
- 监控批量操作的性能指标,如QPS、响应时间等
- 设置合理的告警阈值,及时发现性能问题
通过以上方案,我们成功地扩展了Spring Boot Cache的功能,实现了兼容RedisCache的BatchCache接口。这种实现方式不仅保持了与Spring Cache生态的兼容性,还显著提升了批量数据操作的性能,为高并发场景下的应用提供了有力支持。