Python基础(SQLAlchemy)

发布于:2025-02-11 ⋅ 阅读:(11) ⋅ 点赞:(0)

ORM 的作用

传统数据库操作需要使用 SQL 语句,而 ORM 允许你用 面向对象的方式 来操作数据库,不需要手写 SQL,提高开发效率和可维护性。

ORM 的核心概念

  1. 模型(Model)
    • 在 ORM 中,每张表通常对应一个 ,表的字段对应类的 属性
  2. 查询(Query)
    • 通过 类的方法 查询数据库,而不直接写 SQL。
  3. 映射(Mapping)
    • ORM 会自动将对象转换成 SQL 语句,执行查询,并返回结果。
1. 安装 SQLAlchemy
pip install sqlalchemy pymysql
2. 创建 ORM Model(数据表)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 连接 MySQL 数据库
engine = create_engine("mysql+pymysql://root:123456@localhost:3306/test_db")

# 创建 ORM 基类
Base = declarative_base()

# 定义一个 User 表
class User(Base):
    __tablename__ = 'users'  # 对应数据库中的表名

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), nullable=False)
    age = Column(Integer, nullable=False)

# 创建表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 插入数据
new_user = User(name="Alice", age=25)
session.add(new_user)
session.commit()

# 查询数据
users = session.query(User).all()
for user in users:
    print(user.id, user.name, user.age)

session.close()
  1. session.add(new_user)

    • 这行代码的意思是:
      “先把 new_user 这个对象放到一个待提交的列表里。”
    • 数据库里还没有真正保存这条数据!
  2. session.commit()

    • 这行代码的意思是:
      “把所有待提交的数据真正保存到数据库里。”
    • 只有执行 commit(),数据库才会更新。

总结:

首先要写一下②=自己的账号密码还有数据库的名字--->然后创建一个ORM基类--->①=创建自己的一个class表(每一个都要放入ORM基类)--->创建表(放入①)--->创建会话(连接②)

backref和back_populates


backref

backref就像是一个自动给书加标签的魔法工具,它让每本书自动知道它属于哪个作者。你不需要特别写代码去做这一点,backref 会帮你搞定。

比如:
class Author:
    # 作者的名字
    name = ""
    
    # 书的列表
    books = []  # 这里是作者的书

class Book:
    # 书名
    title = ""
    # 每本书有一个作者
    author = None  # 通过这个,我们可以知道每本书的作者是谁

当你写了 backref="author",每本书就会自动有一个指向作者的标签。

books = relationship("Book", backref="author")  # 让每本书都知道自己的作者

back_populates

back_populates是手动指定书和作者之间关系的方式。你要告诉作者,你们之间是怎么连接的。就像你说:“我告诉你,我是作者;你告诉我,我是你的书。”

比如:
class Author:
    # 作者的名字
    name = ""
    
    # 书的列表
    books = relationship("Book", back_populates="author")

class Book:
    # 书名
    title = ""
    # 每本书有一个作者
    author = relationship("Author", back_populates="books")

这样做时,你明确告诉书和作者互相知道对方的存在。书知道自己是谁的作者知道自己写了哪些书

backref两种写法:

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker

# 创建 ORM 基类
Base = declarative_base()

# 定义 Author 模型
class Author(Base):
    __tablename__ = 'authors'

    id = Column(Integer, primary_key=True)
    name = Column(String)

    # 使用 relationship 和 backref 来建立一对多关系
    books = relationship('Book', backref='author')

# 定义 Book 模型
class Book(Base):
    __tablename__ = 'books'

    id = Column(Integer, primary_key=True)
    title = Column(String)
    author_id = Column(Integer, ForeignKey('authors.id'))  # 外键,指向 Author 表

# 创建 SQLite 数据库引擎
engine = create_engine('sqlite:///books.db', echo=True)

# 创建所有表
Base.metadata.create_all(engine)

# 创建 Session 类
Session = sessionmaker(bind=engine)
session = Session()

# 添加一些数据
author1 = Author(name="J.K. Rowling")
author2 = Author(name="J.R.R. Tolkien")

# 方法 1: 通过书籍与作者直接关联(直接创建书籍对象并与作者关联)
book1 = Book(title="Harry Potter and the Philosopher's Stone", author=author1)
book2 = Book(title="Harry Potter and the Chamber of Secrets", author=author1)

