Python异步爬虫编程技巧:从入门到高级实战指南

发布于:2025-06-24 ⋅ 阅读:(22) ⋅ 点赞:(0)

Python异步爬虫编程技巧:从入门到高级实战指南 🚀

📚 目录

  1. 前言:为什么要学异步爬虫
  2. 异步编程基础概念
  3. 异步爬虫核心技术栈
  4. 入门实战:第一个异步爬虫
  5. 进阶技巧:并发控制与资源管理
  6. 高级实战:分布式异步爬虫架构
  7. 反爬虫对抗策略
  8. 性能优化与监控
  9. 常见问题与解决方案
  10. 最佳实践总结

前言:为什么要学异步爬虫? 🤔

在我近几年的爬虫开发经验中,见证了从最初的单线程顺序爬取,到多线程并发,再到如今的异步编程范式的演进。记得在做新闻数据采集的时候,传统的同步爬虫需要2小时才能完成10万个新闻的数据采集,而改用异步方案后,同样的任务仅需15分钟就能完成。

异步爬虫的核心优势在于:

  • 高并发能力:单进程可处理数千个并发请求
  • 资源利用率高:避免了线程切换开销
  • 响应速度快:非阻塞IO让程序更高效
  • 扩展性强:更容易构建大规模爬虫系统

异步编程基础概念 📖

什么是异步编程?

异步编程是一种编程范式,允许程序在等待某个操作完成时继续执行其他任务,而不是阻塞等待。在爬虫场景中,当我们发送HTTP请求时,不需要傻等服务器响应,而是可以继续发送其他请求。

核心概念解析

协程(Coroutine):可以暂停和恢复执行的函数,是异步编程的基本单元。

事件循环(Event Loop):负责调度和执行协程的核心机制。

awaitable对象:可以被await关键字等待的对象,包括协程、Task和Future。

异步爬虫核心技术栈 🛠️

在实际项目中,我通常会使用以下技术组合:

组件 推荐库 用途
HTTP客户端 aiohttp 异步HTTP请求
HTML解析 BeautifulSoup4 页面内容提取
并发控制 asyncio.Semaphore 限制并发数量
数据存储 aiofiles 异步文件操作
代理管理 aiohttp-proxy 代理池管理

入门实战:第一个异步爬虫 🎯

让我们从一个简单但实用的例子开始,爬取多个网页的标题:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

