Python 实战:内网渗透中的信息收集自动化脚本(6)

发布于:2025-09-01 ⋅ 阅读:(17) ⋅ 点赞:(0)

用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。

这次我们主要讨论使用python脚本来读取本地邮箱的重要文件或者信息,代码能够根据不同邮箱的默认路径进行读取,并且根据不同格式进行解析,并且如果默认路径找不到,可以手动进行输入,代码如下

import os
import glob
import mailbox
import sqlite3
from email.header import decode_header
from libratom.lib.pff import PffArchive
from datetime import datetime


class EnhancedEmailReader:
    """增强版本地邮件读取器,支持自动查找和手动输入路径"""
    
    def __init__(self):
        # 本地邮箱客户端配置
        self.client_configs = {
            # 微软系
            "outlook": {
                "path": os.path.expanduser("~\\Documents\\Outlook Files\\"),
                "ext": ".pst",
                "desc": "Outlook",
                "reader": self.read_pst
            },
            "windows_mail": {
                "path": os.path.expanduser("~\\AppData\\Local\\Microsoft\\Windows Mail\\"),
                "ext": ".eml",
                "desc": "Windows 邮件",
                "reader": self.read_eml
            },
            # Mozilla系
            "thunderbird": {
                "path": os.path.expanduser("~\\AppData\\Roaming\\Thunderbird\\Profiles\\"),
                "ext": ".mbox",
                "desc": "Thunderbird",
                "reader": self.read_mbox
            },
            # 网易系
            "netease_mail_master": {
                "path": os.path.expanduser("~\\AppData\\Roaming\\163 Mail\\Data\\"),
                "ext": ".db",
                "desc": "网易邮箱大师",
                "reader": self.read_netease_db
            },
            "netease_163_client": {
                "path": os.path.expanduser("~\\AppData\\Local\\Netease\\163mail\\User Data\\"),
                "ext": ".eml",
                "desc": "163邮箱客户端",
                "reader": self.read_eml
            },
            # QQ系
            "qq_mail": {
                "path": os.path.expanduser("~\\AppData\\Roaming\\Tencent\\QQMail\\Data\\"),
                "ext": ".db",
                "desc": "QQ邮箱客户端",
                "reader": self.read_qq_db
            },
            "foxmail": {  # 腾讯旗下
                "path": os.path.expanduser("~\\AppData\\Roaming\\Foxmail\\Profiles\\"),
                "ext": ".eml",
                "desc": "Foxmail",
                "reader": self.read_eml
            },
            # 其他
            "sina_mail": {
                "path": os.path.expanduser("~\\AppData\\Roaming\\SinaMail\\Data\\"),
                "ext": ".eml",
                "desc": "新浪邮箱",
                "reader": self.read_eml
            }
        }

    def decode_str(self, s):
        """解码邮件中的特殊字符"""
        if not s:
            return ""
        try:
            decoded = decode_header(s)
            result = []
            for part, encoding in decoded:
                if isinstance(part, bytes):
                    result.append(part.decode(encoding or "utf-8", errors="replace"))
                else:
                    result.append(str(part))
            return "".join(result)
        except:
            return str(s)

    # 通用格式读取方法
    def read_pst(self, file_path):
        """读取Outlook的PST文件"""
        try:
            archive = PffArchive(file_path)
            print(f"\n===== 处理 {os.path.basename(file_path)} =====")
            for folder in archive.folders():
                msg_count = folder.get_number_of_sub_messages()
                if msg_count > 0:
                    print(f"文件夹: {folder.name} (共 {msg_count} 封邮件)")
                    for i, msg in enumerate(folder.sub_messages, 1):
                        print(f"\n邮件 {i}/{msg_count}")
                        print(f"发件人: {self.decode_str(msg.get_sender_name())}")
                        print(f"主题: {self.decode_str(msg.get_subject())}")
                        print(f"时间: {msg.get_delivery_time() or '未知'}")
                        print(f"正文: {self.decode_str(msg.get_plain_text_body() or '无正文')[:500]}...")
        except Exception as e:
            print(f"PST文件处理错误: {str(e)}")

    def read_mbox(self, file_path):
        """读取MBOX格式文件(Thunderbird等)"""
        try:
            mbox = mailbox.mbox(file_path)
            msg_count = len(mbox)
            print(f"\n===== 处理 {os.path.basename(file_path)} =====")
            print(f"共发现 {msg_count} 封邮件")
            
            for i, msg in enumerate(mbox, 1):
                sender = self.decode_str(msg.get("from", ""))
                subject = self.decode_str(msg.get("subject", ""))
                date = self.decode_str(msg.get("date", ""))
                body = self._extract_body(msg)
                
                print(f"\n邮件 {i}/{msg_count}")
                print(f"发件人: {sender}")
                print(f"主题: {subject}")
                print(f"时间: {date}")
                print(f"正文: {body[:500]}...")
        except Exception as e:
            print(f"MBOX文件处理错误: {str(e)}")

    def read_eml(self, file_path):
        """读取EML格式文件(通用单封邮件格式)"""
        try:
            with open(file_path, "rb") as f:
                msg = mailbox.mboxMessage(f)
            
            print(f"\n===== 处理 {os.path.basename(file_path)} =====")
            print(f"发件人: {self.decode_str(msg.get('from', ''))}")
            print(f"收件人: {self.decode_str(msg.get('to', ''))}")
            print(f"主题: {self.decode_str(msg.get('subject', ''))}")
            print(f"时间: {self.decode_str(msg.get('date', ''))}")
            print(f"正文: {self._extract_body(msg)[:500]}...")
        except Exception as e:
            print(f"EML文件处理错误: {str(e)}")

    # 网易邮箱特殊格式处理
    def read_netease_db(self, file_path):
        """读取网易邮箱大师的.db数据库文件"""
        try:
            print(f"\n===== 处理网易邮箱数据库 {os.path.basename(file_path)} =====")
            conn = sqlite3.connect(file_path)
            cursor = conn.cursor()
            
            # 网易邮箱数据库表结构分析
            try:
                # 获取邮件列表
                cursor.execute("SELECT id, subject, fromaddr, sendtime, content FROM mail")
                mails = cursor.fetchall()
                print(f"共发现 {len(mails)} 封邮件")
                
                for i, mail in enumerate(mails, 1):
                    mail_id, subject, fromaddr, sendtime, content = mail
                    # 转换时间戳
                    try:
                        send_time = datetime.fromtimestamp(int(sendtime)/1000).strftime('%Y-%m-%d %H:%M:%S')
                    except:
                        send_time = "未知时间"
                    
                    print(f"\n邮件 {i}/{len(mails)}")
                    print(f"发件人: {self.decode_str(fromaddr)}")
                    print(f"主题: {self.decode_str(subject)}")
                    print(f"时间: {send_time}")
                    print(f"正文: {self.decode_str(content)[:500]}...")
            except Exception as e:
                print(f"数据库查询错误: {str(e)}")
                print("尝试备用表结构查询...")
                try:
                    cursor.execute("SELECT id, title, sender, createtime, text FROM email")
                    mails = cursor.fetchall()
                    print(f"共发现 {len(mails)} 封邮件")
                    
                    for i, mail in enumerate(mails, 1):
                        mail_id, title, sender, createtime, text = mail
                        try:
                            send_time = datetime.fromtimestamp(int(createtime)/1000).strftime('%Y-%m-%d %H:%M:%S')
                        except:
                            send_time = "未知时间"
                        
                        print(f"\n邮件 {i}/{len(mails)}")
                        print(f"发件人: {self.decode_str(sender)}")
                        print(f"主题: {self.decode_str(title)}")
                        print(f"时间: {send_time}")
                        print(f"正文: {self.decode_str(text)[:500]}...")
                except:
                    print("无法识别的数据库结构,请手动查看")
            
            conn.close()
        except Exception as e:
            print(f"网易邮箱数据库处理错误: {str(e)}")

    # QQ邮箱特殊格式处理
    def read_qq_db(self, file_path):
        """读取QQ邮箱客户端的.db数据库文件"""
        try:
            print(f"\n===== 处理QQ邮箱数据库 {os.path.basename(file_path)} =====")
            conn = sqlite3.connect(file_path)
            cursor = conn.cursor()
            
            # QQ邮箱数据库表结构分析
            try:
                # 获取邮件列表(不同版本表名可能不同)
                cursor.execute("SELECT msgId, subject, fromAddress, date, textContent FROM Message")
                mails = cursor.fetchall()
                print(f"共发现 {len(mails)} 封邮件")
                
                for i, mail in enumerate(mails, 1):
                    msg_id, subject, from_addr, date, content = mail
                    # 转换时间
                    try:
                        send_time = datetime.fromtimestamp(int(date)).strftime('%Y-%m-%d %H:%M:%S')
                    except:
                        send_time = "未知时间"
                    
                    print(f"\n邮件 {i}/{len(mails)}")
                    print(f"发件人: {self.decode_str(from_addr)}")
                    print(f"主题: {self.decode_str(subject)}")
                    print(f"时间: {send_time}")
                    print(f"正文: {self.decode_str(content)[:500]}...")
            except Exception as e:
                print(f"数据库查询错误: {str(e)}")
                print("尝试备用表结构查询...")
                try:
                    cursor.execute("SELECT id, title, sender, time, content FROM mail")
                    mails = cursor.fetchall()
                    print(f"共发现 {len(mails)} 封邮件")
                    
                    for i, mail in enumerate(mails, 1):
                        msg_id, title, sender, time, content = mail
                        try:
                            send_time = datetime.fromtimestamp(int(time)).strftime('%Y-%m-%d %H:%M:%S')
                        except:
                            send_time = "未知时间"
                        
                        print(f"\n邮件 {i}/{len(mails)}")
                        print(f"发件人: {self.decode_str(sender)}")
                        print(f"主题: {self.decode_str(title)}")
                        print(f"时间: {send_time}")
                        print(f"正文: {self.decode_str(content)[:500]}...")
                except:
                    print("无法识别的数据库结构,请手动查看")
            
            conn.close()
        except Exception as e:
            print(f"QQ邮箱数据库处理错误: {str(e)}")

    def _extract_body(self, msg):
        """提取邮件正文"""
        body = ""
        try:
            if msg.is_multipart():
                for part in msg.walk():
                    if part.get_content_type() == "text/plain" and "attachment" not in str(part.get("Content-Disposition")):
                        body = self.decode_str(part.get_payload(decode=True) or b"")
                        break
            else:
                body = self.decode_str(msg.get_payload(decode=True) or b"")
            return body.strip() or "无正文内容"
        except:
            return "无法解析正文"

    def find_all_email_files(self):
        """查找系统中所有支持的邮箱文件"""
        all_files = []
        print("===== 开始扫描系统中的本地邮箱文件 =====")
        
        for client, config in self.client_configs.items():
            path = config["path"]
            ext = config["ext"]
            desc = config["desc"]
            
            if os.path.exists(path):
                # 递归查找所有匹配的文件
                files = glob.glob(f"{path}/**/*{ext}", recursive=True)
                if files:
                    print(f"\n发现 {len(files)} 个{desc}文件:")
                    for file in files[:5]:  # 只显示前5个,避免输出过长
                        print(f"  - {file}")
                    if len(files) > 5:
                        print(f"  ... 还有 {len(files)-5} 个文件未显示")
                    all_files.extend([(file, config) for file in files])
            else:
                print(f"\n{desc}默认路径不存在: {path}")
        
        return all_files

    def get_reader_for_file(self, file_path):
        """根据文件扩展名获取相应的读取器"""
        ext = os.path.splitext(file_path)[1].lower()
        for config in self.client_configs.values():
            if config["ext"] == ext:
                return config["reader"]
        
        # 如果没有找到精确匹配,尝试根据扩展名猜测
        if ext == ".pst":
            return self.read_pst
        elif ext == ".mbox":
            return self.read_mbox
        elif ext == ".eml":
            return self.read_eml
        elif ext == ".db":
            # 对于db文件,先尝试QQ邮箱格式,再尝试网易邮箱格式
            def try_both_readers(path):
                print("尝试以QQ邮箱格式读取...")
                try:
                    self.read_qq_db(path)
                    return
                except:
                    print("QQ邮箱格式读取失败,尝试以网易邮箱格式读取...")
                    self.read_netease_db(path)
            return try_both_readers
        else:
            return None

    def handle_manual_input(self):
        """处理用户手动输入文件路径"""
        while True:
            print("\n===== 手动输入文件路径 =====")
            file_path = input("请输入邮箱文件路径(直接回车返回主菜单): ").strip()
            
            if not file_path:
                return False
            
            if not os.path.exists(file_path):
                print(f"错误: 文件 '{file_path}' 不存在")
                continue
                
            if not os.path.isfile(file_path):
                print(f"错误: '{file_path}' 不是一个文件")
                continue
            
            # 获取合适的读取器
            reader = self.get_reader_for_file(file_path)
            if reader:
                reader(file_path)
                return True
            else:
                print(f"不支持的文件格式: {os.path.splitext(file_path)[1]}")
                continue

    def run(self):
        """主运行函数"""
        print("===== 增强版本地邮箱读取器 =====")
        print("支持的邮箱客户端: " + ", ".join([v["desc"] for v in self.client_configs.values()]))
        
        while True:
            # 查找所有邮件文件
            email_files = self.find_all_email_files()
            
            if email_files:
                print(f"\n共发现 {len(email_files)} 个邮件文件")
                print("1. 处理找到的文件")
                print("2. 手动输入文件路径")
                print("3. 退出程序")
                
                choice = input("请选择操作(1/2/3): ").strip()
                
                if choice == "1":
                    # 让用户选择要处理的文件
                    print("\n请选择要处理的文件序号(输入数字,0返回):")
                    display_count = min(10, len(email_files))
                    for i in range(display_count):
                        file_path, config = email_files[i]
                        print(f"  {i+1}. {os.path.basename(file_path)} ({config['desc']})")
                    if len(email_files) > 10:
                        print(f"  ... 共 {len(email_files)} 个文件")
                    
                    try:
                        file_choice = int(input("请选择: "))
                        if file_choice == 0:
                            continue
                        if 1 <= file_choice <= len(email_files):
                            file_path, config = email_files[file_choice-1]
                            config["reader"](file_path)
                        else:
                            print("无效选择")
                    except ValueError:
                        print("请输入有效数字")
                
                elif choice == "2":
                    self.handle_manual_input()
                
                elif choice == "3":
                    print("程序已退出")
                    break
                
                else:
                    print("无效选择,请输入1、2或3")
            
            else:
                print("\n未找到任何支持的邮箱文件")
                choice = input("是否要手动输入文件路径? (y/n): ").lower()
                if choice == 'y' or choice == 'yes':
                    if not self.handle_manual_input():
                        print("程序已退出")
                        break
                else:
                    print("程序已退出")
                    break