# 方法 2: 先创建书籍对象,再将书籍添加到作者的书籍列表
book3 = Book(title="The Hobbit")
book4 = Book(title="The Lord of the Rings")

# 将书籍对象添加到作者的书籍列表
author2.books.append(book3)
author2.books.append(book4)

# 添加数据到数据库
session.add_all([author1, author2, book1, book2, book3, book4])
session.commit()

# 查询某个作者的所有书籍
author = session.query(Author).filter_by(name="J.K. Rowling").first()

# 方法 1: 通过 author.books 获取所有书籍(反向查找书籍)
print(f"Books written by {author.name} (Method 1):")
for book in author.books:
    print(f"- {book.title}")

# 查询某个作者的所有书籍
author2 = session.query(Author).filter_by(name="J.R.R. Tolkien").first()

# 方法 2: 通过 book.author 获取每本书的作者(正向查找作者)
print(f"\nBooks written by {author2.name} (Method 2):")
for book in author2.books:
    print(f"- {book.title}")

粉丝与偶像数据库设计


 

普通:

# 定义用户、偶像和粉丝关系
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

class Celebrity(Base):
    __tablename__ = 'celebrities'
    id = Column(Integer, primary_key=True)
    name = Column(String)

class FanRelationship(Base):
    __tablename__ = 'fan_relationships'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    celebrity_id = Column(Integer, ForeignKey('celebrities.id'))
    is_favorite = Column(Boolean, default=False)

# 创建数据
user1 = User(name="Alice")
celebrity1 = Celebrity(name="J.K. Rowling")
fan_rel = FanRelationship(user=user1, celebrity=celebrity1, is_favorite=True)

# 添加数据到数据库
session.add_all([user1, celebrity1, fan_rel])
session.commit()

# 查询数据
user = session.query(User).filter_by(name="Alice").first()
print(f"{user.name}关注的偶像:")
for relationship in user.fan_relationships:
    print(f"- {relationship.celebrity.name}")

进阶:

from sqlalchemy import create_engine, Column, Integer, String, Boolean, ForeignKey, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime

Base = declarative_base()

# User 表
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String, unique=True)
    password = Column(String)
    join_date = Column(DateTime, default=datetime.utcnow)

    # 粉丝关系:粉丝关注的偶像
    fan_relationships = relationship("FanRelationship", back_populates="user")

# Celebrity 表
class Celebrity(Base):
    __tablename__ = 'celebrities'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    bio = Column(String)

    # 粉丝关系:偶像的粉丝
    fan_relationships = relationship("FanRelationship", back_populates="celebrity")

# FanRelationship 表:粉丝与偶像的关系
class FanRelationship(Base):
    __tablename__ = 'fan_relationships'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    celebrity_id = Column(Integer, ForeignKey('celebrities.id'))
    follow_date = Column(DateTime, default=datetime.utcnow)
    is_favorite = Column(Boolean, default=False)

    user = relationship("User", back_populates="fan_relationships")
    celebrity = relationship("Celebrity", back_populates="fan_relationships")

# 创建 SQLite 数据库引擎
engine = create_engine('sqlite:///fansystem.db', echo=True)
Base.metadata.create_all(engine)

# 创建 Session 类
Session = sessionmaker(bind=engine)
session = Session()

# 添加一些数据
user1 = User(name="Alice", email="alice@example.com", password="123456")
user2 = User(name="Bob", email="bob@example.com", password="abcdef")

celebrity1 = Celebrity(name="J.K. Rowling", bio="Author of Harry Potter series.")
celebrity2 = Celebrity(name="J.R.R. Tolkien", bio="Author of The Lord of the Rings.")

# 添加粉丝关系
fan_rel1 = FanRelationship(user=user1, celebrity=celebrity1, is_favorite=True)
fan_rel2 = FanRelationship(user=user2, celebrity=celebrity2)

session.add_all([user1, user2, celebrity1, celebrity2, fan_rel1, fan_rel2])
session.commit()

# 查询某个用户关注的所有偶像
user = session.query(User).filter_by(name="Alice").first()
print(f"{user.name} is following:")
for relationship in user.fan_relationships:
    print(f"- {relationship.celebrity.name}")

# 查询某个偶像的所有粉丝
celebrity = session.query(Celebrity).filter_by(name="J.K. Rowling").first()
print(f"\n{celebrity.name} has the following fans:")
for relationship in celebrity.fan_relationships:
    print(f"- {relationship.user.name}")