【网站内容安全检测】之2:从网站所有URL页面中提取所有外部及内部域名信息

发布于:2025-06-27 ⋅ 阅读:(19) ⋅ 点赞:(0)

还没写成Go的,用Python吧,稍微慢一点

依赖内容(安装命令pip install -r requirements.txt)

requirements.txt

aiohttp
beautifulsoup4==4.12.2
tqdm==4.66.1
redis==5.2.1
motor==3.3.1
pymongo==4.6.0
chardet

提取域名的程序
domain_extractor.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import re
import sqlite3
import threading
import asyncio
import aiohttp
import chardet
import ssl
import os
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse, urlunparse, urljoin
from typing import Set, List, Dict, Tuple
import time
from tqdm import tqdm
from bs4 import BeautifulSoup

class DomainExtractor:
    def __init__(self, db_path: str = 'domains.db', max_connections: int = 100):
        self.db_path = db_path
        self.domain_pattern = re.compile(r'[a-zA-Z0-9][a-zA-Z0-9-]{1,61}[a-zA-Z0-9](?:\.[a-zA-Z]{2,})+')
        self.max_connections = max_connections
        self.semaphore = asyncio.Semaphore(max_connections)
        self.session = None
        self.tasks = []
        # 定义需要过滤的文件扩展名
        self.file_extensions = {
            # 网页文件
            '.html', '.htm', '.shtml', '.jhtml', '.asp', '.aspx', '.php', '.jsp', '.cgi',
            # 文档文件
            '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf',
            # 图片文件
            '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg',
            # 压缩文件
            '.zip', '.rar', '.7z', '.tar', '.gz',
            # 音频文件
            '.mp3', '.wav', '.ogg', '.m4a',
            # 视频文件
            '.mp4', '.avi', '.mov', '.wmv', '.flv',
            # 其他常见文件
            '.exe', '.dll', '.so', '.dylib', '.class', '.jar', '.war',
            '.css', '.js', '.json', '.xml', '.rss', '.atom'
        }
        self.init_database()
        
    def init_database(self):
        """初始化SQLite数据库"""
        # 如果数据库文件存在,删除它
        if os.path.exists(self.db_path):
            os.remove(self.db_path)
            
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS domains (
                domain TEXT PRIMARY KEY,
                first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                count INTEGER DEFAULT 1,
                source_url TEXT
            )
        ''')
        conn.commit()
        conn.close()
        
    async def init_session(self):
        """初始化HTTP会话"""
        if not self.session:
            # 创建SSL上下文
            ssl_context = ssl.create_default_context()
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE
            
            # 创建连接器
            connector = aiohttp.TCPConnector(
                ssl=ssl_context,
                force_close=True,
                enable_cleanup_closed=True,
                limit=self.max_connections
            )
            
            self.session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=30),
                headers={
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
                    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
                    'Accept-Encoding': 'gzip, deflate, br',
                    'Connection': 'keep-alive',
                    'Upgrade-Insecure-Requests': '1',
                    'Cache-Control': 'max-age=0'
                },
                connector=connector
            )
        
    async def close_session(self):
        """关闭HTTP会话"""
        if self.session and not self.session.closed:
            await self.session.close()
            self.session = None
            
    def is_valid_domain(self, domain: str) -> bool:
        """验证域名是否有效"""
        # 移除所有干扰字符
        domain = re.sub(r'[()()【】\[\]《》<>,。、;:""''!?…—]', '', domain)
        domain = re.sub(r'[^\x00-\x7F]+', '', domain)  # 移除所有非ASCII字符
        
        # 检查是否是文件扩展名
        if any(domain.lower().endswith(ext) for ext in self.file_extensions):
            return False
            
        # 检查域名格式
        if not self.domain_pattern.match(domain):
            return False
            
        # 检查是否是IP地址
        if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', domain):
            return False
            
        # 检查是否是本地域名
        if domain in ('localhost', '127.0.0.1', '0.0.0.0'):
            return False
            
        return True
        
    def extract_domain(self, url: str) -> str:
        """从URL中提取域名"""
        try:
            parsed = urlparse(url)
            domain = parsed.netloc.lower()
            # 移除端口号
            if ':' in domain:
                domain = domain.split(':')[0]
            # 移除所有干扰字符
            domain = re.sub(r'[()()【】\[\]《》<>,。、;:""''!?…—]', '', domain)
            domain = re.sub(r'[^\x00-\x7F]+', '', domain)  # 移除所有非ASCII字符
            return domain
        except:
            return ''
            
    def is_html_content(self, content_type: str) -> bool:
        """检查内容类型是否为HTML"""
        html_types = ['text/html', 'application/xhtml+xml', 'application/xml']
        return any(html_type in content_type.lower() for html_type in html_types)
        
    def convert_to_https(self, url: str) -> str:
        """将HTTP URL转换为HTTPS"""
        parsed = urlparse(url)
        if parsed.scheme == 'http':
            return urlunparse(('https', parsed.netloc, parsed.path, parsed.params, parsed.query, parsed.fragment))
        return url
        
    async def fetch_page(self, url: str) -> str:
        """异步获取页面内容"""
        async with self.semaphore:
            try:
                if not self.session or self.session.closed:
                    await self.init_session()
                    
                # 添加重试机制
                max_retries = 3
                for retry in range(max_retries):
                    try:
                        # 首先尝试HTTPS
                        https_url = self.convert_to_https(url)
                        try:
                            async with self.session.get(https_url, allow_redirects=True) as response:
                                if response.status == 200:
                                    return await self._process_response(response)
                        except Exception as e:
                            print(f"HTTPS请求失败 {https_url}: {e}")
                            
                        # 如果HTTPS失败,尝试HTTP
                        async with self.session.get(url, allow_redirects=True) as response:
                            if response.status == 200:
                                return await self._process_response(response)
                            elif response.status == 301 or response.status == 302:
                                # 处理重定向
                                redirect_url = response.headers.get('Location')
                                if redirect_url:
                                    if not redirect_url.startswith(('http://', 'https://')):
                                        redirect_url = urljoin(url, redirect_url)
                                    async with self.session.get(redirect_url, allow_redirects=True) as redirect_response:
                                        if redirect_response.status == 200:
                                            return await self._process_response(redirect_response)
                            elif response.status == 404:
                                print(f"页面不存在: {url}")
                                return None
                            elif response.status == 403:
                                print(f"访问被拒绝: {url}")
                                return None
                            else:
                                print(f"HTTP错误 {response.status}: {url}")
                                if retry < max_retries - 1:
                                    await asyncio.sleep(1)  # 等待1秒后重试
                                    continue
                                return None
                    except aiohttp.ClientSSLError as e:
                        print(f"SSL证书错误 {url}: {e}")
                        if retry < max_retries - 1:
                            await asyncio.sleep(1)
                            continue
                        return None
                    except aiohttp.ClientConnectorError as e:
                        print(f"连接错误 {url}: {e}")
                        if retry < max_retries - 1:
                            await asyncio.sleep(1)
                            continue
                        return None
                    except Exception as e:
                        print(f"获取页面失败 {url}: {e}")
                        if retry < max_retries - 1:
                            await asyncio.sleep(1)
                            continue
                        return None
                return None
            except Exception as e:
                print(f"获取页面失败 {url}: {e}")
                return None
                
    async def _process_response(self, response: aiohttp.ClientResponse) -> str:
        """处理HTTP响应"""
        # 检查内容类型
        content_type = response.headers.get('Content-Type', '')
        if not self.is_html_content(content_type):
            return None
            
        # 获取原始字节内容
        content = await response.read()
        
        # 检测编码
        encoding = chardet.detect(content)['encoding']
        if not encoding:
            # 如果检测失败,尝试从响应头获取编码
            if 'charset=' in content_type:
                encoding = content_type.split('charset=')[-1].strip()
            else:
                # 默认使用utf-8
                encoding = 'utf-8'
                
        try:
            # 尝试使用检测到的编码解码
            return content.decode(encoding)
        except UnicodeDecodeError:
            # 如果解码失败,尝试其他常见编码
            for enc in ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5']:
                try:
                    return content.decode(enc)
                except UnicodeDecodeError:
                    continue
            # 如果所有编码都失败,使用utf-8并忽略错误
            return content.decode('utf-8', errors='ignore')
                
    def extract_domains_from_text(self, text: str, source_url: str) -> Set[str]:
        """从文本中提取所有域名"""
        domains = set()
        # 提取所有URL中的域名
        urls = re.findall(r'https?://[^\s<>"]+|www\.[^\s<>"]+', text)
        for url in urls:
            domain = self.extract_domain(url)
            if domain and self.is_valid_domain(domain):
                domains.add(domain)
                
        # 提取纯文本中的域名
        text_domains = self.domain_pattern.findall(text)
        for domain in text_domains:
            domain = domain.lower()
            if self.is_valid_domain(domain):
                domains.add(domain)
                
        return domains
        
    async def process_url(self, url: str) -> Tuple[str, Set[str]]:
        """处理单个URL并提取域名"""
        try:
            html = await self.fetch_page(url)
            if not html:
                return url, set()
                
            # 使用BeautifulSoup解析HTML
            try:
                soup = BeautifulSoup(html, 'html.parser')
            except Exception as e:
                print(f"解析HTML失败 {url}: {e}")
                return url, set()
            
            # 提取所有链接
            domains = set()
            for tag in soup.find_all(['a', 'link', 'script', 'img']):
                for attr in ['href', 'src']:
                    if tag.get(attr):
                        domain = self.extract_domain(tag[attr])
                        if domain and self.is_valid_domain(domain):
                            domains.add(domain)
                            
            # 提取页面文本中的域名
            text_domains = self.extract_domains_from_text(soup.get_text(), url)
            domains.update(text_domains)
            
            return url, domains
        except asyncio.CancelledError:
            print(f"任务被取消: {url}")
            return url, set()
        except Exception as e:
            print(f"处理URL时出错 {url}: {e}")
            return url, set()
        
    def save_domains(self, domain_data: Dict[str, Set[str]]):
        """将域名保存到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 使用事务提高性能
        cursor.execute('BEGIN TRANSACTION')
        
        for source_url, domains in domain_data.items():
            for domain in domains:
                cursor.execute('''
                    INSERT INTO domains (domain, last_seen, count, source_url)
                    VALUES (?, CURRENT_TIMESTAMP, 1, ?)
                    ON CONFLICT(domain) DO UPDATE SET
                        last_seen = CURRENT_TIMESTAMP,
                        count = count + 1
                ''', (domain, source_url))
                
        conn.commit()
        conn.close()
        
    async def process_file(self, input_file: str, num_threads: int = 4):
        """处理输入文件并提取域名"""
        print(f"开始处理文件: {input_file}")
        start_time = time.time()
        
        # 读取所有URL
        with open(input_file, 'r', encoding='utf-8') as f:
            urls = [line.strip() for line in f if line.strip()]
            
        total_urls = len(urls)
        print(f"共读取到 {total_urls} 个URL")
        
        # 初始化HTTP会话
        await self.init_session()
        
        try:
            # 创建任务列表
            self.tasks = []
            for url in urls:
                task = asyncio.create_task(self.process_url(url))
                self.tasks.append(task)
                
            # 使用tqdm显示进度
            domain_data = {}
            for completed_task in tqdm(asyncio.as_completed(self.tasks), total=len(self.tasks), desc="处理进度"):
                url, domains = await completed_task
                if domains:
                    domain_data[url] = domains
                    
            # 保存域名到数据库
            print(f"\n保存域名到数据库...")
            self.save_domains(domain_data)
            
            elapsed_time = time.time() - start_time
            print(f"\n处理完成!")
            print(f"总用时: {elapsed_time:.2f} 秒")
            print(f"平均速度: {total_urls/elapsed_time:.2f} URL/秒")
            
        except Exception as e:
            print(f"处理过程中出错: {e}")
        finally:
            # 取消所有未完成的任务
            for task in self.tasks:
                if not task.done():
                    task.cancel()
            # 等待所有任务完成
            await asyncio.gather(*self.tasks, return_exceptions=True)
            await self.close_session()
        
    def get_domain_stats(self):
        """获取域名统计信息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('SELECT COUNT(*) FROM domains')
        total_domains = cursor.fetchone()[0]
        
        cursor.execute('''
            SELECT domain, count, first_seen, last_seen, source_url 
            FROM domains 
            ORDER BY count DESC 
            LIMIT 10
        ''')
        top_domains = cursor.fetchall()
        
        conn.close()
        
        print("\n域名统计信息:")
        print(f"总域名数: {total_domains}")
        print("\nTop 10 域名:")
        for domain, count, first_seen, last_seen, source_url in top_domains:
            print(f"域名: {domain}")
            print(f"出现次数: {count}")
            print(f"首次发现: {first_seen}")
            print(f"最后发现: {last_seen}")
            print(f"来源URL: {source_url}")
            print("-" * 50)

async def main():
    import sys
    if len(sys.argv) < 2:
        print("用法: python domain_extractor.py <URL文件路径> [并发数]")
        print("例如: python domain_extractor.py example.com_links.txt 100")
        return
        
    input_file = sys.argv[1]
    max_connections = int(sys.argv[2]) if len(sys.argv) > 2 else 100
    
    extractor = DomainExtractor(max_connections=max_connections)
    await extractor.process_file(input_file)
    extractor.get_domain_stats()

if __name__ == "__main__":
    asyncio.run(main()) 

用法

python domain_extractor.py <URL文件路径> [并发数]

结束后会在当前目录下生成一个domains.db的sqlite数据库文件,用数据库管理软件把域名信息导出出来,留后续识别内容安全使用


网站公告

今日签到

点亮在社区的每一天
去签到