多线程增量ZIP文件生成器

发布于:2025-05-01 ⋅ 阅读:(43) ⋅ 点赞:(0)

在原有的zip文件内容基础上批量生成增量zip压缩包
每个zip包含原始文件内容和一个新生成的文本文件

代码示例

import os
import zipfile
import shutil
import concurrent.futures
from time import perf_counter


def process_single_zip(args):
    """处理单个ZIP文件的生成(线程工作函数)"""
    i, original_zip, original_path, temp_dir, compression = args

    txt_name = os.path.join(temp_dir, f"{i:03d}.txt")
    new_zip = f"{original_zip}-{i:03d}.zip"

    try:
        # 1. 创建临时文本文件
        with open(txt_name, 'wb') as f:
            f.write(f"这是独立添加的第{i:03d}个文件\n".encode('utf-8'))

        # 2. 创建新ZIP文件
        with zipfile.ZipFile(new_zip, 'w', **compression) as zip_out:
            # 2.1 复制原始ZIP内容
            with zipfile.ZipFile(original_path, 'r') as zip_in:
                for item in zip_in.infolist():
                    with zip_in.open(item) as src_file:
                        zip_out.writestr(item, src_file.read())

            # 2.2 添加新文件
            zip_out.write(txt_name, os.path.basename(txt_name))

        return (True, new_zip)
    except Exception as e:
        return (False, f"{new_zip} 生成失败: {str(e)}")


def non_cumulative_zip_mt(
        original_zip="source",
        num_files=10,
        compression_level=9,
        max_workers=4
):
    """多线程版本增量ZIP生成器

    参数:
        original_zip: 原始ZIP名称
        num_files: 生成文件数量
        compression_level: 压缩级别(0-9)
        max_workers: 最大线程数
    """
    start_time = perf_counter()
    original_path = f"{original_zip}.zip"
    temp_dir = "temp_zip_files"

    # 1. 准备环境
    os.makedirs(temp_dir, exist_ok=True)
    compression = {
        'compression': zipfile.ZIP_DEFLATED if compression_level > 0 else zipfile.ZIP_STORED,
        'compresslevel': min(9, max(0, compression_level))
    }

    # 2. 多线程处理
    success_count = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 2.1 准备任务参数
        tasks = [(i, original_zip, original_path, temp_dir, compression)
                 for i in range(1, num_files + 1)]

        # 2.2 提交并处理任务
        futures = [executor.submit(process_single_zip, task) for task in tasks]

        # 2.3 获取结果
        for future in concurrent.futures.as_completed(futures):
            status, message = future.result()
            if status:
                print(f"✓ {message}")
                success_count += 1
            else:
                print(f"✗ {message}")

    # 3. 清理临时文件
    shutil.rmtree(temp_dir, ignore_errors=True)

    # 4. 性能报告
    elapsed = perf_counter() - start_time
    print(f"\n处理完成 | 成功: {success_count}/{num_files} | 耗时: {elapsed:.2f}s")
    print(f"线程数: {max_workers} | 压缩级别: {compression_level}")


if __name__ == "__main__":
    file_name = '1D-ZB-1270OBJ'
    # ▶ 参数配置
    non_cumulative_zip_mt(
        original_zip=file_name,
        num_files=10,  # 生成10个文件
        compression_level=6,  # 平衡压缩率和速度
        max_workers=10  # 使用10个线程
    )

代码运行效果

在这里插入图片描述

生成文件效果

在这里插入图片描述