步骤 1:添加依赖项
在Maven项目的pom.xml
中添加Jedis依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.0</version>
</dependency>
步骤 2:编写迁移代码
import redis.clients.jedis.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class RedisMigrator {
public static void main(String[] args) {
// 源Redis配置
String sourceHost = "localhost";
int sourcePort = 6379;
String sourcePassword = "source_pass";
int sourceDb = 0;
// 目标Redis配置
String targetHost = "remote.redis.host";
int targetPort = 6380;
String targetPassword = "target_pass";
int targetDb = 0;
// 配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(20);
// 初始化连接池
JedisPool sourcePool = new JedisPool(poolConfig, sourceHost, sourcePort, 5000, sourcePassword, sourceDb);
JedisPool targetPool = new JedisPool(poolConfig, targetHost, targetPort, 5000, targetPassword, targetDb);
try (Jedis source = sourcePool.getResource();
Jedis target = targetPool.getResource();
Pipeline pipeline = target.pipelined()) { // 使用Pipeline提升性能
String cursor = "0";
int batchSize = 100; // 每批处理100个键
do {
ScanParams params = new ScanParams().count(batchSize);
ScanResult<String> scanResult = source.scan(cursor, params);
cursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
for (String key : keys) {
// 跳过系统键(可选)
if (key.startsWith("__")) continue;
String type = source.type(key);
long ttl = source.ttl(key); // 获取过期时间
switch (type) {
case "string":
String value = source.get(key);
pipeline.set(key, value);
if (ttl > 0) pipeline.expire(key, (int) ttl);
break;
case "hash":
Map<String, String> hash = source.hgetAll(key);
pipeline.hset(key, hash);
if (ttl > 0) pipeline.expire(key, (int) ttl);
break;
case "list":
List<String> list = source.lrange(key, 0, -1);
pipeline.del(key); // 清除目标可能存在的旧数据
pipeline.rpush(key, list.toArray(new String[0]));
if (ttl > 0) pipeline.expire(key, (int) ttl);
break;
case "set":
Set<String> set = source.smembers(key);
pipeline.sadd(key, set.toArray(new String[0]));
if (ttl > 0) pipeline.expire(key, (int) ttl);
break;
case "zset":
Set<Tuple> zset = source.zrangeWithScores(key, 0, -1);
Map<String, Double> scoreMap = zset.stream()
.collect(Collectors.toMap(Tuple::getElement, Tuple::getScore));
pipeline.zadd(key, scoreMap);
if (ttl > 0) pipeline.expire(key, (int) ttl);
break;
default:
System.err.println("Unsupported type: " + type + " for key: " + key);
}
}
pipeline.sync(); // 批量提交命令
System.out.println("Processed batch of " + keys.size() + " keys");
} while (!"0".equals(cursor));
System.out.println("Migration completed!");
} catch (Exception e) {
e.printStackTrace();
} finally {
sourcePool.close();
targetPool.close();
}
}
}
步骤 3:配置说明
• 连接参数:根据实际情况修改源和目标Redis的主机、端口、密码和数据库编号
• 性能调优:
• 调整batchSize
控制每批次处理的键数量
• 使用Pipeline
批量提交命令,减少网络开销
• 连接池参数maxTotal
根据服务器资源调整
• 数据处理:
• 自动识别并处理所有Redis数据类型(String/Hash/List/Set/ZSet)
• 保留原始TTL过期时间
• 使用SCAN命令安全遍历键,避免阻塞服务
• 跳过以双下划线开头的系统键(可选)
步骤 4:运行注意事项
- 确保网络互通,目标Redis允许写入
- 处理大键时可能出现内存问题,建议监控堆内存使用
- 生产环境建议添加重试机制和日志记录
- 迁移期间避免源数据大规模变更
- 可通过多线程进一步提高速度(需处理线程安全)
高级优化建议
// 在多线程环境中使用(示例)
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<?>> futures = new ArrayList<>();
// 在SCAN循环中将每个key的处理提交给线程池
for (String key : keys) {
futures.add(executor.submit(() -> processKey(key, source, target)));
}
// 等待所有任务完成
for (Future<?> future : futures) {
future.get();
}
该方案提供了安全高效的Redis数据迁移实现,兼顾了数据完整性和性能要求。根据实际场景调整参数,可在生产环境中处理TB级数据迁移。
👨💻 作者:Teddy(公众号:码尚云软件)
如果本文对您有帮助,欢迎 点赞👍 | 收藏⭐ | 关注👦,获取更多运维干货!
如有疑问,欢迎在评论区留言交流~