if __name__ == "__main__":
    reader = EnhancedEmailReader()
    reader.run()
    

一、类定义与初始化(EnhancedEmailReader 类及 __init__ 方法)

这部分是整个程序的基础,定义了邮件读取器的核心配置。

class EnhancedEmailReader:
    """增强版本地邮件读取器,支持自动查找和手动输入路径"""
    
    def __init__(self):
        # 本地邮箱客户端配置
        self.client_configs = {
            # 微软系
            "outlook": {
                "path": os.path.expanduser("~\\Documents\\Outlook Files\\"),
                "ext": ".pst",
                "desc": "Outlook",
                "reader": self.read_pst
            },
            # ... 其他邮箱客户端配置(省略)
        }

核心作用

  • 定义了一个 EnhancedEmailReader 类,作为整个邮件读取功能的载体。
  • 在 __init__ 方法中,通过 self.client_configs 字典存储了主流邮箱客户端的配置信息,包括:
    • path:客户端默认存储路径(使用 os.path.expanduser("~") 获取用户主目录,适配不同系统用户);
    • ext:邮件文件扩展名(如 .pst 是 Outlook 专用格式);
    • desc:客户端中文描述(用于用户展示);
    • reader:对应的读取方法(每个格式有专属解析逻辑)。

二、编码解码工具(decode_str 方法)

