SQLAlchemy 2.x 异步查询中常用的 结果处理方法速查表,包含方法说明、使用场景、返回类型及典型用途。
SQLAlchemy 查询结果处理方法速查表(适用于 AsyncSession)
方法 |
说明 |
返回类型 |
示例 SQL |
示例输出 |
|
获取单列所有值 |
|
|
|
|
获取单列的第一行 |
|
|
|
|
获取第一行的第一列(聚合) |
|
|
|
|
多列,每行为 dict 映射 |
|
|
|
|
返回第一行 dict 映射 |
|
|
|
|
返回第一行的 Row |
`Row |
None` |
|
|
返回所有行 Row 对象列表 |
|
|
|
|
返回一行 Row |
|
|
|
|
返回所有 Row(非字典) |
|
|
|
|
获取唯一一行,若多行/无行报错 |
|
|
|
|
获取一行或 None |
`Row |
None` |
|
使用建议(按类型):
- 获取单列(如用户 ID) → 用
scalars().all()
- 统计总数/最大值 → 用
scalar()
- 获取结构化多列数据(用于 JSON 返回) → 用
mappings().all()
- 查找某条记录详情 → 用
mappings().first()
或one_or_none()
- 返回原始 SQL 风格 Row → 用
fetchall()
/all()
(不推荐)
详细解释与建议:
scalars()
:
-
- 专门用于查询 单列,如
select(User.id)
- 会自动去除 Row 封装,只返回具体值
- 专门用于查询 单列,如
mappings()
:
-
- 用于查询 多列,将每一行封装为 dict(字段名:值)
- 最常用在结构化响应返回中(如 JSON)
first()
、one()
:
-
first()
:最多一行,超过无所谓,返回第一行one()
:必须返回一行,返回多行或无行都抛异常
fetchall()
/fetchone()
:
-
- 老式用法,返回
Row
对象(类似元组) - 不推荐在现代异步代码中使用,建议用
scalars()
/mappings()
替代
- 老式用法,返回
推荐使用方式总结:
查询类型 |
推荐方式 |
单列、多行 |
|
单列、单值 |
|
多列、多行 |
|
多列、单行 |
|
真实使用示例(异步 FastAPI):
async def test_get_single_column():
async with session_factory() as session:
query = select(User.id)
result = await session.execute(query)
user_ids = result.scalars().all()
print("用户 ID 列表:", user_ids) # 用户 ID 列表: [1, 3]
# 2. 统计总数/最大值 → 用 scalar()
async def test_aggregate_functions():
async with session_factory() as session:
# 统计用户总数
total_count = await session.execute(select(func.count(User.id)))
total = total_count.scalar()
print("用户总数:", total) # 用户总数: 2
# 查找最大的角色 ID
max_role_id = await session.execute(select(func.max(User.role_id)))
max_role_id_value = max_role_id.scalar()
print("最大的角色 ID:", max_role_id_value) # 最大的角色 ID: 2
# 3. 获取结构化多列数据(用于 JSON 返回) → 用 mappings().all()
async def test_get_structured_data():
async with session_factory() as session:
query = select(User.id, User.user_name, User.email)
result = await session.execute(query)
users = result.mappings().all()
print("结构化多列数据:", users)
# 结构化多列数据: [{'id': 1, 'user_name': 'test', 'email': '123@qq.com'},
# {'id': 3, 'user_name': 'test1', 'email': '456@qq.com'}]
# 4. 查找某条记录详情 → 用 mappings().first() 或 one_or_none()
async def test_find_single_record():
async with session_factory() as session:
# 使用 mappings().first()
query = select(User.id, User.user_name, User.email).where(User.id == 1)
result = await session.execute(query)
user = result.mappings().first()
print("使用 mappings().first() 查找的记录:", user)
# 使用 mappings().first() 查找的记录: {'id': 1, 'user_name': 'test', 'email': '123@qq.com'}
# 使用 one_or_none()
user_obj = await session.execute(select(User).where(User.id == 1))
user_result = user_obj.scalars().one_or_none()
print("使用 one_or_none() 查找的记录:", user_result)
# 使用 one_or_none() 查找的记录: <login_related.model.user.User object at 0x000001477DC206D0>
# 5. 返回原始 SQL 风格 Row → 用 fetchall() / all()(不推荐)
async def test_get_raw_rows():
async with session_factory() as session:
query = select(User.id, User.user_name, User.email)
result = await session.execute(query)
rows = result.all()
print("原始 SQL 风格 Row:", rows)
# 原始 SQL 风格 Row: [(1, 'test', '123@qq.com'), (3, 'test1', '456@qq.com')]