1 数据迁移
数据迁移在现代软件开发和数据管理中扮演着至关重要的角色,它涉及将数据从一个存储系统或数据库转移到另一个系统。
这可能涉及到:
- 从一个数据库迁移到另一个数据库。
- 从一个表迁移到另一个表。
- 对数据进行清洗、转换后再导入。
本篇给大家介绍的是如何将远程数据库中的数据全部迁移到本地,因为有时网络连接不稳定,使用远程数据库时就会出现连接中断,所以功能在本地实现在连通远程库表就行测试是比较不错的选择,这个过程也是比较简单,先将远程数据查出来在插入本地库中。
2 数据库相关配置
2.1 数据库连接配置
首先配置源数据库和本地数据库信息用于数据传输
# 远程数据库配置
REMOTE_DB = {
'host': 'your_remote_host', # 远程数据库主机地址
'port': 3306, # 远程数据库端口
'user': 'your_username', # 远程数据库用户名
'password': 'your_password', # 远程数据库密码
'database': 'your_database_name' # 远程数据库名
}
# 本地数据库配置
LOCAL_DB = {
'host': 'localhost', # 本地数据库主机地址
'port': 3306, # 本地数据库端口
'user': 'root', # 本地数据库用户名
'password': 'your_local_password', # 本地数据库密码
'database': 'copied_database' # 本地数据库名(将要创建的)
}
2.2 迁移设置
在某些情况下,需要迁移的数据量较大,无法一次性查询并插入表,可以进行分批次处理,同时可以引入进度条查看处理进度。
# 迁移设置
BATCH_SIZE = 2000 # 批处理大小
AUTO_DROP_EXISTING = False # 是否自动删除已存在的本地数据库
SHOW_PROGRESS = True # 是否显示进度(需要安装tqdm: pip install tqdm)
USE_EXISTING_DATABASE = True # 是否使用已存在的数据库(True=连接现有数据库,False=创建新数据库)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('db_copy.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 尝试导入tqdm,如果没有安装则使用简单的进度显示
try:
from tqdm import tqdm
HAS_TQDM = True
except ImportError:
HAS_TQDM = False
logger.warning("未安装tqdm,将使用简单进度显示。安装命令: pip install tqdm")
class SimpleProgress:
"""简单的进度显示类(当没有tqdm时使用)"""
def __init__(self, total, desc=""):
self.total = total
self.current = 0
self.desc = desc
print(f"{desc}: 0/{total} (0.0%)")
def update(self, n):
self.current += n
percent = (self.current / self.total) * 100
print(f"\r{self.desc}: {self.current}/{self.total} ({percent:.1f}%)", end="")
if self.current >= self.total:
print() # 换行
def __enter__(self):
return self
def __exit__(self, *args):
pass
def create_progress_bar(total, desc=""):
"""创建进度条"""
if HAS_TQDM and SHOW_PROGRESS:
return tqdm(total=total, desc=desc)
elif SHOW_PROGRESS:
return SimpleProgress(total=total, desc=desc)
else:
return SimpleProgress(total=total, desc=desc) # 即使不显示也需要返回对象
3 数据迁移
迁移数据主要涉及以下操作:
首先连接远处数据库——>连接本地数据库——>检查本地是否已有库(没有库会创建)——>获取远程库表的创建语句——>将创建语句应用在本地创建对应表——>获取远程库表的数据(每次2000条记录)——>将其插入对应表
class DatabaseCopier:
"""数据库复制器"""
def __init__(self):
self.remote_conn = None
self.local_conn = None
def connect_remote(self):
"""连接远程数据库"""
try:
self.remote_conn = pymysql.connect(
host=REMOTE_DB['host'],
port=REMOTE_DB['port'],
user=REMOTE_DB['user'],
password=REMOTE_DB['password'],
database=REMOTE_DB['database'],
charset='utf8mb4',
autocommit=False
)
logger.info(f"成功连接到远程数据库: {REMOTE_DB['host']}:{REMOTE_DB['port']}")
return True
except Exception as e:
logger.error(f"连接远程数据库失败: {e}")
return False
def connect_local(self):
"""连接本地数据库(不指定数据库名)"""
try:
self.local_conn = pymysql.connect(
host=LOCAL_DB['host'],
port=LOCAL_DB['port'],
user=LOCAL_DB['user'],
password=LOCAL_DB['password'],
charset='utf8mb4',
autocommit=False
)
logger.info(f"成功连接到本地MySQL服务器: {LOCAL_DB['host']}:{LOCAL_DB['port']}")
return True
except Exception as e:
logger.error(f"连接本地MySQL服务器失败: {e}")
return False
def connect_to_existing_database(self):
"""连接到已存在的本地数据库"""
try:
self.local_conn = pymysql.connect(
host=LOCAL_DB['host'],
port=LOCAL_DB['port'],
user=LOCAL_DB['user'],
password=LOCAL_DB['password'],
database=LOCAL_DB['database'], # 直接连接到已存在的数据库
charset='utf8mb4',
autocommit=False
)
logger.info(f"成功连接到现有本地数据库: {LOCAL_DB['database']}")
return True
except Exception as e:
logger.error(f"连接本地数据库失败: {e}")
return False
def create_local_database(self):
"""创建本地数据库"""
try:
with self.local_conn.cursor() as cursor:
# 检查数据库是否存在
cursor.execute("SHOW DATABASES LIKE %s", (LOCAL_DB['database'],))
if cursor.fetchone():
logger.warning(f"本地数据库 '{LOCAL_DB['database']}' 已存在")
if AUTO_DROP_EXISTING:
cursor.execute(f"DROP DATABASE `{LOCAL_DB['database']}`")
logger.info(f"已自动删除现有数据库: {LOCAL_DB['database']}")
else:
response = input("是否删除现有数据库并重新创建?(y/N): ")
if response.lower() == 'y':
cursor.execute(f"DROP DATABASE `{LOCAL_DB['database']}`")
logger.info(f"已删除现有数据库: {LOCAL_DB['database']}")
else:
logger.info("取消操作")
return False
# 创建数据库
cursor.execute(f"CREATE DATABASE `{LOCAL_DB['database']}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
self.local_conn.commit()
logger.info(f"成功创建本地数据库: {LOCAL_DB['database']}")
# 切换到新创建的数据库
cursor.execute(f"USE `{LOCAL_DB['database']}`")
return True
except Exception as e:
logger.error(f"创建本地数据库失败: {e}")
return False
def get_tables(self) -> List[str]:
"""获取远程数据库中的所有表名"""
try:
with self.remote_conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
tables = [row[0] for row in cursor.fetchall()]
logger.info(f"发现 {len(tables)} 个表: {', '.join(tables)}")
return tables
except Exception as e:
logger.error(f"获取表列表失败: {e}")
return []
def get_table_create_sql(self, table_name: str) -> str:
"""获取表的创建SQL语句"""
try:
with self.remote_conn.cursor() as cursor:
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
result = cursor.fetchone()
return result[1] if result else None
except Exception as e:
logger.error(f"获取表 {table_name} 的创建SQL失败: {e}")
return None
def create_table(self, table_name: str, create_sql: str) -> bool:
"""在本地创建表"""
try:
with self.local_conn.cursor() as cursor:
# 如果使用现有数据库,检查表是否已存在
if USE_EXISTING_DATABASE:
cursor.execute("SHOW TABLES LIKE %s", (table_name,))
if cursor.fetchone():
logger.info(f"表 {table_name} 已存在,跳过创建")
return True
cursor.execute(create_sql)
self.local_conn.commit()
logger.info(f"成功创建表: {table_name}")
return True
except Exception as e:
logger.error(f"创建表 {table_name} 失败: {e}")
return False
def get_table_row_count(self, table_name: str) -> int:
"""获取表的行数"""
try:
with self.remote_conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM `{table_name}`")
return cursor.fetchone()[0]
except Exception as e:
logger.error(f"获取表 {table_name} 行数失败: {e}")
return 0
def copy_table_data(self, table_name: str) -> bool:
"""复制表数据"""
try:
# 获取总行数
total_rows = self.get_table_row_count(table_name)
if total_rows == 0:
logger.info(f"表 {table_name} 为空,跳过数据复制")
return True
logger.info(f"开始复制表 {table_name} 的数据,共 {total_rows} 行")
# 获取表结构信息
with self.remote_conn.cursor() as cursor:
cursor.execute(f"DESCRIBE `{table_name}`")
columns = [row[0] for row in cursor.fetchall()]
columns_str = ', '.join([f"`{col}`" for col in columns])
placeholders = ', '.join(['%s'] * len(columns))
# 分批复制数据
offset = 0
with create_progress_bar(total_rows, f"复制 {table_name}") as pbar:
while offset < total_rows:
# 从远程数据库读取数据
with self.remote_conn.cursor() as remote_cursor:
remote_cursor.execute(
f"SELECT {columns_str} FROM `{table_name}` LIMIT %s OFFSET %s",
(BATCH_SIZE, offset)
)
rows = remote_cursor.fetchall()
if not rows:
break
# 插入到本地数据库
with self.local_conn.cursor() as local_cursor:
insert_sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
local_cursor.executemany(insert_sql, rows)
self.local_conn.commit()
offset += len(rows)
pbar.update(len(rows))
logger.info(f"成功复制表 {table_name} 的所有数据")
return True
except Exception as e:
logger.error(f"复制表 {table_name} 数据失败: {e}")
return False
def copy_database(self) -> bool:
"""复制整个数据库"""
logger.info("开始数据库复制过程")
start_time = datetime.now()
# 连接数据库
if not self.connect_remote():
return False
# 根据配置选择连接方式
if USE_EXISTING_DATABASE:
if not self.connect_to_existing_database():
return False
else:
if not self.connect_local():
return False
# 创建本地数据库
if not self.create_local_database():
return False
# 获取所有表
tables = self.get_tables()
if not tables:
logger.warning("没有找到任何表")
return True
success_count = 0
# 复制表结构和数据
for table_name in tables:
logger.info(f"处理表: {table_name}")
# 检查表是否已存在
table_exists = False
if USE_EXISTING_DATABASE:
with self.local_conn.cursor() as cursor:
cursor.execute("SHOW TABLES LIKE %s", (table_name,))
table_exists = cursor.fetchone() is not None
if table_exists:
logger.info(f"表 {table_name} 已存在,跳过处理")
success_count += 1
continue
# 获取并创建表结构
create_sql = self.get_table_create_sql(table_name)
if not create_sql:
logger.error(f"无法获取表 {table_name} 的创建SQL")
continue
if not self.create_table(table_name, create_sql):
logger.error(f"创建表 {table_name} 失败")
continue
# 复制表数据
if self.copy_table_data(table_name):
success_count += 1
else:
logger.error(f"复制表 {table_name} 数据失败")
end_time = datetime.now()
duration = end_time - start_time
logger.info(f"数据库复制完成!")
logger.info(f"成功复制 {success_count}/{len(tables)} 个表")
logger.info(f"总耗时: {duration}")
return success_count == len(tables)
def close_connections(self):
"""关闭数据库连接"""
if self.remote_conn:
self.remote_conn.close()
logger.info("已关闭远程数据库连接")
if self.local_conn:
self.local_conn.close()
logger.info("已关闭本地数据库连接")