原文链接:Lost connection to MySQL server during query < Ping通途说
0.前言
这个问题挺容易触发的,实际问题就是SQLalachemy与MySQL连接时间过长,MySQL主动断掉了当前连接,但SQLalachemy这边的连接回收还没到时间,需要操作数据库时取出了这条过期连接,于是错误就产生了。
1. 解决方案
从以上博文我们可以知道,我们需要先知道MySQL那边的过期时间,然后再手动修改我们这边的过期策略。那么我们能不能将这一个过程自动化?
下面是我项目中常用的数据库初始化方法,在这里分享给大家
import threading
from config import get_settings
from loguru import logger
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (AsyncSession, async_sessionmaker,
create_async_engine)
config = get_settings()
async def get_mysql_timeout_settings():
"""获取MySQL超时设置并自动调整SQLAlchemy连接池参数"""
# 创建一个临时引擎用于查询MySQL参数
temp_engine = create_async_engine(
config.mysql_url,
echo=config.db_echo,
pool_size=1,
max_overflow=0
)
try:
async with temp_engine.connect() as conn:
# 查询MySQL超时相关参数
result = await conn.execute(text("SHOW VARIABLES LIKE '%timeout%'"))
mysql_settings = {}
for row in result:
mysql_settings[row[0]] = row[1]
logger.info(f"MySQL timeout settings: {mysql_settings}")
# 根据MySQL设置调整SQLAlchemy参数
# interactive_timeout 和 wait_timeout 的最小值(单位:秒)
interactive_timeout = int(
mysql_settings.get('interactive_timeout', 28800))
wait_timeout = int(mysql_settings.get('wait_timeout', 28800))
connect_timeout = int(mysql_settings.get('connect_timeout', 10))
# pool_recycle 应该比 interactive_timeout 和 wait_timeout 小
min_timeout = min(interactive_timeout, wait_timeout)
pool_recycle = min(
config.pool_recycle or min_timeout - 10, min_timeout - 10)
if pool_recycle <= 0:
pool_recycle = min_timeout - 10 if min_timeout > 10 else 28790
# pool_timeout 应该小于或等于 connect_timeout
pool_timeout = min(
config.db_pool_timeout or connect_timeout, connect_timeout)
logger.info(
f"Adjusted SQLAlchemy settings - pool_recycle: {pool_recycle}, pool_timeout: {pool_timeout}")
return {
'pool_recycle': pool_recycle,
'pool_timeout': pool_timeout
}
except Exception as e:
logger.error(f"Failed to get MySQL timeout settings: {e}")
# 如果查询失败,使用默认配置
return {
'pool_recycle': config.pool_recycle,
'pool_timeout': config.db_pool_timeout
}
finally:
# 清理临时引擎
await temp_engine.dispose()
async def create_db_engine():
"""创建数据库引擎,根据MySQL设置自动调整参数"""
# 获取调整后的参数
adjusted_settings = await get_mysql_timeout_settings()
# 创建主引擎
engine = create_async_engine(
config.mysql_url,
connect_args={
# "check_same_thread": False
},
echo=config.db_echo,
pool_size=config.db_pool_size,
pool_timeout=adjusted_settings['pool_timeout'],
pool_recycle=adjusted_settings['pool_recycle'],
max_overflow=config.db_max_overflow
)
return engine
# 创建引擎
async_engine = None
async def init_db_engine():
"""初始化数据库引擎"""
global async_engine
if async_engine is None:
async_engine = await create_db_engine()
return async_engine
async_session = None
async def init_async_session():
"""初始化异步会话"""
global async_session
engine = await init_db_engine()
if async_session is None:
async_session = async_sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession)
return async_session
可以看到我们在进程初始化时,创建了一个临时引擎来查询数据库中interactive_timeout
,wait_timeout
,connect_timeout
三个参数,查询后我们可以将这些数字减1或除半,以避免再取出过期连接,之后销毁临时引擎并传递超时参数到新的主引擎上配置连接池的超时时间。
到此问题就解决了。之后如果想要取用数据库连接,我们可以用以下方式操作:
async def get_user_info(uid: str = None, id: str = None):
if uid == id == None:
return None
session_maker = await init_async_session()
async with session_maker() as session:
return (await session.execute(select(User).where(or_(User.uid == uid, User.id == id)))).scalar_one_or_none()
完善其他数据库代码后,我们就可以在应用初始化时自动调整数据库配置,也方便项目迁移到其他设备时无需再手动修改数据库配置。
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化数据库引擎
await init_db_engine()
# 初始化数据库
await init_db()
yield