ftp、http下载远程文件(多线程、断点续传)

发布于:2025-05-22 ⋅ 阅读:(23) ⋅ 点赞:(0)
import logging
import os
import threading
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from ftplib import FTP
from logging.handlers import TimedRotatingFileHandler

import requests
from tqdm import tqdm


class Downloader(ABC):
    def __init__(self, url, local_path, num_threads=4):
        self.url = url
        self.local_path = local_path
        self.num_threads = num_threads
        self.file_size = 0
        self.lock = threading.Lock()
        self.progress_bar = None

    @abstractmethod
    def get_file_size(self):
        pass

    @abstractmethod
    def download_segment(self, start, end):
        pass

    def init_local_file(self):
        if not os.path.exists(self.local_path):
            with open(self.local_path, 'wb') as f:
                f.seek(self.file_size - 1)
                f.write(b'\0')
        else:
            existing_size = os.path.getsize(self.local_path)
            if existing_size < self.file_size:
                with open(self.local_path, 'r+b') as f:
                    f.seek(self.file_size - 1)
                    f.write(b'\0')

    def run(self):
        self.get_file_size()
        self.init_local_file()

        # 初始化进度条
        self.progress_bar = tqdm(total=self.file_size, unit='B', unit_scale=True, desc="Downloading")

        part_size = self.file_size // self.num_threads
        futures = []

        with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
            for i in range(self.num_threads):
                start = i * part_size
                end = self.file_size if i == self.num_threads - 1 else (i + 1) * part_size
                futures.append(executor.submit(self.download_segment, start, end))

            for future in as_completed(futures):
                try:
                    future.result()
                except Exception as e:
                    logging.error(f"下载出错: {e}")

        self.progress_bar.close()
        logging.info("所有线程已完成,文件下载结束")


class FTPDownloader(Downloader):
    def __init__(self, host, port, username, password, remote_path, num_threads=4, download_dir="downloads"):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.remote_path = remote_path
        self.download_dir = download_dir

        # 提取文件名
        self.filename = os.path.basename(self.remote_path)
        self.local_path = os.path.join(self.download_dir, self.filename)

        super().__init__(f"ftp://{host}/{self.remote_path}", self.local_path, num_threads)

        os.makedirs(self.download_dir, exist_ok=True)  # 确保目录存在

    def get_file_size(self):
        with FTP() as ftp:
            ftp.connect(self.host, self.port)
            ftp.login(self.username, self.password)
            ftp.encoding = 'utf-8'
            try:
                self.file_size = ftp.size(self.remote_path)
                logging.info(f"远程文件大小: {self.file_size} 字节")
            except Exception as e:
                logging.error(f"无法获取远程文件大小: {e}")
                raise

    def download_segment(self, start_byte, end_byte):
        segment_length = end_byte - start_byte
        downloaded = 0

        def callback(data):
            nonlocal downloaded
            if downloaded >= segment_length:
                return

            write_len = min(len(data), segment_length - downloaded)
            with self.lock:
                f.write(data[:write_len])
                downloaded += write_len
                self.progress_bar.update(write_len)

        with open(self.local_path, 'r+b') as f:
            f.seek(start_byte)
            with FTP() as ftp:
                ftp.connect(self.host, self.port)
                ftp.login(self.username, self.password)
                ftp.encoding = 'utf-8'
                ftp.sendcmd('TYPE I')

                try:
                    ftp.retrbinary(f'RETR {self.remote_path}', callback, rest=start_byte, blocksize=65536)
                except Exception as e:
                    logging.error(f"FTP 下载失败: {e}")
                    raise e

        logging.info(f"线程完成 {start_byte}-{end_byte}, 实际下载: {downloaded}/{segment_length} 字节")


class HTTPDownloader(Downloader):
    def __init__(self, url, num_threads=4, headers=None, download_dir="downloads"):
        self.url = url
        self.headers = headers or {}
        self.download_dir = download_dir

        # 尝试从 URL 提取文件名
        self.filename = os.path.basename(url.split("?")[0])
        self.local_path = os.path.join(self.download_dir, self.filename)

        super().__init__(url, self.local_path, num_threads)

        os.makedirs(self.download_dir, exist_ok=True)

    def get_file_size(self):
        response = requests.head(self.url, headers=self.headers, allow_redirects=True)
        if 'Content-Length' in response.headers:
            self.file_size = int(response.headers['Content-Length'])
            logging.info(f"远程文件大小: {self.file_size} 字节")
        else:
            raise Exception("无法获取文件大小,请确保服务器支持 Content-Length 头")

    def download_segment(self, start_byte, end_byte):
        logging.debug(f"开始下载 HTTP 段 {start_byte} - {end_byte}")
        headers = dict(self.headers)
        headers['Range'] = f"bytes={start_byte}-{end_byte - 1}"
        response = requests.get(self.url, headers=headers, stream=True)

        with open(self.local_path, 'r+b') as f:
            f.seek(start_byte)
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    with self.lock:
                        f.write(chunk)
                        self.progress_bar.update(len(chunk))


def setup_logger(log_dir="logs", log_level=logging.INFO, backup_days=7):
    """
    配置日志系统:控制台输出 + 按天分割的日志文件

    :param log_dir: 日志保存目录
    :param log_level: 日志级别
    :param backup_days: 保留历史日志天数
    """
    # 创建日志目录(如果不存在)
    os.makedirs(log_dir, exist_ok=True)

    # 定义日志格式
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

    # 文件日志处理器:按天滚动
    file_handler = TimedRotatingFileHandler(
        os.path.join(log_dir, "app.log"),
        when="midnight",
        backupCount=backup_days,
        encoding='utf-8'
    )
    file_handler.setFormatter(formatter)
    file_handler.suffix = "%Y-%m-%d.log"

    # 获取 root logger 并设置
    logger = logging.getLogger()
    logger.setLevel(log_level)
    logger.addHandler(file_handler)


# if __name__ == "__main__":
#     setup_logger()
#     downloader = FTPDownloader(
#         host="",
#         port=21,
#         username="",
#         password="",
#         remote_path="",
#         num_threads=4,
#         download_dir="C:/Users/Administrator/Desktop/test",  # 可选,默认 downloads/
#     )
#     downloader.run()

if __name__ == "__main__":
    downloader = HTTPDownloader(
        url="",
        num_threads=4,
        download_dir="C:/Users/Administrator/Desktop/test",  # 可选,默认 downloads/
    )
    downloader.run()

调试过程中,发现ftp的编码可能需要调整,其他没啥问题。

最终效果是这样的:


网站公告

今日签到

点亮在社区的每一天
去签到