基于Python的端口扫描器和目录扫描工具实现方案,结合机器学习进行指纹识别

发布于:2025-03-15 ⋅ 阅读:(12) ⋅ 点赞:(0)

以下是一个基于Python的端口扫描器和目录扫描工具实现方案,结合机器学习进行指纹识别,充分利用i7多核性能并考虑核显加速可能性:

import asyncio
import socket
import aiohttp
from concurrent.futures import ProcessPoolExecutor
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
import joblib

# 端口扫描模块
class PortScanner:
    def __init__(self, target, ports=None, max_workers=100):
        self.target = target
        self.ports = ports or range(1, 1024)
        self.max_workers = max_workers
        self.model = joblib.load('port_model.pkl')  # 预训练模型
        self.vectorizer = joblib.load('tfidf.pkl')

    async def scan_port(self, port):
        try:
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(self.target, port),
                timeout=1.0
            )
            writer.close()
            await writer.wait_closed()
            banner = await self.get_banner(reader)
            features = self.vectorizer.transform([banner])
            service = self.model.predict(features)[0]
            return port, True, service
        except:
            return port, False, None

    async def get_banner(self, reader):
        try:
            return (await reader.read(1024)).decode().strip()
        except:
            return ""

    async def run(self):
        results = []
        sem = asyncio.Semaphore(self.max_workers)
        
        async def limited_scan(port):
            async with sem:
                return await self.scan_port(port)
                
        tasks = [limited_scan(port) for port in self.ports]
        for future in asyncio.as_completed(tasks):
            result = await future
            if result[1]:
                results.append(result)
        return results

# 目录扫描模块
class DirectoryScanner:
    def __init__(self, base_url, wordlist, max_workers=50):
        self.base_url = base_url.rstrip('/')
        self.wordlist = wordlist
        self.max_workers = max_workers
        self.classifier = joblib.load('web_model.pkl')
        self.vectorizer = joblib.load('web_tfidf.pkl')

    async def check_path(self, session, path):
        url = f"{self.base_url}/{path}"
        try:
            async with session.get(url, timeout=5) as response:
                content = await response.text()
                features = self.vectorizer.transform([content])
                framework = self.classifier.predict(features)[0]
                return (response.status, url, framework)
        except:
            return (None, url, None)

    async def run(self):
        results = []
        async with aiohttp.ClientSession() as session:
            sem = asyncio.Semaphore(self.max_workers)
            
            async def limited_check(path):
                async with sem:
                    return await self.check_path(session, path)
            
            tasks = [limited_check(path.strip()) for path in open(self.wordlist)]
            for future in asyncio.as_completed(tasks):
                status, url, framework = await future
                if status == 200:
                    results.append((url, status, framework))
        return results

# 主程序
async def main():
    target = "example.com"
    
    # 端口扫描
    port_scanner = PortScanner(target)
    ports = await port_scanner.run()
    print("开放端口和服务:")
    for p in ports:
        print(f"Port {p[0]}: {p[2]}")

    # 目录扫描
    dir_scanner = DirectoryScanner(f"http://{target}", "wordlist.txt")
    dirs = await dir_scanner.run()
    print("\n有效目录:")
    for d in dirs:
        print(f"{d[0]} - {d[1]} - {d[2]}")

if __name__ == "__main__":
    asyncio.run(main())

实现要点说明:

  1. 性能优化

    • 使用异步I/O(asyncio)实现高并发
    • 进程池处理CPU密集型任务(机器学习推理)
    • 通过信号量控制并发数量
  2. 机器学习集成

    • 端口扫描使用TF-IDF + 随机森林识别服务
    • 目录扫描使用网页内容特征识别Web框架
    • 示例模型需预先训练并保存为.pkl文件
  3. 硬件利用

    • 多核并行:通过ProcessPoolExecutor利用i7多核
    • GPU加速:可替换分类器为GPU加速版本(需安装CUDA)
  4. 扩展功能

    # 在PortScanner类中添加
    async def os_detection(self, ttl, window_size):
        # 使用机器学习进行操作系统识别
        pass
    
    # 在DirectoryScanner类中添加
    def detect_waf(self, response):
        # WAF检测逻辑
        pass
    

训练数据准备示例:

# 端口服务识别训练示例
from sklearn.pipeline import Pipeline

port_data = [
    ("SSH-2.0-OpenSSH_8.2p1", "OpenSSH"),
    ("220 ProFTPD Server", "ProFTPD"),
    # ...更多样本
]

X = [d[0] for d in port_data]
y = [d[1] for d in port_data]

pipeline = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('clf', RandomForestClassifier(n_estimators=100))
])

pipeline.fit(X, y)
joblib.dump(pipeline, 'port_model.pkl')

使用建议:

  1. 准备合适的字典文件(wordlist.txt)
  2. 收集足够训练数据并训练模型
  3. 根据实际情况调整并发参数
  4. 添加异常处理和日志记录
  5. 遵守网络安全法律法规

注意事项:

  • 扫描前务必获得合法授权
  • 合理设置超时和间隔时间
  • 避免对目标系统造成过大负载
  • 结果需要人工验证确认

这个实现充分利用了现代CPU的并行处理能力,通过异步I/O和进程池的组合实现高效扫描,同时保持代码可读性和扩展性。机器学习部分需要根据实际数据调整特征工程和模型选择。