class AsyncSpider:
    def __init__(self, max_concurrent=10):
        """
        初始化异步爬虫
        :param max_concurrent: 最大并发数
        """
        self.max_concurrent = max_concurrent
        self.session = None
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        # 创建aiohttp会话,设置连接池和超时参数
        connector = aiohttp.TCPConnector(
            limit=100,  # 总连接池大小
            limit_per_host=20,  # 单个host的连接数限制
            ttl_dns_cache=300,  # DNS缓存时间
        )
        
        timeout = aiohttp.ClientTimeout(total=30, connect=10)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器退出"""
        if self.session:
            await self.session.close()
    
    async def fetch_page(self, url):
        """
        获取单个页面内容
        :param url: 目标URL
        :return: 页面标题和URL的元组
        """
        async with self.semaphore:  # 控制并发数
            try:
                async with self.session.get(url) as response:
                    if response.status == 200:
                        html = await response.text()
                        soup = BeautifulSoup(html, 'html.parser')
                        title = soup.find('title')
                        title_text = title.get_text().strip() if title else "无标题"
                        print(f"✅ 成功获取: {url} - {title_text}")
                        return url, title_text
                    else:
                        print(f"❌ HTTP错误 {response.status}: {url}")
                        return url, None
            except asyncio.TimeoutError:
                print(f"⏰ 超时: {url}")
                return url, None
            except Exception as e:
                print(f"🚫 异常: {url} - {str(e)}")
                return url, None
    
    async def crawl_urls(self, urls):
        """
        批量爬取URL列表
        :param urls: URL列表
        :return: 结果列表
        """
        print(f"🚀 开始爬取 {len(urls)} 个URL,最大并发数: {self.max_concurrent}")
        start_time = time.time()
        
        # 创建所有任务
        tasks = [self.fetch_page(url) for url in urls]
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        end_time = time.time()
        print(f"🎉 爬取完成,耗时: {end_time - start_time:.2f}秒")
        
        # 过滤掉异常结果
        valid_results = [r for r in results if isinstance(r, tuple) and r[1] is not None]
        print(f"📊 成功率: {len(valid_results)}/{len(urls)} ({len(valid_results)/len(urls)*100:.1f}%)")
        
        return valid_results

# 使用示例
async def main():
    # 测试URL列表
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2', 
        'https://httpbin.org/delay/1',
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/404',
        'https://httpbin.org/json',
    ]
    
    # 使用异步上下文管理器确保资源正确释放
    async with AsyncSpider(max_concurrent=5) as spider:
        results = await spider.crawl_urls(urls)
        
        # 输出结果
        print("\n📋 爬取结果:")
        for url, title in results:
            print(f"  {url} -> {title}")

if __name__ == "__main__":
    asyncio.run(main())

这个基础版本展示了异步爬虫的核心要素:

  • 使用aiohttp进行异步HTTP请求
  • 通过Semaphore控制并发数量
  • 使用异步上下文管理器管理资源
  • 异常处理和超时控制

进阶技巧:并发控制与资源管理 ⚡

在实际项目中,合理的并发控制和资源管理至关重要。以下是一个更高级的实现:

import asyncio
import aiohttp
import aiofiles
import json
import random
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
from urllib.parse import urljoin, urlparse
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class CrawlResult:
    """爬取结果数据类"""
    url: str
    status_code: int
    title: Optional[str] = None
    content_length: Optional[int] = None
    response_time: Optional[float] = None
    error: Optional[str] = None

class AdvancedAsyncSpider:
    def __init__(self, config: Dict[str, Any]):
        """
        高级异步爬虫初始化
        :param config: 配置字典,包含各种爬虫参数
        """
        self.max_concurrent = config.get('max_concurrent', 10)
        self.retry_times = config.get('retry_times', 3)
        self.retry_delay = config.get('retry_delay', 1)
        self.request_delay = config.get('request_delay', 0)
        self.timeout = config.get('timeout', 30)
        
        self.session = None
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        self.results: List[CrawlResult] = []
        self.failed_urls: List[str] = []
        
        # 用户代理池
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
        ]
    
    async def __aenter__(self):
        """创建会话"""
        connector = aiohttp.TCPConnector(
            limit=200,
            limit_per_host=50,
            ttl_dns_cache=300,
            use_dns_cache=True,
            keepalive_timeout=60,
            enable_cleanup_closed=True
        )
        
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            trust_env=True  # 支持代理环境变量
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """关闭会话"""
        if self.session:
            await self.session.close()
            # 等待连接完全关闭
            await asyncio.sleep(0.1)
    
    def get_random_headers(self) -> Dict[str, str]:
        """生成随机请求头"""
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
    
    async def fetch_with_retry(self, url: str) -> CrawlResult:
        """
        带重试机制的页面获取
        :param url: 目标URL
        :return: 爬取结果对象
        """
        async with self.semaphore:
            for attempt in range(self.retry_times + 1):
                start_time = asyncio.get_event_loop().time()
                
                try:
                    # 添加随机延迟,避免被反爬虫检测
                    if self.request_delay > 0:
                        await asyncio.sleep(random.uniform(0, self.request_delay))
                    
                    headers = self.get_random_headers()
                    
                    async with self.session.get(url, headers=headers) as response:
                        response_time = asyncio.get_event_loop().time() - start_time
                        content = await response.text()
                        
                        # 解析标题
                        title = None
                        if 'text/html' in response.headers.get('content-type', ''):
                            from bs4 import BeautifulSoup
                            soup = BeautifulSoup(content, 'html.parser')
                            title_tag = soup.find('title')
                            if title_tag:
                                title = title_tag.get_text().strip()
                        
                        result = CrawlResult(
                            url=url,
                            status_code=response.status,
                            title=title,
                            content_length=len(content),
                            response_time=response_time
                        )
                        
                        logger.info(f"✅ 成功: {url} (状态码: {response.status}, 耗时: {response_time:.2f}s)")
                        return result
                        
                except asyncio.TimeoutError:
                    error_msg = f"超时 (尝试 {attempt + 1}/{self.retry_times + 1})"
                    logger.warning(f"⏰ {url} - {error_msg}")
                    
                except aiohttp.ClientError as e:
                    error_msg = f"客户端错误: {str(e)} (尝试 {attempt + 1}/{self.retry_times + 1})"
                    logger.warning(f"🚫 {url} - {error_msg}")
                    
                except Exception as e:
                    error_msg = f"未知错误: {str(e)} (尝试 {attempt + 1}/{self.retry_times + 1})"
                    logger.error(f"💥 {url} - {error_msg}")
                
                # 如果不是最后一次尝试,等待后重试
                if attempt < self.retry_times:
                    await asyncio.sleep(self.retry_delay * (2 ** attempt))  # 指数退避
            
            # 所有重试都失败了
            result = CrawlResult(
                url=url,
                status_code=0,
                error=f"重试 {self.retry_times} 次后仍然失败"
            )
            self.failed_urls.append(url)
            logger.error(f"❌ 最终失败: {url}")
            return result
    
    async def crawl_batch(self, urls: List[str], callback=None) -> List[CrawlResult]:
        """
        批量爬取URL
        :param urls: URL列表
        :param callback: 可选的回调函数,处理每个结果
        :return: 爬取结果列表
        """
        logger.info(f"🚀 开始批量爬取 {len(urls)} 个URL")
        start_time = asyncio.get_event_loop().time()
        
        # 创建所有任务
        tasks = [self.fetch_with_retry(url) for url in urls]
        
        # 使用as_completed获取完成的任务,可以实时处理结果
        results = []
        completed = 0
        
        for coro in asyncio.as_completed(tasks):
            result = await coro
            results.append(result)
            completed += 1
            
            # 调用回调函数
            if callback:
                await callback(result, completed, len(urls))
            
            # 显示进度
            if completed % 10 == 0 or completed == len(urls):
                logger.info(f"📊 进度: {completed}/{len(urls)} ({completed/len(urls)*100:.1f}%)")
        
        total_time = asyncio.get_event_loop().time() - start_time
        success_count = len([r for r in results if r.status_code > 0])
        
        logger.info(f"🎉 批量爬取完成")
        logger.info(f"📈 总耗时: {total_time:.2f}秒")
        logger.info(f"📊 成功率: {success_count}/{len(urls)} ({success_count/len(urls)*100:.1f}%)")
        
        self.results.extend(results)
        return results
    
    async def save_results(self, filename: str):
        """异步保存结果到JSON文件"""
        data = {
            'total_urls': len(self.results),
            'successful': len([r for r in self.results if r.status_code > 0]),
            'failed': len(self.failed_urls),
            'results': [asdict(result) for result in self.results],
            'failed_urls': self.failed_urls
        }
        
        async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
            await f.write(json.dumps(data, ensure_ascii=False, indent=2))
        
        logger.info(f"💾 结果已保存到: {filename}")

# 使用示例
async def progress_callback(result: CrawlResult, completed: int, total: int):
    """进度回调函数"""
    if result.status_code > 0:
        print(f"✅ [{completed}/{total}] {result.url} - {result.title}")
    else:
        print(f"❌ [{completed}/{total}] {result.url} - {result.error}")

async def advanced_demo():
    """高级爬虫演示"""
    config = {
        'max_concurrent': 20,  # 并发数
        'retry_times': 2,      # 重试次数
        'retry_delay': 1,      # 重试延迟
        'request_delay': 0.5,  # 请求间隔
        'timeout': 15,         # 超时时间
    }
    
    # 测试URL列表
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/404',
        'https://httpbin.org/json',
        'https://httpbin.org/xml',
        'https://httpbin.org/html',
        'https://httpbin.org/robots.txt',
    ] * 3  # 重复3次,模拟更多URL
    
    async with AdvancedAsyncSpider(config) as spider:
        # 批量爬取
        results = await spider.crawl_batch(urls, callback=progress_callback)
        
        # 保存结果
        await spider.save_results('crawl_results.json')
        
        # 统计信息
        successful = [r for r in results if r.status_code > 0]
        avg_response_time = sum(r.response_time or 0 for r in successful) / len(successful)
        
        print(f"\n📊 统计信息:")
        print(f"  成功: {len(successful)}")
        print(f"  失败: {len(spider.failed_urls)}")
        print(f"  平均响应时间: {avg_response_time:.2f}秒")

if __name__ == "__main__":
    asyncio.run(advanced_demo())

这个进阶版本包含了:

  • 完整的重试机制与指数退避
  • 随机请求头和延迟,避免反爬虫检测
  • 实时进度回调和详细日志
  • 结构化的结果存储
  • 更好的资源管理和异常处理

高级实战:分布式异步爬虫架构 🏗️

对于大规模爬虫项目,我们需要考虑分布式架构。以下是一个基于Redis的分布式爬虫实现:

import asyncio
import aiohttp
import aioredis
import json
import hashlib
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, urlparse
import logging

logger = logging.getLogger(__name__)

@dataclass
class Task:
    """爬取任务数据结构"""
    url: str
    priority: int = 0
    retry_count: int = 0
    max_retries: int = 3
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

class DistributedSpider:
    """分布式异步爬虫"""
    
    def __init__(self, worker_id: str, redis_config: Dict[str, Any], spider_config: Dict[str, Any]):
        self.worker_id = worker_id
        self.redis_config = redis_config
        self.spider_config = spider_config
        
        # Redis连接
        self.redis = None
        
        # 队列名称
        self.task_queue = "spider:tasks"
        self.result_queue = "spider:results"
        self.failed_queue = "spider:failed"
        self.duplicate_set = "spider:duplicates"
        
        # HTTP会话
        self.session = None
        self.semaphore = asyncio.Semaphore(spider_config.get('max_concurrent', 10))
        
        # 统计信息
        self.stats = {
            'processed': 0,
            'success': 0,
            'failed': 0,
            'start_time': time.time()
        }
    
    async def __aenter__(self):
        """初始化连接"""
        # 连接Redis
        self.redis = aioredis.from_url(
            f"redis://{self.redis_config['host']}:{self.redis_config['port']}",
            password=self.redis_config.get('password'),
            db=self.redis_config.get('db', 0),
            encoding='utf-8',
            decode_responses=True
        )
        
        # 创建HTTP会话
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=20)
        timeout = aiohttp.ClientTimeout(total=30)
        self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
        
        logger.info(f"🚀 Worker {self.worker_id} 已启动")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """清理资源"""
        if self.session:
            await self.session.close()
        if self.redis:
            await self.redis.close()
        logger.info(f"🛑 Worker {self.worker_id} 已停止")
    
    def generate_task_id(self, url: str) -> str:
        """生成任务唯一ID"""
        return hashlib.md5(url.encode()).hexdigest()
    
    async def add_task(self, task: Task) -> bool:
        """添加任务到队列"""
        task_id = self.generate_task_id(task.url)
        
        # 检查是否重复
        is_duplicate = await self.redis.sismember(self.duplicate_set, task_id)
        if is_duplicate:
            logger.debug(f"⚠️ 重复任务: {task.url}")
            return False
        
        # 添加到去重集合
        await self.redis.sadd(self.duplicate_set, task_id)
        
        # 添加到任务队列(使用优先级队列)
        task_data = json.dumps(asdict(task))
        await self.redis.zadd(self.task_queue, {task_data: task.priority})
        
        logger.info(f"➕ 任务已添加: {task.url} (优先级: {task.priority})")
        return True
    
    async def get_task(self) -> Optional[Task]:
        """从队列获取任务"""
        # 使用BZPOPMAX获取最高优先级任务(阻塞式)
        result = await self.redis.bzpopmax(self.task_queue, timeout=5)
        if not result:
            return None
        
        task_data = json.loads(result[1])
        return Task(**task_data)
    
    async def process_task(self, task: Task) -> Dict[str, Any]:
        """处理单个任务"""
        async with self.semaphore:
            start_time = time.time()
            
            try:
                # 发送HTTP请求
                headers = {
                    'User-Agent': 'DistributedSpider/1.0',
                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
                }
                
                async with self.session.get(task.url, headers=headers) as response:
                    content = await response.text()
                    response_time = time.time() - start_time
                    
                    result = {
                        'task_id': self.generate_task_id(task.url),
                        'url': task.url,
                        'status_code': response.status,
                        'content_length': len(content),
                        'response_time': response_time,
                        'worker_id': self.worker_id,
                        'timestamp': time.time(),
                        'content': content[:1000],  # 只保存前1000字符
                        'metadata': task.metadata
                    }
                    
                    # 这里可以添加内容解析逻辑
                    if response.status == 200:
                        result['success'] = True
                        # 解析页面,提取新的URL(示例)
                        new_urls = await self.extract_urls(content, task.url)
                        result['extracted_urls'] = new_urls
                    else:
                        result['success'] = False
                        result['error'] = f"HTTP {response.status}"
                    
                    return result
                    
            except Exception as e:
                response_time = time.time() - start_time
                return {
                    'task_id': self.generate_task_id(task.url),
                    'url': task.url,
                    'status_code': 0,
                    'response_time': response_time,
                    'worker_id': self.worker_id,
                    'timestamp': time.time(),
                    'success': False,
                    'error': str(e),
                    'metadata': task.metadata
                }
    
    async def extract_urls(self, content: str, base_url: str) -> List[str]:
        """从页面内容中提取URL(示例实现)"""
        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(content, 'html.parser')
            
            urls = []
            for link in soup.find_all('a', href=True):
                url = urljoin(base_url, link['href'])
                # 简单过滤
                if url.startswith('http') and len(urls) < 10:
                    urls.append(url)
            
            return urls
        except Exception:
            return []
    
    async def save_result(self, result: Dict[str, Any]):
        """保存处理结果"""
        if result['success']:
            # 保存成功结果
            await self.redis.lpush(self.result_queue, json.dumps(result))
            self.stats['success'] += 1
            
            # 如果有提取的URL,添加为新任务
            if 'extracted_urls' in result:
                for url in result['extracted_urls']:
                    new_task = Task(url=url, priority=0, metadata={'parent_url': result['url']})
                    await self.add_task(new_task)
        else:
            # 处理失败的任务
            task_id = result['task_id']
            
            # 重试逻辑
            original_task = Task(
                url=result['url'],
                retry_count=result.get('retry_count', 0) + 1,
                metadata=result.get('metadata', {})
            )
            
            if original_task.retry_count <= original_task.max_retries:
                # 重新加入队列,降低优先级
                original_task.priority = -original_task.retry_count
                await self.add_task(original_task)
                logger.info(f"🔄 重试任务: {original_task.url} (第{original_task.retry_count}次)")
            else:
                # 超过重试次数,保存到失败队列
                await self.redis.lpush(self.failed_queue, json.dumps(result))
                logger.error(f"💀 任务最终失败: {original_task.url}")
            
            self.stats['failed'] += 1
    
    async def run_worker(self, max_tasks: Optional[int] = None):
        """运行工作进程"""
        logger.info(f"🏃 Worker {self.worker_id} 开始工作")
        
        processed = 0
        while True:
            if max_tasks and processed >= max_tasks:
                logger.info(f"🎯 达到最大任务数量: {max_tasks}")
                break
            
            # 获取任务
            task = await self.get_task()
            if not task:
                logger.debug("⏳ 暂无任务,等待中...")
                continue
            
            # 处理任务
            logger.info(f"🔧 处理任务: {task.url}")
            result = await self.process_task(task)
            
            # 保存结果
            await self.save_result(result)
            
            # 更新统计
            processed += 1
            self.stats['processed'] = processed
            
            # 定期输出统计信息
            if processed % 10 == 0:
                await self.print_stats()
    
    async def print_stats(self):
        """打印统计信息"""
        elapsed = time.time() - self.stats['start_time']
        rate = self.stats['processed'] / elapsed if elapsed > 0 else 0
        
        logger.info(f"📊 Worker {self.worker_id} 统计:")
        logger.info(f"  已处理: {self.stats['processed']}")
        logger.info(f"  成功: {self.stats['success']}")
        logger.info(f"  失败: {self.stats['failed']}")
        logger.info(f"  速率: {rate:.2f} tasks/sec")
        logger.info(f"  运行时间: {elapsed:.2f}秒")

# 使用示例
async def distributed_demo():
    """分布式爬虫演示"""
    redis_config = {
        'host': 'localhost',
        'port': 6379,
        'password': None,
        'db': 0
    }
    
    spider_config = {
        'max_concurrent': 5,
        'request_delay': 1,
    }
    
    worker_id = f"worker-{int(time.time())}"
    
    async with DistributedSpider(worker_id, redis_config, spider_config) as spider:
        # 添加一些初始任务
        initial_urls = [
            'https://httpbin.org/delay/1',
            'https://httpbin.org/delay/2',
            'https://httpbin.org/json',
            'https://httpbin.org/html',
        ]
        
        for url in initial_urls:
            task = Task(url=url, priority=10)  # 高优先级
            await spider.add_task(task)
        
        # 运行工作进程
        await spider.run_worker(max_tasks=20)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    asyncio.run(distributed_demo())

这个分布式版本实现了:

  • 基于Redis的任务队列和结果存储
  • 优先级任务调度
  • 自动去重机制
  • 失败重试和降级处理
  • 多工作进程协作
  • 实时统计和监控

反爬虫对抗策略 🛡️

在实际爬虫开发中,反爬虫对抗是必须面对的挑战。以下是一些常用的策略:

import asyncio
import aiohttp
import random
import time
from typing import List, Dict, Optional
from urllib.parse import urlparse
import json

class AntiAntiSpider:
    """反反爬虫工具类"""
    
    def __init__(self):
        # 用户代理池
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
        ]
        
        # 代理池
        self.proxy_pool = [
            # 'http://proxy1:port',
            # 'http://proxy2:port',
            # 可以从代理服务商API动态获取
        ]
        
        # 请求频率控制
        self.domain_delays = {}  # 每个域名的延迟配置
        self.last_request_times = {}  # 每个域名的最后请求时间
    
    def get_random_user_agent(self) -> str:
        """获取随机User-Agent"""
        return random.choice(self.user_agents)
    
    def get_proxy(self) -> Optional[str]:
        """获取代理(如果有的话)"""
        if self.proxy_pool:
            return random.choice(self.proxy_pool)
        return None
    
    async def respect_robots_txt(self, session: aiohttp.ClientSession, url: str) -> bool:
        """检查robots.txt(可选实现)"""
        try:
            parsed_url = urlparse(url)
            robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
            
            async with session.get(robots_url) as response:
                if response.status == 200:
                    robots_content = await response.text()
                    # 这里可以实现robots.txt解析逻辑
                    # 简化版本:检查是否包含Disallow
                    return 'Disallow: /' not in robots_content
        except:
            pass
        return True  # 默认允许
    
    async def rate_limit(self, url: str):
        """频率限制"""
        domain = urlparse(url).netloc
        
        # 获取该域名的延迟配置(默认1-3秒)
        if domain not in self.domain_delays:
            self.domain_delays[domain] = random.uniform(1, 3)
        
        # 检查上次请求时间
        if domain in self.last_request_times:
            elapsed = time.time() - self.last_request_times[domain]
            required_delay = self.domain_delays[domain]
            
            if elapsed < required_delay:
                sleep_time = required_delay - elapsed
                await asyncio.sleep(sleep_time)
        
        # 更新最后请求时间
        self.last_request_times[domain] = time.time()
    
    def get_headers(self, url: str, referer: Optional[str] = None) -> Dict[str, str]:
        """生成请求头"""
        headers = {
            'User-Agent': self.get_random_user_agent(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Cache-Control': 'max-age=0',
        }
        
        # 添加Referer(如果提供)
        if referer:
            headers['Referer'] = referer
        
        # 根据域名添加特定头部
        domain = urlparse(url).netloc
        if 'github' in domain:
            headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
        
        return headers

class StealthSpider:
    """隐蔽爬虫实现"""
    
    def __init__(self, max_concurrent: int = 3):
        self.max_concurrent = max_concurrent
        self.session = None
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.anti_anti = AntiAntiSpider()
        
        # 会话状态管理
        self.cookies = {}
        self.session_data = {}
    
    async def __aenter__(self):
        # 创建更真实的连接器配置
        connector = aiohttp.TCPConnector(
            limit=50,
            limit_per_host=10,
            ttl_dns_cache=300,
            use_dns_cache=True,
            keepalive_timeout=30,
            enable_cleanup_closed=True,
            # 模拟真实浏览器的连接行为
            family=0,  # 支持IPv4和IPv6
        )
        
        # 设置合理的超时
        timeout = aiohttp.ClientTimeout(
            total=30,
            connect=10,
            sock_read=20
        )
        
        # 创建会话,支持自动重定向和cookie
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            cookie_jar=aiohttp.CookieJar(),
            # 支持自动解压
            auto_decompress=True,
            # 信任环境变量中的代理设置
            trust_env=True
        )
        
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
            await asyncio.sleep(0.1)  # 确保连接完全关闭
    
    async def fetch_with_stealth(self, url: str, referer: Optional[str] = None) -> Dict:
        """隐蔽模式获取页面"""
        async with self.semaphore:
            # 频率限制
            await self.anti_anti.rate_limit(url)
            
            # 生成请求头
            headers = self.anti_anti.get_headers(url, referer)
            
            # 获取代理
            proxy = self.anti_anti.get_proxy()
            
            try:
                # 发送请求
                async with self.session.get(
                    url, 
                    headers=headers, 
                    proxy=proxy,
                    allow_redirects=True,
                    max_redirects=5
                ) as response:
                    
                    content = await response.text()
                    
                    return {
                        'url': str(response.url),
                        'status': response.status,
                        'headers': dict(response.headers),
                        'content': content,
                        'cookies': {cookie.key: cookie.value for cookie in response.cookies.values()},
                        'final_url': str(response.url),
                        'redirects': len(response.history),
                        'success': True
                    }
                    
            except Exception as e:
                return {
                    'url': url,
                    'status': 0,
                    'error': str(e),
                    'success': False
                }
    
    async def crawl_with_session_management(self, urls: List[str]) -> List[Dict]:
        """带会话管理的爬取"""
        results = []
        
        for i, url in enumerate(urls):
            # 使用前一个URL作为referer(模拟用户行为)
            referer = urls[i-1] if i > 0 else None
            
            result = await self.fetch_with_stealth(url, referer)
            results.append(result)
            
            # 模拟用户阅读时间
            if result['success']:
                reading_time = random.uniform(2, 8)
                print(f"📖 模拟阅读时间: {reading_time:.1f}秒")
                await asyncio.sleep(reading_time)
        
        return results

# 验证码处理示例(需要OCR服务)
class CaptchaHandler:
    """验证码处理器"""
    
    async def solve_captcha(self, captcha_image_url: str, session: aiohttp.ClientSession) -> Optional[str]:
        """
        解决验证码(示例实现)
        实际项目中可能需要:
        1. 第三方OCR服务
        2. 机器学习模型
        3. 人工打码平台
        """
        try:
            # 下载验证码图片
            async with session.get(captcha_image_url) as response:
                if response.status == 200:
                    image_data = await response.read()
                    
                    # 这里应该调用OCR服务
                    # 示例:使用第三方服务
                    # result = await self.call_ocr_service(image_data)
                    # return result
                    
                    # 临时返回None,表示无法处理
                    return None
        except Exception as e:
            print(f"验证码处理失败: {e}")
            return None
    
    async def handle_captcha_page(self, session: aiohttp.ClientSession, response_text: str, current_url: str):
        """处理包含验证码的页面"""
        # 检测是否包含验证码
        if 'captcha' in response_text.lower() or '验证码' in response_text:
            print("🤖 检测到验证码页面")
            
            # 提取验证码图片URL(需要根据具体网站实现)
            # 这里只是示例
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(response_text, 'html.parser')
            captcha_img = soup.find('img', {'id': 'captcha'})
            
            if captcha_img:
                captcha_url = captcha_img.get('src')
                if captcha_url:
                    # 解决验证码
                    captcha_result = await self.solve_captcha(captcha_url, session)
                    if captcha_result:
                        print(f"✅ 验证码识别结果: {captcha_result}")
                        return captcha_result
            
            print("❌ 验证码处理失败")
        
        return None

# 使用示例
async def stealth_demo():
    """隐蔽爬虫演示"""
    urls = [
        'https://httpbin.org/user-agent',
        'https://httpbin.org/headers',
        'https://httpbin.org/cookies',
        'https://httpbin.org/redirect/2',
    ]
    
    async with StealthSpider(max_concurrent=2) as spider:
        results = await spider.crawl_with_session_management(urls)
        
        for result in results:
            if result['success']:
                print(f"✅ {result['url']} (状态: {result['status']})")
                if result['redirects'] > 0:
                    print(f"   重定向次数: {result['redirects']}")
            else:
                print(f"❌ {result['url']} - {result['error']}")

if __name__ == "__main__":
    asyncio.run(stealth_demo())

性能优化与监控 📊

性能优化是异步爬虫的关键环节,以下是一个完整的监控和优化方案:

import asyncio
import aiohttp
import time
import psutil
import gc
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging
from collections import deque, defaultdict
import json

@dataclass
class PerformanceMetrics:
    """性能指标数据类"""
    timestamp: float
    requests_per_second: float
    average_response_time: float
    memory_usage_mb: float
    cpu_usage_percent: float
    active_connections: int
    queue_size: int
    success_rate: float

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self, window_size: int = 60):
        self.window_size = window_size  # 监控窗口大小(秒)
        self.metrics_history = deque(maxlen=window_size)
        self.request_times = deque()
        self.response_times = deque()
        self.success_count = 0
        self.total_count = 0
        self.start_time = time.time()
        
        # 连接池监控
        self.active_connections = 0
        self.queue_size = 0
    
    def record_request(self, response_time: float, success: bool):
        """记录请求指标"""
        current_time = time.time()
        self.request_times.append(current_time)
        self.response_times.append(response_time)
        
        if success:
            self.success_count += 1
        self.total_count += 1
        
        # 清理过期数据
        cutoff_time = current_time - self.window_size
        while self.request_times and self.request_times[0] < cutoff_time:
            self.request_times.popleft()
            self.response_times.popleft()
    
    def get_current_metrics(self) -> PerformanceMetrics:
        """获取当前性能指标"""
        current_time = time.time()
        
        # 计算RPS
        rps = len(self.request_times) / self.window_size if self.request_times else 0
        
        # 计算平均响应时间
        avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0
        
        # 系统资源使用
        memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        cpu_usage = psutil.Process().cpu_percent()
        
        # 成功率
        success_rate = self.success_count / self.total_count if self.total_count > 0 else 0
        
        metrics = PerformanceMetrics(
            timestamp=current_time,
            requests_per_second=rps,
            average_response_time=avg_response_time,
            memory_usage_mb=memory_usage,
            cpu_usage_percent=cpu_usage,
            active_connections=self.active_connections,
            queue_size=self.queue_size,
            success_rate=success_rate
        )
        
        self.metrics_history.append(metrics)
        return metrics
    
    def print_stats(self):
        """打印统计信息"""
        metrics = self.get_current_metrics()
        uptime = time.time() - self.start_time
        
        print(f"\n📊 性能监控报告 (运行时间: {uptime:.1f}s)")
        print(f"  RPS: {metrics.requests_per_second:.2f}")
        print(f"  平均响应时间: {metrics.average_response_time:.2f}s")
        print(f"  内存使用: {metrics.memory_usage_mb:.1f}MB")
        print(f"  CPU使用: {metrics.cpu_usage_percent:.1f}%")
        print(f"  活跃连接: {metrics.active_connections}")
        print(f"  队列大小: {metrics.queue_size}")
        print(f"  成功率: {metrics.success_rate:.1%}")
        print(f"  总请求数: {self.total_count}")

class OptimizedAsyncSpider:
    """优化版异步爬虫"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.session = None
        self.semaphore = None
        self.monitor = PerformanceMonitor()
        
        # 连接池优化配置
        self.connector_config = {
            'limit': config.get('max_connections', 100),
            'limit_per_host': config.get('max_connections_per_host', 30),
            'ttl_dns_cache': config.get('dns_cache_ttl', 300),
            'use_dns_cache': True,
            'keepalive_timeout': config.get('keepalive_timeout', 60),
            'enable_cleanup_closed': True,
            # 优化TCP socket选项
            'socket_options': [
                (1, 6, 1),  # TCP_NODELAY
            ] if hasattr(1, '__index__') else None
        }
        
        # 自适应并发控制
        self.adaptive_concurrency = config.get('adaptive_concurrency', True)
        self.min_concurrent = config.get('min_concurrent', 5)
        self.max_concurrent = config.get('max_concurrent', 50)
        self.current_concurrent = self.min_concurrent
        
        # 请求池
        self.request_pool = asyncio.Queue(maxsize=config.get('request_queue_size', 1000))
        
        # 响应时间统计(用于自适应调整)
        self.response_time_window = deque(maxlen=100)
    
    async def __aenter__(self):
        # 创建优化的连接器
        connector = aiohttp.TCPConnector(**self.connector_config)
        
        # 设置超时
        timeout = aiohttp.ClientTimeout(
            total=self.config.get('total_timeout', 30),
            connect=self.config.get('connect_timeout', 10),
            sock_read=self.config.get('read_timeout', 20)
        )
        
        # 创建会话
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            # 启用压缩
            auto_decompress=True,
            # 设置最大响应大小
            read_bufsize=self.config.get('read_bufsize', 64 * 1024),
        )
        
        # 初始化信号量
        self.semaphore = asyncio.Semaphore(self.current_concurrent)
        
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
            # 强制垃圾回收
            gc.collect()
    
    async def adjust_concurrency(self):
        """自适应并发调整"""
        if not self.adaptive_concurrency or len(self.response_time_window) < 10:
            return
        
        # 计算最近的平均响应时间
        recent_avg = sum(list(self.response_time_window)[-10:]) / 10
        overall_avg = sum(self.response_time_window) / len(self.response_time_window)
        
        # 如果最近响应时间明显增加,降低并发
        if recent_avg > overall_avg * 1.5 and self.current_concurrent > self.min_concurrent:
            self.current_concurrent = max(self.min_concurrent, self.current_concurrent - 2)
            print(f"📉 降低并发数至: {self.current_concurrent}")
        
        # 如果响应时间稳定且较快,增加并发
        elif recent_avg < overall_avg * 0.8 and self.current_concurrent < self.max_concurrent:
            self.current_concurrent = min(self.max_concurrent, self.current_concurrent + 1)
            print(f"📈 提高并发数至: {self.current_concurrent}")
        
        # 更新信号量
        self.semaphore = asyncio.Semaphore(self.current_concurrent)
    
    async def fetch_optimized(self, url: str) -> Dict:
        """优化的请求方法"""
        async with self.semaphore:
            start_time = time.time()
            
            try:
                # 更新连接数
                self.monitor.active_connections += 1
                
                async with self.session.get(url) as response:
                    content = await response.text()
                    response_time = time.time() - start_time
                    
                    # 记录响应时间
                    self.response_time_window.append(response_time)
                    self.monitor.record_request(response_time, response.status == 200)
                    
                    return {
                        'url': url,
                        'status': response.status,
                        'content': content,
                        'response_time': response_time,
                        'content_length': len(content),
                        'success': response.status == 200
                    }
                    
            except Exception as e:
                response_time = time.time() - start_time
                self.monitor.record_request(response_time, False)
                
                return {
                    'url': url,
                    'status': 0,
                    'error': str(e),
                    'response_time': response_time,
                    'success': False
                }
            finally:
                self.monitor.active_connections -= 1
    
    async def batch_crawl_optimized(self, urls: List[str]) -> List[Dict]:
        """优化的批量爬取"""
        print(f"🚀 开始优化爬取 {len(urls)} 个URL")
        
        # 分批处理,避免内存过载
        batch_size = self.config.get('batch_size', 100)
        all_results = []
        
        for i in range(0, len(urls), batch_size):
            batch_urls = urls[i:i + batch_size]
            print(f"📦 处理批次 {i//batch_size + 1}/{(len(urls)-1)//batch_size + 1}")
            
            # 创建任务
            tasks = [self.fetch_optimized(url) for url in batch_urls]
            
            # 执行任务
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 过滤异常
            valid_results = [r for r in batch_results if isinstance(r, dict)]
            all_results.extend(valid_results)
            
            # 自适应调整并发
            await self.adjust_concurrency()
            
            # 打印监控信息
            if i % (batch_size * 2) == 0:  # 每2个批次打印一次
                self.monitor.print_stats()
            
            # 批次间短暂休息,避免过载
            if i + batch_size < len(urls):
                await asyncio.sleep(0.1)
        
        return all_results
    
    async def memory_cleanup(self):
        """内存清理"""
        # 手动触发垃圾回收
        gc.collect()
        
        # 清理过期的监控数据
        current_time = time.time()
        cutoff_time = current_time - 300  # 保留5分钟的数据
        
        while (self.monitor.metrics_history and 
               self.monitor.metrics_history[0].timestamp < cutoff_time):
            self.monitor.metrics_history.popleft()

