虫洞数观系列二 | Python+MySQL高效封装:为pandas数据分析铺路

发布于:2025-04-03 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

系列文章

1. 引言

2. 常规写法mysql

3. 封装设计接口mysql

3.1dbname.py文件

3.1.1. 导入和基类定义

3.1.2. 具体表定义类

3.1.3. 表定义整合函数

3.1.4. 全局字典和测试代码

3.2mysql_dao文件

3.2.1. 模块导入与配置

3.2.2. 数据库连接池初始化

3.2.3. CommonSQL 类功能

3.3db文件使用

4总结


系列文章

虫洞数观系列总览 | 技术全景:豆瓣电影TOP250数据采集→分析→可视化完整指南

虫洞数观系列一 | 豆瓣电影TOP250数据采集与MySQL存储实战

虫洞数观系列三 | 数据分析全链路实践:Pandas清洗统计 + Navicat可视化呈现 

1. 引言

在上一篇文章中,我们完成了豆瓣TOP250电影数据的爬取,存储字段包括:

  • 基础信息(中英文片名、详情页链接)

  • 制作信息(导演、主演、年份、国家、类型)

  • 评分数据(分数、评分人数、经典评语)

这些数据已存入MySQL数据库doubantop250movie表中。

本文核心目标

  1. 用Python封装MySQL的CRUD(增删改查)操作类

  2. 建立高效的数据存取管道

  3. 为后续的Pandas透视分析(如:

    • 按年份/国家的评分分布

    • 类型与评分的关联性

    • 导演/演员的作品统计等)奠定基础

通过标准化数据库操作接口,后续数据分析时只需关注业务逻辑,无需重复编写SQL语句。

2. 常规写法mysql

可以参考之前的文章

知识周汇 | MySQL增删改查与Python连接

对以下的数据表格实现增删改查,

# coding=utf-8


import mysql.connector.pooling
import pandas as pd

# 本地数据库
__config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "faw-vw.1901",
    "database": "douban"
}

try:
    pool = mysql.connector.pooling.MySQLConnectionPool(
        **__config,
        pool_size=10
    )
except Exception as e:
    print(e)


class DoubanDao():
    # 增
    def add_infro_from_douban(self):
        sql = "REPLACE INTO top250movie (feature) VALUES (2);"

        print(sql)
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            cursor.execute(sql)
            con.commit()
        except Exception as e:
            if "con" in dir():
                con.rollback()
        finally:
            if "con" in dir():
                con.close()

    # 删
    def del_infro_from_douban(self):
        sql = "DELETE FROM top250movie WHERE feature ='TOP0001';"

        print(sql)
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            cursor.execute(sql)
            con.commit()
        except Exception as e:
            if "con" in dir():
                con.rollback()
        finally:
            if "con" in dir():
                con.close()

    # 改
    def update_infro_from_douban(self):
        sql = "UPDATE top250movie SET movie_ch = '" + str("你好") + "' WHERE feature = '" + "TOP0002" + "';"

        # print(sql)
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            cursor.execute(sql)
            print(sql)
            con.commit()
        except Exception as e:
            print(e)
            if "con" in dir():
                con.rollback()
        finally:
            if "con" in dir():
                con.close()

    # 查
    def select_infro_from_douban(self):
        sql = "SELECT update_date ,movie_ch,movie_en,movie_url FROM top250movie;"

        print(sql)
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            cursor.execute(sql)
            result = cursor.fetchall()
            return result
        except Exception as e:
            if "con" in dir():
                con.rollback()
        finally:
            if "con" in dir():
                con.close()

这种实现方式存在代码冗余问题,当数据库表数量增加时,需要为每个表单独编写定制化逻辑,显著增加了开发维护成本。

3. 封装接口设计mysql

这边设计3个py文件,dbname.py - 表定义模块,mysql_dao.py - 数据访问对象(DAO),main.py - 主程序。

架构设计图

+-------------------+    +-------------------+    +-------------------+
|    dbname.py      |    |   mysql_dao.py    |    |     main.py       |
| 表结构定义模块     |<--->| 数据访问层        |<--->| 业务逻辑层        |
+-------------------+    +-------------------+    +-------------------+
                                      |
                                      v
                            +-------------------+
                            |     MySQL 数据库   |
                            +-------------------+

