sqlachemy

发布于:2025-08-30 ⋅ 阅读:(17) ⋅ 点赞:(0)
from sqlalchemy import create_engine, String, select, update, delete
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session

# 定义模型基类


class Base(DeclarativeBase):
    pass

# 定义用户模型


class User(Base):
    __tablename__ = "user1"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    email: Mapped[str] = mapped_column(String(50), unique=True)

    def __repr__(self) -> str:
        return f"User(id={self.id!r}, name={self.name!r}, email={self.email!r})"


# 创建数据库引擎(使用SQLite内存数据库)
engine = create_engine("postgresql://postgres:carizon@10.11.96.70:5432/hdmap_check", echo=True)

# 创建表
Base.metadata.create_all(engine)

# 使用上下文管理器创建会话
with Session(engine) as session:
    # 1. 增加(Create) - 添加新用户
    # print("=== 创建用户 ===")
    # new_user = User(name="张三", email="zhangsan@example.com")
    # new_user2 = User(name="张三2", email="zhangsan@example.com")
    # # session.add(new_user)
    # session.add_all([new_user,new_user2])
    # session.commit()
    # print(f"已创建用户: {new_user}")

    # # 添加更多用户
    # users_data = [
    #     User(name="李四", email="lisi@example.com"),
    #     User(name="王五", email="wangwu@example.com"),
    #     User(name="赵六", email="zhaoliu@example.com")
    # ]
    # session.add_all(users_data)
    # session.commit()
    # print("已批量添加用户")

    # # 2. 查询(Read) - 获取所有用户
    # print("\n=== 查询所有用户 ===")
    # stmt = select(User)
    # users = session.execute(stmt).scalars().all()
    # for user in users:
    #     print(user)

    # # 查询特定用户
    # print("\n=== 查询特定用户 ===")
    # stmt = select(User).where(User.name == "张三2")
    # # print("sql=",stmt)
    # # user = session.execute(stmt).scalar_one()
    # # user = session.execute(stmt).scalar_one_or_none()
    # user = session.execute(stmt).first()
    # print(f"找到用户: {user}")

    # 3. 更新(Update) - 修改用户信息
    print("\n=== 更新用户 ===")
    stmt = update(User).where(User.name == "张三2").values(email="wangwu_updated@example.com", name="张三_updated")
    session.execute(stmt)
    session.commit()

    # 验证更新
    stmt = select(User).where(User.name == "张三_updated")
    updated_user = session.execute(stmt).scalar_one()
    updated_user.email="sss"
    session.commit()
    print(f"更新后的用户: {updated_user}")

    # # 4. 删除(Delete) - 删除用户
    # print("\n=== 删除用户 ===")
    # stmt = delete(User).where(User.name == "赵六")
    # session.execute(stmt)
    # session.commit()

    # # 验证删除
    # stmt = select(User)
    # remaining_users = session.execute(stmt).scalars().all()
    # print("删除后的剩余用户:")
    # for user in remaining_users:
    #     print(user)

# # 使用事务的另一种方式
# print("\n=== 使用事务操作 ===")
# with Session(engine) as session:
#     with session.begin():
#         # 在事务中执行多个操作
#         new_user = User(name="钱七", email="qianqi@example.com")
#         session.add(new_user)

#         # 更新操作
#         stmt = update(User).where(User.name == "张三").values(email="zhangsan_updated@example.com")
#         session.execute(stmt)

#     # 事务已提交,验证结果
#     users = session.execute(select(User)).scalars().all()
#     print("事务操作后的所有用户:")
#     for user in users:
#         print(user)