Python包版本分析工具开发:从PyPI私有源快速提取元数据

发布于:2025-07-05 ⋅ 阅读:(15) ⋅ 点赞:(0)
import subprocess
import re
import os
import sys
import tempfile
import zipfile
from email.parser import Parser
from typing import List, Dict, Optional, Any

from jinja2 import Environment
from packaging.version import parse as parse_version
from tqdm import tqdm

# --- 1. 配置与常量 (Configuration & Constants) ---
# 将所有可配置项和常量集中管理,清晰明了

# PyPI源配置
PYPI_INDEX_URL = "https://nexus.xxx-tech.com/repository/pypi/simple"
TRUSTED_HOST = "nexus.xxx-tech.com"

# 用于从'Summary'字段中提取特定信息的正则表达式
# 匹配6位以上的数字(日期版本)和一个7位以上的十六进制哈希值
SUMMARY_REGEX = re.compile(r'(\d{6,})\.([a-f0-9]{7,})')

# --- 2. 核心功能函数 (Core Functions) ---


def get_all_versions(package_name: str) -> List[str]:
    """
    通过pip的“无效版本”技巧,高效获取指定包在私有源上的所有可用版本。

    Args:
        package_name: 需要查询的包名。

    Returns:
        一个从新到旧排序的版本号字符串列表。如果找不到则返回空列表。
    """
    print(f"正在从 {TRUSTED_HOST} 查找 {package_name} 的所有版本...")
    # 使用一个不可能存在的版本号,触发pip报错并列出所有可用版本
    command = [
        sys.executable, "-m", "pip", "install", f"{package_name}==INVALID_VERSION_MARKER", "--index-url",
        PYPI_INDEX_URL, "--trusted-host", TRUSTED_HOST
    ]

    # 执行命令并捕获输出
    result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
    # 合并 stdout 和 stderr,因为错误信息可能在任一输出流中
    output = result.stdout + result.stderr

    # 从输出中用正则表达式查找版本列表
    match = re.search(r'\(from versions: ([^\)]+)\)', output)
    if not match:
        print(f"错误:无法找到 {package_name} 的版本列表。")
        print("请检查:\n1. 包名是否正确。\n2. 是否已连接到公司网络/VPN。\n3. Nexus源是否可用。")
        print("\n--- pip 输出详情 ---\n", output)
        return []

    versions_str = match.group(1)
    # 清理并分割版本号字符串
    versions = [v.strip() for v in versions_str.split(',')]

    # 使用 packaging.version.parse 进行语义化版本排序(从新到旧)
    versions.sort(key=parse_version, reverse=True)

    print(f"成功找到 {len(versions)} 个版本。")
    return versions


def fetch_and_parse_metadata(package_spec: str, download_dir: str) -> Optional[Dict[str, Any]]:
    """
    【性能优化的核心】
    下载指定的包(不安装),并直接从wheel文件中解析元数据。
    这比创建虚拟环境并安装要快得多。

    Args:
        package_spec: 包的精确规约,例如 "python_package==1.0.0"。
        download_dir: 用于存放下载文件的临时目录。

    Returns:
        一个包含包元数据的字典,失败则返回 None。
    """
    try:
        # 1. 下载包文件(.whl),不安装其依赖项,速度极快
        download_command = [
            sys.executable,
            "-m",
            "pip",
            "download",
            "--no-deps",  # 关键:不下载依赖项
            "--dest",
            download_dir,  # 指定下载目录
            "--index-url",
            PYPI_INDEX_URL,
            "--trusted-host",
            TRUSTED_HOST,
            package_spec
        ]
        subprocess.run(download_command, check=True, capture_output=True)

        # 2. 在下载目录中找到 wheel 文件
        wheel_file = next((f for f in os.listdir(download_dir) if f.endswith('.whl')), None)
        if not wheel_file:
            # print(f"  -> 警告: 未能为 {package_spec} 下载 wheel 文件。可能是一个sdist包。")
            return None

        wheel_path = os.path.join(download_dir, wheel_file)

        # 3. 从 wheel 文件(zip格式)中提取并解析 METADATA 文件
        with zipfile.ZipFile(wheel_path, 'r') as zf:
            # METADATA 文件通常在 .dist-info 目录下
            metadata_path = next((f for f in zf.namelist() if f.endswith('.dist-info/METADATA')), None)
            if not metadata_path:
                # print(f"  -> 错误: 在 {wheel_file} 中未找到 METADATA 文件。")
                return None

            with zf.open(metadata_path) as meta_file:
                metadata_content = meta_file.read().decode('utf-8')

        # 4. 使用email.parser解析类RFC 822格式的METADATA文件
        # 这比手动解析更健壮
        headers = Parser().parsestr(metadata_content)
        metadata = dict(headers.items())

        # 'Requires-Dist' 可能出现多次,需要特殊处理成列表
        metadata['Requires-Dist'] = headers.get_all('Requires-Dist') or []

        return metadata

    except subprocess.CalledProcessError as e:
        # 如果pip命令失败,打印关键错误信息
        error_message = e.stderr.decode('utf-8', errors='ignore')
        # print(f"  -> 处理 {package_spec} 失败: {error_message.strip().splitlines()[-1]}")
        return None
    except Exception as e:
        # 捕获其他潜在异常
        # print(f"  -> 处理 {package_spec} 时发生意外错误: {e}")
        return None
    finally:
        # 清理下载的包文件,为下一个版本做准备
        for item in os.listdir(download_dir):
            os.remove(os.path.join(download_dir, item))