邮件内容(如主题、发件人)常包含特殊编码(如 Base64、GBK),此方法用于统一解码。

def decode_str(self, s):
    """解码邮件中的特殊字符"""
    if not s:
        return ""
    try:
        decoded = decode_header(s)  # 解析带编码的字符串(如 "=?UTF-8?B?xxx?=")
        result = []
        for part, encoding in decoded:
            if isinstance(part, bytes):
                # 字节类型按指定编码解码,默认UTF-8,错误用"replace"避免崩溃
                result.append(part.decode(encoding or "utf-8", errors="replace"))
            else:
                result.append(str(part))  # 字符串直接拼接
        return "".join(result)
    except:
        return str(s)  # 异常时返回原始字符串,保证程序不崩溃

核心作用

  • 利用 email.header.decode_header 解析邮件中带编码标记的字符串(如主题中的中文可能被编码为 =?GB2312?B?...?=);
  • 兼容字节和字符串类型,处理编码缺失或错误的情况,确保中文等特殊字符能正常显示。

三、邮件格式读取方法(核心功能)

针对不同邮箱文件格式(PST、MBOX、EML、DB 等),定义了专属读取方法,以下是典型示例:

1. PST 格式(Outlook):read_pst 方法
def read_pst(self, file_path):
    """读取Outlook的PST文件"""
    try:
        archive = PffArchive(file_path)  # 用libratom库解析PST文件
        print(f"\n===== 处理 {os.path.basename(file_path)} =====")
        for folder in archive.folders():  # 遍历PST中的文件夹
            msg_count = folder.get_number_of_sub_messages()
            if msg_count > 0:
                print(f"文件夹: {folder.name} (共 {msg_count} 封邮件)")
                for i, msg in enumerate(folder.sub_messages, 1):  # 遍历邮件
                    print(f"\n邮件 {i}/{msg_count}")
                    print(f"发件人: {self.decode_str(msg.get_sender_name())}")
                    print(f"主题: {self.decode_str(msg.get_subject())}")
                    print(f"时间: {msg.get_delivery_time() or '未知'}")
                    print(f"正文: {self.decode_str(msg.get_plain_text_body() or '无正文')[:500]}...")
    except Exception as e:
        print(f"PST文件处理错误: {str(e)}")