# 使用示例
async def performance_demo():
    """性能优化演示"""
    config = {
        'max_connections': 50,
        'max_connections_per_host': 15,
        'max_concurrent': 20,
        'min_concurrent': 5,
        'adaptive_concurrency': True,
        'batch_size': 50,
        'total_timeout': 15,
        'connect_timeout': 5,
    }
    
    # 生成大量测试URL
    test_urls = []
    for i in range(200):
        delay = i % 5 + 1  # 1-5秒延迟
        test_urls.append(f'https://httpbin.org/delay/{delay}')
    
    async with OptimizedAsyncSpider(config) as spider:
        # 启动监控任务
        monitor_task = asyncio.create_task(periodic_monitoring(spider.monitor))
        
        try:
            # 执行爬取
            results = await spider.batch_crawl_optimized(test_urls)
            
            # 最终统计
            print(f"\n🎉 爬取完成!")
            print(f"📊 总URL数: {len(test_urls)}")
            print(f"📊 成功数: {len([r for r in results if r.get('success')])}")
            print(f"📊 失败数: {len([r for r in results if not r.get('success')])}")
            
            spider.monitor.print_stats()
            
        finally:
            monitor_task.cancel()

async def periodic_monitoring(monitor: PerformanceMonitor):
    """定期监控任务"""
    while True:
        await asyncio.sleep(10)  # 每10秒监控一次
        try:
            metrics = monitor.get_current_metrics()
            
            # 检查异常情况
            if metrics.memory_usage_mb > 500:  # 内存超过500MB
                print("⚠️ 内存使用过高!")
            
            if metrics.requests_per_second < 1 and monitor.total_count > 0:
                print("⚠️ RPS过低,可能存在瓶颈!")
            
            if metrics.success_rate < 0.8 and monitor.total_count > 10:
                print("⚠️ 成功率过低!")
                
        except Exception as e:
            print(f"监控异常: {e}")

