import logging
import os
import threading
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from ftplib import FTP
from logging.handlers import TimedRotatingFileHandler
import requests
from tqdm import tqdm
class Downloader(ABC):
def __init__(self, url, local_path, num_threads=4):
self.url = url
self.local_path = local_path
self.num_threads = num_threads
self.file_size = 0
self.lock = threading.Lock()
self.progress_bar = None
@abstractmethod
def get_file_size(self):
pass
@abstractmethod
def download_segment(self, start, end):
pass
def init_local_file(self):
if not os.path.exists(self.local_path):
with open(self.local_path, 'wb') as f:
f.seek(self.file_size - 1)
f.write(b'\0')
else:
existing_size = os.path.getsize(self.local_path)
if existing_size < self.file_size:
with open(self.local_path, 'r+b') as f:
f.seek(self.file_size - 1)
f.write(b'\0')
def run(self):
self.get_file_size()
self.init_local_file()
# 初始化进度条
self.progress_bar = tqdm(total=self.file_size, unit='B', unit_scale=True, desc="Downloading")
part_size = self.file_size // self.num_threads
futures = []
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
for i in range(self.num_threads):
start = i * part_size
end = self.file_size if i == self.num_threads - 1 else (i + 1) * part_size
futures.append(executor.submit(self.download_segment, start, end))
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logging.error(f"下载出错: {e}")
self.progress_bar.close()
logging.info("所有线程已完成,文件下载结束")
class FTPDownloader(Downloader):
def __init__(self, host, port, username, password, remote_path, num_threads=4, download_dir="downloads"):
self.host = host
self.port = port
self.username = username
self.password = password
self.remote_path = remote_path
self.download_dir = download_dir
# 提取文件名
self.filename = os.path.basename(self.remote_path)
self.local_path = os.path.join(self.download_dir, self.filename)
super().__init__(f"ftp://{host}/{self.remote_path}", self.local_path, num_threads)
os.makedirs(self.download_dir, exist_ok=True) # 确保目录存在
def get_file_size(self):
with FTP() as ftp:
ftp.connect(self.host, self.port)
ftp.login(self.username, self.password)
ftp.encoding = 'utf-8'
try:
self.file_size = ftp.size(self.remote_path)
logging.info(f"远程文件大小: {self.file_size} 字节")
except Exception as e:
logging.error(f"无法获取远程文件大小: {e}")
raise
def download_segment(self, start_byte, end_byte):
segment_length = end_byte - start_byte
downloaded = 0
def callback(data):
nonlocal downloaded
if downloaded >= segment_length:
return
write_len = min(len(data), segment_length - downloaded)
with self.lock:
f.write(data[:write_len])
downloaded += write_len
self.progress_bar.update(write_len)
with open(self.local_path, 'r+b') as f:
f.seek(start_byte)
with FTP() as ftp:
ftp.connect(self.host, self.port)
ftp.login(self.username, self.password)
ftp.encoding = 'utf-8'
ftp.sendcmd('TYPE I')
try:
ftp.retrbinary(f'RETR {self.remote_path}', callback, rest=start_byte, blocksize=65536)
except Exception as e:
logging.error(f"FTP 下载失败: {e}")
raise e
logging.info(f"线程完成 {start_byte}-{end_byte}, 实际下载: {downloaded}/{segment_length} 字节")
class HTTPDownloader(Downloader):
def __init__(self, url, num_threads=4, headers=None, download_dir="downloads"):
self.url = url
self.headers = headers or {}
self.download_dir = download_dir
# 尝试从 URL 提取文件名
self.filename = os.path.basename(url.split("?")[0])
self.local_path = os.path.join(self.download_dir, self.filename)
super().__init__(url, self.local_path, num_threads)
os.makedirs(self.download_dir, exist_ok=True)
def get_file_size(self):
response = requests.head(self.url, headers=self.headers, allow_redirects=True)
if 'Content-Length' in response.headers:
self.file_size = int(response.headers['Content-Length'])
logging.info(f"远程文件大小: {self.file_size} 字节")
else:
raise Exception("无法获取文件大小,请确保服务器支持 Content-Length 头")
def download_segment(self, start_byte, end_byte):
logging.debug(f"开始下载 HTTP 段 {start_byte} - {end_byte}")
headers = dict(self.headers)
headers['Range'] = f"bytes={start_byte}-{end_byte - 1}"
response = requests.get(self.url, headers=headers, stream=True)
with open(self.local_path, 'r+b') as f:
f.seek(start_byte)
for chunk in response.iter_content(chunk_size=8192):
if chunk:
with self.lock:
f.write(chunk)
self.progress_bar.update(len(chunk))
def setup_logger(log_dir="logs", log_level=logging.INFO, backup_days=7):
"""
配置日志系统:控制台输出 + 按天分割的日志文件
:param log_dir: 日志保存目录
:param log_level: 日志级别
:param backup_days: 保留历史日志天数
"""
# 创建日志目录(如果不存在)
os.makedirs(log_dir, exist_ok=True)
# 定义日志格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 文件日志处理器:按天滚动
file_handler = TimedRotatingFileHandler(
os.path.join(log_dir, "app.log"),
when="midnight",
backupCount=backup_days,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
file_handler.suffix = "%Y-%m-%d.log"
# 获取 root logger 并设置
logger = logging.getLogger()
logger.setLevel(log_level)
logger.addHandler(file_handler)
# if __name__ == "__main__":
# setup_logger()
# downloader = FTPDownloader(
# host="",
# port=21,
# username="",
# password="",
# remote_path="",
# num_threads=4,
# download_dir="C:/Users/Administrator/Desktop/test", # 可选,默认 downloads/
# )
# downloader.run()
if __name__ == "__main__":
downloader = HTTPDownloader(
url="",
num_threads=4,
download_dir="C:/Users/Administrator/Desktop/test", # 可选,默认 downloads/
)
downloader.run()
调试过程中,发现ftp的编码可能需要调整,其他没啥问题。
最终效果是这样的: