使用python进行数据库的数据迁移

发布于:2025-06-29 ⋅ 阅读:(21) ⋅ 点赞:(0)

1 数据迁移

数据迁移在现代软件开发和数据管理中扮演着至关重要的角色,它涉及将数据从一个存储系统或数据库转移到另一个系统。
这可能涉及到:

  • 从一个数据库迁移到另一个数据库。
  • 从一个表迁移到另一个表。
  • 对数据进行清洗、转换后再导入。
    本篇给大家介绍的是如何将远程数据库中的数据全部迁移到本地,因为有时网络连接不稳定,使用远程数据库时就会出现连接中断,所以功能在本地实现在连通远程库表就行测试是比较不错的选择,这个过程也是比较简单,先将远程数据查出来在插入本地库中。

2 数据库相关配置

2.1 数据库连接配置

首先配置源数据库和本地数据库信息用于数据传输

# 远程数据库配置
REMOTE_DB = {
   'host': 'your_remote_host',        # 远程数据库主机地址
   'port': 3306,                      # 远程数据库端口
   'user': 'your_username',           # 远程数据库用户名
   'password': 'your_password',       # 远程数据库密码
   'database': 'your_database_name'   # 远程数据库名
}

# 本地数据库配置
LOCAL_DB = {
    'host': 'localhost',               # 本地数据库主机地址
    'port': 3306,                      # 本地数据库端口
    'user': 'root',                    # 本地数据库用户名
    'password': 'your_local_password', # 本地数据库密码
    'database': 'copied_database'      # 本地数据库名(将要创建的)
}

2.2 迁移设置

在某些情况下,需要迁移的数据量较大,无法一次性查询并插入表,可以进行分批次处理,同时可以引入进度条查看处理进度。