if __name__ == "__main__":
    asyncio.run(performance_demo())

常见问题与解决方案 ❓

1. 内存泄露问题

问题:长时间运行后内存持续增长
解决方案

# 正确的资源管理
async with aiohttp.ClientSession() as session:
    # 使用完自动关闭
    pass

# 定期垃圾回收
import gc
gc.collect()

# 限制并发数量
semaphore = asyncio.Semaphore(10)

2. 连接池耗尽

问题:大量并发请求导致连接池耗尽
解决方案

connector = aiohttp.TCPConnector(
    limit=100,  # 增加连接池大小
    limit_per_host=20,  # 单host连接数限制
    ttl_dns_cache=300,  # DNS缓存
    enable_cleanup_closed=True  # 自动清理关闭的连接
)

3. 超时处理不当

问题:请求超时导致任务堆积
解决方案

timeout = aiohttp.ClientTimeout(
    total=30,      # 总超时
    connect=10,    # 连接超时
    sock_read=20   # 读取超时
)

4. 异常传播

问题:单个任务异常影响整体爬取
解决方案

# 使用return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)

# 过滤异常结果
valid_results = [r for r in results if not isinstance(r, Exception)]

最佳实践总结 🏆

经过近几年的异步爬虫开发经验,我总结出以下最佳实践:

