MongoDB 聚合管道速成教程

发布于:2025-03-13 ⋅ 阅读:(11) ⋅ 点赞:(0)

一、引言

MongoDB 的聚合管道(Aggregation Pipeline)是一种强大的数据处理工具,它允许你对文档进行一系列的操作,如过滤、转换、分组和聚合等。聚合管道由多个管道组成,每个管道对输入的文档进行特定的处理,并将处理后的结果传递给下一个管道。


二、基础管道

1. 过滤管道 $match 

功能

$match 管道用于过滤文档,它的作用类似于 find() 方法的条件查询。只有满足指定条件的文档才会进入下一个管道。

示例代码

假设我们有一个名为 students 的集合,包含以下文档:

[
  { "name": "Alice", "age": 20, "status": "A" },
  { "name": "Bob", "age": 17, "status": "B" },
  { "name": "Charlie", "age": 22, "status": "A" }
]

 以下是使用 $match 管道过滤出年龄大于等于 18 且状态为 "A" 的学生的聚合管道代码:

from pymongo import MongoClient

# 连接到 MongoDB 数据库
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $match 管道,过滤出 age 大于等于 18 且 status 为 "A" 的文档
        '$match': {
            'age': {'$gte': 18},
            'status': "A"
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

# 关闭连接
client.close()

 ’$gte‘是比较操作符(>=),类似的比较操作符如下:

名称

说明

$eq

匹配等于指定值的值。

$gt

匹配大于指定值的值。

$gte

匹配大于等于指定值的值。

$in

匹配数组中指定的任何值。

$lt

匹配小于指定值的值。

$lte

匹配小于等于指定值的值。

$ne

匹配所有不等于指定值的值。

$nin

不匹配数组中指定的任何值。

 结果

执行上述代码后,预期的结果是返回年龄大于等于 18 且状态为 "A" 的学生文档,即:

[
  { "name": "Alice", "age": 20, "status": "A" },
  { "name": "Charlie", "age": 22, "status": "A" }
]

 2. 分组管道 $group

功能

$group 管道用于按指定字段对文档进行分组,并可以对每个组进行聚合操作,如求和、计数、求平均值等。

示例代码

继续使用 students 集合,以下是按 status 字段分组并统计每组学生数量的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $group 管道,按 status 字段分组
        '$group': {
            # _id 是分组的依据字段
            '_id': "$status",
            # 统计每组的学生数量
            'studentCount': {'$sum': 1}
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

结果

执行上述代码后,预期的结果是按 status 字段分组并统计每组的学生数量,结果如下:

[
  { "_id": "A", "studentCount": 2 },
  { "_id": "B", "studentCount": 1 }
]

 3.投影管道 $project

功能

$project 管道用于选择、重命名或删除字段,还可以计算新的字段。

示例代码

以下是使用 $project 管道选择 name 和 age 字段,并计算 ageInDays 字段(假设一年按 365 天计算)的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $project 管道,选择和计算字段
        '$project': {
            # 保留 name 字段
            'name': 1,
            # 保留 age 字段
            'age': 1,
            # 计算 ageInDays 字段
            'ageInDays': {'$multiply': ["$age", 365]}
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

结果

执行上述代码后,预期的结果是返回包含 nameage 和 ageInDays 字段的文档,结果如下:

[
  { "name": "Alice", "age": 20, "ageInDays": 7300 },
  { "name": "Bob", "age": 17, "ageInDays": 6205 },
  { "name": "Charlie", "age": 22, "ageInDays": 8030 }
]

 4. 排序管道 $sort

 功能

$sort 管道用于按指定字段对文档进行排序,排序规则为 1 表示升序, -1 表示降序。

示例代码

以下是按 age 字段降序排序的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $sort 管道,按 age 字段降序排序
        '$sort': {
            'age': -1
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

 结果

执行上述代码后,预期的结果是按 age 字段降序排列的文档,结果如下:

[
  { "name": "Charlie", "age": 22, "status": "A" },
  { "name": "Alice", "age": 20, "status": "A" },
  { "name": "Bob", "age": 17, "status": "B" }
]

 5. 分页管道 $limit 和 $skip

功能

$limit 管道用于限制返回的文档数量,$skip 管道用于跳过指定数量的文档,通常一起使用来实现分页功能。

示例代码

假设我们要获取第二页的数据,每页显示 1 条记录,以下是对应的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $skip 管道,跳过 1 条记录
        '$skip': 1
    },
    {
        # $limit 管道,限制返回 1 条记录
        '$limit': 1
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

 结果

执行上述代码后,预期的结果是跳过第一条记录,返回第二条记录,结果如下:

[
  { "name": "Bob", "age": 17, "status": "B" }
]

 三、高级管道

1.展开管道 $unwind

功能

$unwind 管道用于展开数组字段,将包含数组字段的文档拆分成多个文档,每个文档包含数组中的一个元素。

示例代码

假设我们有一个名为 books 的集合,包含以下文档:

[
  { "title": "Book 1", "authors": ["Author 1", "Author 2"] },
  { "title": "Book 2", "authors": ["Author 3"] }
]

 以下是使用 $unwind 管道展开 authors 数组的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['books']

# 定义聚合管道
pipeline = [
    {
        # $unwind 管道,展开 authors 数组
        '$unwind': "$authors"
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

结果

执行上述代码后,预期的结果是将包含数组字段的文档拆分成多个文档,每个文档包含数组中的一个元素,结果如下:

[
  { "title": "Book 1", "authors": "Author 1" },
  { "title": "Book 1", "authors": "Author 2" },
  { "title": "Book 2", "authors": "Author 3" }
]

 2.连接管道 $lookup

功能

$lookup 管道用于在不同的集合之间进行左外连接,类似于 SQL 中的 JOIN 操作。

示例代码

假设我们有两个集合:orders 和 customersorders 集合包含以下文档:

[
  { "orderId": 1, "customerId": 1, "amount": 100 },
  { "orderId": 2, "customerId": 2, "amount": 200 }
]

 customers 集合包含以下文档:

[
  { "customerId": 1, "name": "Customer 1" },
  { "customerId": 2, "name": "Customer 2" }
]

以下是使用 $lookup 管道将 orders 集合和 customers 集合进行连接的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
orders_collection = database['orders']

# 定义聚合管道
pipeline = [
    {
        # $lookup 管道,连接 orders 集合和 customers 集合
        '$lookup': {
            # 要连接的集合名称
            'from': "customers",
            # 当前集合中用于连接的字段
            'localField': "customerId",
            # 要连接的集合中用于连接的字段
            'foreignField': "customerId",
            # 连接结果存储的字段名
            'as': "customerInfo"
        }
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
print(result)

client.close()

 结果

执行上述代码后,预期的结果是将 orders 集合和 customers 集合进行左外连接,结果如下:

[
  {
    "orderId": 1,
    "customerId": 1,
    "amount": 100,
    "customerInfo": [
      { "customerId": 1, "name": "Customer 1" }
    ]
  },
  {
    "orderId": 2,
    "customerId": 2,
    "amount": 200,
    "customerInfo": [
      { "customerId": 2, "name": "Customer 2" }
    ]
  }
]

 3.新增管道 $addFields

功能

$addFields 管道用于向文档中添加新的字段,而不影响原有的字段。

示例代码

继续使用 students 集合,以下是使用 $addFields 管道添加 isAdult 字段的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $addFields 管道,添加 isAdult 字段
        '$addFields': {
            'isAdult': {'$gte': ["$age", 18]}
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

 结果

执行上述代码后,预期的结果是向文档中添加 isAdult 字段,该字段表示学生是否成年,结果如下:

[
  { "name": "Alice", "age": 20, "status": "A", "isAdult": true },
  { "name": "Bob", "age": 17, "status": "B", "isAdult": false },
  { "name": "Charlie", "age": 22, "status": "A", "isAdult": true }
]

4.设置管道 $set

功能

$set 管道用于更新或添加字段,如果字段已经存在,则更新其值;如果字段不存在,则添加该字段。

示例代码

以下是使用 $set 管道更新 status 字段为 "completed" 的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $set 管道,更新 status 字段
        '$set': {
            'status': "completed"
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

 结果

执行上述代码后,预期的结果是将所有文档的 status 字段更新为 "completed",结果如下:

[
  { "name": "Alice", "age": 20, "status": "completed" },
  { "name": "Bob", "age": 17, "status": "completed" },
  { "name": "Charlie", "age": 22, "status": "completed" }
]

 5. 地图管道 $geoNear 

功能

$geoNear 管道用于基于地理位置进行查询,返回距离指定坐标点最近的文档。

示例代码

假设我们有一个名为 places 的集合,包含以下文档:

[
  {
    "name": "Place 1",
    "location": {
      "type": "Point",
      "coordinates": [116.4074, 39.9042]
    }
  },
  {
    "name": "Place 2",
    "location": {
      "type": "Point",
      "coordinates": [116.4174, 39.9142]
    }
  }
]

 以下是使用 $geoNear 管道查询距离坐标点 [116.4074, 39.9042] 最近的地点的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['places']

# 创建 2dsphere 索引,用于地理空间查询
collection.create_index([("location", "2dsphere")])

# 定义聚合管道
pipeline = [
    {
        # $geoNear 管道,查询距离指定坐标点最近的地点
        '$geoNear': {
            # 指定查询的中心点坐标
            'near': {'type': "Point", 'coordinates': [116.4074, 39.9042]},
            # 存储计算出的距离的字段名
            'distanceField': "distance",
            # 最大查询距离(单位:米)
            'maxDistance': 10000,
            # 是否返回包含中心点的文档
            'spherical': True
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

结果

执行上述代码后,预期的结果是返回距离指定坐标点最近的地点文档,并在文档中添加 distance 字段表示距离中心点的距离。因为 Place 1 的坐标与查询中心点坐标相同,所以它会排在第一位,Place 2 排在第二位,结果大致如下(distance 值会根据实际计算得出):

[
  {
    "name": "Place 1",
    "location": {
      "type": "Point",
      "coordinates": [116.4074, 39.9042]
    },
    "distance": 0
  },
  {
    "name": "Place 2",
    "location": {
      "type": "Point",
      "coordinates": [116.4174, 39.9142]
    },
    "distance": 1234 // 实际距离值
  }
]

四、其他常用管道

1. 分组连接管道 $graphLookup

功能

$graphLookup 管道用于处理图结构数据,例如树状关系。它可以从一个集合中递归地查找相关文档。

示例代码

假设我们有一个名为 employees 的集合,存储员工及其上级关系,文档结构如下:

[
  { "_id": 1, "name": "Alice", "managerId": null },
  { "_id": 2, "name": "Bob", "managerId": 1 },
  { "_id": 3, "name": "Charlie", "managerId": 2 }
]

 以下是使用 $graphLookup 管道查找每个员工的所有上级的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['employees']

# 定义聚合管道
pipeline = [
    {
        # $graphLookup 管道,查找每个员工的所有上级
        '$graphLookup': {
            # 要查找的集合
            'from': "employees",
            # 起始字段,即当前文档中用于开始查找的字段
            'startWith': "$managerId",
            # 连接字段,用于递归查找
            'connectFromField': "managerId",
            # 被连接字段,与 connectFromField 对应
            'connectToField': "_id",
            # 存储查找结果的字段名
            'as': "managers"
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

结果

执行上述代码后,预期的结果是为每个员工文档添加一个 managers 数组字段,其中包含该员工的所有上级信息。例如:

[
  {
    "_id": 1,
    "name": "Alice",
    "managerId": null,
    "managers": []
  },
  {
    "_id": 2,
    "name": "Bob",
    "managerId": 1,
    "managers": [
      { "_id": 1, "name": "Alice", "managerId": null }
    ]
  },
  {
    "_id": 3,
    "name": "Charlie",
    "managerId": 2,
    "managers": [
      { "_id": 2, "name": "Bob", "managerId": 1 },
      { "_id": 1, "name": "Alice", "managerId": null }
    ]
  }
]

 2.面管道 $facet

功能

$facet 管道允许在单个聚合管道中执行多个不同的聚合操作,并将结果分组返回。这对于在一个查询中进行多维度分析非常有用。

示例代码

继续使用 students 集合,以下是使用 $facet 管道同时进行分页和统计学生总数的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $facet 管道,同时进行分页和统计
        '$facet': {
            # 分页操作
            'paginatedResults': [
                {'$skip': 0},
                {'$limit': 2}
            ],
            # 统计学生总数
            'totalCount': [
                {'$count': "total"}
            ]
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

结果

执行上述代码后,预期的结果是返回一个包含两个字段的文档,paginatedResults 字段包含分页后的学生文档,totalCount 字段包含学生总数。例如:

[
  {
    "paginatedResults": [
      { "name": "Alice", "age": 20, "status": "A" },
      { "name": "Bob", "age": 17, "status": "B" }
    ],
    "totalCount": [
      { "total": 3 }
    ]
  }
]

 3. 分阶管道 $bucket 和 $bucketAuto

功能

$bucket 管道用于将文档分到不同的区间(桶)中,需要手动指定区间边界;$bucketAuto 管道则会自动将文档分到指定数量的区间中。

示例代码($bucket

以下是使用 $bucket 管道按 age 字段将学生文档分到不同区间的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $bucket 管道,按 age 字段分组
        '$bucket': {
            # 分组依据字段
            'groupBy': "$age",
            # 区间边界
            'boundaries': [15, 20, 25],
            # 缺省桶,用于存储不在指定区间内的文档
            'default': "Other",
            # 每个桶中要统计的信息
            'output': {
                'count': {'$sum': 1}
            }
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

 结果

执行上述代码后,预期的结果是将学生文档按 age 字段分到不同的区间中,并统计每个区间的文档数量。例如:

[
  { "_id": 15, "count": 1 },
  { "_id": 20, "count": 1 },
  { "_id": "Other", "count": 1 }
]

示例代码($bucketAuto

以下是使用 $bucketAuto 管道按 age 字段自动将学生文档分到 2 个区间的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $bucketAuto 管道,自动分组
        '$bucketAuto': {
            # 分组依据字段
            'groupBy': "$age",
            # 区间数量
            'buckets': 2,
            # 每个桶中要统计的信息
            'output': {
                'count': {'$sum': 1}
            }
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

结果

执行上述代码后,预期的结果是将学生文档按 age 字段自动分到 2 个区间中,并统计每个区间的文档数量。例如:

[
  {
    "_id": {
      "min": 17,
      "max": 20
    },
    "count": 2
  },
  {
    "_id": {
      "min": 20,
      "max": 22
    },
    "count": 1
  }
]

4.输出管道 $out 

功能

$out 管道用于将聚合结果输出到一个新的集合中。

示例代码

以下是使用 $out 管道将过滤后的学生文档输出到一个新集合 filtered_students 中的聚合管道代码:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $match 管道,过滤出 age 大于等于 18 的学生
        '$match': {
            'age': {'$gte': 18}
        }
    },
    {
        # $out 管道,将结果输出到新集合
        '$out': "filtered_students"
    }
]

# 执行聚合操作
list(collection.aggregate(pipeline))
print("聚合结果已输出到 filtered_students 集合")

client.close()

 结果

执行上述代码后,会在数据库中创建一个名为 filtered_students 的新集合,其中包含年龄大于等于 18 的学生文档。可以通过查询该集合来验证结果:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['filtered_students']

# 查询新集合中的文档
result = list(collection.find())
print(result)

client.close()

 预期结果与 $match 管道过滤后的结果相同:

[
  { "name": "Alice", "age": 20, "status": "A" },
  { "name": "Charlie", "age": 22, "status": "A" }
]

五、管道使用建议

1. 管道的组合使用

通过多个管道的组合可以实现复杂的逻辑。例如,先使用 $match 管道过滤数据,减少后续管道处理的数据量,再使用 $group 管道进行分组统计。以下是一个组合示例:

from pymongo import MongoClient

uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']

# 定义聚合管道
pipeline = [
    {
        # $match 管道,过滤出 age 大于等于 18 的学生
        '$match': {
            'age': {'$gte': 18}
        }
    },
    {
        # $group 管道,按 status 字段分组并统计每组学生数量
        '$group': {
            '_id': "$status",
            'studentCount': {'$sum': 1}
        }
    }
]

# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)

client.close()

 2. 性能优化

  • 优先使用 $match 或 $limit 管道:在聚合管道的开头使用 $match 管道过滤数据,减少后续管道处理的数据量;使用 $limit 管道限制返回的文档数量,避免处理过多不必要的数据。
  • 避免在 $group 管道中使用复杂表达式:复杂的表达式会增加计算量,影响性能。尽量在 $group 管道之前进行数据处理,将复杂的计算放到 $project 管道。

3. 参考官方文档

MongoDB 的官方文档提供了完整的聚合管道列表和详细的示例,建议参考 MongoDB Aggregation Pipeline 获取更多信息。

通过本教程,你应该对 MongoDB 聚合管道的常用管道有了更深入的了解,可以根据具体需求选择合适的管道组合来完成数据处理任务。