核心作用

  • 使用 libratom.lib.pff.PffArchive 解析 Outlook 专属的 PST 格式文件(PST 是二进制格式,需专用库解析);
  • 遍历 PST 中的文件夹和邮件,提取发件人、主题、时间、正文等核心信息,并用 decode_str 解码特殊字符;
  • 正文只显示前 500 字符(避免输出过长),并通过 try-except 捕获错误(如文件损坏)。
2. MBOX 格式(Thunderbird):read_mbox 方法
def read_mbox(self, file_path):
    """读取MBOX格式文件(Thunderbird等)"""
    try:
        mbox = mailbox.mbox(file_path)  # 用Python标准库mailbox解析MBOX
        msg_count = len(mbox)
        print(f"\n===== 处理 {os.path.basename(file_path)} =====")
        print(f"共发现 {msg_count} 封邮件")
        
        for i, msg in enumerate(mbox, 1):
            sender = self.decode_str(msg.get("from", ""))  # 从邮件头获取发件人
            subject = self.decode_str(msg.get("subject", ""))  # 获取主题
            date = self.decode_str(msg.get("date", ""))  # 获取时间
            body = self._extract_body(msg)  # 调用辅助方法提取正文
            
            print(f"\n邮件 {i}/{msg_count}")
            print(f"发件人: {sender}")
            print(f"主题: {subject}")
            print(f"时间: {date}")
            print(f"正文: {body[:500]}...")
    except Exception as e:
        print(f"MBOX文件处理错误: {str(e)}")

核心作用

  • 利用 Python 标准库 mailbox.mbox 解析 MBOX 格式(Thunderbird 等客户端常用,本质是文本文件集合);
  • 通过邮件头(msg.get("from") 等)提取元信息,调用 _extract_body 方法提取正文。
3. EML 格式(通用单封邮件):read_eml 方法
def read_eml(self, file_path):
    """读取EML格式文件(通用单封邮件格式)"""
    try:
        with open(file_path, "rb") as f:
            msg = mailbox.mboxMessage(f)  # EML可直接用mailbox解析
        
        print(f"\n===== 处理 {os.path.basename(file_path)} =====")
        print(f"发件人: {self.decode_str(msg.get('from', ''))}")
        print(f"收件人: {self.decode_str(msg.get('to', ''))}")  # EML通常包含收件人信息
        print(f"主题: {self.decode_str(msg.get('subject', ''))}")
        print(f"时间: {self.decode_str(msg.get('date', ''))}")
        print(f"正文: {self._extract_body(msg)[:500]}...")
    except Exception as e:
        print(f"EML文件处理错误: {str(e)}")

核心作用

  • EML 是单封邮件的通用格式(本质是符合邮件协议的文本文件),直接用 mailbox.mboxMessage 解析;
  • 相比 MBOX,EML 单独存储一封邮件,因此额外提取了收件人信息。