def generate_html_report(package_name: str, packages_info: List[Dict], total_versions: int) -> None:
    """
    使用Jinja2模板生成HTML报告。

    Args:
        package_name: 包名。
        packages_info: 包含所有已处理包元数据的列表。
        total_versions: 发现的总版本数。
    """
    # HTML模板直接嵌入代码中,方便分发
    # 注意:模板中现在使用 'Requires-Dist' 来获取依赖
    html_template_str = """
<!doctype html>
<html lang="zh-CN">
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Package Details for {{ package_name }}</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
    <style>
        body { padding: 2rem; background-color: #f8f9fa; }
        .container { max-width: 1140px; }
        .package-card { margin-bottom: 1.5rem; border-left: 5px solid #0d6efd; box-shadow: 0 2px 4px rgba(0,0,0,.1); transition: all 0.2s ease-in-out; }
        .package-card:hover { transform: translateY(-3px); box-shadow: 0 4px 8px rgba(0,0,0,.15); }
        .card-header { font-weight: bold; font-size: 1.25rem; }
        dt { font-weight: 500; }
        .commit-hash { font-family: monospace; }
        .summary-text { font-style: italic; color: #6c757d; }
    </style>
</head>
<body>
    <div class="container">
        <h1 class="mb-2">Package: <span class="text-primary">{{ package_name }}</span></h1>
        <p class="text-muted">共发现 {{ total_versions }} 个版本,成功获取了 {{ packages_info|length }} 个版本的元数据。</p>
        
        {% if not packages_info %}
            <div class="alert alert-warning mt-4" role="alert">
                未能获取到包 "{{ package_name }}" 的任何版本信息。
            </div>
        {% else %}
            {% for pkg in packages_info %}
            <div class="card package-card">
                <div class="card-header bg-light">
                    Version: {{ pkg.get('Version', 'N/A') }}
                </div>
                <div class="card-body">
                    <dl class="row">
                        <dt class="col-sm-3">Summary</dt>
                        <dd class="col-sm-9 summary-text">{{ pkg.get('Summary', 'N/A') }}</dd>
                        
                        {% if pkg.isp_version %}
                        <dt class="col-sm-3">ISP Build Info</dt>
                        <dd class="col-sm-9">
                            <span class="badge bg-success fs-6 me-2" title="ISP Build Date">{{ pkg.isp_version }}</span>
                            <span class="badge bg-info text-dark fs-6 commit-hash" title="Commit Hash">{{ pkg.commit_hash }}</span>
                        </dd>
                        {% endif %}

                        <dt class="col-sm-3">Author</dt>
                        <dd class="col-sm-9">{{ pkg.get('Author-email', pkg.get('Author', 'N/A')) }}</dd>
                        
                        <dt class="col-sm-3">Dependencies</dt>
                        <dd class="col-sm-9">
                            {% if pkg.get('Requires-Dist') %}
                                <ul class="list-unstyled mb-0">
                                {% for req in pkg.get('Requires-Dist') %}
                                    <li><code>{{ req }}</code></li>
                                {% endfor %}
                                </ul>
                            {% else %}
                                N/A
                            {% endif %}
                        </dd>
                    </dl>
                </div>
            </div>
            {% endfor %}
        {% endif %}
    </div>
</body>
</html>
"""
    env = Environment()
    template = env.from_string(html_template_str)
    html_output = template.render(package_name=package_name, packages_info=packages_info, total_versions=total_versions)

    output_filename = f"{package_name}_report.html"
    with open(output_filename, "w", encoding='utf-8') as f:
        f.write(html_output)

    print(f"\n报告生成成功!请在浏览器中打开文件: file://{os.path.abspath(output_filename)}")


# --- 3. 主程序 (Main Execution) ---


def main(package_name: str, limit: Optional[int] = None):
    """
    主执行函数,协调整个流程。

    Args:
        package_name: 目标包名。
        limit: (可选) 限制处理最新版本的数量,用于快速测试。
    """
    # 步骤一:获取所有版本
    versions = get_all_versions(package_name)
    if not versions:
        return

    versions_to_check = versions
    if limit and limit < len(versions):
        print(f"注意:根据限制,将只检查最新的 {limit} 个版本。")
        versions_to_check = versions[:limit]

    all_package_info = []

    # 步骤二:获取每个版本的元数据
    # 创建一个临时目录,在整个循环中复用,避免I/O开销
    with tempfile.TemporaryDirectory() as temp_dir:
        # 使用tqdm创建进度条,总数动态设置
        progress_bar = tqdm(versions_to_check, desc="正在获取元数据", unit=" an")

        for version in progress_bar:
            spec = f"{package_name}=={version}"
            progress_bar.set_postfix_str(spec, refresh=True)

            metadata = fetch_and_parse_metadata(spec, temp_dir)

            if metadata:
                # 尝试从Summary中解析额外信息
                summary = metadata.get('Summary', '')
                match = SUMMARY_REGEX.search(summary)
                if match:
                    metadata['isp_version'] = match.group(1)
                    metadata['commit_hash'] = match.group(2)

                all_package_info.append(metadata)

    # all_package_info 自然地按版本从新到旧排序,因为 `versions` 列表已排序

    # 步骤三:生成并保存HTML报告
    generate_html_report(package_name, all_package_info, len(versions))


if __name__ == "__main__":
    # --- 在这里配置你要查询的包 ---
    target_package = "python_package"

    # === 可选:为了快速测试,可以只检查最新的几个版本 ===
    # 设置为 None 来检查所有版本,或设置为一个数字(如 5)来限制数量。
    version_limit = 5

    main(target_package, limit=version_limit)


网站公告

今日签到

点亮在社区的每一天
去签到