sqlalchemy
SQLAlchemy 是 Python 里最常用、最强大的 数据库工具包,主要用来跟各种关系型数据库(如 MySQL、PostgreSQL、SQLite、Oracle 等)打交道。
构建session对象
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config import app_config
# Create database engine
url = f"mysql+pymysql://{user}:{pass}@{host}:{port}/{db_name}?charset=utf8mb4"
kwargs = {
"pool_size": 30
"max_overflow": 10
"pool_recycle": 1200
"pool_pre_ping": False,
"connect_args": {"init_command": f'SET time_zone="{sql_timezone}"'},
"echo": False,
}
db = create_engine(url=url, **app_config.SQLALCHEMY_ENGINE_OPTIONS)
# Create database session
Session = sessionmaker(bind=db, expire_on_commit=False)
DDL
定义实体
from datetime import datetime
from sqlalchemy import TIMESTAMP, String, text, INT, Index
from sqlalchemy.dialects.mysql import BIGINT, MEDIUMTEXT
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class RequestLog(DeclarativeBase):
__tablename__ = 'request_log'
id: Mapped[int] = mapped_column(BIGINT(unsigned=True), primary_key=True, autoincrement=True)
token: Mapped[str] = mapped_column(String(64), comment="token", server_default="")
qps: Mapped[int] = mapped_column(INT, comment="qps", server_default="0")
request_id: Mapped[str] = mapped_column(String(64), comment="请求id", server_default="")
client_ip: Mapped[str] = mapped_column(String(64), comment="客户端ip", server_default="")
request_url: Mapped[str] = mapped_column(String(255), comment="请求地址", server_default="")
request_body: Mapped[str] = mapped_column(MEDIUMTEXT, comment="请求体", server_default="")
status: Mapped[str] = mapped_column(String(32), comment="请求状态", server_default="0")
elapsed_time: Mapped[int] = mapped_column(INT, comment="请求耗时", server_default="0")
error_msg: Mapped[str] = mapped_column(String(255), comment="错误信息", server_default="")
created_by: Mapped[str] = mapped_column(String(32), comment="创建人", server_default="system")
date_created: Mapped[datetime] = mapped_column(
"date_created", TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), comment="创建时间"
)
updated_by: Mapped[str] = mapped_column(String(32), comment="更新人", server_default="system")
date_updated: Mapped[datetime] = mapped_column(
"date_updated",
TIMESTAMP,
server_default=text("CURRENT_TIMESTAMP"),
onupdate=text("CURRENT_TIMESTAMP"),
comment="更新时间",
)
__table_args__ = (
Index('idx_token', 'token'),
Index('idx_request_url', 'request_url'),
Index('idx_status', 'status'),
Index('idx_date_created', 'date_created'),
{
"comment": "请求日志",
"mysql_engine": "InnoDB",
"mysql_charset": "utf8mb4",
},
)
生成建表语句
from sqlalchemy.schema import CreateTable, CreateIndex
from sqlalchemy.dialects.mysql import dialect as mysql_dialect
# 生成建表语句
print(CreateTable(RequestLog.__table__).compile(dialect=mysql_dialect(), compile_kwargs={"literal_binds": True}))
# 生成索引语句
for idx in RequestLog.__table__.indexes:
print(str(CreateIndex(idx).compile(dialect=mysql_dialect(), compile_kwargs={"literal_binds": True})))
DML
from typing import Any, List, Optional, Type, TypeVar
from sqlalchemy import desc, Executable
from extensions.ext_database import Session
# 定义一个通用的类型变量,用于表示任何 ORM 模型类
_T = TypeVar("_T")
def select_one(model: Type[_T], **conditions: Any) -> Optional[_T]:
"""Execute a statement and return the first result."""
with Session() as session:
query = session.query(model).filter_by(**conditions).order_by(desc(model.id))
return query.first()
def select(stmt: Executable) -> list[Any]:
"""Execute a statement and return the first result."""
with Session() as session:
return session.execute(stmt).all()
def add(instance: _T) -> None:
"""Add a new instance to the database."""
with Session() as session:
session.add(instance)
session.flush()
session.commit()
session.refresh(instance)
def add_all(instances: List[_T]) -> None:
"""Add multiple instances to the database."""
with Session() as session:
session.add_all(instances)
session.commit()
def update(instance: _T) -> Optional[_T]:
"""Update an existing instance in the database."""
with Session() as session:
db_instance = session.query(type(instance)).get(instance.id)
if db_instance:
merged_inst = session.merge(db_instance)
session.commit()
return merged_inst
return None
def update_by_id(model: Type[_T], obj_id: Any, **kwargs: Any) -> Optional[_T]:
"""Update an existing instance in the database."""
with Session() as session:
db_instance = session.get(model, obj_id)
if db_instance:
for field_name, field_value in kwargs.items():
setattr(db_instance, field_name, field_value)
session.commit()
return db_instance
return None
def delete(instance: _T) -> None:
"""Delete an instance from the database."""
with Session() as session:
session.delete(instance)
session.commit()
def get_by_id(model: _T, ident: int) -> Optional[_T]:
"""Retrieve an instance by its primary key."""
with Session() as session:
return session.get(model, ident)
def get_all(model: _T) -> List[_T]:
"""Retrieve all instances of a model."""
with Session() as session:
return session.query(model).all()