3.1dbname.py文件

该文件主要是想表达数据表的列名和中文名字对应关系

from typing import Dict


class TableDefinition:
    """表定义基类"""

    @staticmethod
    def _create_table_dict(table_name: str, columns: Dict[str, str]) -> Dict[str, Dict[str, str]]:
        """创建表字典结构"""
        return {table_name: columns}


class DouBan(TableDefinition):
    """分析表定义"""

    @staticmethod
    def top250movie() -> Dict[str, Dict[str, str]]:
        """top250电影"""
        columns = {
            'update_date': '更新日期',
            'feature': '特征值',
            'movie_ch': '电影中文名',
            'movie_en': '电影英文名',
            'movie_url': '电影详情页链接',
            'director': '导演',
            'star': '主演',
            'start_year': '上映年份',
            'country': '国籍',
            'type': '类型',
            'rating': '评分',
            'num_ratings': '评分人数',
            'comment': '评语',
        }
        return DouBan._create_table_dict('top250movie', columns)

    # 其他表定义类...


def get_dbname_dict() -> Dict[str, Dict[str, str]]:
    """获取所有表定义的字典"""
    db_dict = {}

    # 合并所有表定义
    db_dict.update(DouBan.top250movie())
    # 添加其他表...

    return db_dict


# 全局表定义字典
dbname_dic = get_dbname_dict()
for dbname in dbname_dic:
    print('>>>>>>>>>>>>>>>>>')
    print(dbname)
    print(dbname_dic[dbname])

打印的结果:

3.1.1. 导入和基类定义

from typing import Dict

class TableDefinition:
    """表定义基类"""

    @staticmethod
    def _create_table_dict(table_name: str, columns: Dict[str, str]) -> Dict[str, Dict[str, str]]:
        """创建表字典结构"""
        return {table_name: columns}
  • typing模块导入Dict,用于类型注解。

  • 定义了一个基类TableDefinition,包含一个静态方法_create_table_dict,用于创建表结构的字典表示。该方法接收表名和列定义字典,返回一个嵌套字典,外层键是表名,内层是列名到列描述的映射。

3.1.2. 具体表定义类

class DouBan(TableDefinition):
    """分析表定义"""

    @staticmethod
    def top250movie() -> Dict[str, Dict[str, str]]:
        """top250电影"""
        columns = {
            'update_date': '更新日期',
            'feature': '特征值',
            'movie_ch': '电影中文名',
            'movie_en': '电影英文名',
            'movie_url': '电影详情页链接',
            'director': '导演',
            'star': '主演',
            'start_year': '上映年份',
            'country': '国籍',
            'type': '类型',
            'rating': '评分',
            'num_ratings': '评分人数',
            'comment': '评语',
        }
        return DouBan._create_table_dict('top250movie', columns)
  • DouBan继承自TableDefinition,表示豆瓣相关的表定义。

  • 定义了一个静态方法top250movie,返回豆瓣Top250电影的表结构:

    • 包含13个字段,如更新日期、电影中英文名、导演、评分等。

    • 每个字段都有英文名和中文描述。

    • 使用基类的_create_table_dict方法生成最终的字典结构。

3.1.3. 表定义整合函数

def get_dbname_dict() -> Dict[str, Dict[str, str]]:
    """获取所有表定义的字典"""
    db_dict = {}

    # 合并所有表定义
    db_dict.update(DouBan.top250movie())
    # 添加其他表...

    return db_dict
  • 该函数整合所有表定义,返回一个统一的字典。

  • 目前只添加了DouBan.top250movie(),但注释表明可以添加其他表定义。

3.1.4. 全局字典和测试代码

# 全局表定义字典
dbname_dic = get_dbname_dict()
for dbname in dbname_dic:
    print('>>>>>>>>>>>>>>>>>')
    print(dbname)
    print(dbname_dic[dbname])
  • 生成全局表定义字典dbname_dic

  • 遍历并打印每个表名及其列定义,用于测试和验证。

3.2mysql_dao文件

# coding=utf-8
import pandas as pd
from tqdm import tqdm
import mysql.connector.pooling
from db.dbname import dbname_dic

# Database configuration
__config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "faw-vw.1901",
    "database": "douban"
}

# Initialize connection pool
try:
    pool = mysql.connector.pooling.MySQLConnectionPool(
        **__config,
        pool_size=10
    )
