【Python模块】——pymysql

发布于:2025-02-26 ⋅ 阅读:(16) ⋅ 点赞:(0)

pymysql是python操作mysql的标准库,可以通过pip install快速导入pymysql包操作数据库

使用pymysql操作mysql

简单demo

import pymysql
connect = pymysql.connect(
    host="localhost",
    port=3306,
    user="root",
    password="root",
    database="my_database",
    # charset="utf8mb4"
)
cursor = connect.cursor()

# 查询语句1
sql = "select * from user where name = %(name)s"
ret = cursor.execute(sql, {"name": "ls"})
# 查询语句2
sql = "select * from user where name = %s"
ret = cursor.execute(sql, "ls")
print(ret)

result = cursor.fetchall()
print("result", result)

cursor.close()
connect.close()

自定义SqlHelper

import pymysql

class MySQLClient(object):
    def __init__(self, **kwargs):
        self.conn = pymysql.connect(
            **kwargs
        )
        self.cursor = self.conn.cursor()


    def query(self, sql, *args):
        try:
            rowcount = self.cursor.execute(sql, *args)
            return rowcount
        except Exception as e:
            raise e


    def update(self, sql, *args):
        self.cursor.execute(sql, *args)
        self.conn.commit()


    def insert(self, sql, *args):
        self.cursor.execute(sql, *args)
        self.conn.commit()


    def fetch_one(self, sql, *args):
        self.query(sql, *args)
        result = self.cursor.fetchone()
        return result


    def fetch_all(self, sql, *args):
        self.query(sql, *args)
        result = self.cursor.fetchone()
        return result

    def close(self):
        self.cursor.close()
        self.conn.close()


config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "root",
    "database": "my_database",
}

mysql_client = MySQLClient(**config)
sql = "select * from user where name=%s"
ret = mysql_client.fetch_one(sql, "ls")
print(ret)

# mysql_client.close()

借助DButils创建数据库连接池

DButils模块可以通过创建数据库连接池,提升数据库操作性能;
实现思路:

  1. 定义SqlHelper类
  2. 通过__init__方法定义pool=PoolDB(**kwargs),_local=threading.local()
  3. 定义__enter__获取connection与cursor和__exit__关闭connection与cursor,可支持with 上下文操作
  4. 为了保证每次获取的connection与cursor不会将之前的覆盖掉,引入threading.local进行保存;self._local = {thread_id: {“stack”: [(connection, cursor)]}}
#!/usr/bin/env python  
# -*- coding:utf-8 -*-  
import pymysql
from dbutils.pooled_db import PooledDB
from threading import local

class SqlHelper(object):
    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            # maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表
            host='localhost',
            port=3306,
            user='root',
            password='root',
            database='my_database',
            charset='utf8'
        )
        self._local = local()

    def open(self):
        connection = self.pool.connection()
        cursor = connection.cursor()
        return  connection, cursor

    def close(self, cursor, conn):
        cursor.close()
        conn.close()

    def __enter__(self):
        conn, cursor = self.open()
        rv =  getattr(self._local, "stack", None)
        if not rv:
            self._local.stack = [(conn, cursor)]
        else:
            self._local.stack.append((conn, cursor))
        return cursor

    def __exit__(self, exc_type, exc_val, exc_tb):
        rv = getattr(self._local, "stack", None)
        if not rv:
            # del self._local.stack
            return
        elif len(rv) == 1:
            conn, cursor = rv[-1]
            # del self._local.stack
            return
        else:
            conn, cursor = rv.pop()
        cursor.close()
        conn.close()


    def fetchone(self, sql, *args):
        conn, cursor = self.open(self)
        try:
            rowcount = cursor.execute(sql, *args)
            ret = cursor.fetchone()
            return ret
        except Exception as e:
            raise


    def fetchall(self, sql, *args):
        conn, cursor = self.open(self)
        try:
            rowcount = cursor.execute(sql, *args)
            ret = cursor.fetchall()
            return ret
        except Exception as e:
            raise

db = SqlHelper()

sql = "select * from user"
with db as c1:
    ret = c1.execute(sql)
    print(ret)
    with db as c2:
        ret = c2.execute(sql)
        print(ret)

使用DButils的另一种写法

使用这种写法,每次都实例化SqlHelper,保证每次获取的connection和cursor不被覆盖

#!/usr/bin/env python  
# -*- coding:utf-8 -*-  
""" 
1. 定义全局变量POOL=pooledDB(**kwargs)
2. 每次用到db就实例化一次
"""
import pymysql
from dbutils.pooled_db import PooledDB
from threading import local

pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=0,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            # maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            blocking=False,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表
            host='localhost',
            port=3306,
            user='root',
            password='root',
            database='my_database',
            charset='utf8'
        )

class SqlHelper(object):
    def __init__(self):
        self.conn = None
        self.cursor = None

    def open(self):
        self.connection = pool.connection()
        self.cursor = self.connection.cursor()
        return  self.connection, self.cursor

    def close(self):
        self.cursor.close()
        self.conn.close()

    def __enter__(self):
        self.conn, self.cursor = self.open()
        return self.cursor

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()



db = SqlHelper()

sql = "select * from user"
with db as c1:
    ret = c1.execute(sql)
    print("c1.cursor: ", db.cursor)
    print(ret)
    with db as c2:
        ret = c2.execute(sql)
        print("c2.cursor: ", db.cursor)  # 一个实例对象是可以多次调用enter方法的,但db.cursor发生了改变,即上一次的连接丢了
        print(ret)

        print(type(c1), type(c2))

        print(c1 is c2) # false


    print("c1.cursor: ", db.cursor) # c2.cursor将c1.cursor覆盖了


网站公告

今日签到

点亮在社区的每一天
去签到