Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个目录下,然后解压缩这个目录中的所有zip文件到第三个目录下。不使用Pandas库,需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。
该脚本已在考虑大数据量、异常处理和性能优化的基础上进行了全面设计,能够处理大多数常见场景。根据具体需求可进一步调整批量大小(batch_size)和线程数(max_workers)以获得最佳性能。
import os
import csv
import zipfile
import logging
import mysql.connector
from datetime import datetime
import time
import concurrent.futures
import glob
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_export.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000):
"""导出单个表的数据到CSV文件,分批处理"""
conn = None
cursor = None
total_rows = 0
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
# 获取数据并写入CSV
with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# 执行查询并获取列名
cursor.execute(f"SELECT * FROM `{table_name}`")
columns = [col[0] for col in cursor.description]
writer.writerow(columns)
# 分批获取数据
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
writer.writerows(rows)
total_rows += len(rows)
logger.debug(f"{table_name} 已导出 {total_rows} 行")
logger.info(f"{table_name} CSV导出完成,总行数:{total_rows}")
return total_rows
except Exception as e:
logger.error(f"导出表 {table_name} 失败: {str(e)}", exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn and conn.is_connected():
conn.close()
def compress_to_zip(source_path, zip_path):
"""压缩文件为ZIP格式"""
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(source_path, arcname=os.path.basename(source_path))
logger.info(f"成功压缩 {source_path} 到 {zip_path}")
except Exception as e:
logger.error(f"压缩 {source_path} 失败: {str(e)}", exc_info=True)
raise
def process_table(table_name, db_config, csv_dir, zip_dir):
"""处理单个表的导出和压缩"""
start_time = time.time()
logger.info(f"开始处理表: {table_name}")
status = "成功"
rows_exported = 0
try:
# 定义文件路径
csv_filename = f"{table_name}.csv"
zip_filename = f"{table_name}.zip"
csv_path = os.path.join(csv_dir, csv_filename)
zip_path = os.path.join(zip_dir, zip_filename)
# 导出CSV
rows_exported = export_table_to_csv(table_name, csv_path, db_config)
# 压缩文件
compress_to_zip(csv_path, zip_path)
except Exception as e:
status = f"失败: {str(e)}"
# 清理可能存在的中间文件
for path in [csv_path, zip_path]:
if path and os.path.exists(path):
try:
os.remove(path)
logger.warning(f"已清理文件: {path}")
except Exception as clean_error:
logger.error(f"清理文件失败: {clean_error}")
finally:
duration = time.time() - start_time
log_message = (
f"表处理完成 - 表名: {table_name}, "
f"状态: {status}, "
f"导出行数: {rows_exported}, "
f"耗时: {duration:.2f}秒"
)
logger.info(log_message)
def unzip_files(zip_dir, unzip_dir):
"""解压指定目录中的所有ZIP文件"""
zip_files = glob.glob(os.path.join(zip_dir, '*.zip'))
if not zip_files:
logger.warning("未找到ZIP文件,跳过解压")
return
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for zip_path in zip_files:
futures.append(executor.submit(
lambda: extract_zip(zip_path, unzip_dir)
))
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"解压过程中发生错误: {str(e)}")
def extract_zip(zip_path, unzip_dir):
"""解压单个ZIP文件"""
try:
start_time = time.time()
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(unzip_dir)
duration = time.time() - start_time
logger.info(f"解压完成: {zip_path} => {unzip_dir} (耗时: {duration:.2f}秒)")
except Exception as e:
logger.error(f"解压 {zip_path} 失败: {str(e)}", exc_info=True)
raise
def main():
# 配置参数
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
# 目录配置
base_dir = os.path.dirname(os.path.abspath(__file__))
csv_dir = os.path.join(base_dir, 'csv_exports')
zip_dir = os.path.join(base_dir, 'zip_archives')
unzip_dir = os.path.join(base_dir, 'unzipped_files')
# 创建目录
for dir_path in [csv_dir, zip_dir, unzip_dir]:
os.makedirs(dir_path, exist_ok=True)
logger.info(f"目录已准备: {dir_path}")
# 获取所有表名
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
logger.info(f"发现 {len(tables)} 个需要处理的表")
except Exception as e:
logger.error(f"获取数据库表失败: {str(e)}", exc_info=True)
return
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
# 处理所有表(多线程导出和压缩)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for table in tables:
futures.append(executor.submit(
process_table,
table,
db_config,
csv_dir,
zip_dir
))
# 处理任务结果
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"表处理异常: {str(e)}")
# 解压所有ZIP文件(多线程解压)
logger.info("开始解压所有ZIP文件")
unzip_files(zip_dir, unzip_dir)
logger.info("全部处理流程完成")
if __name__ == "__main__":
main()
关键特性说明:
分批处理大数据:
- 使用
fetchmany(batch_size)
分批获取数据(默认每批1000行) - 流式处理减少内存占用
- 使用
多线程处理:
- 使用ThreadPoolExecutor并行处理不同表的导出和压缩
- 独立的数据库连接池(每个线程有自己的连接)
- 并行解压处理
异常处理:
- 全面的try-except块覆盖所有关键操作
- 自动清理失败时产生的中间文件
- 详细的错误日志记录(包含堆栈跟踪)
日志记录:
- 同时输出到文件和终端
- 记录时间戳、操作类型、状态、耗时等关键信息
- 包含每个表的处理结果统计
文件管理:
- 自动创建所需目录
- 使用ZIP_DEFLATED进行高效压缩
- 安全的文件路径处理
性能优化:
- 使用服务器端游标避免内存过载
- 可配置的批量大小和线程数
- 异步I/O操作
使用说明:
安装依赖:
pip install mysql-connector-python
修改配置:
- 更新
db_config
中的数据库连接信息 - 根据需要调整目录路径(csv_dir, zip_dir, unzip_dir)
- 更新
运行脚本:
python script.py
查看日志:
- 实时终端输出
- 详细日志文件
data_export.log
扩展建议:
- 通过命令行参数接受数据库配置和目录路径
- 添加邮件通知功能(处理完成或失败时通知)
- 实现断点续传功能
- 添加文件校验(MD5校验和)
- 支持配置文件(YAML/JSON格式)
- 添加进度条显示