1. 链接Milvus
import random
from datetime import datetime
from time import sleep
from pymilvus import MilvusClient, AnnSearchRequest, RRFRanker
# client = MilvusClient(
# uri="http://localhost:19530",
# token="root:Milvus"
# )
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
# 连接Milvus
connections.connect(
# alias="cluster",
alias="default",
host="localhost",
port=19530
)
# 检查连接
print("服务器版本:", utility.get_server_version())
2. 创建包含多种标量和向量字段的集合
def create_collection():
# 定义复杂的字段结构
fields = [
# 主键字段
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
# 向量字段
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
FieldSchema(name="keyword_embedding", dtype=DataType.FLOAT_VECTOR, dim=64),
# 标量字段
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="subcategory", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="price", dtype=DataType.FLOAT),
FieldSchema(name="rating", dtype=DataType.INT64),
FieldSchema(name="is_active", dtype=DataType.BOOL),
FieldSchema(name="created_time", dtype=DataType.INT64), # 时间戳
FieldSchema(name="tags", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=100, max_length=32)
]
# 创建集合模式
schema = CollectionSchema(
fields,
description="完整的标量和向量操作示例"
)
# 创建集合
collection_name = "full_example_collection"
collection = Collection(
name=collection_name,
schema=schema,
using='default',
num_shards=2
)
print(f"集合 '{collection_name}' 创建成功")
return collection
3. 获取集合
def get_collection():
collection = Collection(name="full_example_collection", using="default")
# 3. 打印集合信息
print("集合名称:", collection.name)
print("是否自动 ID:", collection.schema.auto_id)
print("主键字段:", collection.primary_field.name)
print("所有字段:", [f.name for f in collection.schema.fields])
# 加载集合到内存(搜索前必须 load)
collection.load()
return collection
# 获取集合
collection = get_collection()
4. 创建索引
def create_index(collection: Collection):
# 为主向量字段创建索引
vector_index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128}
}
collection.create_index(
field_name="embedding",
index_params=vector_index_params
)
# 为关键词向量字段创建索引
keyword_index_params = {
"index_type": "IVF_FLAT",
"metric_type": "IP", # 内积相似度
"params": {"nlist": 64}
}
collection.create_index(
field_name="keyword_embedding",
index_params=keyword_index_params
)
print("向量索引创建完成")
# 标量字段索引(Milvus 2.3+支持)
# 注意:标量索引主要用于加速过滤操作
scalar_index_params = {
"index_type": "INVERTED"
}
# 为常用过滤字段创建标量索引
collection.create_index(field_name="category", index_params=scalar_index_params)
collection.create_index(field_name="subcategory", index_params=scalar_index_params)
collection.create_index(field_name="is_active", index_params=scalar_index_params)
collection.create_index(field_name="rating", index_params=scalar_index_params)
print("标量索引创建完成")
5. 插入数据
def generate_complex_data(num_entities):
data = []
for i in range(num_entities):
# ID
id_val = i
# 向量数据
embedding = [random.random() for _ in range(128)]
keyword_embedding = [random.random() for _ in range(64)]
# 标量数据
title = f"商品标题_{i}"
content = f"这是第{i}个商品的详细描述内容,包含丰富的信息..."
category = random.choice(["电子产品", "服装", "食品", "图书", "家居"])
subcategory = random.choice(["手机", "电脑", "上衣", "零食", "小说"])
price = round(random.uniform(10.0, 1000.0), 2)
rating = random.randint(1, 5)
is_active = random.choice([True, False])
created_time = int(datetime.now().timestamp()) + random.randint(-1000000, 1000000)
tags = random.sample(["热门", "新品", "促销", "推荐", "限量"], random.randint(1, 3))
data.append({
"id": id_val,
"embedding": embedding,
"keyword_embedding": keyword_embedding,
"title": title,
"content": content,
"category": category,
"subcategory": subcategory,
"price": price,
"rating": rating,
"is_active": is_active,
"created_time": created_time,
"tags": tags
})
return data
def insert_collection(collection):
# 插入数据
complex_data = generate_complex_data(1000)
# 转换为Milvus需要的格式
insert_data = [
[item["id"] for item in complex_data],
[item["embedding"] for item in complex_data],
[item["keyword_embedding"] for item in complex_data],
[item["title"] for item in complex_data],
[item["content"] for item in complex_data],
[item["category"] for item in complex_data],
[item["subcategory"] for item in complex_data],
[item["price"] for item in complex_data],
[item["rating"] for item in complex_data],
[item["is_active"] for item in complex_data],
[item["created_time"] for item in complex_data],
[item["tags"] for item in complex_data]
]
insert_result = collection.insert(insert_data)
print(f"插入了 {len(insert_result.primary_keys)} 条复杂记录")
6. 单向量查询
def single_vector_search_data(query_vector, top_k=10, filter_expr=None):
"""
单向量搜索
metric_type 距离度量方式:
"L2" :欧氏距离(数值越小越相似)
"IP" :内积(越大越相似)
"COSINE" :余弦相似度
nprobe IVF 类索引:
如果用了 IVF 类索引,表示搜索时查看多少个“最近的聚类中心”
• 值越大 → 精度越高,速度越慢
• 值越小 → 速度快,精度低
合法范围:1 ≤ nprobe ≤ nlist
取值推荐:一般取nlist的 10%~50%,如nlist=128 → nprobe=10~64
"""
search_params = {
"metric_type": "L2", # 使用欧氏距离(L2 距离)
"params": {"nprobe": 10} # IVF 索引参数:每次搜索探测 10 个聚类中心
}
results = collection.search(
data=[query_vector], # 要搜索的向量列表
anns_field="embedding", # 在哪个字段上做近似最近邻搜索
param=search_params, # 搜索参数(如算法、精度)
limit=top_k, # 返回前 k 个最相似的结果
expr=filter_expr, # 可选:过滤条件(如 price > 100)
output_fields=["id", "title", "category", "price", "rating"] # 返回哪些标量字段
)
return results
def single_vector_search():
# 执行向量搜索
query_vector = [random.random() for _ in range(128)]
search_results = single_vector_search_data(query_vector, top_k=5)
print("向量搜索结果:")
for i, result in enumerate(search_results[0]):
print(f"排名 {i + 1}: ID={result.id}, 标题={result.entity.get('title')}, "
f"分类={result.entity.get('category')}, 价格={result.entity.get('price')}")
7. 多向量查询
def muli_vector_hybrid_search_data(query_embedding, keyword_embedding, top_k=10):
"""混合搜索:结合内容向量和关键词向量"""
# 内容向量搜索
content_search_param = {
"data": [query_embedding],
"anns_field": "embedding",
"param": {
"metric_type": "L2",
"params": {"nprobe": 10}
},
"limit": top_k * 2
}
# 关键词向量搜索
keyword_search_param = {
"data": [keyword_embedding],
"anns_field": "keyword_embedding",
"param": {
"metric_type": "IP",
"params": {"nprobe": 10}
},
"limit": top_k * 2
}
# 执行混合搜索
search_result_1 = AnnSearchRequest(**content_search_param)
search_result_2 = AnnSearchRequest(**keyword_search_param)
results = collection.hybrid_search(
reqs=[search_result_1, search_result_2],
rerank=RRFRanker(),
limit=top_k,
output_fields=["id", "title", "category", "price"]
)
return results
def muli_vector_hybrid_search():
# 执行混合搜索
content_query = [random.random() for _ in range(128)]
keyword_query = [random.random() for _ in range(64)]
hybrid_results = muli_vector_hybrid_search_data(content_query, keyword_query, top_k=5)
print("\n混合搜索结果:")
for i, result in enumerate(hybrid_results[0]):
print(f"排名 {i + 1}: ID={result.id}, 标题={result.entity.get('title')}")
8. 标量查询
def scalar_query(expr, limit=100):
"""基础标量查询"""
results = collection.query(
expr=expr,
output_fields=["id", "title", "category", "price", "rating", "is_active"],
limit=limit
)
return results
# --- 简单标量搜索 ---
def simple_scalar_query():
# 各种标量查询示例
print("\n--- 标量查询示例 --- :")
# 1. 等值查询
results = scalar_query('category == "电子产品"')
print(f"电子产品数量: {len(results)}")
# 2. 范围查询
results = scalar_query('price >= 100.0 and price <= 500.0')
print(f"价格在100-500之间的商品数量: {len(results)}")
# 3. 多条件组合查询
results = scalar_query('category == "服装" and rating >= 4 and is_active == True')
print(f"高评分服装商品数量: {len(results)}")
# 4. 数组字段查询
results = scalar_query('array_contains(tags, "热门")')
print(f"热门商品数量: {len(results)}")
# 5. 时间范围查询
current_time = int(datetime.now().timestamp())
one_week_ago = current_time - 7 * 24 * 3600
results = scalar_query(f'created_time >= {one_week_ago}')
print(f"一周内创建的商品数量: {len(results)}")
# --- 复杂标量查询 ---
def complex_scalar_query():
"""复杂标量查询示例"""
print("\n--- 复杂标量查询示例 --- :")
# 1. IN查询
results = scalar_query('category in ["电子产品", "服装"]')
print(f"电子产品和服装商品数量: {len(results)}")
# 2. LIKE查询(模糊匹配)
results = scalar_query('title like "商品标题_1%"')
print(f"标题以'商品标题_1'开头的商品数量: {len(results)}")
# 3. 数组包含多个元素
results = scalar_query('array_contains_any(tags, ["热门", "推荐"])')
print(f"热门或推荐商品数量: {len(results)}")
# 4. 复杂组合条件
complex_expr = '''
(category == "电子产品" and price > 200.0) or
(category == "服装" and rating >= 4) and
is_active == True
'''
results = scalar_query(complex_expr)
print(f"复杂条件查询结果数量: {len(results)}")
def vector_search_with_scalar_filter(query_vector, filter_expr, top_k=10):
"""带标量过滤的向量搜索"""
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=top_k,
expr=filter_expr,
output_fields=["id", "title", "category", "price", "rating"]
)
return results
9. 组合查询(标量+向量)
def vector_search_with_scalar():
# 带过滤的向量搜索
query_vector = [random.random() for _ in range(128)]
filter_expr = 'category == "电子产品" and price < 500.0 and rating >= 3'
filtered_results = vector_search_with_scalar_filter(query_vector, filter_expr, top_k=5)
print("\n=== 带标量过滤的向量搜索结果 ===")
for i, result in enumerate(filtered_results[0]):
print(f"排名 {i + 1}: ID={result.id}, 标题={result.entity.get('title')}, "
f"价格={result.entity.get('price')}, 评分={result.entity.get('rating')}")
10. 数据更新
def update_record(record_id, update_fields):
"""更新记录(标量和向量字段)"""
# 1. 查询原记录
expr = f"id == {record_id}"
original_data = collection.query(
expr=expr,
output_fields=["id", "embedding", "keyword_embedding", "title", "content",
"category", "subcategory", "price", "rating", "is_active",
"created_time", "tags"]
)
if not original_data:
print(f"记录 {record_id} 不存在")
return False
original = original_data[0]
# 2. 删除原记录
collection.delete(expr=expr)
# 3. 准备新数据(保留未更新的字段)
new_data = [
[original['id']],
[update_fields.get('embedding', original['embedding'])],
[update_fields.get('keyword_embedding', original.get('keyword_embedding', [0] * 64))],
[update_fields.get('title', original['title'])],
[update_fields.get('content', original['content'])],
[update_fields.get('category', original['category'])],
[update_fields.get('subcategory', original['subcategory'])],
[update_fields.get('price', original['price'])],
[update_fields.get('rating', original['rating'])],
[update_fields.get('is_active', original['is_active'])],
[original['created_time']], # 保持创建时间不变
[update_fields.get('tags', original.get('tags', []))]
]
# 4. 插入更新后的记录
collection.insert(new_data)
return True
# 更新操作(标量和向量)
def update():
# 验证更新结果
updated_data_pre = collection.query(
expr="id == 10",
output_fields=["id", "title", "price", "rating", "tags"]
)
print("\n更新前的数据:", updated_data_pre[0])
# 更新示例
update_fields = {
'title': '更新后的商品标题003',
'price': 199.99,
'rating': 4,
'tags': ['热门', '新品', '促销', '限时']
}
success = update_record(record_id=10, update_fields=update_fields)
if success:
sleep(2)
print("\n记录更新成功")
# 验证更新结果
updated_data = collection.query(
expr="id == 10",
output_fields=["id", "title", "price", "rating", "tags"]
)
print("\n更新后的数据:", updated_data[0])
11. 数据删除
def delete_by_conditions(expr):
"""根据标量条件删除记录"""
result = collection.delete(expr=expr)
return result
# 删除
def delete():
# 各种删除操作示例
print("\n=== 删除操作示例 ===")
# 1. 删除单个记录
delete_result = delete_by_conditions("id == 50")
print(f"\n删除ID为50的记录数: {len(delete_result.primary_keys)}")
# 2. 删除满足条件的记录
delete_result = delete_by_conditions('category == "食品" and price < 50.0')
print(f"\n删除价格低于50的食品记录数: {len(delete_result.primary_keys)}")
# 3. 删除多个分类的记录
delete_result = delete_by_conditions('category in ["图书", "家居"]')
print(f"\n删除图书和家居记录数: {len(delete_result.primary_keys)}")
# 4. 删除时间范围外的记录
old_time = int(datetime.now().timestamp()) - 30 * 24 * 3600 # 30天前
delete_result = delete_by_conditions(f'created_time < {old_time}')
print(f"\n删除30天前的记录数: {len(delete_result.primary_keys)}")
12. 统计与聚合
def get_collection_stats():
"""获取集合统计信息"""
print("=== 集合统计信息 ===")
print(f"总实体数: {collection.num_entities}")
# 查询不同分类的数量
categories = ["电子产品", "服装", "食品", "图书", "家居"]
for category in categories:
results = collection.query(
expr=f'category == "{category}"',
output_fields=["id"]
)
print(f"{category}分类商品数: {len(results)}")
# 查询评分分布
for rating in range(1, 6):
results = collection.query(
expr=f'rating == {rating}',
output_fields=["id"]
)
print(f"{rating}星评分商品数: {len(results)}")
13. main方法
if __name__ == '__main__':
# 创建集合
# create_collection()
# 创建索引
# create_index(collection)
# 插入数据
# insert_collection(collection)
# 单向量搜索
# single_vector_search()
# 混合向量搜索
# muli_vector_hybrid_search()
# 标量查询
# simple_scalar_query()
# 复杂标量查询
# complex_scalar_query()
# 混合查询
# vector_search_with_scalar()
# 更新数据
# update()
# 删除
# delete()
# 统计和聚合
# get_collection_stats()
# 清理资源
# cleanup()
# 性能优化和最佳实践
performance_tips()