11.4 导入和导出NULL值 404
-- 创建测试数据库
CREATE DATABASE IF NOT EXISTS test_null_db;
USE test_null_db;
-- 创建测试表
CREATE TABLE test_data (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50),
description VARCHAR(100),
is_active TINYINT(1),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入测试数据(包含NULL和空字符串)
INSERT INTO test_data (name, description, is_active) VALUES
('Alice', 'Regular user', 1),
('Bob', NULL, 0), -- 显式NULL
('Charlie', '', 1), -- 空字符串
('David', 'VIP member', NULL); -- NULL值
使用标准 mysqldump
bash
复制
下载
# 标准 mysqldump 命令(不指定 NULL 表示方式)
mysqldump -u root -p test_null_db test_data > test_data_dump.sql
mysql> SHOW VARIABLES LIKE 'secure_file_priv';
+------------------+---------------------------------+
| Variable_name | Value |
+------------------+---------------------------------+
| secure_file_priv | D:\software\MySql\Data\Uploads\ |
+------------------+---------------------------------+
1 row in set, 1 warning (0.00 sec)
mysql> SELECT * FROM test_data
-> INTO OUTFILE 'D:/software/MySql/Data/Uploads/test_data_export.csv'
-> FIELDS TERMINATED BY ','
-> ENCLOSED BY '"'
-> LINES TERMINATED BY '\r\n';
Query OK, 4 rows affected (0.01 sec)
处理 NULL 值的改进版本
sql
SELECT
id,
name,
IFNULL(description, '\\N') AS description,
IFNULL(is_active, '\\N') AS is_active,
created_at
FROM test_data
INTO OUTFILE 'D:/software/MySql/Data/Uploads/test_data_export.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\r\n';
先运行以下命令检查:
sql
SHOW VARIABLES LIKE 'secure_file_priv';
最佳实践建议
对于 Windows 系统,建议:
sql
LOAD DATA LOCAL INFILE 'C:\\Users\\lenovo\\test_data_import.csv'
INTO TABLE test_data_import
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\r\n'
(id, name, @description, @is_active, created_at)
SET
description = NULLIF(@description, ''),
is_active = NULLIF(@is_active, '');
对于 NULL 和空字符串的区别处理:
sql
SET
description = CASE WHEN @description = '\\N' THEN NULL ELSE @description END,
is_active = CASE WHEN @is_active = '\\N' THEN NULL ELSE @is_active END;
请根据您实际的 MySQL 版本和需求选择合适的解决方案
11.5 编写数据导出程序 406
mysql> use cookbook;
Database changed
mysql> SELECT * FROM mail INTO OUTFILE 'D:/software/MySql/Data/Uploads/mail.csv' ;
Query OK, 16 rows affected (0.02 sec)
LOAD DATA INFILE 'D:/software/MySql/Data/Uploads/test_data_import.csv'
INTO TABLE test_data_import
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
ESCAPED BY '"'
LINES TERMINATED BY '\r\n'
-- IGNORE 1 LINES -- 如果CSV有表头行,请取消注释此行
(id, name, description, is_active, create_time) -- 替换为你的实际列名,顺序必须与CSV一致
SET
description = NULLIF(description, '\\N'),
is_active = NULLIF(is_active, '\\N');
11.6 数据文件格式的转换 411
import csv
from pathlib import Path
class DataConverter:
# (省略类中的方法,与之前相同,确保完整复制)
def __init__(self, delimiter=',', quotechar='"'):
self.delimiter = delimiter
self.quotechar = quotechar
def txt_to_csv(self, txt_path, csv_path, has_headers=True):
with open(txt_path, 'r', encoding='utf-8') as txtfile, \
open(csv_path, 'w', encoding='utf-8', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
lines = [line.strip() for line in txtfile if line.strip()]
if has_headers:
headers = lines[0].split(self.delimiter)
writer.writerow(headers)
data_lines = lines[1:]
else:
data_lines = lines
for line in data_lines:
writer.writerow(line.split(self.delimiter))
print(f"已将 {txt_path} 转换为 {csv_path}")
def csv_to_pl(self, csv_path, pl_path, include_headers=True):
with open(csv_path, 'r', encoding='utf-8') as csvfile, \
open(pl_path, 'w', encoding='utf-8') as plfile:
reader = csv.reader(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
lines = list(reader)
plfile.write("# MySQL exported data file\n")
plfile.write("# Format: field1|field2|field3...\n")
if not include_headers:
lines = lines[1:]
for line in lines:
plfile.write('|'.join(line) + '\n')
print(f"已将 {csv_path} 转换为 {pl_path}")
def pl_to_txt(self, pl_path, txt_path, has_headers=True):
temp_csv = Path(pl_path).with_suffix('.temp.csv')
# 先将PL转CSV(内部逻辑)
with open(pl_path, 'r', encoding='utf-8') as plfile, \
open(temp_csv, 'w', encoding='utf-8', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
lines = [line.strip() for line in plfile if line.strip() and not line.startswith('#')]
if has_headers:
headers = lines[0].split('|')
writer.writerow(headers)
data_lines = lines[1:]
else:
data_lines = lines
for line in data_lines:
writer.writerow(line.split('|'))
# 再将CSV转TXT
with open(temp_csv, 'r', encoding='utf-8') as csvfile, \
open(txt_path, 'w', encoding='utf-8') as txtfile:
reader = csv.reader(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
lines = list(reader)
if not has_headers:
lines = lines[1:]
for line in lines:
txtfile.write(self.delimiter.join(line) + '\n')
temp_csv.unlink()
print(f"已将 {pl_path} 转换为 {txt_path}")
# --------------------------
# 分步执行,确保每个步骤的输入文件存在
# --------------------------
# 初始化转换器
converter = DataConverter()
# 步骤1:创建一个测试用的data.txt,并转换为CSV
# 先手动创建data.txt,内容如下:
# id,name,age
# 1,张三,30
# 2,李四,25
# 3,王五,35
converter.txt_to_csv(
txt_path="data.txt", # 确保当前目录有这个文件
csv_path="data_from_txt.csv",
has_headers=True
)
# 步骤2:用步骤1生成的CSV文件(data_from_txt.csv)转换为PL
converter.csv_to_pl(
csv_path="data_from_txt.csv", # 使用上一步生成的CSV,确保存在
pl_path="data_from_csv.pl",
include_headers=True
)
# 步骤3:用步骤2生成的PL文件(data_from_csv.pl)转换为TXT
converter.pl_to_txt(
pl_path="data_from_csv.pl", # 使用上一步生成的PL,确保存在
txt_path="data_from_pl.txt",
has_headers=True
)
已将 data.txt 转换为 data_from_txt.csv
已将 data_from_txt.csv 转换为 data_from_csv.pl
已将 data_from_csv.pl 转换为 data_from_pl.txt
11.7 提取并重新排列数据文件的列 412
```python
import csv
from pathlib import Path
def rearrange_columns(
input_file,
output_file,
new_order,
delimiter=',',
has_headers=True
):
"""
提取并重新排列文本文件中的列
参数:
input_file: 输入文件路径
output_file: 输出文件路径
new_order: 新的列顺序列表,例如 ['name', 'id', 'age'] 或 [1, 0, 2]
delimiter: 列分隔符,默认是逗号
has_headers: 是否包含表头
"""
with open(input_file, 'r', encoding='utf-8') as infile, \
open(output_file, 'w', encoding='utf-8', newline='') as outfile:
reader = csv.reader(infile, delimiter=delimiter)
writer = csv.writer(outfile, delimiter=delimiter)
# 处理表头
if has_headers:
headers = next(reader)
# 确定新列顺序的索引
if all(isinstance(x, str) for x in new_order):
# 如果new_order是列名,转换为索引
try:
new_indices = [headers.index(col) for col in new_order]
except ValueError as e:
raise ValueError(f"列名不存在: {e}")
else:
# 如果new_order是索引
new_indices = new_order
# 验证索引有效性
for idx in new_indices:
if idx < 0 or idx >= len(headers):
raise IndexError(f"无效的列索引: {idx}")
# 写入新表头
new_headers = [headers[i] for i in new_indices]
writer.writerow(new_headers)
else:
# 无表头时,直接使用索引
new_indices = new_order
# 先读取一行获取列数
first_row = next(reader)
for idx in new_indices:
if idx < 0 or idx >= len(first_row):
raise IndexError(f"无效的列索引: {idx}")
# 写回第一行
writer.writerow([first_row[i] for i in new_indices])
# 处理数据行
for row in reader:
# 跳过空行
if not row:
continue
# 按新顺序写入列
new_row = [row[i] for i in new_indices]
writer.writerow(new_row)
print(f"列重排完成!\n输入文件: {input_file}\n输出文件: {output_file}")
print(f"新的列顺序: {new_order}")
if __name__ == "__main__":
# 示例使用
# 1. 准备测试数据(如果不存在则创建)
test_data = """id,name,age,city,email
1,张三,30,北京,zhangsan@example.com
2,李四,25,上海,lisi@example.com
3,王五,35,广州,wangwu@example.com
"""
with open("data.txt", "w", encoding="utf-8") as f:
f.write(test_data)
# 2. 列重排示例
# 示例1: 使用列名重新排列(保留需要的列)
rearrange_columns(
input_file="data.txt",
output_file="rearranged_by_name.txt",
new_order=['name', 'age', 'city', 'id'], # 新的列顺序(按列名)
delimiter=',',
has_headers=True
)
# 示例2: 使用列索引重新排列(索引从0开始)
rearrange_columns(
input_file="data.txt",
output_file="rearranged_by_index.txt",
new_order=[1, 3, 0], # 新的列顺序(按索引,对应name, city, id)
delimiter=',',
has_headers=True
)
# 示例3: 处理无表头的文件(假设用Tab分隔)
# 先创建一个无表头的Tab分隔文件
no_header_data = """1\t张三\t30\t北京
2\t李四\t25\t上海
3\t王五\t35\t广州
"""
with open("data_no_header.txt", "w", encoding="utf-8") as f:
f.write(no_header_data)
# 重排无表头文件的列
rearrange_columns(
input_file="data_no_header.txt",
output_file="rearranged_no_header.txt",
new_order=[1, 0, 3], # 按索引重排(姓名, ID, 城市)
delimiter='\t',
has_headers=False
)
列重排完成!
输入文件: data.txt
输出文件: rearranged_by_name.txt
新的列顺序: ['name', 'age', 'city', 'id']
列重排完成!
输入文件: data.txt
输出文件: rearranged_by_index.txt
新的列顺序: [1, 3, 0]
列重排完成!
输入文件: data_no_header.txt
输出文件: rearranged_no_header.txt
新的列顺序: [1, 0, 3]
选择建议
追求极致压缩:优先选 Parquet(适合批量分析)或 Feather(适合快速读写)。
需要兼容文本格式:用 GZIP 压缩 CSV(简单易用,支持大多数工具)。
需要查询功能:用 SQLite(单文件数据库,支持 SQL 查询)。
半结构化数据:用 JSONL(比普通 JSON 紧凑,适合日志类数据)。
以文档中的 employees.txt 为例,转换为 Parquet 后大小通常可减少 70% 以上,且保留完整结构化信息,是最推荐的高效存储方式。
import mysql.connector
from mysql.connector import Error
import csv
def mysql_to_txt(
host,
database,
user,
password,
table_name="employees",
output_file="employees.txt",
delimiter="|",
include_headers=True,
query=None
):
"""
从MySQL数据库提取表数据并保存为文本文件
参数:
host: 数据库主机地址
database: 数据库名称
user: 数据库用户名
password: 数据库密码
table_name: 要提取数据的表名,默认是employees
output_file: 输出的文本文件名,默认是employees.txt
delimiter: 文本文件的列分隔符,默认是|
include_headers: 是否包含表头,默认是True
query: 自定义查询语句,如果提供则忽略table_name
"""
connection = None
try:
# 连接数据库
connection = mysql.connector.connect(
host=host,
database=database,
user=user,
password=password
)
if connection.is_connected():
cursor = connection.cursor()
# 准备查询语句
if query is None:
query = f"SELECT * FROM {table_name}"
# 执行查询
cursor.execute(query)
records = cursor.fetchall()
columns = [column[0] for column in cursor.description] # 获取列名
# 写入文本文件
with open(output_file, 'w', encoding='utf-8', newline='') as txtfile:
writer = csv.writer(
txtfile,
delimiter=delimiter,
quoting=csv.QUOTE_MINIMAL
)
# 写入表头
if include_headers:
writer.writerow(columns)
# 写入数据行
writer.writerows(records)
print(f"数据提取成功!")
print(f"表名: {table_name}")
print(f"提取记录数: {len(records)}")
print(f"输出文件: {output_file}")
print(f"分隔符: '{delimiter}'")
except Error as e:
print(f"数据库操作错误: {e}")
finally:
# 关闭数据库连接
if connection is not None and connection.is_connected():
cursor.close()
connection.close()
print("数据库连接已关闭")
if __name__ == "__main__":
# 配置数据库连接信息(请根据实际情况修改)
db_config = {
"host": "localhost", # 数据库主机地址
"database": "employees",# 数据库名称
"user": "root", # 数据库用户名
"password": "root" # 数据库密码
}
# 提取employees表数据到employees.txt
mysql_to_txt(
**db_config,
table_name="employees",
output_file="employees.txt",
delimiter="|", # 使用|作为分隔符,便于后续导入MySQL
include_headers=True
)
# 示例:提取特定列(使用自定义查询)
# custom_query = "SELECT emp_no, first_name, last_name, hire_date FROM employees WHERE hire_date > '2000-01-01'"
# mysql_to_txt(
# ** db_config,
# query=custom_query,
# output_file="employees_filtered.txt",
# delimiter=","
# )
数据提取成功!
表名: employees
提取记录数: 300024
输出文件: employees.txt
分隔符: '|'
数据库连接已关闭
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path # 新增导入Path类
# 读取 TXT 文件(按|分隔)
df = pd.read_csv(
"employees.txt",
sep="|",
header=0, # 第一行为表头
names=["emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date"]
)
# 转换为 Parquet 并压缩
pq.write_table(
pa.Table.from_pandas(df),
"employees.parquet",
compression="snappy" # 可选压缩算法:snappy、gzip、lz4等
)
# 计算并打印文件大小(修正后)
print(f"原文件大小: {Path('employees.txt').stat().st_size / 1024:.2f} KB") # 更准确的原文件大小计算方式
print(f"Parquet文件大小: {Path('employees.parquet').stat().st_size / 1024:.2f} KB")
原文件大小: 13791.09 KB
Parquet文件大小: 3318.18 KB
import gzip
import csv
from pathlib import Path
# 读取 TXT 并写入 GZIP 压缩的 CSV
with open("employees.txt", "r", encoding="utf-8") as f_in, \
gzip.open("employees.csv.gz", "wt", encoding="utf-8") as f_out:
reader = csv.reader(f_in, delimiter="|")
writer = csv.writer(f_out, delimiter=",") # 转为逗号分隔
for row in reader:
writer.writerow(row)
print(f"GZIP压缩后大小: {Path('employees.csv.gz').stat().st_size / 1024:.2f} KB")
GZIP压缩后大小: 4971.34 KB
import sqlite3
import csv
from pathlib import Path
# 连接 SQLite 数据库(文件不存在则创建)
conn = sqlite3.connect("employees.db")
cursor = conn.cursor()
# 创建表结构
cursor.execute("""
CREATE TABLE IF NOT EXISTS employees (
emp_no INT,
birth_date DATE,
first_name TEXT,
last_name TEXT,
gender TEXT,
hire_date DATE
)
""")
# 读取 TXT 并插入数据
with open("employees.txt", "r", encoding="utf-8") as f:
reader = csv.reader(f, delimiter="|")
next(reader) # 跳过表头
cursor.executemany("""
INSERT INTO employees VALUES (?, ?, ?, ?, ?, ?)
""", reader)
conn.commit()
conn.close()
print(f"SQLite文件大小: {Path('employees.db').stat().st_size / 1024:.2f} KB")
SQLite文件大小: 14884.00 KB
为什么 Parquet 压缩效率这么高?
列存储特性:Parquet 按列存储数据,对于重复值较多的列(如 gender 只有 M/F 两种值),压缩算法(如 snappy)能高效消除冗余。
类型优化:自动识别数据类型(如 emp_no 为整数、birth_date 为日期),用更紧凑的二进制格式存储,比文本格式更节省空间。
按需读取:后续使用时可以只读取需要的列(如只查 name 和 hire_date),无需加载整个文件,进一步提升效率。
后续操作建议
验证数据完整性:可以用以下代码确认转换后的数据是否完整:
python
运行
import pandas as pd
# 读取Parquet文件
parquet_df = pd.read_parquet("employees.parquet")
# 对比行数是否与原文件一致
txt_df = pd.read_csv("employees.txt", sep="|")
print(f"原文件行数: {len(txt_df)}, Parquet文件行数: {len(parquet_df)}") # 应相等
原文件行数: 300024, Parquet文件行数: 300024
当数据量达到 5 亿行以上时,选择存储格式需要重点考虑压缩效率、分布式处理兼容性、读写性能和查询效率(尤其是列级操作和过滤能力)。此时,Parquet 是最优选择,其次是 ORC(针对特定生态),以下是具体分析:
一、5 亿 + 数据量的核心需求
极致压缩率:减少存储成本(5 亿行数据若用 CSV 可能占用数十 TB,压缩后需控制在数 TB 内)。
分布式友好:支持 Spark、Flink、Hadoop 等分布式框架,避免单机处理瓶颈。
列级操作支持:可只读取需要的列(如仅查询hire_date和gender),减少 IO 和计算量。
稳定的读写性能:大规模数据下避免内存溢出,支持批量读写和并行处理。
二、最优选择:Parquet
核心优势:
压缩效率碾压文本格式
5 亿行数据中,大量列(如gender仅 2 个值、birth_date格式固定)存在极高重复度,Parquet 的列存储 + 压缩算法(如 ZSTD、Gzip)可将压缩比做到 10:1 甚至 20:1(远高于 CSV 的 3:1)。例如:5 亿行员工数据用 CSV 可能占 50TB,Parquet 压缩后可降至 3-5TB。
完美适配分布式生态
支持 Spark、Hive、Flink 等分布式计算框架,可直接进行分区存储(如按hire_year分区),实现 “数据分片 + 并行处理”,避免单机加载 5 亿行数据的内存压力。
谓词下推与列剪枝
查询时可通过where条件(如hire_date > '2000-01-01')直接在存储层过滤数据,且只读取需要的列(如仅emp_no和last_name),IO 量减少 80% 以上。
成熟的工业级支持
广泛用于大数据场景(如 Netflix、Uber 的 PB 级数据存储),兼容性强,工具链完善(Python、Java、SQL 均可直接操作)。
import pandas as pd
from sqlalchemy import create_engine
from pathlib import Path
# 1. 读取 Parquet 文件
parquet_df = pd.read_parquet("employees.parquet")
print(f"待导入数据量:{len(parquet_df)} 行")
# 2. 连接 MySQL 数据库(替换为你的数据库信息)
db_config = {
"host": "localhost",
"user": "你的用户名",
"password": "你的密码",
"database": "目标数据库名",
"port": 3306
}
engine = create_engine(f"mysql+mysqlconnector://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}")
# 3. 写入 MySQL(自动创建表,若表已存在可添加 if_exists='append' 追加)
parquet_df.to_sql(
name="employees_from_parquet", # 目标表名
con=engine,
index=False, # 不导入 DataFrame 的索引
if_exists="replace" # 若表存在则替换(可选:'fail' 报错 / 'append' 追加)
)
print("导入完成!可在 MySQL 中查询表 employees_from_parquet")
待导入数据量:300024 行
导入完成!可在 MySQL 中查询表 employees_from_parquet