Flask(六) 数据库操作SQLAlchemy

发布于:2025-06-29 ⋅ 阅读:(19) ⋅ 点赞:(0)

  • 使用 SQLAlchemy:定义模型,配置数据库,执行基本的 CRUD 操作。
  • 创建和管理数据库:使用 db.create_all() 创建表。
  • CRUD 操作:添加、读取、更新和删除记录。
  • 查询操作:执行基本和复杂查询,包括排序和分页。
  • Flask-Migrate:使用 Flask-Migrate 管理数据库迁移。
  • 执行原始 SQL:使用原始 SQL 语句进行数据库操作

一、准备工作

pip install flask flask_sqlalchemy

二、最小化可运行示例

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///mydb.sqlite3'
#或者
#app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://user:password@localhost/db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

# 定义数据模型(表)
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    age = db.Column(db.Integer)

# 创建表
with app.app_context():
    db.create_all()

# 启动应用
@app.route('/')
def index():
    return '数据库已初始化'
部分 含义
'sqlite:///mydatabase.db' 使用 SQLite 数据库,数据库文件保存在当前目录
SQLAlchemy(app) 绑定 Flask 应用,初始化 ORM 引擎。也就是将 Flask 应用对象 app 注册到 SQLAlchemy 对象中,从而启用数据库功能。
SQLALCHEMY_TRACK_MODIFICATIONS 关闭事件监听,提升性能,防止警告

✅ 补充

延迟绑定方式(推荐方式)

适用于大型项目或避免循环引用:

# extensions.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy() # 先不绑定 app

# app.py
from flask import Flask
from extensions import db

def create_app():
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///mydatabase.db'
    db.init_app(app) # ✅ 后绑定 app
    return app
写法 含义
db = SQLAlchemy(app) 初始化并立刻绑定 app
db = SQLAlchemy() + db.init_app(app) 更灵活,推荐在大型项目或工厂模式中使用

三、数据库基本操作(增删改查)

1. 插入数据(增)

with app.app_context():
    user = User(username='alice', age=20)
    db.session.add(user)
    db.session.commit()

db.session.add(user):将新用户对象添加到会话中。
db.session.commit():提交事务,将更改保存到数据库。

2. 查询数据(查)

# 查询所有
User.query.all()

# 查询第一条
User.query.first()

# 条件查询
User.query.filter_by(username='alice').first()

# 更复杂的筛选
User.query.filter(User.age >= 18).all()

User.query.all():查询所有用户记录。

User.query.all()时报错,RuntimeError: Working outside of application context.
说明你试图在没有 Flask 应用上下文的环境中访问数据库。这是 Flask-SQLAlchemy 的一个常规限制。

正确解决方案:
示例1、直接加上 app.app_context()

import User

with app.app_context():
    users = User.query.all()
    print(users) 

示例2、在 Python 交互模式中(REPL 或 Jupyter Notebook)

from your_project import app from your_project.models import
User

app.app_context().push()  # 手动推入应用上下文 User.query.all()

示例 3:在 Flask 视图函数中(上下文自动存在)

 @app.route("/users") 
 def get_users():
    users = User.query.all()
    return jsonify([u.to_dict() for u in users]) 

🔍 为什么必须有应用上下文?
Flask-SQLAlchemy 底层依赖 current_app 和 g,这些变量必须在上下文中才存在

📌如果你使用的是“应用工厂模式”(create_app()),记得这样写:

app = create_app() with app.app_context():
   db.create_all()
   User.query.all() 

3. 更新数据(改)

with app.app_context():
    user = User.query.filter_by(username='alice').first()
    if user:
        user.age = 25
        db.session.commit()
@app.route('/update_user/<int:user_id>')
def update_user(user_id):
    user = User.query.get(user_id)
    if user:
        user.username = 'new_username'
        db.session.commit()
        return 'User updated!'
    return 'User not found!'

User.query.get(user_id):通过主键查询单个用户记录。
更新字段值并提交事务。

4. 删除数据(删)

with app.app_context():
    user = User.query.filter_by(username='alice').first()
    if user:
        db.session.delete(user)
        db.session.commit()

db.session.delete(user):删除用户记录,并提交事务。

四、其他有用方法

方法 / 类 功能
db.Model 所有模型基类
db.Column 定义字段类型
db.session.add(obj) 添加记录
db.session.commit() 提交更改
User.query.filter(...) ORM 查询
db.create_all() 创建所有表
db.drop_all() 删除所有表

五、常用字段类型

类型 含义
db.String(n) 字符串(最长 n)
db.Integer 整数
db.Boolean 布尔值
db.DateTime 时间戳
db.Float 浮点数

六、初始化数据库脚本(推荐)

可以加一个 Python 脚本 init_db.py,自动初始化数据库:

from app import db
from app import app  # 确保 Flask app 存在

with app.app_context():
    db.create_all()
    print("数据库已初始化!")

db.create_all():创建所有在当前上下文中定义的模型对应的表。

sqlalchemy 实例

基本使用

假设你有模型:

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)

__tablename__ = "users"在 SQLAlchemy 中,这是在 ORM 模型类中定义 对应的数据库表名。
不加 __tablename__ 会怎样?
SQLAlchemy 默认使用类名的小写作为表名,如 User → user
但不推荐依赖默认行为 —— 明确指定更安全、更清晰
模型是数据库表的 Python 类,每个模型类代表数据库中的一张表。
推荐命名风格

