板凳-------Mysql cookbook学习 (十二--------5)

发布于:2025-07-24 ⋅ 阅读:(15) ⋅ 点赞:(0)

11.4 导入和导出NULL值 404

-- 创建测试数据库
CREATE DATABASE IF NOT EXISTS test_null_db;
USE test_null_db;

-- 创建测试表
CREATE TABLE test_data (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50),
    description VARCHAR(100),
    is_active TINYINT(1),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 插入测试数据(包含NULL和空字符串)
INSERT INTO test_data (name, description, is_active) VALUES
('Alice', 'Regular user', 1),
('Bob', NULL, 0),  -- 显式NULL
('Charlie', '', 1),  -- 空字符串
('David', 'VIP member', NULL);  -- NULL值

使用标准 mysqldump 
bash
复制
下载
# 标准 mysqldump 命令(不指定 NULL 表示方式)
mysqldump -u root -p test_null_db test_data > test_data_dump.sql

mysql> SHOW VARIABLES LIKE 'secure_file_priv';
+------------------+---------------------------------+
| Variable_name    | Value                           |
+------------------+---------------------------------+
| secure_file_priv | D:\software\MySql\Data\Uploads\ |
+------------------+---------------------------------+
1 row in set, 1 warning (0.00 sec)

mysql> SELECT * FROM test_data
    -> INTO OUTFILE 'D:/software/MySql/Data/Uploads/test_data_export.csv'
    -> FIELDS TERMINATED BY ','
    -> ENCLOSED BY '"'
    -> LINES TERMINATED BY '\r\n';
Query OK, 4 rows affected (0.01 sec)

处理 NULL 值的改进版本
sql
SELECT 
    id,
    name,
    IFNULL(description, '\\N') AS description,
    IFNULL(is_active, '\\N') AS is_active,
    created_at
FROM test_data
INTO OUTFILE 'D:/software/MySql/Data/Uploads/test_data_export.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\r\n';

先运行以下命令检查:

sql
SHOW VARIABLES LIKE 'secure_file_priv';
最佳实践建议
对于 Windows 系统,建议:

sql
LOAD DATA LOCAL INFILE 'C:\\Users\\lenovo\\test_data_import.csv'
INTO TABLE test_data_import
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\r\n'
(id, name, @description, @is_active, created_at)
SET
  description = NULLIF(@description, ''),
  is_active = NULLIF(@is_active, '');
对于 NULL 和空字符串的区别处理:

sql
SET
  description = CASE WHEN @description = '\\N' THEN NULL ELSE @description END,
  is_active = CASE WHEN @is_active = '\\N' THEN NULL ELSE @is_active END;
请根据您实际的 MySQL 版本和需求选择合适的解决方案

11.5 编写数据导出程序 406

mysql> use cookbook;
Database changed
mysql> SELECT * FROM mail  INTO OUTFILE 'D:/software/MySql/Data/Uploads/mail.csv' ;
Query OK, 16 rows affected (0.02 sec)


LOAD DATA INFILE 'D:/software/MySql/Data/Uploads/test_data_import.csv'
INTO TABLE test_data_import
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
ESCAPED BY '"'
LINES TERMINATED BY '\r\n'
-- IGNORE 1 LINES  -- 如果CSV有表头行,请取消注释此行
(id, name, description, is_active, create_time)  -- 替换为你的实际列名,顺序必须与CSV一致
SET
    description = NULLIF(description, '\\N'),
is_active = NULLIF(is_active, '\\N');

11.6 数据文件格式的转换 411

import csv
from pathlib import Path

