Python操作MySQL的基本使用

发布于:2024-12-18 ⋅ 阅读:(7) ⋅ 点赞:(0)

今天来总结下,怎么使用 Python 操作 MySQL 数据库

Python DB API 规范

Python 支持非常多的数据库管理系统,如 MySQL、Oracle、SQL Server 和 PostgreSQL 等。
为了实现对这些 DBMS 的统一访问,Python 需要遵守一个规范,这就是 DB API 规范。
这个规范提供了数据库对象连接、对象交互和异常处理的方式,为各种 DBMS 提供了统一的访问接口。

在这里插入图片描述

在使用 Python 对 DBMS 进行操作的时候,需要经过下面的几个步骤:

  • 引入 API 模块;
  • 与数据库建立连接;
  • 执行 SQL 语句;
  • 关闭数据库连接。

mysql-connector

使用 Python 对数据库进行访问需要基于 DB API 规范,可以选择以下库,如 MySQLdbmysqlclientPyMySQLpeeweeSQLAIchemy 等。

mysql-connector 是 MySQL 官方提供的驱动器,用来给后端语言,比如 Python 提供连接。

1)安装 mysql-connector

pip install mysql-connector

2)创建数据库连接,然后查看下数据库的版本号,来验证下数据库是否连接成功。

# -*- coding: UTF-8 -*-
import mysql.connector
# 打开数据库连接
connection = mysql.connector.connect(
       host="",
       user="",
       passwd="",
       database=''
)
# 获取操作游标
cursor = connection.cursor()
# 执行SQL语句
cursor.execute("SELECT VERSION()")
# 获取一条数据
data = cursor.fetchone()
print("MySQL版本: %s " % data)
# 关闭游标&数据库连接
cursor.close()
connection.close()

运行结果:

MySQL版本: 5.7.20-log 

其中有两个重要的对象 Connection 和 Cursor。

Connection 就是对数据库的当前连接进行管理,我们可以通过它来进行以下操作:

  • 通过指定 host、user、passwd 和 port 等参数来创建数据库连接;
  • 使用 connection.close() 关闭数据库连接;
  • 使用 connection.cursor() 创建游标,操作数据库中的数据;
  • 使用 connection.begin() 开启事务;
  • 使用 connection.commit() 和 connection.rollback(),对事务进行提交以及回滚。

通过cursor = connection.cursor()创建游标后,就可以通过面向过程的编程方式对数据库中的数据进行操作:

  • 使用cursor.execute(query_sql),执行数据库查询;
  • 使用cursor.fetchone(),读取数据集中的一条数据;
  • 使用cursor.fetchall(),取出数据集中的所有行,返回一个元组 tuples 类型;
  • 使用cursor.fetchmany(n),取出数据集中的多条数据,同样返回一个元组 tuples;
  • 使用cursor.rowcount,返回查询结果集中的行数。如果没有查询到数据或者还没有查询,则结果为 -1,否则会返回查询得到的数据行数;
  • 使用cursor.close(),关闭游标。

增删改查

插入

sql = "INSERT INTO testuser(id,name) VALUES (%s,%s)"
val = (3, 'young')
cursor.execute(sql, val)
connection.commit()
print(cursor.rowcount, "记录插入成功。")

使用 cursor.execute 来执行相应的 SQL 语句,val 为 SQL 语句中的参数,SQL 执行后使用 db.commit() 进行提交。
在使用 SQL 语句的时候,可以向 SQL 语句传递参数,这时 SQL 语句里要统一用(%s)进行占位,否则就会报错。不论插入的数值为整数类型,还是浮点类型,都需要统一用(%s)进行占位。

查询

query_sql = "SELECT * FROM testuser WHERE name = 'young'"
cursor.execute(query_sql)
data = cursor.fetchall()
for each_player in data:
    print(each_player)

运行结果:

(3, 'young')

修改

update_sql = 'UPDATE testuser SET name = %s WHERE name = %s'
update_val = ('youngNew', 'young')
cursor.execute(update_sql, update_val)
connection.commit()

删除

delete_sql = 'DELETE FROM testuser WHERE name = %s'
delete_val = ('youngNew',)
cursor.execute(delete_sql, delete_val)
connection.commit()

最后都执行完了,要记得关闭游标和数据库的连接。

cursor.close()
connection.close()

异常处理

在对数据进行增加、删除和修改的时候,可能会出现异常,需要用try...except捕获异常信息。

import traceback

# 省略其他代码 ...

try:
    # 执行SQL语句
    insert_sql = "INSERT INTO testuser(name) VALUES (%s)"
    insert_val = ('young')
    cursor.execute(insert_sql, insert_val)
    connection.commit()
except Exception as e:
    print('错误了.......')
    print(e)
    # 打印异常信息
    traceback.print_exc()
    # 回滚
    connection.rollback()
finally:
    # 关闭游标&数据库连接
    cursor.close()
    connection.close()

SQLAIchemy

当然了,我们也可以使用SQLAIchemy库来满足我们的需求。

SQLAlchemy 是一个功能强大的 ORM 和数据库工具包,支持多种数据库。

举例如下:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, update, delete, insert, select

# 创建基类
Base = declarative_base()


# 定义模型类
class TestUser(Base):
    __tablename__ = 'testuser'
    id = Column(Integer, primary_key=True)
    name = Column(String)


# 创建数据库引擎
engine = create_engine('mysql+pymysql://user:password@host/database')

# 绑定基类
Base.metadata.bind = engine

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 执行查询
# 使用 ORM 查询
results = session.query(TestUser).all()
for user in results:
    print(user.name)

# ORM 添加数据
new_user = TestUser(name='orm1')
session.add(new_user)

# ORM 查询并更新对象
user = session.query(TestUser).filter_by(id=1).first()  # 假设我们要更新id为1的记录
if user:
    user.name = 'orm_update'  # 更新name字段

# ORM 查询并删除对象
user = session.query(TestUser).filter_by(id=1).first()  # 假设我们要更新id为1的记录
if user:
    session.delete(user)

# stmt = update(TestUser).where(TestUser.id == 7).values(name='young77') Core 更新

# stmt = delete(TestUser).where(TestUser.id == 7) Core 删除

# stmt = insert(TestUser).values(name='young888') Core 插入

# session.execute(stmt)

# 提交
session.commit()
# 关闭会话
session.close()