模型类名 表名 建议写法
User users __tablename__ = "users"
Order orders __tablename__ = "orders"
UserOrder user_orders __tablename__ = "user_orders"

你可以:

# 查询所有用户
users = db.session.query(User).all()

# 查询用户名为 'root' 的用户
user = db.session.query(User).filter_by(username='root').first()

# 查询 ID 大于 10 的用户
users = db.session.query(User).filter(User.id > 10).all()

常见方法速查

方法 说明
query(Model) 创建查询对象
.filter(condition) 过滤条件,支持表达式
.filter_by(field=value) 简写形式,等同于 == 比较
.first() 获取第一条记录或 None
.all() 获取所有结果(列表)
.count() 获取记录数量
.limit(n) 限制返回数量
.offset(n) 跳过前 n 行记录
.order_by(Model.field) 排序(可加 .desc()
.one() / .one_or_none() 获取一条记录(不唯一会报错)
.distinct() 去重

多表查询(JOIN)

db.session.query(User, Post).join(Post, User.id == Post.user_id).all()
from sqlalchemy import or_

users = User.query.filter(or_(User.username == 'john_doe', User.email == 'john@example.com')).all()

or_():用于执行复杂的查询条件。

原始 SQL 语句(可选)

from sqlalchemy import text
db.session.execute(text("SELECT * FROM users WHERE id = :id"), {"id": 1}).fetchall()

db.session.execute():执行原始 SQL 查询。

示例:分页 + 排序

users = db.session.query(User) \
    .filter(User.username.like('%admin%')) \
    .order_by(User.id.desc()) \
    .offset(0).limit(10) \
    .all()

order_by():按指定字段排序。
paginate():分页查询。

推荐:使用 Flask-SQLAlchemy 提供的简写风格

User.query.filter_by(username='root').first()
User.query.filter(User.id > 10).all()

完整的 Flask-SQLAlchemy 配置示例(含连接池/超时/调试配置)

推荐项目结构(简化)

your_project/
├── app.py
├── config.py           ← 配置集中管理
├── models.py           ← 数据模型
└── init_db.py          ← 初始化数据库

✅ 1. config.py 配置文件

import os

class Config:
    # ✅ 数据库连接(选你要的数据库)
    # SQLite(开发测试用)
    SQLALCHEMY_DATABASE_URI = 'sqlite:///mydb.sqlite3'

    # MySQL 示例
    # SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost:3306/mydb?charset=utf8mb4'

    # PostgreSQL 示例
    # SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://user:password@localhost/mydb'

    # ✅ SQLAlchemy 设置
    SQLALCHEMY_TRACK_MODIFICATIONS = False

    # ✅ 调试设置
    DEBUG = True

    # ✅ 数据库连接池设置
    SQLALCHEMY_ENGINE_OPTIONS = {
        "pool_size": 10,            # 最大连接数
        "max_overflow": 5,          # 超出 pool_size 后允许的连接数
        "pool_timeout": 30,         # 获取连接最大等待时间(秒)
        "pool_recycle": 1800,       # 自动重连时间(秒)防止 MySQL 8 小时断开
        "echo": True,               # 输出执行 SQL(调试用)
    }

    # ✅ 密钥(如启用 Session)
    SECRET_KEY = os.environ.get('SECRET_KEY', 'dev-key')

数据库连接池说明(MySQL/PostgreSQL 推荐)

参数 含义
pool_size 最大连接数
max_overflow 超出最大连接数后的额外连接数
pool_timeout 获取连接等待时长(秒)
pool_recycle 多久重置连接(避免断线),MySQL 默认 8 小时(28800 秒)断连接,推荐 pool_recycle = 1800。
echo 是否打印 SQL(调试用)

✅ 2. app.py 应用初始化

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from config import Config

db = SQLAlchemy()

def create_app():
    app = Flask(__name__)
    app.config.from_object(Config)
    db.init_app(app)

    # 注册路由/蓝图
    @app.route('/')
    def index():
        return 'Hello Flask + SQLAlchemy'

    return app

✅ 3. models.py 数据模型定义

from app import db

class User(db.Model):
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, nullable=False)
    age = db.Column(db.Integer)

✅ 4. init_db.py 数据库初始化脚本

from app import create_app, db
from models import *

app = create_app()

with app.app_context():
    db.create_all()
    print("✅ 数据库已初始化")

运行方式:

python init_db.py

使用 PyMySQL 连接 MySQL,app.py 文件代码:

from flask import Flask, request, jsonify
import pymysql

app = Flask(__name__)

def get_db_connection():
    connection = pymysql.connect(
        host='localhost',
        user='username',
        password='password',
        database='dbname',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection

@app.route('/add_user', methods=['POST'])
def add_user():
    data = request.json
    name = data['name']
    email = data['email']

    connection = get_db_connection()
    with connection.cursor() as cursor:
        cursor.execute('INSERT INTO user (username, email) VALUES (%s, %s)', (name, email))
        connection.commit()

    connection.close()
    return 'User added!'

@app.route('/get_users')
def get_users():
    connection = get_db_connection()
    with connection.cursor() as cursor:
        cursor.execute('SELECT * FROM user')
        users = cursor.fetchall()

    connection.close()
    return jsonify(users)

if __name__ == '__main__':
    app.run(debug=True)