class DataConverter:
    # (省略类中的方法,与之前相同,确保完整复制)
    def __init__(self, delimiter=',', quotechar='"'):
        self.delimiter = delimiter
        self.quotechar = quotechar

    def txt_to_csv(self, txt_path, csv_path, has_headers=True):
        with open(txt_path, 'r', encoding='utf-8') as txtfile, \
             open(csv_path, 'w', encoding='utf-8', newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
            lines = [line.strip() for line in txtfile if line.strip()]
            if has_headers:
                headers = lines[0].split(self.delimiter)
                writer.writerow(headers)
                data_lines = lines[1:]
            else:
                data_lines = lines
            for line in data_lines:
                writer.writerow(line.split(self.delimiter))
        print(f"已将 {txt_path} 转换为 {csv_path}")

    def csv_to_pl(self, csv_path, pl_path, include_headers=True):
        with open(csv_path, 'r', encoding='utf-8') as csvfile, \
             open(pl_path, 'w', encoding='utf-8') as plfile:
            reader = csv.reader(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
            lines = list(reader)
            plfile.write("# MySQL exported data file\n")
            plfile.write("# Format: field1|field2|field3...\n")
            if not include_headers:
                lines = lines[1:]
            for line in lines:
                plfile.write('|'.join(line) + '\n')
        print(f"已将 {csv_path} 转换为 {pl_path}")

    def pl_to_txt(self, pl_path, txt_path, has_headers=True):
        temp_csv = Path(pl_path).with_suffix('.temp.csv')
        # 先将PL转CSV(内部逻辑)
        with open(pl_path, 'r', encoding='utf-8') as plfile, \
             open(temp_csv, 'w', encoding='utf-8', newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
            lines = [line.strip() for line in plfile if line.strip() and not line.startswith('#')]
            if has_headers:
                headers = lines[0].split('|')
                writer.writerow(headers)
                data_lines = lines[1:]
            else:
                data_lines = lines
            for line in data_lines:
                writer.writerow(line.split('|'))
        # 再将CSV转TXT
        with open(temp_csv, 'r', encoding='utf-8') as csvfile, \
             open(txt_path, 'w', encoding='utf-8') as txtfile:
            reader = csv.reader(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)
            lines = list(reader)
            if not has_headers:
                lines = lines[1:]
            for line in lines:
                txtfile.write(self.delimiter.join(line) + '\n')
        temp_csv.unlink()
        print(f"已将 {pl_path} 转换为 {txt_path}")


# --------------------------
# 分步执行,确保每个步骤的输入文件存在
# --------------------------

# 初始化转换器
converter = DataConverter()

# 步骤1:创建一个测试用的data.txt,并转换为CSV
# 先手动创建data.txt,内容如下:
# id,name,age
# 1,张三,30
# 2,李四,25
# 3,王五,35
converter.txt_to_csv(
    txt_path="data.txt",    # 确保当前目录有这个文件
    csv_path="data_from_txt.csv",
    has_headers=True
)

# 步骤2:用步骤1生成的CSV文件(data_from_txt.csv)转换为PL
converter.csv_to_pl(
    csv_path="data_from_txt.csv",  # 使用上一步生成的CSV,确保存在
    pl_path="data_from_csv.pl",
    include_headers=True
)

# 步骤3:用步骤2生成的PL文件(data_from_csv.pl)转换为TXT
converter.pl_to_txt(
    pl_path="data_from_csv.pl",  # 使用上一步生成的PL,确保存在
    txt_path="data_from_pl.txt",
    has_headers=True
)

已将 data.txt 转换为 data_from_txt.csv
已将 data_from_txt.csv 转换为 data_from_csv.pl
已将 data_from_csv.pl 转换为 data_from_pl.txt

11.7 提取并重新排列数据文件的列 412


```python
import csv
from pathlib import Path

def rearrange_columns(
    input_file, 
    output_file, 
    new_order, 
    delimiter=',', 
    has_headers=True
):
    """
    提取并重新排列文本文件中的列
    
    参数:
        input_file: 输入文件路径
        output_file: 输出文件路径
        new_order: 新的列顺序列表,例如 ['name', 'id', 'age'] 或 [1, 0, 2]
        delimiter: 列分隔符,默认是逗号
        has_headers: 是否包含表头
    """
    with open(input_file, 'r', encoding='utf-8') as infile, \
         open(output_file, 'w', encoding='utf-8', newline='') as outfile:
        
        reader = csv.reader(infile, delimiter=delimiter)
        writer = csv.writer(outfile, delimiter=delimiter)
        
        # 处理表头
        if has_headers:
            headers = next(reader)
            
            # 确定新列顺序的索引
            if all(isinstance(x, str) for x in new_order):
                # 如果new_order是列名,转换为索引
                try:
                    new_indices = [headers.index(col) for col in new_order]
                except ValueError as e:
                    raise ValueError(f"列名不存在: {e}")
            else:
                # 如果new_order是索引
                new_indices = new_order
                
                # 验证索引有效性
                for idx in new_indices:
                    if idx < 0 or idx >= len(headers):
                        raise IndexError(f"无效的列索引: {idx}")
            
            # 写入新表头
            new_headers = [headers[i] for i in new_indices]
            writer.writerow(new_headers)
        else:
            # 无表头时,直接使用索引
            new_indices = new_order
            # 先读取一行获取列数
            first_row = next(reader)
            for idx in new_indices:
                if idx < 0 or idx >= len(first_row):
                    raise IndexError(f"无效的列索引: {idx}")
            # 写回第一行
            writer.writerow([first_row[i] for i in new_indices])
        
        # 处理数据行
        for row in reader:
            # 跳过空行
            if not row:
                continue
            # 按新顺序写入列
            new_row = [row[i] for i in new_indices]
            writer.writerow(new_row)
    
    print(f"列重排完成!\n输入文件: {input_file}\n输出文件: {output_file}")
    print(f"新的列顺序: {new_order}")

if __name__ == "__main__":
    # 示例使用
    
    # 1. 准备测试数据(如果不存在则创建)
    test_data = """id,name,age,city,email
1,张三,30,北京,zhangsan@example.com
2,李四,25,上海,lisi@example.com
3,王五,35,广州,wangwu@example.com
"""
    with open("data.txt", "w", encoding="utf-8") as f:
        f.write(test_data)
    
    # 2. 列重排示例
    
    # 示例1: 使用列名重新排列(保留需要的列)
    rearrange_columns(
        input_file="data.txt",
        output_file="rearranged_by_name.txt",
        new_order=['name', 'age', 'city', 'id'],  # 新的列顺序(按列名)
        delimiter=',',
        has_headers=True
    )
    
    # 示例2: 使用列索引重新排列(索引从0开始)
    rearrange_columns(
        input_file="data.txt",
        output_file="rearranged_by_index.txt",
        new_order=[1, 3, 0],  # 新的列顺序(按索引,对应name, city, id)
        delimiter=',',
        has_headers=True
    )
    
    # 示例3: 处理无表头的文件(假设用Tab分隔)
    # 先创建一个无表头的Tab分隔文件
    no_header_data = """1\t张三\t30\t北京
2\t李四\t25\t上海
3\t王五\t35\t广州
"""
    with open("data_no_header.txt", "w", encoding="utf-8") as f:
        f.write(no_header_data)
    
    # 重排无表头文件的列
    rearrange_columns(
        input_file="data_no_header.txt",
        output_file="rearranged_no_header.txt",
        new_order=[1, 0, 3],  # 按索引重排(姓名, ID, 城市)
        delimiter='\t',
        has_headers=False
)

列重排完成!
输入文件: data.txt
输出文件: rearranged_by_name.txt
新的列顺序: ['name', 'age', 'city', 'id']
列重排完成!
输入文件: data.txt
输出文件: rearranged_by_index.txt
新的列顺序: [1, 3, 0]
列重排完成!
输入文件: data_no_header.txt
输出文件: rearranged_no_header.txt
新的列顺序: [1, 0, 3]

选择建议
追求极致压缩:优先选 Parquet(适合批量分析)或 Feather(适合快速读写)。
需要兼容文本格式:用 GZIP 压缩 CSV(简单易用,支持大多数工具)。
需要查询功能:用 SQLite(单文件数据库,支持 SQL 查询)。
半结构化数据:用 JSONL(比普通 JSON 紧凑,适合日志类数据)。
以文档中的 employees.txt 为例,转换为 Parquet 后大小通常可减少 70% 以上,且保留完整结构化信息,是最推荐的高效存储方式。
import mysql.connector
from mysql.connector import Error
import csv

def mysql_to_txt(
    host,
    database,
    user,
    password,
    table_name="employees",
    output_file="employees.txt",
    delimiter="|",
    include_headers=True,
    query=None
):
    """
    从MySQL数据库提取表数据并保存为文本文件
    
    参数:
        host: 数据库主机地址
        database: 数据库名称
        user: 数据库用户名
        password: 数据库密码
        table_name: 要提取数据的表名,默认是employees
        output_file: 输出的文本文件名,默认是employees.txt
        delimiter: 文本文件的列分隔符,默认是|
        include_headers: 是否包含表头,默认是True
        query: 自定义查询语句,如果提供则忽略table_name
    """
    connection = None
    try:
        # 连接数据库
        connection = mysql.connector.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )
        
        if connection.is_connected():
            cursor = connection.cursor()
            
            # 准备查询语句
            if query is None:
                query = f"SELECT * FROM {table_name}"
            
            # 执行查询
            cursor.execute(query)
            records = cursor.fetchall()
            columns = [column[0] for column in cursor.description]  # 获取列名
            
            # 写入文本文件
            with open(output_file, 'w', encoding='utf-8', newline='') as txtfile:
                writer = csv.writer(
                    txtfile,
                    delimiter=delimiter,
                    quoting=csv.QUOTE_MINIMAL
                )
                
                # 写入表头
                if include_headers:
                    writer.writerow(columns)
                
                # 写入数据行
                writer.writerows(records)
            
            print(f"数据提取成功!")
            print(f"表名: {table_name}")
            print(f"提取记录数: {len(records)}")
            print(f"输出文件: {output_file}")
            print(f"分隔符: '{delimiter}'")

    except Error as e:
        print(f"数据库操作错误: {e}")
    finally:
        # 关闭数据库连接
        if connection is not None and connection.is_connected():
            cursor.close()
            connection.close()
            print("数据库连接已关闭")

if __name__ == "__main__":
    # 配置数据库连接信息(请根据实际情况修改)
    db_config = {
        "host": "localhost",        # 数据库主机地址
        "database": "employees",# 数据库名称
        "user": "root",    # 数据库用户名
        "password": "root" # 数据库密码
    }
    
    # 提取employees表数据到employees.txt
    mysql_to_txt(
        **db_config,
        table_name="employees",
        output_file="employees.txt",
        delimiter="|",  # 使用|作为分隔符,便于后续导入MySQL
        include_headers=True
    )
    
    # 示例:提取特定列(使用自定义查询)
    # custom_query = "SELECT emp_no, first_name, last_name, hire_date FROM employees WHERE hire_date > '2000-01-01'"
    # mysql_to_txt(
    #     ** db_config,
    #     query=custom_query,
    #     output_file="employees_filtered.txt",
    #     delimiter=","
    # )
数据提取成功!
表名: employees
提取记录数: 300024
输出文件: employees.txt
分隔符: '|'
数据库连接已关闭

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path  # 新增导入Path类

# 读取 TXT 文件(按|分隔)
df = pd.read_csv(
    "employees.txt",
    sep="|",
    header=0,  # 第一行为表头
    names=["emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date"]
)

# 转换为 Parquet 并压缩
pq.write_table(
    pa.Table.from_pandas(df),
    "employees.parquet",
    compression="snappy"  # 可选压缩算法:snappy、gzip、lz4等
)

# 计算并打印文件大小(修正后)
print(f"原文件大小: {Path('employees.txt').stat().st_size / 1024:.2f} KB")  # 更准确的原文件大小计算方式
print(f"Parquet文件大小: {Path('employees.parquet').stat().st_size / 1024:.2f} KB")

原文件大小: 13791.09 KB
Parquet文件大小: 3318.18 KB

import gzip
import csv
from pathlib import Path

# 读取 TXT 并写入 GZIP 压缩的 CSV
with open("employees.txt", "r", encoding="utf-8") as f_in, \
     gzip.open("employees.csv.gz", "wt", encoding="utf-8") as f_out:
    
    reader = csv.reader(f_in, delimiter="|")
    writer = csv.writer(f_out, delimiter=",")  # 转为逗号分隔
    for row in reader:
        writer.writerow(row)

print(f"GZIP压缩后大小: {Path('employees.csv.gz').stat().st_size / 1024:.2f} KB")

GZIP压缩后大小: 4971.34 KB

import sqlite3
import csv
from pathlib import Path

# 连接 SQLite 数据库(文件不存在则创建)
conn = sqlite3.connect("employees.db")
cursor = conn.cursor()

# 创建表结构
cursor.execute("""
    CREATE TABLE IF NOT EXISTS employees (
        emp_no INT,
        birth_date DATE,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        hire_date DATE
    )
""")

# 读取 TXT 并插入数据
with open("employees.txt", "r", encoding="utf-8") as f:
    reader = csv.reader(f, delimiter="|")
    next(reader)  # 跳过表头
    cursor.executemany("""
        INSERT INTO employees VALUES (?, ?, ?, ?, ?, ?)
    """, reader)

conn.commit()
conn.close()

print(f"SQLite文件大小: {Path('employees.db').stat().st_size / 1024:.2f} KB")

SQLite文件大小: 14884.00 KB

为什么 Parquet 压缩效率这么高?
列存储特性:Parquet 按列存储数据,对于重复值较多的列(如 gender 只有 M/F 两种值),压缩算法(如 snappy)能高效消除冗余。
类型优化:自动识别数据类型(如 emp_no 为整数、birth_date 为日期),用更紧凑的二进制格式存储,比文本格式更节省空间。
按需读取:后续使用时可以只读取需要的列(如只查 name 和 hire_date),无需加载整个文件,进一步提升效率。
后续操作建议
验证数据完整性:可以用以下代码确认转换后的数据是否完整:
python
运行
import pandas as pd
# 读取Parquet文件
parquet_df = pd.read_parquet("employees.parquet")
# 对比行数是否与原文件一致
txt_df = pd.read_csv("employees.txt", sep="|")
print(f"原文件行数: {len(txt_df)}, Parquet文件行数: {len(parquet_df)}")  # 应相等

原文件行数: 300024, Parquet文件行数: 300024

当数据量达到 5 亿行以上时,选择存储格式需要重点考虑压缩效率、分布式处理兼容性、读写性能和查询效率(尤其是列级操作和过滤能力)。此时,Parquet 是最优选择,其次是 ORC(针对特定生态),以下是具体分析:
一、5 亿 + 数据量的核心需求
极致压缩率:减少存储成本(5 亿行数据若用 CSV 可能占用数十 TB,压缩后需控制在数 TB 内)。
分布式友好:支持 Spark、Flink、Hadoop 等分布式框架,避免单机处理瓶颈。
列级操作支持:可只读取需要的列(如仅查询hire_date和gender),减少 IO 和计算量。
稳定的读写性能:大规模数据下避免内存溢出,支持批量读写和并行处理。
二、最优选择:Parquet
核心优势:
压缩效率碾压文本格式
5 亿行数据中,大量列(如gender仅 2 个值、birth_date格式固定)存在极高重复度,Parquet 的列存储 + 压缩算法(如 ZSTD、Gzip)可将压缩比做到 10:1 甚至 20:1(远高于 CSV 的 3:1)。例如:5 亿行员工数据用 CSV 可能占 50TB,Parquet 压缩后可降至 3-5TB。
完美适配分布式生态
支持 Spark、Hive、Flink 等分布式计算框架,可直接进行分区存储(如按hire_year分区),实现 “数据分片 + 并行处理”,避免单机加载 5 亿行数据的内存压力。
谓词下推与列剪枝
查询时可通过where条件(如hire_date > '2000-01-01')直接在存储层过滤数据,且只读取需要的列(如仅emp_no和last_name),IO 量减少 80% 以上。
成熟的工业级支持
广泛用于大数据场景(如 Netflix、Uber 的 PB 级数据存储),兼容性强,工具链完善(Python、Java、SQL 均可直接操作)。
import pandas as pd
from sqlalchemy import create_engine
from pathlib import Path


# 1. 读取 Parquet 文件
parquet_df = pd.read_parquet("employees.parquet")
print(f"待导入数据量:{len(parquet_df)} 行")

# 2. 连接 MySQL 数据库(替换为你的数据库信息)
db_config = {
    "host": "localhost",
    "user": "你的用户名",
    "password": "你的密码",
    "database": "目标数据库名",
    "port": 3306
}
engine = create_engine(f"mysql+mysqlconnector://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}")

# 3. 写入 MySQL(自动创建表,若表已存在可添加 if_exists='append' 追加)
parquet_df.to_sql(
    name="employees_from_parquet",  # 目标表名
    con=engine,
    index=False,  # 不导入 DataFrame 的索引
    if_exists="replace"  # 若表存在则替换(可选:'fail' 报错 / 'append' 追加)
)

print("导入完成!可在 MySQL 中查询表 employees_from_parquet")

待导入数据量:300024 行
导入完成!可在 MySQL 中查询表 employees_from_parquet


网站公告

今日签到

点亮在社区的每一天
去签到