except Exception as e:
    print(f"Error initializing connection pool: {e}")


class CommonSQL:
    def __init__(self):
        self.pool = pool

    def execute_sql_no_return(self, sql):
        """Execute SQL without return value."""
        try:
            con = self.pool.get_connection()
            cursor = con.cursor()
            cursor.execute(sql)
            con.commit()
            self._print_success(sql)
        except Exception as e:
            if "con" in locals():
                con.rollback()
            self._print_failure(sql)
            print(f"Error: {e}")
        finally:
            if "con" in locals():
                con.close()

    def executemany_sql_no_return(self, sql, value_list):
        """Execute many SQL statements without return value."""
        try:
            con = self.pool.get_connection()
            cursor = con.cursor()
            cursor.executemany(sql, value_list)
            con.commit()
            self._print_success(sql)
        except Exception as e:
            if "con" in locals():
                con.rollback()
            self._print_failure(sql)
            print(f"Error: {e}")
        finally:
            if "con" in locals():
                con.close()

    def execute_sql_return_value(self, dbname):
        """Execute SQL and return values as a DataFrame."""
        try:
            con = self.pool.get_connection()
            cursor = con.cursor()
            sql = f"SELECT * FROM {dbname};"
            cursor.execute(sql)
            rows = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            df = pd.DataFrame(rows, columns=columns)
            print(df)
            # 将df的英文列名更换为中文列名
            print(dbname)
            print(dbname_dic)
            print(dbname_dic[dbname])

            if dbname in dbname_dic:
                dbname_columns_dic = dbname_dic[dbname]
                print(dbname_columns_dic)
                for each_column in list(df.columns):
                    if each_column in dbname_columns_dic:
                        df.rename(columns={each_column: dbname_columns_dic[each_column]}, inplace=True)

            return df
        except Exception as e:
            if "con" in locals():
                con.rollback()
            print(f"Error: {e}")
        finally:
            if "con" in locals():
                con.close()

    def bulk_update_infor_in_db(self, df, PRIMARY_KEY, update_cols, dbname):
        """Bulk update database with DataFrame."""
        sql = self._create_update_sql(dbname, update_cols, PRIMARY_KEY)
        self._bulk_operation(df, sql, update_cols, PRIMARY_KEY, dbname, "update")

    def bulk_insert_infor_in_db(self, df, insert_cols, dbname):
        """Bulk insert into database with DataFrame."""
        sql = self._create_insert_sql(dbname, insert_cols)
        self._bulk_operation(df, sql, insert_cols, None, dbname, "insert")

    def bulk_replace_infor_in_db(self, df, insert_cols, dbname):
        """Bulk replace into database with DataFrame."""
        sql = self._create_replace_sql(dbname, insert_cols)
        self._bulk_operation(df, sql, insert_cols, None, dbname, "replace")

    def clear_db_table(self, dbname):
        """Clear database table."""
        sql = f"TRUNCATE TABLE {dbname}"
        self.execute_sql_no_return(sql)

    def _create_update_sql(self, dbname, update_cols, PRIMARY_KEY):
        set_parts = ", ".join([f"{col} = %s" for col in update_cols])
        sql = f"UPDATE {dbname} SET {set_parts} WHERE {PRIMARY_KEY[0]} = %s;"
        return sql

    def _create_insert_sql(self, dbname, insert_cols):
        columns = ", ".join(insert_cols)
        placeholders = ", ".join(["%s"] * len(insert_cols))
        sql = f"INSERT INTO {dbname} ({columns}) VALUES ({placeholders});"
        return sql

    def _create_replace_sql(self, dbname, insert_cols):
        columns = ", ".join(insert_cols)
        placeholders = ", ".join(["%s"] * len(insert_cols))
        sql = f"REPLACE INTO {dbname} ({columns}) VALUES ({placeholders});"
        return sql

    def _bulk_operation(self, df, sql, cols, PRIMARY_KEY, dbname, operation):
        """Helper method to perform bulk operations."""
        df_copy = df.copy()
        i_max0 = df_copy.shape[0]
        num = i_max0 // 5000
        for j in range(num + 1):
            value_list = []
            start = j * 5000
            end = min((j + 1) * 5000, i_max0)
            for i in tqdm(range(start, end), desc=f"Batch {operation}"):
                row = df_copy.iloc[i]
                values = [str(row[cols[col]]) for col in cols]
                if PRIMARY_KEY:
                    values.append(str(row[PRIMARY_KEY[1]]))
                value_list.append(tuple(values))

            self.executemany_sql_no_return(sql, value_list)
            print(f"Database {dbname} {operation}d {end - start} rows!!!")

    def _print_success(self, sql):
        """Print success message."""
        operation = "insert" if "INSERT" in sql else "update" if "UPDATE" in sql else "execute"
        print(f"Successfully {operation} {sql}")

    def _print_failure(self, sql):
        """Print failure message."""
        operation = "insert" if "INSERT" in sql else "update" if "UPDATE" in sql else "execute"
        print(f"Failed {operation} {sql}")