# 迁移设置
BATCH_SIZE = 2000                      # 批处理大小
AUTO_DROP_EXISTING = False             # 是否自动删除已存在的本地数据库
SHOW_PROGRESS = True                   # 是否显示进度(需要安装tqdm: pip install tqdm)
USE_EXISTING_DATABASE = True           # 是否使用已存在的数据库(True=连接现有数据库,False=创建新数据库)

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('db_copy.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 尝试导入tqdm,如果没有安装则使用简单的进度显示
try:
    from tqdm import tqdm
    HAS_TQDM = True
except ImportError:
    HAS_TQDM = False
    logger.warning("未安装tqdm,将使用简单进度显示。安装命令: pip install tqdm")


class SimpleProgress:
    """简单的进度显示类(当没有tqdm时使用)"""
    def __init__(self, total, desc=""):
        self.total = total
        self.current = 0
        self.desc = desc
        print(f"{desc}: 0/{total} (0.0%)")
    
    def update(self, n):
        self.current += n
        percent = (self.current / self.total) * 100
        print(f"\r{self.desc}: {self.current}/{self.total} ({percent:.1f}%)", end="")
        if self.current >= self.total:
            print()  # 换行
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        pass

def create_progress_bar(total, desc=""):
    """创建进度条"""
    if HAS_TQDM and SHOW_PROGRESS:
        return tqdm(total=total, desc=desc)
    elif SHOW_PROGRESS:
        return SimpleProgress(total=total, desc=desc)
    else:
        return SimpleProgress(total=total, desc=desc)  # 即使不显示也需要返回对象

3 数据迁移

迁移数据主要涉及以下操作:
首先连接远处数据库——>连接本地数据库——>检查本地是否已有库(没有库会创建)——>获取远程库表的创建语句——>将创建语句应用在本地创建对应表——>获取远程库表的数据(每次2000条记录)——>将其插入对应表

class DatabaseCopier:
    """数据库复制器"""
    
    def __init__(self):
        self.remote_conn = None
        self.local_conn = None
        
    def connect_remote(self):
        """连接远程数据库"""
        try:
            self.remote_conn = pymysql.connect(
                host=REMOTE_DB['host'],
                port=REMOTE_DB['port'],
                user=REMOTE_DB['user'],
                password=REMOTE_DB['password'],
                database=REMOTE_DB['database'],
                charset='utf8mb4',
                autocommit=False
            )
            logger.info(f"成功连接到远程数据库: {REMOTE_DB['host']}:{REMOTE_DB['port']}")
            return True
        except Exception as e:
            logger.error(f"连接远程数据库失败: {e}")
            return False
    
    def connect_local(self):
        """连接本地数据库(不指定数据库名)"""
        try:
            self.local_conn = pymysql.connect(
                host=LOCAL_DB['host'],
                port=LOCAL_DB['port'],
                user=LOCAL_DB['user'],
                password=LOCAL_DB['password'],
                charset='utf8mb4',
                autocommit=False
            )
            logger.info(f"成功连接到本地MySQL服务器: {LOCAL_DB['host']}:{LOCAL_DB['port']}")
            return True
        except Exception as e:
            logger.error(f"连接本地MySQL服务器失败: {e}")
            return False
    
    def connect_to_existing_database(self):
        """连接到已存在的本地数据库"""
        try:
            self.local_conn = pymysql.connect(
                host=LOCAL_DB['host'],
                port=LOCAL_DB['port'],
                user=LOCAL_DB['user'],
                password=LOCAL_DB['password'],
                database=LOCAL_DB['database'],  # 直接连接到已存在的数据库
                charset='utf8mb4',
                autocommit=False
            )
            logger.info(f"成功连接到现有本地数据库: {LOCAL_DB['database']}")
            return True
        except Exception as e:
            logger.error(f"连接本地数据库失败: {e}")
            return False
    
    def create_local_database(self):
        """创建本地数据库"""
        try:
            with self.local_conn.cursor() as cursor:
                # 检查数据库是否存在
                cursor.execute("SHOW DATABASES LIKE %s", (LOCAL_DB['database'],))
                if cursor.fetchone():
                    logger.warning(f"本地数据库 '{LOCAL_DB['database']}' 已存在")
                    if AUTO_DROP_EXISTING:
                        cursor.execute(f"DROP DATABASE `{LOCAL_DB['database']}`")
                        logger.info(f"已自动删除现有数据库: {LOCAL_DB['database']}")
                    else:
                        response = input("是否删除现有数据库并重新创建?(y/N): ")
                        if response.lower() == 'y':
                            cursor.execute(f"DROP DATABASE `{LOCAL_DB['database']}`")
                            logger.info(f"已删除现有数据库: {LOCAL_DB['database']}")
                        else:
                            logger.info("取消操作")
                            return False
                
                # 创建数据库
                cursor.execute(f"CREATE DATABASE `{LOCAL_DB['database']}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
                self.local_conn.commit()
                logger.info(f"成功创建本地数据库: {LOCAL_DB['database']}")
                
                # 切换到新创建的数据库
                cursor.execute(f"USE `{LOCAL_DB['database']}`")
                return True
        except Exception as e:
            logger.error(f"创建本地数据库失败: {e}")
            return False
    
    def get_tables(self) -> List[str]:
        """获取远程数据库中的所有表名"""
        try:
            with self.remote_conn.cursor() as cursor:
                cursor.execute("SHOW TABLES")
                tables = [row[0] for row in cursor.fetchall()]
                logger.info(f"发现 {len(tables)} 个表: {', '.join(tables)}")
                return tables
        except Exception as e:
            logger.error(f"获取表列表失败: {e}")
            return []
    
    def get_table_create_sql(self, table_name: str) -> str:
        """获取表的创建SQL语句"""
        try:
            with self.remote_conn.cursor() as cursor:
                cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
                result = cursor.fetchone()
                return result[1] if result else None
        except Exception as e:
            logger.error(f"获取表 {table_name} 的创建SQL失败: {e}")
            return None
    
    def create_table(self, table_name: str, create_sql: str) -> bool:
        """在本地创建表"""
        try:
            with self.local_conn.cursor() as cursor:
                # 如果使用现有数据库,检查表是否已存在
                if USE_EXISTING_DATABASE:
                    cursor.execute("SHOW TABLES LIKE %s", (table_name,))
                    if cursor.fetchone():
                        logger.info(f"表 {table_name} 已存在,跳过创建")
                        return True
                
                cursor.execute(create_sql)
                self.local_conn.commit()
                logger.info(f"成功创建表: {table_name}")
                return True
        except Exception as e:
            logger.error(f"创建表 {table_name} 失败: {e}")
            return False
    
    def get_table_row_count(self, table_name: str) -> int:
        """获取表的行数"""
        try:
            with self.remote_conn.cursor() as cursor:
                cursor.execute(f"SELECT COUNT(*) FROM `{table_name}`")
                return cursor.fetchone()[0]
        except Exception as e:
            logger.error(f"获取表 {table_name} 行数失败: {e}")
            return 0
    
    def copy_table_data(self, table_name: str) -> bool:
        """复制表数据"""
        try:
            # 获取总行数
            total_rows = self.get_table_row_count(table_name)
            if total_rows == 0:
                logger.info(f"表 {table_name} 为空,跳过数据复制")
                return True
            
            logger.info(f"开始复制表 {table_name} 的数据,共 {total_rows} 行")
            
            # 获取表结构信息
            with self.remote_conn.cursor() as cursor:
                cursor.execute(f"DESCRIBE `{table_name}`")
                columns = [row[0] for row in cursor.fetchall()]
                columns_str = ', '.join([f"`{col}`" for col in columns])
                placeholders = ', '.join(['%s'] * len(columns))
            
            # 分批复制数据
            offset = 0
            with create_progress_bar(total_rows, f"复制 {table_name}") as pbar:
                while offset < total_rows:
                    # 从远程数据库读取数据
                    with self.remote_conn.cursor() as remote_cursor:
                        remote_cursor.execute(
                            f"SELECT {columns_str} FROM `{table_name}` LIMIT %s OFFSET %s",
                            (BATCH_SIZE, offset)
                        )
                        rows = remote_cursor.fetchall()
                    
                    if not rows:
                        break
                    
                    # 插入到本地数据库
                    with self.local_conn.cursor() as local_cursor:
                        insert_sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
                        local_cursor.executemany(insert_sql, rows)
                        self.local_conn.commit()
                    
                    offset += len(rows)
                    pbar.update(len(rows))
            
            logger.info(f"成功复制表 {table_name} 的所有数据")
            return True
            
        except Exception as e:
            logger.error(f"复制表 {table_name} 数据失败: {e}")
            return False
    
    def copy_database(self) -> bool:
        """复制整个数据库"""
        logger.info("开始数据库复制过程")
        start_time = datetime.now()
        
        # 连接数据库
        if not self.connect_remote():
            return False
        
        # 根据配置选择连接方式
        if USE_EXISTING_DATABASE:
            if not self.connect_to_existing_database():
                return False
        else:
            if not self.connect_local():
                return False
            # 创建本地数据库
            if not self.create_local_database():
                return False
        
        # 获取所有表
        tables = self.get_tables()
        if not tables:
            logger.warning("没有找到任何表")
            return True
        
        success_count = 0
        
        # 复制表结构和数据
        for table_name in tables:
            logger.info(f"处理表: {table_name}")
            
            # 检查表是否已存在
            table_exists = False
            if USE_EXISTING_DATABASE:
                with self.local_conn.cursor() as cursor:
                    cursor.execute("SHOW TABLES LIKE %s", (table_name,))
                    table_exists = cursor.fetchone() is not None
            
            if table_exists:
                logger.info(f"表 {table_name} 已存在,跳过处理")
                success_count += 1
                continue
            
            # 获取并创建表结构
            create_sql = self.get_table_create_sql(table_name)
            if not create_sql:
                logger.error(f"无法获取表 {table_name} 的创建SQL")
                continue
            
            if not self.create_table(table_name, create_sql):
                logger.error(f"创建表 {table_name} 失败")
                continue
            
            # 复制表数据
            if self.copy_table_data(table_name):
                success_count += 1
            else:
                logger.error(f"复制表 {table_name} 数据失败")
        
        end_time = datetime.now()
        duration = end_time - start_time
        
        logger.info(f"数据库复制完成!")
        logger.info(f"成功复制 {success_count}/{len(tables)} 个表")
        logger.info(f"总耗时: {duration}")
        
        return success_count == len(tables)
    
    def close_connections(self):
        """关闭数据库连接"""
        if self.remote_conn:
            self.remote_conn.close()
            logger.info("已关闭远程数据库连接")
        
        if self.local_conn:
            self.local_conn.close()
            logger.info("已关闭本地数据库连接")

网站公告

今日签到

点亮在社区的每一天
去签到