使用Java将数据从一个Redis迁移到另一个Redis的逐步解决方案

发布于:2025-06-22 ⋅ 阅读:(19) ⋅ 点赞:(0)

步骤 1:添加依赖项

在Maven项目的pom.xml中添加Jedis依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.0</version>
</dependency>

步骤 2:编写迁移代码

import redis.clients.jedis.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class RedisMigrator {

    public static void main(String[] args) {
        // 源Redis配置
        String sourceHost = "localhost";
        int sourcePort = 6379;
        String sourcePassword = "source_pass";
        int sourceDb = 0;

        // 目标Redis配置
        String targetHost = "remote.redis.host";
        int targetPort = 6380;
        String targetPassword = "target_pass";
        int targetDb = 0;

        // 配置连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(20);

        // 初始化连接池
        JedisPool sourcePool = new JedisPool(poolConfig, sourceHost, sourcePort, 5000, sourcePassword, sourceDb);
        JedisPool targetPool = new JedisPool(poolConfig, targetHost, targetPort, 5000, targetPassword, targetDb);

        try (Jedis source = sourcePool.getResource();
             Jedis target = targetPool.getResource();
             Pipeline pipeline = target.pipelined()) { // 使用Pipeline提升性能

            String cursor = "0";
            int batchSize = 100; // 每批处理100个键

            do {
                ScanParams params = new ScanParams().count(batchSize);
                ScanResult<String> scanResult = source.scan(cursor, params);
                cursor = scanResult.getCursor();
                List<String> keys = scanResult.getResult();

                for (String key : keys) {
                    // 跳过系统键(可选)
                    if (key.startsWith("__")) continue;

                    String type = source.type(key);
                    long ttl = source.ttl(key); // 获取过期时间

                    switch (type) {
                        case "string":
                            String value = source.get(key);
                            pipeline.set(key, value);
                            if (ttl > 0) pipeline.expire(key, (int) ttl);
                            break;

                        case "hash":
                            Map<String, String> hash = source.hgetAll(key);
                            pipeline.hset(key, hash);
                            if (ttl > 0) pipeline.expire(key, (int) ttl);
                            break;

                        case "list":
                            List<String> list = source.lrange(key, 0, -1);
                            pipeline.del(key); // 清除目标可能存在的旧数据
                            pipeline.rpush(key, list.toArray(new String[0]));
                            if (ttl > 0) pipeline.expire(key, (int) ttl);
                            break;

                        case "set":
                            Set<String> set = source.smembers(key);
                            pipeline.sadd(key, set.toArray(new String[0]));
                            if (ttl > 0) pipeline.expire(key, (int) ttl);
                            break;

                        case "zset":
                            Set<Tuple> zset = source.zrangeWithScores(key, 0, -1);
                            Map<String, Double> scoreMap = zset.stream()
                                    .collect(Collectors.toMap(Tuple::getElement, Tuple::getScore));
                            pipeline.zadd(key, scoreMap);
                            if (ttl > 0) pipeline.expire(key, (int) ttl);
                            break;

                        default:
                            System.err.println("Unsupported type: " + type + " for key: " + key);
                    }
                }

                pipeline.sync(); // 批量提交命令
                System.out.println("Processed batch of " + keys.size() + " keys");

            } while (!"0".equals(cursor));

            System.out.println("Migration completed!");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            sourcePool.close();
            targetPool.close();
        }
    }
}

步骤 3:配置说明

连接参数:根据实际情况修改源和目标Redis的主机、端口、密码和数据库编号
性能调优
• 调整batchSize控制每批次处理的键数量
• 使用Pipeline批量提交命令,减少网络开销
• 连接池参数maxTotal根据服务器资源调整
数据处理
• 自动识别并处理所有Redis数据类型(String/Hash/List/Set/ZSet)
• 保留原始TTL过期时间
• 使用SCAN命令安全遍历键,避免阻塞服务
• 跳过以双下划线开头的系统键(可选)

步骤 4:运行注意事项

  1. 确保网络互通,目标Redis允许写入
  2. 处理大键时可能出现内存问题,建议监控堆内存使用
  3. 生产环境建议添加重试机制和日志记录
  4. 迁移期间避免源数据大规模变更
  5. 可通过多线程进一步提高速度(需处理线程安全)

高级优化建议

// 在多线程环境中使用(示例)
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<?>> futures = new ArrayList<>();

// 在SCAN循环中将每个key的处理提交给线程池
for (String key : keys) {
    futures.add(executor.submit(() -> processKey(key, source, target)));
}

// 等待所有任务完成
for (Future<?> future : futures) {
    future.get();
}

该方案提供了安全高效的Redis数据迁移实现,兼顾了数据完整性和性能要求。根据实际场景调整参数,可在生产环境中处理TB级数据迁移。

👨💻 作者:Teddy(公众号:码尚云软件)
如果本文对您有帮助,欢迎 点赞👍 | 收藏⭐ | 关注👦,获取更多运维干货!
如有疑问,欢迎在评论区留言交流~


网站公告

今日签到

点亮在社区的每一天
去签到