一、引言
MongoDB 的聚合管道(Aggregation Pipeline)是一种强大的数据处理工具,它允许你对文档进行一系列的操作,如过滤、转换、分组和聚合等。聚合管道由多个管道组成,每个管道对输入的文档进行特定的处理,并将处理后的结果传递给下一个管道。
二、基础管道
1. 过滤管道 $match
功能
$match
管道用于过滤文档,它的作用类似于 find()
方法的条件查询。只有满足指定条件的文档才会进入下一个管道。
示例代码
假设我们有一个名为 students
的集合,包含以下文档:
[
{ "name": "Alice", "age": 20, "status": "A" },
{ "name": "Bob", "age": 17, "status": "B" },
{ "name": "Charlie", "age": 22, "status": "A" }
]
以下是使用 $match
管道过滤出年龄大于等于 18 且状态为 "A" 的学生的聚合管道代码:
from pymongo import MongoClient
# 连接到 MongoDB 数据库
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $match 管道,过滤出 age 大于等于 18 且 status 为 "A" 的文档
'$match': {
'age': {'$gte': 18},
'status': "A"
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
# 关闭连接
client.close()
’$gte‘是比较操作符(>=),类似的比较操作符如下:
名称 |
说明 |
---|---|
匹配等于指定值的值。 |
|
匹配大于指定值的值。 |
|
匹配大于等于指定值的值。 |
|
匹配数组中指定的任何值。 |
|
匹配小于指定值的值。 |
|
匹配小于等于指定值的值。 |
|
匹配所有不等于指定值的值。 |
|
不匹配数组中指定的任何值。 |
结果
执行上述代码后,预期的结果是返回年龄大于等于 18 且状态为 "A" 的学生文档,即:
[
{ "name": "Alice", "age": 20, "status": "A" },
{ "name": "Charlie", "age": 22, "status": "A" }
]
2. 分组管道 $group
功能
$group
管道用于按指定字段对文档进行分组,并可以对每个组进行聚合操作,如求和、计数、求平均值等。
示例代码
继续使用 students
集合,以下是按 status
字段分组并统计每组学生数量的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $group 管道,按 status 字段分组
'$group': {
# _id 是分组的依据字段
'_id': "$status",
# 统计每组的学生数量
'studentCount': {'$sum': 1}
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是按 status
字段分组并统计每组的学生数量,结果如下:
[
{ "_id": "A", "studentCount": 2 },
{ "_id": "B", "studentCount": 1 }
]
3.投影管道 $project
功能
$project
管道用于选择、重命名或删除字段,还可以计算新的字段。
示例代码
以下是使用 $project
管道选择 name
和 age
字段,并计算 ageInDays
字段(假设一年按 365 天计算)的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $project 管道,选择和计算字段
'$project': {
# 保留 name 字段
'name': 1,
# 保留 age 字段
'age': 1,
# 计算 ageInDays 字段
'ageInDays': {'$multiply': ["$age", 365]}
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是返回包含 name
、age
和 ageInDays
字段的文档,结果如下:
[
{ "name": "Alice", "age": 20, "ageInDays": 7300 },
{ "name": "Bob", "age": 17, "ageInDays": 6205 },
{ "name": "Charlie", "age": 22, "ageInDays": 8030 }
]
4. 排序管道 $sort
功能
$sort
管道用于按指定字段对文档进行排序,排序规则为 1 表示升序, -1 表示降序。
示例代码
以下是按 age
字段降序排序的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $sort 管道,按 age 字段降序排序
'$sort': {
'age': -1
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是按 age
字段降序排列的文档,结果如下:
[
{ "name": "Charlie", "age": 22, "status": "A" },
{ "name": "Alice", "age": 20, "status": "A" },
{ "name": "Bob", "age": 17, "status": "B" }
]
5. 分页管道 $limit
和 $skip
功能
$limit
管道用于限制返回的文档数量,$skip
管道用于跳过指定数量的文档,通常一起使用来实现分页功能。
示例代码
假设我们要获取第二页的数据,每页显示 1 条记录,以下是对应的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $skip 管道,跳过 1 条记录
'$skip': 1
},
{
# $limit 管道,限制返回 1 条记录
'$limit': 1
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是跳过第一条记录,返回第二条记录,结果如下:
[
{ "name": "Bob", "age": 17, "status": "B" }
]
三、高级管道
1.展开管道 $unwind
功能
$unwind
管道用于展开数组字段,将包含数组字段的文档拆分成多个文档,每个文档包含数组中的一个元素。
示例代码
假设我们有一个名为 books
的集合,包含以下文档:
[
{ "title": "Book 1", "authors": ["Author 1", "Author 2"] },
{ "title": "Book 2", "authors": ["Author 3"] }
]
以下是使用 $unwind
管道展开 authors
数组的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['books']
# 定义聚合管道
pipeline = [
{
# $unwind 管道,展开 authors 数组
'$unwind': "$authors"
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是将包含数组字段的文档拆分成多个文档,每个文档包含数组中的一个元素,结果如下:
[
{ "title": "Book 1", "authors": "Author 1" },
{ "title": "Book 1", "authors": "Author 2" },
{ "title": "Book 2", "authors": "Author 3" }
]
2.连接管道 $lookup
功能
$lookup
管道用于在不同的集合之间进行左外连接,类似于 SQL 中的 JOIN 操作。
示例代码
假设我们有两个集合:orders
和 customers
,orders
集合包含以下文档:
[
{ "orderId": 1, "customerId": 1, "amount": 100 },
{ "orderId": 2, "customerId": 2, "amount": 200 }
]
customers
集合包含以下文档:
[
{ "customerId": 1, "name": "Customer 1" },
{ "customerId": 2, "name": "Customer 2" }
]
以下是使用 $lookup
管道将 orders
集合和 customers
集合进行连接的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
orders_collection = database['orders']
# 定义聚合管道
pipeline = [
{
# $lookup 管道,连接 orders 集合和 customers 集合
'$lookup': {
# 要连接的集合名称
'from': "customers",
# 当前集合中用于连接的字段
'localField': "customerId",
# 要连接的集合中用于连接的字段
'foreignField': "customerId",
# 连接结果存储的字段名
'as': "customerInfo"
}
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是将 orders
集合和 customers
集合进行左外连接,结果如下:
[
{
"orderId": 1,
"customerId": 1,
"amount": 100,
"customerInfo": [
{ "customerId": 1, "name": "Customer 1" }
]
},
{
"orderId": 2,
"customerId": 2,
"amount": 200,
"customerInfo": [
{ "customerId": 2, "name": "Customer 2" }
]
}
]
3.新增管道 $addFields
功能
$addFields
管道用于向文档中添加新的字段,而不影响原有的字段。
示例代码
继续使用 students
集合,以下是使用 $addFields
管道添加 isAdult
字段的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $addFields 管道,添加 isAdult 字段
'$addFields': {
'isAdult': {'$gte': ["$age", 18]}
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是向文档中添加 isAdult
字段,该字段表示学生是否成年,结果如下:
[
{ "name": "Alice", "age": 20, "status": "A", "isAdult": true },
{ "name": "Bob", "age": 17, "status": "B", "isAdult": false },
{ "name": "Charlie", "age": 22, "status": "A", "isAdult": true }
]
4.设置管道 $set
功能
$set
管道用于更新或添加字段,如果字段已经存在,则更新其值;如果字段不存在,则添加该字段。
示例代码
以下是使用 $set
管道更新 status
字段为 "completed" 的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $set 管道,更新 status 字段
'$set': {
'status': "completed"
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是将所有文档的 status
字段更新为 "completed",结果如下:
[
{ "name": "Alice", "age": 20, "status": "completed" },
{ "name": "Bob", "age": 17, "status": "completed" },
{ "name": "Charlie", "age": 22, "status": "completed" }
]
5. 地图管道 $geoNear
功能
$geoNear
管道用于基于地理位置进行查询,返回距离指定坐标点最近的文档。
示例代码
假设我们有一个名为 places
的集合,包含以下文档:
[
{
"name": "Place 1",
"location": {
"type": "Point",
"coordinates": [116.4074, 39.9042]
}
},
{
"name": "Place 2",
"location": {
"type": "Point",
"coordinates": [116.4174, 39.9142]
}
}
]
以下是使用 $geoNear
管道查询距离坐标点 [116.4074, 39.9042]
最近的地点的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['places']
# 创建 2dsphere 索引,用于地理空间查询
collection.create_index([("location", "2dsphere")])
# 定义聚合管道
pipeline = [
{
# $geoNear 管道,查询距离指定坐标点最近的地点
'$geoNear': {
# 指定查询的中心点坐标
'near': {'type': "Point", 'coordinates': [116.4074, 39.9042]},
# 存储计算出的距离的字段名
'distanceField': "distance",
# 最大查询距离(单位:米)
'maxDistance': 10000,
# 是否返回包含中心点的文档
'spherical': True
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是返回距离指定坐标点最近的地点文档,并在文档中添加 distance
字段表示距离中心点的距离。因为 Place 1
的坐标与查询中心点坐标相同,所以它会排在第一位,Place 2
排在第二位,结果大致如下(distance
值会根据实际计算得出):
[
{
"name": "Place 1",
"location": {
"type": "Point",
"coordinates": [116.4074, 39.9042]
},
"distance": 0
},
{
"name": "Place 2",
"location": {
"type": "Point",
"coordinates": [116.4174, 39.9142]
},
"distance": 1234 // 实际距离值
}
]
四、其他常用管道
1. 分组连接管道 $graphLookup
功能
$graphLookup
管道用于处理图结构数据,例如树状关系。它可以从一个集合中递归地查找相关文档。
示例代码
假设我们有一个名为 employees
的集合,存储员工及其上级关系,文档结构如下:
[
{ "_id": 1, "name": "Alice", "managerId": null },
{ "_id": 2, "name": "Bob", "managerId": 1 },
{ "_id": 3, "name": "Charlie", "managerId": 2 }
]
以下是使用 $graphLookup
管道查找每个员工的所有上级的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['employees']
# 定义聚合管道
pipeline = [
{
# $graphLookup 管道,查找每个员工的所有上级
'$graphLookup': {
# 要查找的集合
'from': "employees",
# 起始字段,即当前文档中用于开始查找的字段
'startWith': "$managerId",
# 连接字段,用于递归查找
'connectFromField': "managerId",
# 被连接字段,与 connectFromField 对应
'connectToField': "_id",
# 存储查找结果的字段名
'as': "managers"
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是为每个员工文档添加一个 managers
数组字段,其中包含该员工的所有上级信息。例如:
[
{
"_id": 1,
"name": "Alice",
"managerId": null,
"managers": []
},
{
"_id": 2,
"name": "Bob",
"managerId": 1,
"managers": [
{ "_id": 1, "name": "Alice", "managerId": null }
]
},
{
"_id": 3,
"name": "Charlie",
"managerId": 2,
"managers": [
{ "_id": 2, "name": "Bob", "managerId": 1 },
{ "_id": 1, "name": "Alice", "managerId": null }
]
}
]
2.面管道 $facet
功能
$facet
管道允许在单个聚合管道中执行多个不同的聚合操作,并将结果分组返回。这对于在一个查询中进行多维度分析非常有用。
示例代码
继续使用 students
集合,以下是使用 $facet
管道同时进行分页和统计学生总数的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $facet 管道,同时进行分页和统计
'$facet': {
# 分页操作
'paginatedResults': [
{'$skip': 0},
{'$limit': 2}
],
# 统计学生总数
'totalCount': [
{'$count': "total"}
]
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是返回一个包含两个字段的文档,paginatedResults
字段包含分页后的学生文档,totalCount
字段包含学生总数。例如:
[
{
"paginatedResults": [
{ "name": "Alice", "age": 20, "status": "A" },
{ "name": "Bob", "age": 17, "status": "B" }
],
"totalCount": [
{ "total": 3 }
]
}
]
3. 分阶管道 $bucket
和 $bucketAuto
功能
$bucket
管道用于将文档分到不同的区间(桶)中,需要手动指定区间边界;$bucketAuto
管道则会自动将文档分到指定数量的区间中。
示例代码($bucket
)
以下是使用 $bucket
管道按 age
字段将学生文档分到不同区间的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $bucket 管道,按 age 字段分组
'$bucket': {
# 分组依据字段
'groupBy': "$age",
# 区间边界
'boundaries': [15, 20, 25],
# 缺省桶,用于存储不在指定区间内的文档
'default': "Other",
# 每个桶中要统计的信息
'output': {
'count': {'$sum': 1}
}
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是将学生文档按 age
字段分到不同的区间中,并统计每个区间的文档数量。例如:
[
{ "_id": 15, "count": 1 },
{ "_id": 20, "count": 1 },
{ "_id": "Other", "count": 1 }
]
示例代码($bucketAuto
)
以下是使用 $bucketAuto
管道按 age
字段自动将学生文档分到 2 个区间的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $bucketAuto 管道,自动分组
'$bucketAuto': {
# 分组依据字段
'groupBy': "$age",
# 区间数量
'buckets': 2,
# 每个桶中要统计的信息
'output': {
'count': {'$sum': 1}
}
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
结果
执行上述代码后,预期的结果是将学生文档按 age
字段自动分到 2 个区间中,并统计每个区间的文档数量。例如:
[
{
"_id": {
"min": 17,
"max": 20
},
"count": 2
},
{
"_id": {
"min": 20,
"max": 22
},
"count": 1
}
]
4.输出管道 $out
功能
$out
管道用于将聚合结果输出到一个新的集合中。
示例代码
以下是使用 $out
管道将过滤后的学生文档输出到一个新集合 filtered_students
中的聚合管道代码:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $match 管道,过滤出 age 大于等于 18 的学生
'$match': {
'age': {'$gte': 18}
}
},
{
# $out 管道,将结果输出到新集合
'$out': "filtered_students"
}
]
# 执行聚合操作
list(collection.aggregate(pipeline))
print("聚合结果已输出到 filtered_students 集合")
client.close()
结果
执行上述代码后,会在数据库中创建一个名为 filtered_students
的新集合,其中包含年龄大于等于 18 的学生文档。可以通过查询该集合来验证结果:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['filtered_students']
# 查询新集合中的文档
result = list(collection.find())
print(result)
client.close()
预期结果与 $match
管道过滤后的结果相同:
[
{ "name": "Alice", "age": 20, "status": "A" },
{ "name": "Charlie", "age": 22, "status": "A" }
]
五、管道使用建议
1. 管道的组合使用
通过多个管道的组合可以实现复杂的逻辑。例如,先使用 $match
管道过滤数据,减少后续管道处理的数据量,再使用 $group
管道进行分组统计。以下是一个组合示例:
from pymongo import MongoClient
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']
# 定义聚合管道
pipeline = [
{
# $match 管道,过滤出 age 大于等于 18 的学生
'$match': {
'age': {'$gte': 18}
}
},
{
# $group 管道,按 status 字段分组并统计每组学生数量
'$group': {
'_id': "$status",
'studentCount': {'$sum': 1}
}
}
]
# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)
client.close()
2. 性能优化
- 优先使用
$match
或$limit
管道:在聚合管道的开头使用$match
管道过滤数据,减少后续管道处理的数据量;使用$limit
管道限制返回的文档数量,避免处理过多不必要的数据。 - 避免在
$group
管道中使用复杂表达式:复杂的表达式会增加计算量,影响性能。尽量在$group
管道之前进行数据处理,将复杂的计算放到$project
管道。
3. 参考官方文档
MongoDB 的官方文档提供了完整的聚合管道列表和详细的示例,建议参考 MongoDB Aggregation Pipeline 获取更多信息。
通过本教程,你应该对 MongoDB 聚合管道的常用管道有了更深入的了解,可以根据具体需求选择合适的管道组合来完成数据处理任务。