3.2.1. 模块导入与配置

# coding=utf-8
import pandas as pd
from tqdm import tqdm
import mysql.connector.pooling
from db.dbname import dbname_dic
  • pandas:用于将查询结果转换为 DataFrame。

  • tqdm:显示批量操作的进度条。

  • mysql.connector.pooling:MySQL 连接池,提高数据库连接效率。

  • dbname_dic:从自定义模块导入表名和字段名的映射字典(如 {'update_date': '更新日期'})。

3.2.2. 数据库连接池初始化

__config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "faw-vw.1901",
    "database": "douban"
}

pool = mysql.connector.pooling.MySQLConnectionPool(
    **__config,
    pool_size=10
)
  • 使用连接池管理数据库连接,默认大小为 10,避免频繁创建/销毁连接。

3.2.3. CommonSQL 类功能

初始化方法
def __init__(self):
    self.pool = pool
  • 直接使用全局连接池 pool

基础 SQL 操作方法
execute_sql_no_return
def execute_sql_no_return(self, sql):
    """执行无返回值的 SQL(如 INSERT/UPDATE/DELETE)"""
    try:
        con = self.pool.get_connection()
        cursor = con.cursor()
        cursor.execute(sql)
        con.commit()
        self._print_success(sql)  # 打印成功日志
    except Exception as e:
        con.rollback()  # 回滚事务
        self._print_failure(sql)  # 打印失败日志
    finally:
        con.close()  # 释放连接
  • 用于执行不需要返回结果的 SQL(如 DML 语句)。

executemany_sql_no_return
def executemany_sql_no_return(self, sql, value_list):
    """批量执行无返回值的 SQL"""
    try:
        con = self.pool.get_connection()
        cursor = con.cursor()
        cursor.executemany(sql, value_list)  # 批量执行
        con.commit()
    except Exception as e:
        con.rollback()
    finally:
        con.close()
  • 高效批量插入/更新数据(如 INSERT INTO ... VALUES (%s, %s))。

execute_sql_return_value
def execute_sql_return_value(self, dbname):
    """执行查询并返回 DataFrame(自动转换列名为中文)"""
    sql = f"SELECT * FROM {dbname};"
    cursor.execute(sql)
    rows = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]  # 获取列名
    df = pd.DataFrame(rows, columns=columns)
    
    # 将英文列名替换为中文(通过 dbname_dic 映射)
    if dbname in dbname_dic:
        df.rename(columns=dbname_dic[dbname], inplace=True)
    return df
  • 查询结果转换为 DataFrame,并自动替换列名为中文(如 movie_ch → 电影中文名)。

批量操作方法
bulk_insert_infor_in_db / bulk_update_infor_in_db / bulk_replace_infor_in_db
def bulk_insert_infor_in_db(self, df, insert_cols, dbname):
    sql = self._create_insert_sql(dbname, insert_cols)  # 生成 INSERT SQL
    self._bulk_operation(df, sql, insert_cols, None, dbname, "insert")

def _create_insert_sql(self, dbname, insert_cols):
    """生成 INSERT 语句模板,如: INSERT INTO table (col1, col2) VALUES (%s, %s)"""
    columns = ", ".join(insert_cols)
    placeholders = ", ".join(["%s"] * len(insert_cols))
    return f"INSERT INTO {dbname} ({columns}) VALUES ({placeholders});"
  • 将 DataFrame 数据分批次(每批 5000 行)插入数据库,通过 tqdm 显示进度。