4. 数据库格式(网易 / QQ 邮箱客户端):read_netease_db 和 read_qq_db 方法

以网易邮箱为例:

def read_netease_db(self, file_path):
    """读取网易邮箱大师的.db数据库文件"""
    try:
        print(f"\n===== 处理网易邮箱数据库 {os.path.basename(file_path)} =====")
        conn = sqlite3.connect(file_path)  # 连接SQLite数据库
        cursor = conn.cursor()
        
        # 尝试主流表结构查询邮件
        try:
            cursor.execute("SELECT id, subject, fromaddr, sendtime, content FROM mail")
            mails = cursor.fetchall()
            print(f"共发现 {len(mails)} 封邮件")
            
            for i, mail in enumerate(mails, 1):
                mail_id, subject, fromaddr, sendtime, content = mail
                # 时间戳转换(网易邮箱时间戳通常是毫秒级)
                try:
                    send_time = datetime.fromtimestamp(int(sendtime)/1000).strftime('%Y-%m-%d %H:%M:%S')
                except:
                    send_time = "未知时间"
                
                print(f"\n邮件 {i}/{len(mails)}")
                print(f"发件人: {self.decode_str(fromaddr)}")
                print(f"主题: {self.decode_str(subject)}")
                print(f"时间: {send_time}")
                print(f"正文: {self.decode_str(content)[:500]}...")
        except:
            # 尝试备用表结构(应对客户端版本差异)
            cursor.execute("SELECT id, title, sender, createtime, text FROM email")
            # ... 后续逻辑类似
        conn.close()
    except Exception as e:
        print(f"网易邮箱数据库处理错误: {str(e)}")

核心作用

  • 网易 / QQ 邮箱客户端的邮件通常存储在 SQLite 数据库(.db 文件)中,因此用 sqlite3 库连接并查询;
  • 考虑到客户端版本差异(表名 / 字段可能不同),提供了多表结构查询的容错逻辑;
  • 时间戳转换(客户端通常存储毫秒级时间戳,需转为秒级再格式化)。

四、辅助方法:_extract_body 提取邮件正文

邮件可能是 "多部分"(multipart,如包含文本 + 附件),此方法专门提取纯文本正文。

def _extract_body(self, msg):
    """提取邮件正文"""
    body = ""
    try:
        if msg.is_multipart():  # 多部分邮件(可能包含正文、附件、HTML等)
            for part in msg.walk():  # 遍历所有部分
                # 只提取纯文本正文,排除附件
                if part.get_content_type() == "text/plain" and "attachment" not in str(part.get("Content-Disposition")):
                    body = self.decode_str(part.get_payload(decode=True) or b"")
                    break  # 找到文本正文后退出
        else:  # 单部分邮件(直接是纯文本)
            body = self.decode_str(msg.get_payload(decode=True) or b"")
        return body.strip() or "无正文内容"
    except:
        return "无法解析正文"

核心作用

  • 区分 "多部分邮件" 和 "单部分邮件",优先提取 text/plain 类型的内容(避免 HTML 格式或附件);
  • 用 part.get_payload(decode=True) 解码正文(可能被 Base64 等编码),并通过 decode_str 处理字符编码。

五、文件查找与匹配:find_all_email_files 和 get_reader_for_file

1. 自动查找邮件文件:find_all_email_files
def find_all_email_files(self):
    """查找系统中所有支持的邮箱文件"""
    all_files = []
    print("===== 开始扫描系统中的本地邮箱文件 =====")
    
    for client, config in self.client_configs.items():
        path = config["path"]
        ext = config["ext"]
        desc = config["desc"]
        
        if os.path.exists(path):
            # 递归查找所有匹配扩展名的文件(**表示子目录)
            files = glob.glob(f"{path}/**/*{ext}", recursive=True)
            if files:
                print(f"\n发现 {len(files)} 个{desc}文件:")
                for file in files[:5]:  # 只显示前5个,避免输出过长
                    print(f"  - {file}")
                if len(files) > 5:
                    print(f"  ... 还有 {len(files)-5} 个文件未显示")
                all_files.extend([(file, config) for file in files])
        else:
            print(f"\n{desc}默认路径不存在: {path}")
    
    return all_files

核心作用

  • 遍历 client_configs 中定义的所有邮箱路径,用 glob.glob 递归查找对应扩展名的文件(recursive=True 支持子目录搜索);
  • 收集所有找到的文件及对应的配置(用于后续调用专属读取方法),并向用户展示查找结果(限制显示数量,避免刷屏)。
