单文件:
import alibabacloud_oss_v2 as oss
# 配置
accessKeyId = "***************"
accessKeySecret = "#########"
customRegion = "cn-%%%%%%"
customeBucket = "&&&&&&&&&"
# 配置静态访问凭证(生产环境不推荐硬编码)
static_cred = oss.credentials.StaticCredentialsProvider(
access_key_id=accessKeyId,
access_key_secret=accessKeySecret,
)
def init_client():
cfg = oss.config.load_default()
cfg.credentials_provider = static_cred
cfg.region = customRegion
return oss.Client(cfg)
def multi_upload(files_map):
"""
:param files_map: 字典类型 { "目标object路径": "本地文件绝对路径" }
"""
client = init_client()
uploader = client.uploader()
for obj_key, local_path in files_map.items():
try:
request = oss.PutObjectRequest(bucket=customeBucket, key=obj_key)
result = uploader.upload_file(
request,
filepath=local_path,
part_size=10 * 1024 * 1024, # 分片大小10MB
part_num=3, # 并发分片数
)
print(f"文件 {local_path} 上传成功,ETag: {result.etag}")
except oss as e:
print(f"上传失败:{e.message}")
if __name__ == "__main__":
# 定义多文件映射(目标路径:本地路径)
file_mapping = {
"sunwukongtest/index.html": "C:/Users/57307/Desktop/Test/TestHtml/index.html",
"sunwukongtest/index1.html": "C:/Users/57307/Desktop/Test/TestHtml/index1.html",
"sunwukongtest/index2.html": "C:/Users/57307/Desktop/Test/TestHtml/index2.html",
"sunwukongtest/index3.html": "C:/Users/57307/Desktop/Test/TestHtml/index3.html",
"sunwukongtest/index4.html": "C:/Users/57307/Desktop/Test/TestHtml/index4.html",
}
multi_upload(file_mapping)
分片上传
import os
import alibabacloud_oss_v2 as oss
# 配置
accessKeyId = "***************"
accessKeySecret = "#########"
customRegion = "cn-%%%%%%"
customeBucket = "&&&&&&&&&"
# 配置静态访问凭证(生产环境不推荐硬编码)
static_cred = oss.credentials.StaticCredentialsProvider(
access_key_id=accessKeyId,
access_key_secret=accessKeySecret,
)
def init_client():
cfg = oss.config.load_default()
cfg.credentials_provider = static_cred
cfg.region = customRegion
return oss.Client(cfg)
def multi_upload(files_map):
"""
:param files_map: 字典类型 { "目标object路径": "本地文件绝对路径" }
"""
client = init_client()
for obj_key, local_path in files_map.items():
# 初始化分片上传请求,获取upload_id用于后续分片上传
result = client.initiate_multipart_upload(
oss.InitiateMultipartUploadRequest(
bucket=customeBucket,
key=obj_key,
)
)
# 定义每个分片的大小为5MB
part_size = 5 * 1024 * 1024
# 获取要上传文件的总大小
data_size = os.path.getsize(local_path)
# 初始化分片编号,从1开始
part_number = 1
# 存储每个分片上传的结果
upload_parts = []
# 打开文件以二进制模式读取
with open(local_path, "rb") as f:
# 遍历文件,按照part_size分片上传
for start in range(0, data_size, part_size):
n = part_size
if start + n > data_size: # 处理最后一个分片可能小于part_size的情况
n = data_size - start
# 创建SectionReader来读取文件的特定部分
reader = oss.io_utils.SectionReader(oss.io_utils.ReadAtReader(f), start, n)
# 上传分片
up_result = client.upload_part(
oss.UploadPartRequest(
bucket=customeBucket,
key=obj_key,
upload_id=result.upload_id,
part_number=part_number,
body=reader,
)
)
# 打印每个分片上传的结果信息
print(
f"status code: {up_result.status_code},"
f" request id: {up_result.request_id},"
f" part number: {part_number},"
f" content md5: {up_result.content_md5},"
f" etag: {up_result.etag},"
f" hash crc64: {up_result.hash_crc64},"
)
# 将分片上传结果保存到列表中
upload_parts.append(
oss.UploadPart(part_number=part_number, etag=up_result.etag)
)
# 增加分片编号
part_number += 1
# 对上传的分片按照分片编号排序
parts = sorted(upload_parts, key=lambda p: p.part_number)
# 发送完成分片上传请求,合并所有分片为一个完整的对象
result = client.complete_multipart_upload(
oss.CompleteMultipartUploadRequest(
bucket=customeBucket,
key=obj_key,
upload_id=result.upload_id,
complete_multipart_upload=oss.CompleteMultipartUpload(parts=parts),
)
)
# 下面的代码是另一种方式,通过服务器端列出并合并所有分片数据为一个完整的对象
# 这种方法适用于当您不确定所有分片是否都已成功上传时
# Merge fragmented data into a complete Object through the server-side List method
# result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
# bucket=args.bucket,
# key=args.key,
# upload_id=result.upload_id,
# complete_all='yes'
# ))
# 输出完成分片上传的结果信息
print(
f"status code: {result.status_code},"
f" request id: {result.request_id},"
f" bucket: {result.bucket},"
f" key: {result.key},"
f" location: {result.location},"
f" etag: {result.etag},"
f" encoding type: {result.encoding_type},"
f" hash crc64: {result.hash_crc64},"
f" version id: {result.version_id},"
)
if __name__ == "__main__":
# 定义多文件映射(目标路径:本地路径)
file_mapping = {
"sunwukongtest/indx.html": "C:/Users/57307/Desktop/Test/TestHtml/index.html",
}
multi_upload(file_mapping)