sqlalchemy

发布于:2025-04-19 ⋅ 阅读:(23) ⋅ 点赞:(0)

sqlalchemy

SQLAlchemy 是 Python 里最常用、最强大的 数据库工具包,主要用来跟各种关系型数据库(如 MySQL、PostgreSQL、SQLite、Oracle 等)打交道。

构建session对象

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from config import app_config

# Create database engine
url = f"mysql+pymysql://{user}:{pass}@{host}:{port}/{db_name}?charset=utf8mb4"
kwargs = {
	"pool_size": 30
	"max_overflow": 10
	"pool_recycle": 1200
	"pool_pre_ping": False,
	"connect_args": {"init_command": f'SET time_zone="{sql_timezone}"'},
	"echo": False,
}
db = create_engine(url=url, **app_config.SQLALCHEMY_ENGINE_OPTIONS)

# Create database session
Session = sessionmaker(bind=db, expire_on_commit=False)

DDL

定义实体

from datetime import datetime

from sqlalchemy import TIMESTAMP, String, text, INT, Index
from sqlalchemy.dialects.mysql import BIGINT, MEDIUMTEXT
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class RequestLog(DeclarativeBase):
    __tablename__ = 'request_log'
	id: Mapped[int] = mapped_column(BIGINT(unsigned=True), primary_key=True, autoincrement=True)
    token: Mapped[str] = mapped_column(String(64), comment="token", server_default="")
    qps: Mapped[int] = mapped_column(INT, comment="qps", server_default="0")
    request_id: Mapped[str] = mapped_column(String(64), comment="请求id", server_default="")
    client_ip: Mapped[str] = mapped_column(String(64), comment="客户端ip", server_default="")
    request_url: Mapped[str] = mapped_column(String(255), comment="请求地址", server_default="")
    request_body: Mapped[str] = mapped_column(MEDIUMTEXT, comment="请求体", server_default="")
    status: Mapped[str] = mapped_column(String(32), comment="请求状态", server_default="0")
    elapsed_time: Mapped[int] = mapped_column(INT, comment="请求耗时", server_default="0")
    error_msg: Mapped[str] = mapped_column(String(255), comment="错误信息", server_default="")
	created_by: Mapped[str] = mapped_column(String(32), comment="创建人", server_default="system")
    date_created: Mapped[datetime] = mapped_column(
        "date_created", TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), comment="创建时间"
    )
    updated_by: Mapped[str] = mapped_column(String(32), comment="更新人", server_default="system")
    date_updated: Mapped[datetime] = mapped_column(
        "date_updated",
        TIMESTAMP,
        server_default=text("CURRENT_TIMESTAMP"),
        onupdate=text("CURRENT_TIMESTAMP"),
        comment="更新时间",
    )

    __table_args__ = (
        Index('idx_token', 'token'),
        Index('idx_request_url', 'request_url'),
        Index('idx_status', 'status'),
        Index('idx_date_created', 'date_created'),
        {
            "comment": "请求日志",
            "mysql_engine": "InnoDB",
            "mysql_charset": "utf8mb4",
        },
    )

生成建表语句

from sqlalchemy.schema import CreateTable, CreateIndex
from sqlalchemy.dialects.mysql import dialect as mysql_dialect

# 生成建表语句
print(CreateTable(RequestLog.__table__).compile(dialect=mysql_dialect(), compile_kwargs={"literal_binds": True}))

# 生成索引语句
for idx in RequestLog.__table__.indexes:
    print(str(CreateIndex(idx).compile(dialect=mysql_dialect(), compile_kwargs={"literal_binds": True})))

DML

from typing import Any, List, Optional, Type, TypeVar

from sqlalchemy import desc, Executable

from extensions.ext_database import Session

# 定义一个通用的类型变量,用于表示任何 ORM 模型类
_T = TypeVar("_T")


def select_one(model: Type[_T], **conditions: Any) -> Optional[_T]:
    """Execute a statement and return the first result."""
    with Session() as session:
        query = session.query(model).filter_by(**conditions).order_by(desc(model.id))
        return query.first()


def select(stmt: Executable) -> list[Any]:
    """Execute a statement and return the first result."""
    with Session() as session:
        return session.execute(stmt).all()


def add(instance: _T) -> None:
    """Add a new instance to the database."""
    with Session() as session:
        session.add(instance)
        session.flush()
        session.commit()
        session.refresh(instance)


def add_all(instances: List[_T]) -> None:
    """Add multiple instances to the database."""
    with Session() as session:
        session.add_all(instances)
        session.commit()


def update(instance: _T) -> Optional[_T]:
    """Update an existing instance in the database."""
    with Session() as session:
        db_instance = session.query(type(instance)).get(instance.id)
        if db_instance:
            merged_inst = session.merge(db_instance)
            session.commit()
            return merged_inst
        return None


def update_by_id(model: Type[_T], obj_id: Any, **kwargs: Any) -> Optional[_T]:
    """Update an existing instance in the database."""
    with Session() as session:
        db_instance = session.get(model, obj_id)
        if db_instance:
            for field_name, field_value in kwargs.items():
                setattr(db_instance, field_name, field_value)
                session.commit()
            return db_instance
        return None


def delete(instance: _T) -> None:
    """Delete an instance from the database."""
    with Session() as session:
        session.delete(instance)
        session.commit()


def get_by_id(model: _T, ident: int) -> Optional[_T]:
    """Retrieve an instance by its primary key."""
    with Session() as session:
        return session.get(model, ident)


def get_all(model: _T) -> List[_T]:
    """Retrieve all instances of a model."""
    with Session() as session:
        return session.query(model).all()


网站公告

今日签到

点亮在社区的每一天
去签到