_bulk_operation(核心辅助方法)
def _bulk_operation(self, df, sql, cols, PRIMARY_KEY, dbname, operation):
    """批量操作(插入/更新/替换)的通用逻辑"""
    for j in range(num_batches):
        value_list = []
        for i in tqdm(range(start, end), desc=f"Batch {operation}"):
            row = df.iloc[i]
            values = [str(row[col]) for col in cols]  # 提取数据
            if PRIMARY_KEY:  # 如果是更新操作,追加主键值
                values.append(str(row[PRIMARY_KEY[1]]))
            value_list.append(tuple(values))
        
        self.executemany_sql_no_return(sql, value_list)  # 批量执行
  • 支持分批次处理大数据量,避免内存溢出。

其他工具方法
  • clear_db_table:清空表(TRUNCATE TABLE)。

  • _print_success / _print_failure:格式化打印操作日志。

3.3db文件使用

以下完整演示了"查询→备份→清空→重新插入→更新"的数据处理流程

# 导入必要的库
import pandas as pd  # 用于数据处理和分析
from db.mysql_dao import CommonSQL  # 自定义的MySQL数据库操作类

def main():
    """
    主函数,执行数据库CRUD操作流程
    """
    
    # ==================== 数据查询模块 ====================
    # 从'top250movie'表查询数据并返回DataFrame
    df = CommonSQL().execute_sql_return_value('top250movie')
    print(df)  # 打印原始数据
    print(df.columns)  # 打印列名
    df.to_excel('原始数据.xlsx')  # 导出到Excel备份

    # ==================== 数据清理模块 ====================
    # 清空'top250movie'表中的所有数据
    CommonSQL().clear_db_table('top250movie')

    # ==================== 数据插入模块 ====================
    # 从Excel重新加载数据
    df = pd.read_excel('原始数据.xlsx')
    
    # 定义数据库字段与DataFrame列的映射关系
    insert_cols = {
        'update_date': '更新日期',  # 数据库字段: DataFrame列名
        'feature': '特征值',
        'movie_ch': '电影中文名',
        'movie_en': '电影英文名',
        'movie_url': '电影详情页链接',
        'director': '导演',
        'star': '主演',
        'start_year': '上映年份',
        'country': '国籍',
        'type': '类型',
        'rating': '评分',
        'num_ratings': '评分人数',
        'comment': '评语',
    }
    
    # 执行批量插入操作(两种方式)
    CommonSQL().bulk_insert_infor_in_db(df, insert_cols=insert_cols, dbname='top250movie')
    CommonSQL().bulk_replace_infor_in_db(df, insert_cols=insert_cols, dbname='top250movie')

    # ==================== 数据更新模块 ====================
    # 定义主键和需要更新的字段映射
    PRIMARY_KEY = ['feature', '特征值']  # 主键字段
    update_cols = {
        'movie_ch': '电影中文名',  # 数据库字段: DataFrame列名
        'movie_en': '电影英文名',
        'movie_url': '电影详情页链接',
        'director': '导演',
        'star': '主演',
        'start_year': '上映年份',
        'country': '国籍',
        'type': '类型',
        'rating': '评分',
        'num_ratings': '评分人数',
        'comment': '评语',
    }
    
    # 执行批量更新操作
    CommonSQL().bulk_update_infor_in_db(df, PRIMARY_KEY, update_cols, 'top250movie')

# 程序入口
if __name__ == '__main__':
    main()

4总结

作为一名长期从事数据处理与分析的专业人士,我在实际工作中总结出了一套成熟的MySQL-DataFrame交互方案。该方案有效解决了数据分析过程中常见的"数据搬运"效率瓶颈问题,显著提升了工作效能。

✅ 智能双向无缝转换
• 实现DataFrame与数据库表的自动化映射
• 免除繁琐的SQL查询编写及结果解析过程
• 全面适配各类数据分析场景的特殊需求

⚡ 高性能批处理机制
• 采用智能分块处理技术(5000行/批)
• 基于executemany预编译实现高效数据操作
• 显著降低I/O开销,提升数据处理效率

应用价值:
• 节省90%以上的数据转换时间
• 专注于核心数据分析逻辑开发
• 充分利用DataFrame的强大分析功能

让数据真正流动起来,释放分析潜能!


网站公告

今日签到

点亮在社区的每一天
去签到