阿里云OSS 上传文件 Python版本

发布于:2025-06-08 ⋅ 阅读:(19) ⋅ 点赞:(0)

单文件:

import alibabacloud_oss_v2 as oss

# 配置

accessKeyId = "***************"
accessKeySecret = "#########"
customRegion = "cn-%%%%%%"
customeBucket = "&&&&&&&&&"

# 配置静态访问凭证(生产环境不推荐硬编码)
static_cred = oss.credentials.StaticCredentialsProvider(
 access_key_id=accessKeyId,
 access_key_secret=accessKeySecret,
)


def init_client():
 cfg = oss.config.load_default()
 cfg.credentials_provider = static_cred
 cfg.region = customRegion

 return oss.Client(cfg)


def multi_upload(files_map):
 """
    :param files_map: 字典类型 { "目标object路径": "本地文件绝对路径" }
    """
 client = init_client()
 uploader = client.uploader()

 for obj_key, local_path in files_map.items():
 try:
 request = oss.PutObjectRequest(bucket=customeBucket, key=obj_key)
 result = uploader.upload_file(
 request,
 filepath=local_path,
 part_size=10 * 1024 * 1024,  # 分片大小10MB
 part_num=3,  # 并发分片数
            )
 print(f"文件 {local_path} 上传成功,ETag: {result.etag}")
 except oss as e:
 print(f"上传失败:{e.message}")


if __name__ == "__main__":

 # 定义多文件映射(目标路径:本地路径)
 file_mapping = {
 "sunwukongtest/index.html": "C:/Users/57307/Desktop/Test/TestHtml/index.html",
 "sunwukongtest/index1.html": "C:/Users/57307/Desktop/Test/TestHtml/index1.html",
 "sunwukongtest/index2.html": "C:/Users/57307/Desktop/Test/TestHtml/index2.html",
 "sunwukongtest/index3.html": "C:/Users/57307/Desktop/Test/TestHtml/index3.html",
 "sunwukongtest/index4.html": "C:/Users/57307/Desktop/Test/TestHtml/index4.html",
    }

 multi_upload(file_mapping)

分片上传

import os
import alibabacloud_oss_v2 as oss

# 配置

accessKeyId = "***************"
accessKeySecret = "#########"
customRegion = "cn-%%%%%%"
customeBucket = "&&&&&&&&&"

# 配置静态访问凭证(生产环境不推荐硬编码)
static_cred = oss.credentials.StaticCredentialsProvider(
 access_key_id=accessKeyId,
 access_key_secret=accessKeySecret,
)


def init_client():
 cfg = oss.config.load_default()
 cfg.credentials_provider = static_cred
 cfg.region = customRegion

 return oss.Client(cfg)


def multi_upload(files_map):
 """
    :param files_map: 字典类型 { "目标object路径": "本地文件绝对路径" }
    """
 client = init_client()

 for obj_key, local_path in files_map.items():
 # 初始化分片上传请求,获取upload_id用于后续分片上传
 result = client.initiate_multipart_upload(
 oss.InitiateMultipartUploadRequest(
 bucket=customeBucket,
 key=obj_key,
            )
        )

 # 定义每个分片的大小为5MB
 part_size = 5 * 1024 * 1024

 # 获取要上传文件的总大小
 data_size = os.path.getsize(local_path)

 # 初始化分片编号,从1开始
 part_number = 1

 # 存储每个分片上传的结果
 upload_parts = []

 # 打开文件以二进制模式读取
 with open(local_path, "rb") as f:
 # 遍历文件,按照part_size分片上传
 for start in range(0, data_size, part_size):
 n = part_size
 if start + n > data_size:  # 处理最后一个分片可能小于part_size的情况
 n = data_size - start

 # 创建SectionReader来读取文件的特定部分
 reader = oss.io_utils.SectionReader(oss.io_utils.ReadAtReader(f), start, n)

 # 上传分片
 up_result = client.upload_part(
 oss.UploadPartRequest(
 bucket=customeBucket,
 key=obj_key,
 upload_id=result.upload_id,
 part_number=part_number,
 body=reader,
                )
            )

 # 打印每个分片上传的结果信息
 print(
 f"status code: {up_result.status_code},"
 f" request id: {up_result.request_id},"
 f" part number: {part_number},"
 f" content md5: {up_result.content_md5},"
 f" etag: {up_result.etag},"
 f" hash crc64: {up_result.hash_crc64},"
            )

 # 将分片上传结果保存到列表中
 upload_parts.append(
 oss.UploadPart(part_number=part_number, etag=up_result.etag)
            )

 # 增加分片编号
 part_number += 1

 # 对上传的分片按照分片编号排序
 parts = sorted(upload_parts, key=lambda p: p.part_number)

 # 发送完成分片上传请求,合并所有分片为一个完整的对象
 result = client.complete_multipart_upload(
 oss.CompleteMultipartUploadRequest(
 bucket=customeBucket,
 key=obj_key,
 upload_id=result.upload_id,
 complete_multipart_upload=oss.CompleteMultipartUpload(parts=parts),
        )
    )

 # 下面的代码是另一种方式,通过服务器端列出并合并所有分片数据为一个完整的对象
 # 这种方法适用于当您不确定所有分片是否都已成功上传时
 # Merge fragmented data into a complete Object through the server-side List method
 # result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
 #     bucket=args.bucket,
 #     key=args.key,
 #     upload_id=result.upload_id,
 #     complete_all='yes'
 # ))

 # 输出完成分片上传的结果信息
 print(
 f"status code: {result.status_code},"
 f" request id: {result.request_id},"
 f" bucket: {result.bucket},"
 f" key: {result.key},"
 f" location: {result.location},"
 f" etag: {result.etag},"
 f" encoding type: {result.encoding_type},"
 f" hash crc64: {result.hash_crc64},"
 f" version id: {result.version_id},"
    )


if __name__ == "__main__":

 # 定义多文件映射(目标路径:本地路径)
 file_mapping = {
 "sunwukongtest/indx.html": "C:/Users/57307/Desktop/Test/TestHtml/index.html",
    }

 multi_upload(file_mapping)