1. 架构设计原则

  • 单一职责:每个组件只负责特定功能
  • 可扩展性:支持水平扩展和配置调整
  • 容错性:优雅处理各种异常情况
  • 监控性:完整的性能监控和日志记录

2. 性能优化要点

  • 合理的并发数:根据目标网站和网络环境调整
  • 连接复用:使用连接池减少握手开销
  • 内存管理:及时清理资源,避免内存泄露
  • 批处理:分批处理大量URL,避免过载

3. 反爬虫对抗策略

  • 请求头伪装:模拟真实浏览器行为
  • 频率控制:合理的请求间隔
  • IP轮换:使用代理池分散请求
  • 会话管理:维护登录状态和cookie

4. 错误处理机制

  • 重试策略:指数退避重试
  • 降级处理:失败任务的备用方案
  • 监控告警:及时发现和处理问题
  • 日志记录:详细的操作日志

5. 数据质量保证

  • 去重机制:避免重复抓取
  • 数据验证:确保抓取数据的完整性
  • 增量更新:只抓取变化的数据
  • 备份恢复:重要数据的备份策略

通过遵循这些最佳实践,你可以构建出高效、稳定、可维护的异步爬虫系统。记住,爬虫开发不仅仅是技术问题,还需要考虑法律合规、网站负载、数据质量等多个方面。

异步爬虫是一门实践性很强的技术,建议在实际项目中不断优化和完善。随着Python异步生态的不断发展,相信会有更多优秀的工具和库出现,让我们的爬虫开发更加高效和便捷。

希望这份指南能够帮助你在异步爬虫的道路上走得更远!🚀


网站公告

今日签到

点亮在社区的每一天
去签到