目录
MinIO 是一个高性能、分布式对象存储系统,兼容 Amazon S3 API,适用于大规模数据存储和管理。它支持多种部署方式,包括 Docker 和 Kubernetes,使其能够轻松集成到现代云原生架构中。通过其简单直观的 API,开发者可以快速实现文件上传、下载、元数据管理、预签名 URL 生成等功能。
下面通过 docker-compose
快速搭建了一个 MinIO 服务,并使用 Python 客户端库实现了完整的对象存储操作。这些功能包括:
- 存储桶管理:创建、列出和删除存储桶。
- 对象操作:上传文本/JSON 数据、下载文件、读取内容、生成预签名 URL。
- 元数据处理:自定义对象元数据并进行检索。
- 批量删除:高效地批量删除多个对象。
- 安全性与可扩展性:结合
.env
环境变量配置敏感信息,确保开发安全;同时利用 MinIO 的分布式能力支持横向扩展。
MinIO 非常适合用于 AI/ML、大数据分析、日志存储等场景,是构建私有云存储平台的理想选择。
1. 创建Docker Compose文件
首先创建 docker-compose.yml
文件:
version: '3.8'
services:
minio:
image: quay.io/minio/minio:RELEASE.2023-12-20T01-00-02Z
container_name: ragflow-minio
command: server --console-address ":9001" /data
ports:
- ${MINIO_PORT}:9000
- ${MINIO_CONSOLE_PORT}:9001
env_file: .env
environment:
- MINIO_ROOT_USER=${MINIO_USER}
- MINIO_ROOT_PASSWORD=${MINIO_PASSWORD}
- TZ=${TIMEZONE}
volumes:
- minio_data:/data
networks:
- ragflow
restart: on-failure
volumes:
minio_data:
networks:
ragflow:
driver: bridge
2. 创建环境变量文件
创建 .env
文件:
# MinIO配置
MINIO_PORT=9000
MINIO_CONSOLE_PORT=9001
MINIO_USER=minioadmin
MINIO_PASSWORD=minioadmin123
TIMEZONE=Asia/Shanghai
3. 创建Python连接和操作脚本
创建 minio_client.py
文件:
from minio import Minio
from minio.error import S3Error
import os
from datetime import timedelta
import io
class MinIOClient:
def __init__(self, endpoint="localhost:9000", access_key="minioadmin", secret_key="minioadmin123", secure=False):
"""
初始化MinIO客户端
Args:
endpoint: MinIO服务地址
access_key: 访问密钥
secret_key: 秘密密钥
secure: 是否使用HTTPS
"""
self.client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
def create_bucket(self, bucket_name):
"""
创建存储桶
Args:
bucket_name: 存储桶名称
"""
try:
if not self.client.bucket_exists(bucket_name):
self.client.make_bucket(bucket_name)
print(f"存储桶 '{bucket_name}' 创建成功")
else:
print(f"存储桶 '{bucket_name}' 已存在")
except S3Error as e:
print(f"创建存储桶失败: {e}")
def list_buckets(self):
"""
列出所有存储桶
"""
try:
buckets = self.client.list_buckets()
print("存储桶列表:")
for bucket in buckets:
print(f" - {bucket.name} (创建时间: {bucket.creation_date})")
return buckets
except S3Error as e:
print(f"列出存储桶失败: {e}")
return []
def upload_file(self, bucket_name, object_name, file_path):
"""
上传文件
Args:
bucket_name: 存储桶名称
object_name: 对象名称(在MinIO中的文件名)
file_path: 本地文件路径
"""
try:
self.client.fput_object(bucket_name, object_name, file_path)
print(f"文件 '{file_path}' 上传成功,对象名: '{object_name}'")
except S3Error as e:
print(f"上传文件失败: {e}")
def upload_data(self, bucket_name, object_name, data):
"""
上传数据(字符串或字节)
Args:
bucket_name: 存储桶名称
object_name: 对象名称
data: 要上传的数据
"""
try:
if isinstance(data, str):
data = data.encode('utf-8')
data_stream = io.BytesIO(data)
self.client.put_object(
bucket_name,
object_name,
data_stream,
length=len(data)
)
print(f"数据上传成功,对象名: '{object_name}'")
except S3Error as e:
print(f"上传数据失败: {e}")
def download_file(self, bucket_name, object_name, file_path):
"""
下载文件
Args:
bucket_name: 存储桶名称
object_name: 对象名称
file_path: 本地保存路径
"""
try:
self.client.fget_object(bucket_name, object_name, file_path)
print(f"文件下载成功,保存到: '{file_path}'")
except S3Error as e:
print(f"下载文件失败: {e}")
def get_object_data(self, bucket_name, object_name):
"""
获取对象数据
Args:
bucket_name: 存储桶名称
object_name: 对象名称
Returns:
对象数据(字节)
"""
try:
response = self.client.get_object(bucket_name, object_name)
data = response.read()
response.close()
response.release_conn()
return data
except S3Error as e:
print(f"获取对象数据失败: {e}")
return None
def list_objects(self, bucket_name, prefix=None):
"""
列出存储桶中的对象
Args:
bucket_name: 存储桶名称
prefix: 对象名前缀(可选)
"""
try:
objects = self.client.list_objects(bucket_name, prefix=prefix)
print(f"存储桶 '{bucket_name}' 中的对象:")
for obj in objects:
print(f" - {obj.object_name} (大小: {obj.size} bytes, 修改时间: {obj.last_modified})")
except S3Error as e:
print(f"列出对象失败: {e}")
def delete_object(self, bucket_name, object_name):
"""
删除对象
Args:
bucket_name: 存储桶名称
object_name: 对象名称
"""
try:
self.client.remove_object(bucket_name, object_name)
print(f"对象 '{object_name}' 删除成功")
except S3Error as e:
print(f"删除对象失败: {e}")
def delete_bucket(self, bucket_name):
"""
删除存储桶(必须为空)
Args:
bucket_name: 存储桶名称
"""
try:
self.client.remove_bucket(bucket_name)
print(f"存储桶 '{bucket_name}' 删除成功")
except S3Error as e:
print(f"删除存储桶失败: {e}")
def generate_presigned_url(self, bucket_name, object_name, expires=timedelta(hours=1)):
"""
生成预签名URL
Args:
bucket_name: 存储桶名称
object_name: 对象名称
expires: 过期时间
Returns:
预签名URL
"""
try:
url = self.client.presigned_get_object(bucket_name, object_name, expires=expires)
print(f"预签名URL生成成功: {url}")
return url
except S3Error as e:
print(f"生成预签名URL失败: {e}")
return None
4. 启动和使用说明
启动MinIO服务
docker-compose up -d
环境安装
uv init
uv venv
.venv/Script/activate
uv pip install minio notebook
5、MinIO基本操作示例
下面演示如何使用Python连接MinIO并进行基本的对象存储操作。
前提条件
- 确保MinIO服务已启动:
docker-compose up -d
- 安装依赖:
uv pip install minio jupyter
# 导入必要的库
from minio import Minio
from minio.error import S3Error
import io
import json
from datetime import timedelta
import os
1. 连接MinIO服务器
# 创建MinIO客户端
client = Minio(
endpoint='localhost:9000',
access_key='minioadmin',
secret_key='minioadmin123',
secure=False # HTTP连接,生产环境建议使用HTTPS
)
print('MinIO客户端创建成功!')
MinIO客户端创建成功!
2. 存储桶(Bucket)操作
# 列出所有存储桶
try:
buckets = client.list_buckets()
print('现有存储桶:')
for bucket in buckets:
print(f' - {bucket.name} (创建时间: {bucket.creation_date})')
except S3Error as e:
print(f'列出存储桶失败: {e}')
现有存储桶:
# 创建新的存储桶
bucket_name = 'demo-bucket'
try:
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
print(f'存储桶 "{bucket_name}" 创建成功')
else:
print(f'存储桶 "{bucket_name}" 已存在')
except S3Error as e:
print(f'创建存储桶失败: {e}')
存储桶 "demo-bucket" 创建成功
3. 对象(Object)上传操作
# 上传文本数据
text_data = "这是一个测试文件的内容\n包含中文字符\n用于演示MinIO的基本功能"
try:
# 将字符串转换为字节流
data_bytes = text_data.encode('utf-8')
data_stream = io.BytesIO(data_bytes)
# 上传数据
client.put_object(
bucket_name=bucket_name,
object_name='test.txt',
data=data_stream,
length=len(data_bytes),
content_type='text/plain'
)
print('文本文件上传成功')
except S3Error as e:
print(f'上传失败: {e}')
文本文件上传成功
# 上传JSON数据
json_data = {
'name': '张三',
'age': 30,
'city': '北京',
'hobbies': ['读书', '旅行', '编程']
}
try:
# 将JSON转换为字节流
json_bytes = json.dumps(json_data, ensure_ascii=False, indent=2).encode('utf-8')
json_stream = io.BytesIO(json_bytes)
# 上传JSON数据
client.put_object(
bucket_name=bucket_name,
object_name='data.json',
data=json_stream,
length=len(json_bytes),
content_type='application/json'
)
print('JSON文件上传成功')
except S3Error as e:
print(f'上传失败: {e}')
JSON文件上传成功
# 创建并上传本地文件
local_file = 'sample.txt'
# 创建本地文件
with open(local_file, 'w', encoding='utf-8') as f:
f.write('这是一个本地文件\n')
f.write('用于演示文件上传功能\n')
f.write('MinIO对象存储服务')
try:
# 上传本地文件
client.fput_object(bucket_name, 'uploaded_file.txt', local_file)
print(f'本地文件 {local_file} 上传成功')
# 删除本地文件
os.remove(local_file)
print(f'本地文件 {local_file} 已删除')
except S3Error as e:
print(f'上传失败: {e}')
本地文件 sample.txt 上传成功
本地文件 sample.txt 已删除
4. 列出对象
# 列出存储桶中的所有对象
try:
objects = client.list_objects(bucket_name)
print(f'存储桶 "{bucket_name}" 中的对象:')
for obj in objects:
print(f' - {obj.object_name}')
print(f' 大小: {obj.size} bytes')
print(f' 修改时间: {obj.last_modified}')
print(f' ETag: {obj.etag}')
print()
except S3Error as e:
print(f'列出对象失败: {e}')
存储桶 "demo-bucket" 中的对象:
- data.json
大小: 116 bytes
修改时间: 2025-06-20 06:36:40.507000+00:00
ETag: 4dde3c79af4dc9daec0d6732299fd978
- test.txt
大小: 85 bytes
修改时间: 2025-06-20 06:35:24.762000+00:00
ETag: 94f45a1da8526a43e60345353fa49828
- uploaded_file.txt
大小: 81 bytes
修改时间: 2025-06-20 06:36:54.572000+00:00
ETag: 3e067e47a4a226f953fae4e395f54fba
5. 下载和读取对象
# 读取文本文件内容
try:
response = client.get_object(bucket_name, 'test.txt')
content = response.read().decode('utf-8')
response.close()
response.release_conn()
print('文本文件内容:')
print(content)
except S3Error as e:
print(f'读取文件失败: {e}')
文本文件内容:
这是一个测试文件的内容
包含中文字符
用于演示MinIO的基本功能
# 读取JSON文件内容
try:
response = client.get_object(bucket_name, 'data.json')
json_content = json.loads(response.read().decode('utf-8'))
response.close()
response.release_conn()
print('JSON文件内容:')
print(json.dumps(json_content, ensure_ascii=False, indent=2))
except S3Error as e:
print(f'读取JSON文件失败: {e}')
JSON文件内容:
{
"name": "张三",
"age": 30,
"city": "北京",
"hobbies": [
"读书",
"旅行",
"编程"
]
}
# 下载文件到本地
download_file = 'downloaded_file.txt'
try:
client.fget_object(bucket_name, 'uploaded_file.txt', download_file)
print(f'文件下载成功: {download_file}')
# 读取下载的文件内容
with open(download_file, 'r', encoding='utf-8') as f:
content = f.read()
print('下载文件内容:')
print(content)
# 清理下载的文件
# os.remove(download_file)
# print(f'本地文件 {download_file} 已删除')
except S3Error as e:
print(f'下载文件失败: {e}')
文件下载成功: downloaded_file.txt
下载文件内容:
这是一个本地文件
用于演示文件上传功能
MinIO对象存储服务
6. 生成预签名URL
# 生成预签名下载URL(1小时有效)
try:
url = client.presigned_get_object(
bucket_name,
'test.txt',
expires=timedelta(hours=1)
)
print('预签名下载URL(1小时有效):')
print(url)
except S3Error as e:
print(f'生成预签名URL失败: {e}')
预签名下载URL(1小时有效):
http://localhost:9000/demo-bucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250620%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250620T064216Z&X-Amz-Expires=3600&X-Amz-SignedHeaders=host&X-Amz-Signature=8f9822f2994820097d6420df1414a24e1cedfa1f042299fbb1022fc5c4d8176e
# 生成预签名上传URL(30分钟有效)
try:
upload_url = client.presigned_put_object(
bucket_name,
'new_upload.txt',
expires=timedelta(minutes=30)
)
print('预签名上传URL(30分钟有效):')
print(upload_url)
except S3Error as e:
print(f'生成预签名上传URL失败: {e}')
预签名上传URL(30分钟有效):
http://localhost:9000/demo-bucket/new_upload.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250620%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250620T064649Z&X-Amz-Expires=1800&X-Amz-SignedHeaders=host&X-Amz-Signature=272f7c56aad68ea815a78732c3e4cc3cebd2dd3f397b3729c60787a3585a44f2
7. 对象元数据操作
# 获取对象信息
try:
stat = client.stat_object(bucket_name, 'test.txt')
print('对象信息:')
print(f' 对象名: {stat.object_name}')
print(f' 大小: {stat.size} bytes')
print(f' ETag: {stat.etag}')
print(f' 内容类型: {stat.content_type}')
print(f' 最后修改时间: {stat.last_modified}')
print(f' 元数据: {stat.metadata}')
except S3Error as e:
print(f'获取对象信息失败: {e}')
对象信息:
对象名: test.txt
大小: 85 bytes
ETag: 94f45a1da8526a43e60345353fa49828
内容类型: text/plain
最后修改时间: 2025-06-20 06:35:24+00:00
元数据: HTTPHeaderDict({'Accept-Ranges': 'bytes', 'Content-Length': '85', 'Content-Type': 'text/plain', 'ETag': '"94f45a1da8526a43e60345353fa49828"', 'Last-Modified': 'Fri, 20 Jun 2025 06:35:24 GMT', 'Server': 'MinIO', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'Vary': 'Origin, Accept-Encoding', 'X-Amz-Id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8', 'X-Amz-Request-Id': '184AAD60DBDA67DD', 'X-Content-Type-Options': 'nosniff', 'X-Xss-Protection': '1; mode=block', 'Date': 'Fri, 20 Jun 2025 06:47:16 GMT'})
# 上传带有自定义元数据的对象
import base64
metadata = {
'author': base64.b64encode('张三'.encode('utf-8')).decode('ascii'),
'department': base64.b64encode('技术部'.encode('utf-8')).decode('ascii'),
'project': base64.b64encode('MinIO演示'.encode('utf-8')).decode('ascii')
}
try:
data = '这是一个带有自定义元数据的文件'.encode('utf-8')
data_stream = io.BytesIO(data)
client.put_object(
bucket_name=bucket_name,
object_name='metadata_file.txt',
data=data_stream,
length=len(data),
metadata=metadata
)
print('带元数据的文件上传成功')
# 获取并显示元数据
stat = client.stat_object(bucket_name, 'metadata_file.txt')
print('自定义元数据:')
for key, value in stat.metadata.items():
print(f' {key}: {value}')
except S3Error as e:
print(f'操作失败: {e}')
带元数据的文件上传成功
自定义元数据:
Accept-Ranges: bytes
Content-Length: 45
Content-Type: application/octet-stream
ETag: "7a2d55727bfb2f17005600941e4ba45e"
Last-Modified: Fri, 20 Jun 2025 06:49:54 GMT
Server: MinIO
Strict-Transport-Security: max-age=31536000; includeSubDomains
Vary: Origin
Vary: Accept-Encoding
X-Amz-Id-2: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8
X-Amz-Request-Id: 184AAD859BF00043
X-Content-Type-Options: nosniff
X-Xss-Protection: 1; mode=block
x-amz-meta-author: 5byg5LiJ
x-amz-meta-department: 5oqA5pyv6YOo
x-amz-meta-project: TWluSU/mvJTnpLo=
Date: Fri, 20 Jun 2025 06:49:54 GMT
8. 删除操作
# 删除单个对象
try:
client.remove_object(bucket_name, 'metadata_file.txt')
print('对象 metadata_file.txt 删除成功')
except S3Error as e:
print(f'删除对象失败: {e}')
对象 metadata_file.txt 删除成功
# 批量删除对象
objects_to_delete = ['test.txt', 'data.json', 'uploaded_file.txt']
from minio.deleteobjects import DeleteObject
try:
# 创建删除对象的迭代器
delete_object_list = [DeleteObject(obj) for obj in objects_to_delete]
# 批量删除
errors = client.remove_objects(bucket_name, delete_object_list)
# 检查删除结果
error_occurred = False
for error in errors:
print(f'删除失败: {error}')
error_occurred = True
if not error_occurred:
print('所有对象删除成功')
except S3Error as e:
print(f'批量删除失败: {e}')
所有对象删除成功
# 删除存储桶(必须为空)
try:
# 确认存储桶为空
objects = list(client.list_objects(bucket_name))
if objects:
print(f'存储桶 {bucket_name} 不为空,包含 {len(objects)} 个对象')
print('请先删除所有对象再删除存储桶')
else:
client.remove_bucket(bucket_name)
print(f'存储桶 {bucket_name} 删除成功')
except S3Error as e:
print(f'删除存储桶失败: {e}')
存储桶 demo-bucket 删除成功