项目结构
opensearch-demo/
├── docker-compose.yml
├── requirements.txt # Add this file
├── opensearch_client.py
├── README.md
├── .venv # Keep if virtual environment is used
├── .gitignore # Keep for Git ignore rules
├── .python-version # Keep if specific Python version is required
├── opensearch_demo.ipynb # Keep if Jupyter notebook is useful
└── pyproject.toml # Keep if using Poetry or similar tool
功能特性
- ✅ Docker Compose 一键启动 OpenSearch 集群
- ✅ OpenSearch Dashboards 可视化界面
- ✅ Python 客户端连接和操作
- ✅ 完整的 CRUD 操作演示
- ✅ 全文搜索、精确匹配、范围查询
- ✅ 聚合查询和统计分析
1. Docker Compose 配置
创建 docker-compose.yml
文件:
services:
opensearch:
container_name: opensearch-demo
image: opensearchproject/opensearch:2.19.1
environment:
- cluster.name=opensearch-cluster
- node.name=opensearch-node1
- discovery.type=single-node
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=StrongPass123!@
- plugins.security.disabled=true
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- opensearch-data:/usr/share/opensearch/data
ports:
- "9200:9200"
- "9600:9600"
networks:
- opensearch-net
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:9200 || exit 1"]
interval: 30s
timeout: 10s
retries: 5
opensearch-dashboards:
container_name: opensearch-dashboards-demo
image: opensearchproject/opensearch-dashboards:2.19.1
ports:
- "5601:5601"
expose:
- "5601"
environment:
OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
networks:
- opensearch-net
depends_on:
opensearch:
condition: service_healthy
volumes:
opensearch-data:
networks:
opensearch-net:
2. Python 环境准备
uv init
uv venv
.venv/Script/activate
uv pip install opensearch-py requests pandas
3. opensearch_client 客户端
创建 opensearch_client.py
文件:
import os
from opensearchpy import OpenSearch
from typing import Dict, List
class OpenSearchClient:
def __init__(self):
self.host = 'localhost'
self.port = 9200
self.auth = ('admin', os.getenv('OPENSEARCH_PASSWORD', 'Admin123!'))
# 创建OpenSearch客户端
self.client = OpenSearch(
hosts=[{'host': self.host, 'port': self.port}],
http_auth=self.auth,
use_ssl=False,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
def test_connection(self) -> bool:
"""测试连接"""
try:
info = self.client.info()
print(f"连接成功! OpenSearch版本: {info['version']['number']}")
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def create_index(self, index_name: str, mapping: Dict = None) -> bool:
"""创建索引"""
try:
if self.client.indices.exists(index=index_name):
print(f"索引 '{index_name}' 已存在")
return True
body = {}
if mapping:
body['mappings'] = mapping
response = self.client.indices.create(index=index_name, body=body)
print(f"索引 '{index_name}' 创建成功")
return True
except Exception as e:
print(f"创建索引失败: {e}")
return False
def delete_index(self, index_name: str) -> bool:
"""删除索引"""
try:
if not self.client.indices.exists(index=index_name):
print(f"索引 '{index_name}' 不存在")
return True
self.client.indices.delete(index=index_name)
print(f"索引 '{index_name}' 删除成功")
return True
except Exception as e:
print(f"删除索引失败: {e}")
return False
def index_document(self, index_name: str, doc_id: str, document: Dict) -> bool:
"""索引文档"""
try:
response = self.client.index(
index=index_name,
id=doc_id,
body=document
)
print(f"文档 '{doc_id}' 索引成功")
return True
except Exception as e:
print(f"索引文档失败: {e}")
return False
def get_document(self, index_name: str, doc_id: str) -> Dict:
"""获取文档"""
try:
response = self.client.get(index=index_name, id=doc_id)
return response['_source']
except Exception as e:
print(f"获取文档失败: {e}")
return {}
def search_documents(self, index_name: str, query: Dict) -> List[Dict]:
"""搜索文档"""
try:
response = self.client.search(index=index_name, body=query)
hits = response['hits']['hits']
return [hit['_source'] for hit in hits]
except Exception as e:
print(f"搜索失败: {e}")
return []
def bulk_index(self, index_name: str, documents: List[Dict]) -> bool:
"""批量索引文档"""
try:
actions = []
for i, doc in enumerate(documents):
action = {
"_index": index_name,
"_id": doc.get('id', i),
"_source": doc
}
actions.append(action)
response = self.client.bulk(body=actions)
if response['errors']:
print("批量索引过程中出现错误")
return False
else:
print(f"成功批量索引 {len(documents)} 个文档")
return True
except Exception as e:
print(f"批量索引失败: {e}")
return False
def get_cluster_health(self) -> Dict:
"""获取集群健康状态"""
try:
return self.client.cluster.health()
except Exception as e:
print(f"获取集群健康状态失败: {e}")
return {}
def list_indices(self) -> List[str]:
"""列出所有索引"""
try:
response = self.client.cat.indices(format='json')
return [index['index'] for index in response]
except Exception as e:
print(f"获取索引列表失败: {e}")
return []
4、快速开始
启动服务
# 启动 OpenSearch 服务
docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f opensearch
访问 Web 界面
- OpenSearch API: http://localhost:9200
- OpenSearch Dashboards: http://localhost:5601
OpenSearch 基本操作演示
这个notebook演示了如何使用Python连接OpenSearch并进行基本操作。
# 导入必要的库
from opensearch_client import OpenSearchClient
import json
import pandas as pd
1. 连接到OpenSearch
# 创建OpenSearch客户端
client = OpenSearchClient()
# 测试连接
client.test_connection()
连接成功! OpenSearch版本: 2.19.1
True
2. 查看集群健康状态
# 获取集群健康状态
health = client.get_cluster_health()
print(json.dumps(health, indent=2))
{
"cluster_name": "opensearch-cluster",
"status": "green",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"discovered_master": true,
"discovered_cluster_manager": true,
"active_primary_shards": 4,
"active_shards": 4,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100.0
}
3. 创建索引
# 定义索引映射
mapping = {
'properties': {
'title': {'type': 'text'},
'content': {'type': 'text'},
'author': {'type': 'keyword'},
'publish_date': {'type': 'date'},
'tags': {'type': 'keyword'},
'views': {'type': 'integer'}
}
}
# 创建索引
index_name = 'blog_posts'
client.create_index(index_name, mapping)
索引 'blog_posts' 创建成功
True
4. 索引单个文档
# 创建示例文档
document = {
'title': 'OpenSearch入门指南',
'content': '这是一篇关于OpenSearch基本使用的文章...',
'author': '张三',
'publish_date': '2024-01-15',
'tags': ['opensearch', '搜索引擎', '教程'],
'views': 1250
}
# 索引文档
client.index_document(index_name, '1', document)
文档 '1' 索引成功
True
5. 批量索引文档
# 创建多个示例文档
documents = [
{
'id': '2',
'title': 'Elasticsearch vs OpenSearch',
'content': '比较Elasticsearch和OpenSearch的异同点...',
'author': '李四',
'publish_date': '2024-01-20',
'tags': ['elasticsearch', 'opensearch', '比较'],
'views': 890
},
{
'id': '3',
'title': '搜索引擎优化技巧',
'content': '提高搜索性能的各种技巧和最佳实践...',
'author': '王五',
'publish_date': '2024-01-25',
'tags': ['优化', '性能', '搜索'],
'views': 2100
},
{
'id': '4',
'title': 'Python与OpenSearch集成',
'content': '如何在Python项目中集成OpenSearch...',
'author': '张三',
'publish_date': '2024-02-01',
'tags': ['python', 'opensearch', '集成'],
'views': 1560
}
]
# 批量索引
client.bulk_index(index_name, documents)
成功批量索引 3 个文档
True
6. 获取单个文档
# 获取文档
doc = client.get_document(index_name, '1')
print(json.dumps(doc, indent=2, ensure_ascii=False))
{
"title": "OpenSearch入门指南",
"content": "这是一篇关于OpenSearch基本使用的文章...",
"author": "张三",
"publish_date": "2024-01-15",
"tags": [
"opensearch",
"搜索引擎",
"教程"
],
"views": 1250
}
7. 搜索文档
# 简单文本搜索
query = {
'query': {
'match': {
'content': 'OpenSearch'
}
}
}
results = client.search_documents(index_name, query)
print(f'找到 {len(results)} 个结果:')
for i, result in enumerate(results, 1):
print(f'{i}. {result["title"]} - 作者: {result["author"]}')
找到 3 个结果:
1. Elasticsearch vs OpenSearch - 作者: 李四
2. Python与OpenSearch集成 - 作者: 张三
3. OpenSearch入门指南 - 作者: 张三
# 复合查询 - 搜索特定作者的文章
query = {
'query': {
'bool': {
'must': [
{'term': {'author': '张三'}}
]
}
}
}
results = client.search_documents(index_name, query)
print(f'张三的文章 ({len(results)} 篇):')
for result in results:
print(f'- {result["title"]} (浏览量: {result["views"]})')
张三的文章 (2 篇):
- OpenSearch入门指南 (浏览量: 1250)
- Python与OpenSearch集成 (浏览量: 1560)
# 范围查询 - 查找浏览量大于1000的文章
query = {
'query': {
'range': {
'views': {
'gt': 1000
}
}
},
'sort': [
{'views': {'order': 'desc'}}
]
}
results = client.search_documents(index_name, query)
print('浏览量大于1000的文章 (按浏览量降序):')
for result in results:
print(f'- {result["title"]} (浏览量: {result["views"]})')
浏览量大于1000的文章 (按浏览量降序):
- 搜索引擎优化技巧 (浏览量: 2100)
- Python与OpenSearch集成 (浏览量: 1560)
- OpenSearch入门指南 (浏览量: 1250)
8. 聚合查询
# 聚合查询 - 按作者统计文章数量
query = {
'size': 0,
'aggs': {
'authors': {
'terms': {
'field': 'author'
}
},
'avg_views': {
'avg': {
'field': 'views'
}
}
}
}
try:
response = client.client.search(index=index_name, body=query)
print('作者文章统计:')
for bucket in response['aggregations']['authors']['buckets']:
print(f'- {bucket["key"]}: {bucket["doc_count"]} 篇文章')
avg_views = response['aggregations']['avg_views']['value']
print(f'\n平均浏览量: {avg_views:.2f}')
except Exception as e:
print(f'聚合查询失败: {e}')
作者文章统计:
- 张三: 2 篇文章
- 李四: 1 篇文章
- 王五: 1 篇文章
平均浏览量: 1450.00
9. 使用Pandas展示结果
# 获取所有文档并转换为DataFrame
query = {'query': {'match_all': {}}}
all_docs = client.search_documents(index_name, query)
df = pd.DataFrame(all_docs)
print('所有文章数据:')
print(df[['title', 'author', 'views', 'publish_date']])
所有文章数据:
title author views publish_date
0 OpenSearch入门指南 张三 1250 2024-01-15
1 Elasticsearch vs OpenSearch 李四 890 2024-01-20
2 搜索引擎优化技巧 王五 2100 2024-01-25
3 Python与OpenSearch集成 张三 1560 2024-02-01
# 简单的数据分析
print('数据统计:')
print(f'总文章数: {len(df)}')
print(f'总浏览量: {df["views"].sum()}')
print(f'平均浏览量: {df["views"].mean():.2f}')
print(f'最高浏览量: {df["views"].max()}')
print(f'最低浏览量: {df["views"].min()}')
print('按作者分组:')
author_stats = df.groupby('author').agg({
'title': 'count',
'views': ['sum', 'mean']
}).round(2)
print(author_stats)
数据统计:
总文章数: 4
总浏览量: 5800
平均浏览量: 1450.00
最高浏览量: 2100
最低浏览量: 890
按作者分组:
title views
count sum mean
author
张三 2 2810 1405.0
李四 1 890 890.0
王五 1 2100 2100.0
10. 清理资源
# 列出所有索引
indices = client.list_indices()
print('当前索引:', indices)
当前索引: ['.plugins-ml-config', '.opensearch-observability', 'blog_posts', '.kibana_1']
client.delete_index(index_name)
print(f'索引 {index_name} 已删除')
索引 'blog_posts' 删除成功
索引 blog_posts 已删除