2. 匹配文件读取器:get_reader_for_file
def get_reader_for_file(self, file_path):
    """根据文件扩展名获取相应的读取器"""
    ext = os.path.splitext(file_path)[1].lower()  # 获取扩展名(如 .pst)
    for config in self.client_configs.values():
        if config["ext"] == ext:
            return config["reader"]  # 返回配置中对应的读取方法
    
    # 容错:如果没有精确匹配,按扩展名猜测
    if ext == ".pst":
        return self.read_pst
    elif ext == ".db":
        # .db可能是QQ或网易邮箱,定义一个尝试双格式的函数
        def try_both_readers(path):
            print("尝试以QQ邮箱格式读取...")
            try:
                self.read_qq_db(path)
            except:
                print("尝试以网易邮箱格式读取...")
                self.read_netease_db(path)
        return try_both_readers
    # ... 其他扩展名容错逻辑
    else:
        return None

核心作用

  • 根据文件扩展名自动匹配对应的读取方法(如 .pst 对应 read_pst);
  • 对模糊格式(如 .db 可能属于多个客户端)提供容错逻辑,尝试多种读取方法。

六、用户交互逻辑:handle_manual_input 和 run 方法

1. 手动输入文件路径:handle_manual_input
def handle_manual_input(self):
    """处理用户手动输入文件路径"""
    while True:
        print("\n===== 手动输入文件路径 =====")
        file_path = input("请输入邮箱文件路径(直接回车返回主菜单): ").strip()
        
        if not file_path:
            return False  # 回车返回
        
        if not os.path.exists(file_path):
            print(f"错误: 文件 '{file_path}' 不存在")
            continue
            
        if not os.path.isfile(file_path):
            print(f"错误: '{file_path}' 不是一个文件")
            continue
        
        # 调用匹配的读取器处理文件
        reader = self.get_reader_for_file(file_path)
        if reader:
            reader(file_path)
            return True
        else:
            print(f"不支持的文件格式: {os.path.splitext(file_path)[1]}")

核心作用

  • 提供手动输入文件路径的交互界面,验证文件是否存在、是否为有效文件;
  • 调用 get_reader_for_file 获取读取器并处理文件,支持用户灵活处理未被自动扫描到的文件。
2. 主运行逻辑:run 方法
def run(self):
    """主运行函数"""
    print("===== 增强版本地邮箱读取器 =====")
    print("支持的邮箱客户端: " + ", ".join([v["desc"] for v in self.client_configs.values()]))
    
    while True:
        # 自动查找所有邮件文件
        email_files = self.find_all_email_files()
        
        if email_files:
            # 显示操作菜单
            print(f"\n共发现 {len(email_files)} 个邮件文件")
            print("1. 处理找到的文件")
            print("2. 手动输入文件路径")
            print("3. 退出程序")
            
            choice = input("请选择操作(1/2/3): ").strip()
            
            if choice == "1":
                # 让用户选择要处理的文件(显示前10个)
                print("\n请选择要处理的文件序号(输入数字,0返回):")
                display_count = min(10, len(email_files))
                for i in range(display_count):
                    file_path, config = email_files[i]
                    print(f"  {i+1}. {os.path.basename(file_path)} ({config['desc']})")
                # ... 处理用户选择并调用读取器
            
            elif choice == "2":
                self.handle_manual_input()  # 调用手动输入逻辑
            
            elif choice == "3":
                print("程序已退出")
                break
        
        else:
            # 未找到文件时,询问是否手动输入
            choice = input("是否要手动输入文件路径? (y/n): ").lower()
            if not (choice == 'y' or choice == 'yes'):
                print("程序已退出")
                break

核心作用

  • 程序入口逻辑,协调自动查找、用户选择、文件处理的全流程;
  • 通过菜单式交互引导用户操作(处理自动找到的文件 / 手动输入 / 退出),提升易用性;
  • 对未找到文件的场景提供容错,引导用户手动输入。

七、程序入口

if __name__ == "__main__":
    reader = EnhancedEmailReader()
    reader.run()

作用:当脚本直接运行时,创建 EnhancedEmailReader 实例并调用 run 方法启动程序。


网站公告

今日签到

点亮在社区的每一天
去签到