【Node.js】高级主题

发布于:2025-05-25 ⋅ 阅读:(24) ⋅ 点赞:(0)

在这里插入图片描述

个人主页:Guiat
归属专栏:node.js

在这里插入图片描述

正文

1. Node.js 高级主题概览

Node.js 高级主题涵盖了深入的技术概念和实践,包括事件循环机制、内存管理、性能优化、微服务架构、实时通信等核心领域。掌握这些高级主题对于构建高性能、可扩展的企业级应用至关重要。

1.1 高级主题架构图

Node.js 高级主题
核心机制
性能优化
架构模式
实时通信
安全与认证
数据处理
部署与扩展
事件循环
内存管理
流处理
集群模式
性能分析
缓存策略
数据库优化
负载均衡
微服务
设计模式
中间件
插件系统
WebSocket
Server-Sent Events
消息队列
实时数据同步
JWT认证
OAuth2.0
加密算法
安全中间件
大数据处理
文件上传
图像处理
数据分析
容器化
服务网格
监控系统
自动扩缩容

2. 事件循环与异步编程深度解析

2.1 事件循环机制详解

JavaScript 执行
调用栈 Call Stack
调用栈是否为空?
检查微任务队列
微任务队列是否为空?
执行微任务
检查宏任务队列
宏任务队列是否为空?
执行一个宏任务
等待新任务
Timer Phase
Pending Callbacks
Idle, Prepare
Poll Phase
Check Phase
Close Callbacks

事件循环阶段详解

// event-loop-demo.js
const fs = require('fs');

console.log('=== 事件循环演示 ===');

// 1. 同步代码
console.log('1. 同步代码执行');

// 2. process.nextTick (微任务,最高优先级)
process.nextTick(() => {
    console.log('2. process.nextTick 回调');
});

// 3. Promise (微任务)
Promise.resolve().then(() => {
    console.log('3. Promise.then 回调');
});

// 4. setImmediate (Check 阶段)
setImmediate(() => {
    console.log('4. setImmediate 回调');
});

// 5. setTimeout (Timer 阶段)
setTimeout(() => {
    console.log('5. setTimeout 回调');
}, 0);

// 6. I/O 操作 (Poll 阶段)
fs.readFile(__filename, () => {
    console.log('6. fs.readFile 回调');
    
    // 在 I/O 回调中的 setImmediate 会在下一次 setTimeout 之前执行
    setImmediate(() => {
        console.log('7. setImmediate 在 I/O 回调中');
    });
    
    setTimeout(() => {
        console.log('8. setTimeout 在 I/O 回调中');
    }, 0);
});

console.log('9. 同步代码结束');

/*
输出顺序:
1. 同步代码执行
9. 同步代码结束
2. process.nextTick 回调
3. Promise.then 回调
5. setTimeout 回调
4. setImmediate 回调
6. fs.readFile 回调
7. setImmediate 在 I/O 回调中
8. setTimeout 在 I/O 回调中
*/

2.2 异步编程模式演进

异步编程演进
回调函数 Callbacks
Promise
Async/Await
生成器 Generators
流 Streams
回调地狱
错误处理困难
代码可读性差
链式调用
错误处理改善
Promise.all/race
同步风格代码
异常处理简化
调试友好
暂停/恢复执行
惰性求值
迭代器模式
背压处理
内存效率
管道操作

高级异步模式实现

// advanced-async-patterns.js

// 1. 异步迭代器
class AsyncDataProcessor {
    constructor(data) {
        this.data = data;
        this.index = 0;
    }

    async *[Symbol.asyncIterator]() {
        while (this.index < this.data.length) {
            // 模拟异步处理
            await new Promise(resolve => setTimeout(resolve, 100));
            yield this.data[this.index++];
        }
    }
}

// 使用异步迭代器
async function processDataAsync() {
    const processor = new AsyncDataProcessor([1, 2, 3, 4, 5]);
    
    for await (const item of processor) {
        console.log('处理项目:', item);
    }
}

// 2. 异步生成器与流控制
async function* asyncGenerator() {
    let i = 0;
    while (i < 10) {
        await new Promise(resolve => setTimeout(resolve, 500));
        yield i++;
    }
}

// 3. 高级 Promise 模式
class PromisePool {
    constructor(concurrency = 3) {
        this.concurrency = concurrency;
        this.running = [];
        this.queue = [];
    }

    async add(promiseFunction) {
        return new Promise((resolve, reject) => {
            this.queue.push({
                promiseFunction,
                resolve,
                reject
            });
            this.process();
        });
    }

    async process() {
        if (this.running.length >= this.concurrency || this.queue.length === 0) {
            return;
        }

        const { promiseFunction, resolve, reject } = this.queue.shift();
        const promise = promiseFunction()
            .then(resolve)
            .catch(reject)
            .finally(() => {
                this.running.splice(this.running.indexOf(promise), 1);
                this.process();
            });

        this.running.push(promise);
    }
}

// 4. 可取消的 Promise
class CancellablePromise {
    constructor(executor) {
        this.isCancelled = false;
        
        this.promise = new Promise((resolve, reject) => {
            this.cancel = () => {
                this.isCancelled = true;
                reject(new Error('Promise was cancelled'));
            };

            executor(
                (value) => {
                    if (!this.isCancelled) {
                        resolve(value);
                    }
                },
                (reason) => {
                    if (!this.isCancelled) {
                        reject(reason);
                    }
                }
            );
        });
    }

    then(onFulfilled, onRejected) {
        return this.promise.then(onFulfilled, onRejected);
    }

    catch(onRejected) {
        return this.promise.catch(onRejected);
    }
}

// 5. 异步重试机制
async function retryAsync(fn, maxRetries = 3, delay = 1000) {
    let lastError;
    
    for (let i = 0; i <= maxRetries; i++) {
        try {
            return await fn();
        } catch (error) {
            lastError = error;
            
            if (i === maxRetries) {
                throw lastError;
            }
            
            // 指数退避
            const waitTime = delay * Math.pow(2, i);
            await new Promise(resolve => setTimeout(resolve, waitTime));
        }
    }
}

module.exports = {
    AsyncDataProcessor,
    PromisePool,
    CancellablePromise,
    retryAsync,
    processDataAsync,
    asyncGenerator
};

3. 内存管理与性能优化

3.1 V8 内存管理机制

V8 内存结构
新生代 New Space
老生代 Old Space
大对象空间 Large Object Space
代码空间 Code Space
From Space
To Space
Scavenge GC
老生代指针空间
老生代数据空间
Mark-Sweep GC
Mark-Compact GC
大于8KB的对象
直接分配
编译后的代码
JIT优化代码
垃圾回收触发
Minor GC
Major GC
Incremental GC

内存监控与分析工具

// memory-profiler.js
const v8 = require('v8');
const fs = require('fs');

class MemoryProfiler {
    constructor() {
        this.snapshots = [];
        this.startTime = Date.now();
    }

    // 获取内存使用情况
    getMemoryUsage() {
        const usage = process.memoryUsage();
        const heapStats = v8.getHeapStatistics();
        
        return {
            timestamp: Date.now(),
            rss: usage.rss,
            heapTotal: usage.heapTotal,
            heapUsed: usage.heapUsed,
            external: usage.external,
            arrayBuffers: usage.arrayBuffers,
            heapSizeLimit: heapStats.heap_size_limit,
            totalHeapSize: heapStats.total_heap_size,
            usedHeapSize: heapStats.used_heap_size,
            mallocedMemory: heapStats.malloced_memory,
            peakMallocedMemory: heapStats.peak_malloced_memory
        };
    }

    // 生成堆快照
    takeHeapSnapshot(filename) {
        const snapshotStream = v8.getHeapSnapshot();
        const fileStream = fs.createWriteStream(filename || `heap-${Date.now()}.heapsnapshot`);
        
        snapshotStream.pipe(fileStream);
        
        return new Promise((resolve, reject) => {
            fileStream.on('finish', () => {
                console.log(`堆快照已保存: ${filename}`);
                resolve(filename);
            });
            fileStream.on('error', reject);
        });
    }

    // 监控内存泄漏
    startMemoryLeak Detection() {
        const interval = setInterval(() => {
            const usage = this.getMemoryUsage();
            this.snapshots.push(usage);
            
            // 保留最近100个快照
            if (this.snapshots.length > 100) {
                this.snapshots.shift();
            }
            
            // 检测内存泄漏
            if (this.snapshots.length >= 10) {
                const recent = this.snapshots.slice(-10);
                const trend = this.calculateMemoryTrend(recent);
                
                if (trend.isIncreasing && trend.rate > 1024 * 1024) { // 1MB/snapshot
                    console.warn('检测到可能的内存泄漏:', {
                        trend: trend.rate,
                        currentUsage: usage.heapUsed
                    });
                }
            }
        }, 5000);

        return () => clearInterval(interval);
    }

    // 计算内存趋势
    calculateMemoryTrend(snapshots) {
        if (snapshots.length < 2) return { isIncreasing: false, rate: 0 };
        
        const first = snapshots[0].heapUsed;
        const last = snapshots[snapshots.length - 1].heapUsed;
        const rate = (last - first) / snapshots.length;
        
        return {
            isIncreasing: rate > 0,
            rate: rate
        };
    }

    // 强制垃圾回收 (需要 --expose-gc 标志)
    forceGC() {
        if (global.gc) {
            const before = this.getMemoryUsage();
            global.gc();
            const after = this.getMemoryUsage();
            
            console.log('垃圾回收效果:', {
                before: before.heapUsed,
                after: after.heapUsed,
                freed: before.heapUsed - after.heapUsed
            });
        } else {
            console.warn('垃圾回收不可用,请使用 --expose-gc 标志启动');
        }
    }

    // 生成内存报告
    generateReport() {
        const current = this.getMemoryUsage();
        const runtime = Date.now() - this.startTime;
        
        return {
            runtime: runtime,
            currentMemory: current,
            snapshots: this.snapshots.length,
            averageHeapUsed: this.snapshots.reduce((sum, s) => sum + s.heapUsed, 0) / this.snapshots.length
        };
    }
}

module.exports = MemoryProfiler;

3.2 性能优化策略

性能优化策略
代码层面优化
运行时优化
架构优化
资源优化
算法优化
数据结构选择
避免内存泄漏
减少闭包使用
JIT编译优化
V8引擎调优
垃圾回收优化
事件循环优化
缓存策略
负载均衡
数据库优化
CDN使用
静态资源压缩
图片优化
网络优化
并发控制

性能分析工具实现

// performance-analyzer.js
const { performance, PerformanceObserver } = require('perf_hooks');

class PerformanceAnalyzer {
    constructor() {
        this.metrics = new Map();
        this.observers = [];
        this.setupObservers();
    }

    // 设置性能观察器
    setupObservers() {
        // HTTP 请求性能观察
        const httpObserver = new PerformanceObserver((list) => {
            for (const entry of list.getEntries()) {
                this.recordMetric('http', {
                    name: entry.name,
                    duration: entry.duration,
                    startTime: entry.startTime
                });
            }
        });
        httpObserver.observe({ entryTypes: ['http'] });
        this.observers.push(httpObserver);

        // 函数性能观察
        const functionObserver = new PerformanceObserver((list) => {
            for (const entry of list.getEntries()) {
                this.recordMetric('function', {
                    name: entry.name,
                    duration: entry.duration,
                    startTime: entry.startTime
                });
            }
        });
        functionObserver.observe({ entryTypes: ['function', 'measure'] });
        this.observers.push(functionObserver);
    }

    // 记录指标
    recordMetric(type, data) {
        if (!this.metrics.has(type)) {
            this.metrics.set(type, []);
        }
        this.metrics.get(type).push({
            ...data,
            timestamp: Date.now()
        });
    }

    // 性能装饰器
    performanceDecorator(target, propertyKey, descriptor) {
        const originalMethod = descriptor.value;
        
        descriptor.value = async function(...args) {
            const start = performance.now();
            const markStart = `${propertyKey}-start`;
            const markEnd = `${propertyKey}-end`;
            const measureName = `${propertyKey}-duration`;
            
            performance.mark(markStart);
            
            try {
                const result = await originalMethod.apply(this, args);
                return result;
            } finally {
                performance.mark(markEnd);
                performance.measure(measureName, markStart, markEnd);
                
                const end = performance.now();
                console.log(`${propertyKey} 执行时间: ${(end - start).toFixed(2)}ms`);
            }
        };
        
        return descriptor;
    }

    // 函数性能测试
    async measureFunction(fn, iterations = 1000) {
        const results = [];
        
        for (let i = 0; i < iterations; i++) {
            const start = performance.now();
            await fn();
            const end = performance.now();
            results.push(end - start);
        }
        
        return {
            iterations,
            min: Math.min(...results),
            max: Math.max(...results),
            average: results.reduce((a, b) => a + b, 0) / results.length,
            median: results.sort((a, b) => a - b)[Math.floor(results.length / 2)],
            p95: results.sort((a, b) => a - b)[Math.floor(results.length * 0.95)],
            p99: results.sort((a, b) => a - b)[Math.floor(results.length * 0.99)]
        };
    }

    // CPU 使用率监控
    monitorCPU() {
        const startUsage = process.cpuUsage();
        const startTime = process.hrtime();
        
        return () => {
            const endUsage = process.cpuUsage(startUsage);
            const endTime = process.hrtime(startTime);
            
            const totalTime = endTime[0] * 1000000 + endTime[1] / 1000; // 微秒
            const cpuPercent = (endUsage.user + endUsage.system) / totalTime * 100;
            
            return {
                user: endUsage.user,
                system: endUsage.system,
                total: endUsage.user + endUsage.system,
                percentage: cpuPercent
            };
        };
    }

    // 生成性能报告
    generateReport() {
        const report = {
            timestamp: new Date().toISOString(),
            metrics: {}
        };

        for (const [type, data] of this.metrics) {
            const durations = data.map(d => d.duration).filter(d => d !== undefined);
            
            if (durations.length > 0) {
                report.metrics[type] = {
                    count: data.length,
                    averageDuration: durations.reduce((a, b) => a + b, 0) / durations.length,
                    minDuration: Math.min(...durations),
                    maxDuration: Math.max(...durations),
                    recentEntries: data.slice(-10)
                };
            }
        }

        return report;
    }

    // 清理资源
    cleanup() {
        this.observers.forEach(observer => observer.disconnect());
        this.metrics.clear();
    }
}

// 使用示例
const analyzer = new PerformanceAnalyzer();

// 装饰器使用示例
class DatabaseService {
    @analyzer.performanceDecorator
    async queryUsers() {
        // 模拟数据库查询
        await new Promise(resolve => setTimeout(resolve, 100));
        return [];
    }
}

module.exports = PerformanceAnalyzer;

4. 微服务架构与设计模式

4.1 微服务架构模式

微服务架构
服务拆分策略
服务通信
数据管理
服务治理
按业务领域拆分
按数据模型拆分
按团队结构拆分
按技术栈拆分
同步通信
异步通信
事件驱动
消息队列
REST API
GraphQL
gRPC
消息总线
发布订阅
事件流
数据库分离
数据一致性
分布式事务
CQRS模式
服务注册发现
负载均衡
熔断器
链路追踪

微服务基础框架实现

// microservice-framework.js
const express = require('express');
const { EventEmitter } = require('events');
const axios = require('axios');

class MicroService extends EventEmitter {
    constructor(config) {
        super();
        this.config = {
            name: 'unnamed-service',
            port: 3000,
            version: '1.0.0',
            ...config
        };
        
        this.app = express();
        this.services = new Map();
        this.middlewares = [];
        this.routes = new Map();
        this.healthChecks = [];
        
        this.setupDefaultMiddleware();
        this.setupDefaultRoutes();
    }

    // 设置默认中间件
    setupDefaultMiddleware() {
        this.app.use(express.json());
        this.app.use(express.urlencoded({ extended: true }));
        
        // 请求追踪中间件
        this.app.use((req, res, next) => {
            req.traceId = this.generateTraceId();
            req.startTime = Date.now();
            
            res.on('finish', () => {
                const duration = Date.now() - req.startTime;
                this.emit('request', {
                    traceId: req.traceId,
                    method: req.method,
                    path: req.path,
                    statusCode: res.statusCode,
                    duration
                });
            });
            
            next();
        });
    }

    // 设置默认路由
    setupDefaultRoutes() {
        // 健康检查
        this.app.get('/health', async (req, res) => {
            const health = await this.performHealthCheck();
            const statusCode = health.status === 'healthy' ? 200 : 503;
            res.status(statusCode).json(health);
        });

        // 服务信息
        this.app.get('/info', (req, res) => {
            res.json({
                name: this.config.name,
                version: this.config.version,
                uptime: process.uptime(),
                memory: process.memoryUsage(),
                pid: process.pid
            });
        });

        // 指标端点
        this.app.get('/metrics', (req, res) => {
            res.json(this.getMetrics());
        });
    }

    // 注册服务
    registerService(name, config) {
        this.services.set(name, {
            name,
            url: config.url,
            timeout: config.timeout || 5000,
            retries: config.retries || 3,
            circuitBreaker: new CircuitBreaker(config.circuitBreaker)
        });
    }

    // 调用其他服务
    async callService(serviceName, path, options = {}) {
        const service = this.services.get(serviceName);
        if (!service) {
            throw new Error(`Service ${serviceName} not registered`);
        }

        const url = `${service.url}${path}`;
        const config = {
            timeout: service.timeout,
            headers: {
                'X-Trace-ID': options.traceId || this.generateTraceId(),
                'X-Service-Name': this.config.name,
                ...options.headers
            },
            ...options
        };

        try {
            return await service.circuitBreaker.execute(() => 
                axios(url, config)
            );
        } catch (error) {
            this.emit('service-call-error', {
                service: serviceName,
                url,
                error: error.message
            });
            throw error;
        }
    }

    // 添加中间件
    use(middleware) {
        this.middlewares.push(middleware);
        this.app.use(middleware);
    }

    // 添加路由
    route(method, path, handler) {
        const routeKey = `${method.toUpperCase()} ${path}`;
        this.routes.set(routeKey, handler);
        this.app[method.toLowerCase()](path, handler);
    }

    // 添加健康检查
    addHealthCheck(name, checkFunction) {
        this.healthChecks.push({ name, check: checkFunction });
    }

    // 执行健康检查
    async performHealthCheck() {
        const results = await Promise.allSettled(
            this.healthChecks.map(async ({ name, check }) => {
                try {
                    const result = await check();
                    return { name, status: 'healthy', ...result };
                } catch (error) {
                    return { name, status: 'unhealthy', error: error.message };
                }
            })
        );

        const checks = results.map(result => result.value || result.reason);
        const allHealthy = checks.every(check => check.status === 'healthy');

        return {
            status: allHealthy ? 'healthy' : 'unhealthy',
            timestamp: new Date().toISOString(),
            checks
        };
    }

    // 获取指标
    getMetrics() {
        return {
            service: this.config.name,
            version: this.config.version,
            uptime: process.uptime(),
            memory: process.memoryUsage(),
            cpu: process.cpuUsage(),
            routes: Array.from(this.routes.keys()),
            services: Array.from(this.services.keys())
        };
    }

    // 生成追踪ID
    generateTraceId() {
        return Math.random().toString(36).substr(2, 9);
    }

    // 启动服务
    async start() {
        return new Promise((resolve, reject) => {
            this.server = this.app.listen(this.config.port, (error) => {
                if (error) {
                    reject(error);
                } else {
                    console.log(`${this.config.name} started on port ${this.config.port}`);
                    this.emit('started');
                    resolve();
                }
            });
        });
    }

    // 停止服务
    async stop() {
        return new Promise((resolve) => {
            if (this.server) {
                this.server.close(() => {
                    console.log(`${this.config.name} stopped`);
                    this.emit('stopped');
                    resolve();
                });
            } else {
                resolve();
            }
        });
    }
}

// 熔断器实现
class CircuitBreaker {
    constructor(options = {}) {
        this.failureThreshold = options.failureThreshold || 5;
        this.timeout = options.timeout || 60000;
        this.monitoringPeriod = options.monitoringPeriod || 10000;
        
        this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
        this.failureCount = 0;
        this.lastFailureTime = null;
        this.successCount = 0;
    }

    async execute(operation) {
        if (this.state === 'OPEN') {
            if (Date.now() - this.lastFailureTime >= this.timeout) {
                this.state = 'HALF_OPEN';
                this.successCount = 0;
            } else {
                throw new Error('Circuit breaker is OPEN');
            }
        }

        try {
            const result = await operation();
            this.onSuccess();
            return result;
        } catch (error) {
            this.onFailure();
            throw error;
        }
    }

    onSuccess() {
        this.failureCount = 0;
        
        if (this.state === 'HALF_OPEN') {
            this.successCount++;
            if (this.successCount >= 3) {
                this.state = 'CLOSED';
            }
        }
    }

    onFailure() {
        this.failureCount++;
        this.lastFailureTime = Date.now();
        
        if (this.failureCount >= this.failureThreshold) {
            this.state = 'OPEN';
        }
    }
}

module.exports = { MicroService, CircuitBreaker };

4.2 设计模式在 Node.js 中的应用

Node.js 设计模式
创建型模式
结构型模式
行为型模式
架构模式
单例模式
工厂模式
建造者模式
原型模式
适配器模式
装饰器模式
代理模式
外观模式
观察者模式
策略模式
命令模式
中介者模式
MVC模式
中间件模式
插件模式
发布订阅模式

高级设计模式实现

// design-patterns.js

// 1. 单例模式 - 数据库连接管理
class DatabaseManager {
    constructor() {
        if (DatabaseManager.instance) {
            return DatabaseManager.instance;
        }
        
        this.connections = new Map();
        DatabaseManager.instance = this;
    }

    async getConnection(config) {
        const key = `${config.host}:${config.port}/${config.database}`;
        
        if (!this.connections.has(key)) {
            const connection = await this.createConnection(config);
            this.connections.set(key, connection);
        }
        
        return this.connections.get(key);
    }

    async createConnection(config) {
        // 模拟数据库连接创建
        return {
            host: config.host,
            port: config.port,
            database: config.database,
            connected: true
        };
    }
}

// 2. 工厂模式 - 日志记录器工厂
class LoggerFactory {
    static createLogger(type, config) {
        switch (type) {
            case 'console':
                return new ConsoleLogger(config);
            case 'file':
                return new FileLogger(config);
            case 'database':
                return new DatabaseLogger(config);
            default:
                throw new Error(`Unknown logger type: ${type}`);
        }
    }
}

class ConsoleLogger {
    constructor(config) {
        this.level = config.level || 'info';
    }

    log(level, message) {
        if (this.shouldLog(level)) {
            console.log(`[${level.toUpperCase()}] ${message}`);
        }
    }

    shouldLog(level) {
        const levels = ['debug', 'info', 'warn', 'error'];
        return levels.indexOf(level) >= levels.indexOf(this.level);
    }
}

// 3. 装饰器模式 - 缓存装饰器
function cacheDecorator(ttl = 300000) {
    const cache = new Map();
    
    return function(target, propertyKey, descriptor) {
        const originalMethod = descriptor.value;
        
        descriptor.value = async function(...args) {
            const cacheKey = `${propertyKey}:${JSON.stringify(args)}`;
            const cached = cache.get(cacheKey);
            
            if (cached && Date.now() - cached.timestamp < ttl) {
                return cached.value;
            }
            
            const result = await originalMethod.apply(this, args);
            cache.set(cacheKey, {
                value: result,
                timestamp: Date.now()
            });
            
            return result;
        };
        
        return descriptor;
    };
}

// 4. 策略模式 - 支付处理策略
class PaymentProcessor {
    constructor() {
        this.strategies = new Map();
    }

    addStrategy(name, strategy) {
        this.strategies.set(name, strategy);
    }

    async processPayment(method, amount, details) {
        const strategy = this.strategies.get(method);
        if (!strategy) {
            throw new Error(`Payment method ${method} not supported`);
        }
        
        return await strategy.process(amount, details);
    }
}

class CreditCardStrategy {
    async process(amount, details) {
        // 信用卡支付逻辑
        return {
            success: true,
            transactionId: `cc_${Date.now()}`,
            amount,
            method: 'credit_card'
        };
    }
}

class PayPalStrategy {
    async process(amount, details) {
        // PayPal 支付逻辑
        return {
            success: true,
            transactionId: `pp_${Date.now()}`,
            amount,
            method: 'paypal'
        };
    }
}

// 5. 观察者模式 - 事件系统
class EventManager {
    constructor() {
        this.listeners = new Map();
    }

    subscribe(event, callback) {
        if (!this.listeners.has(event)) {
            this.listeners.set(event, []);
        }
        
        this.listeners.get(event).push(callback);
        
        // 返回取消订阅函数
        return () => {
            const callbacks = this.listeners.get(event);
            const index = callbacks.indexOf(callback);
            if (index > -1) {
                callbacks.splice(index, 1);
            }
        };
    }

    async publish(event, data) {
        const callbacks = this.listeners.get(event) || [];
        
        // 并行执行所有回调
        await Promise.allSettled(
            callbacks.map(callback => callback(data))
        );
    }
}

// 6. 命令模式 - 任务队列
class Command {
    constructor(execute, undo) {
        this.execute = execute;
        this.undo = undo;
    }
}

class TaskQueue {
    constructor() {
        this.commands = [];
        this.currentIndex = -1;
    }

    execute(command) {
        // 移除当前位置之后的命令
        this.commands = this.commands.slice(0, this.currentIndex + 1);
        
        // 添加新命令
        this.commands.push(command);
        this.currentIndex++;
        
        // 执行命令
        return command.execute();
    }

    undo() {
        if (this.currentIndex >= 0) {
            const command = this.commands[this.currentIndex];
            this.currentIndex--;
            return command.undo();
        }
    }

    redo() {
        if (this.currentIndex < this.commands.length - 1) {
            this.currentIndex++;
            const command = this.commands[this.currentIndex];
            return command.execute();
        }
    }
}

// 7. 中介者模式 - 聊天室
class ChatRoom {
    constructor() {
        this.users = new Map();
        this.rooms = new Map();
    }

    addUser(user) {
        this.users.set(user.id, user);
        user.setChatRoom(this);
    }

    removeUser(userId) {
        this.users.delete(userId);
    }

    sendMessage(fromUserId, toUserId, message) {
        const toUser = this.users.get(toUserId);
        if (toUser) {
            toUser.receiveMessage(fromUserId, message);
        }
    }

    broadcast(fromUserId, message, roomId) {
        const room = this.rooms.get(roomId);
        if (room) {
            room.members.forEach(userId => {
                if (userId !== fromUserId) {
                    this.sendMessage(fromUserId, userId, message);
                }
            });
        }
    }
}

class User {
    constructor(id, name) {
        this.id = id;
        this.name = name;
        this.chatRoom = null;
    }

    setChatRoom(chatRoom) {
        this.chatRoom = chatRoom;
    }

    sendMessage(toUserId, message) {
        if (this.chatRoom) {
            this.chatRoom.sendMessage(this.id, toUserId, message);
        }
    }

    receiveMessage(fromUserId, message) {
        console.log(`${this.name} received message from ${fromUserId}: ${message}`);
    }
}

module.exports = {
    DatabaseManager,
    LoggerFactory,
    cacheDecorator,
    PaymentProcessor,
    CreditCardStrategy,
    PayPalStrategy,
    EventManager,
    Command,
    TaskQueue,
    ChatRoom,
    User
};

5. 实时通信与 WebSocket

5.1 实时通信架构

实时通信架构
WebSocket
Server-Sent Events
长轮询
WebRTC
双向通信
低延迟
持久连接
二进制支持
单向推送
自动重连
事件流
HTTP兼容
HTTP请求
超时等待
简单实现
资源消耗高
P2P通信
音视频传输
NAT穿透
媒体流处理
消息传递模式
点对点
发布订阅
广播
房间模式

WebSocket 服务器实现

// websocket-server.js
const WebSocket = require('ws');
const { EventEmitter } = require('events');
const jwt = require('jsonwebtoken');

class WebSocketServer extends EventEmitter {
    constructor(options = {}) {
        super();
        
        this.options = {
            port: 8080,
            verifyClient: null,
            ...options
        };
        
        this.clients = new Map();
        this.rooms = new Map();
        this.messageHandlers = new Map();
        
        this.setupServer();
        this.setupMessageHandlers();
    }

    setupServer() {
        this.wss = new WebSocket.Server({
            port: this.options.port,
            verifyClient: this.verifyClient.bind(this)
        });

        this.wss.on('connection', this.handleConnection.bind(this));
        
        console.log(`WebSocket server started on port ${this.options.port}`);
    }

    verifyClient(info) {
        if (this.options.verifyClient) {
            return this.options.verifyClient(info);
        }
        
        // 默认验证逻辑
        const token = this.extractToken(info.req);
        if (!token) {
            return false;
        }
        
        try {
            const decoded = jwt.verify(token, process.env.JWT_SECRET);
            info.req.user = decoded;
            return true;
        } catch (error) {
            return false;
        }
    }

    extractToken(req) {
        const url = new URL(req.url, `http://${req.headers.host}`);
        return url.searchParams.get('token');
    }

    handleConnection(ws, req) {
        const clientId = this.generateClientId();
        const user = req.user;
        
        const client = {
            id: clientId,
            ws,
            user,
            rooms: new Set(),
            lastPing: Date.now(),
            metadata: {}
        };
        
        this.clients.set(clientId, client);
        
        ws.on('message', (data) => this.handleMessage(client, data));
        ws.on('close', () => this.handleDisconnection(client));
        ws.on('error', (error) => this.handleError(client, error));
        ws.on('pong', () => this.handlePong(client));
        
        // 发送连接确认
        this.sendToClient(client, {
            type: 'connection',
            clientId,
            timestamp: Date.now()
        });
        
        this.emit('connection', client);
    }

    handleMessage(client, data) {
        try {
            const message = JSON.parse(data);
            const handler = this.messageHandlers.get(message.type);
            
            if (handler) {
                handler(client, message);
            } else {
                this.sendError(client, `Unknown message type: ${message.type}`);
            }
        } catch (error) {
            this.sendError(client, 'Invalid message format');
        }
    }

    setupMessageHandlers() {
        // 加入房间
        this.messageHandlers.set('join_room', (client, message) => {
            const { roomId } = message;
            this.joinRoom(client, roomId);
        });

        // 离开房间
        this.messageHandlers.set('leave_room', (client, message) => {
            const { roomId } = message;
            this.leaveRoom(client, roomId);
        });

        // 发送消息到房间
        this.messageHandlers.set('room_message', (client, message) => {
            const { roomId, content } = message;
            this.sendToRoom(roomId, {
                type: 'room_message',
                from: client.id,
                content,
                timestamp: Date.now()
            }, client.id);
        });

        // 私聊消息
        this.messageHandlers.set('private_message', (client, message) => {
            const { targetId, content } = message;
            const targetClient = this.clients.get(targetId);
            
            if (targetClient) {
                this.sendToClient(targetClient, {
                    type: 'private_message',
                    from: client.id,
                    content,
                    timestamp: Date.now()
                });
            } else {
                this.sendError(client, 'Target client not found');
            }
        });

        // 心跳
        this.messageHandlers.set('ping', (client, message) => {
            client.lastPing = Date.now();
            this.sendToClient(client, { type: 'pong' });
        });
    }

    joinRoom(client, roomId) {
        if (!this.rooms.has(roomId)) {
            this.rooms.set(roomId, {
                id: roomId,
                clients: new Set(),
                metadata: {}
            });
        }
        
        const room = this.rooms.get(roomId);
        room.clients.add(client.id);
        client.rooms.add(roomId);
        
        // 通知房间内其他用户
        this.sendToRoom(roomId, {
            type: 'user_joined',
            userId: client.id,
            roomId,
            timestamp: Date.now()
        }, client.id);
        
        // 发送房间信息给新用户
        this.sendToClient(client, {
            type: 'room_joined',
            roomId,
            users: Array.from(room.clients),
            timestamp: Date.now()
        });
    }

    leaveRoom(client, roomId) {
        const room = this.rooms.get(roomId);
        if (room) {
            room.clients.delete(client.id);
            client.rooms.delete(roomId);
            
            // 如果房间为空,删除房间
            if (room.clients.size === 0) {
                this.rooms.delete(roomId);
            } else {
                // 通知房间内其他用户
                this.sendToRoom(roomId, {
                    type: 'user_left',
                    userId: client.id,
                    roomId,
                    timestamp: Date.now()
                });
            }
        }
    }

    sendToClient(client, message) {
        if (client.ws.readyState === WebSocket.OPEN) {
            client.ws.send(JSON.stringify(message));
        }
    }

    sendToRoom(roomId, message, excludeClientId = null) {
        const room = this.rooms.get(roomId);
        if (room) {
            room.clients.forEach(clientId => {
                if (clientId !== excludeClientId) {
                    const client = this.clients.get(clientId);
                    if (client) {
                        this.sendToClient(client, message);
                    }
                }
            });
        }
    }

    broadcast(message, excludeClientId = null) {
        this.clients.forEach((client, clientId) => {
            if (clientId !== excludeClientId) {
                this.sendToClient(client, message);
            }
        });
    }

    sendError(client, error) {
        this.sendToClient(client, {
            type: 'error',
            message: error,
            timestamp: Date.now()
        });
    }

    handleDisconnection(client) {
        // 从所有房间中移除客户端
        client.rooms.forEach(roomId => {
            this.leaveRoom(client, roomId);
        });
        
        this.clients.delete(client.id);
        this.emit('disconnection', client);
    }

    handleError(client, error) {
        console.error(`WebSocket error for client ${client.id}:`, error);
        this.emit('error', { client, error });
    }

    handlePong(client) {
        client.lastPing = Date.now();
    }

    generateClientId() {
        return Math.random().toString(36).substr(2, 9);
    }

    // 健康检查 - 清理断开的连接
    startHealthCheck() {
        setInterval(() => {
            const now = Date.now();
            const timeout = 30000; // 30秒超时
            
            this.clients.forEach((client, clientId) => {
                if (now - client.lastPing > timeout) {
                    console.log(`Client ${clientId} timed out`);
                    client.ws.terminate();
                    this.handleDisconnection(client);
                }
            });
        }, 10000); // 每10秒检查一次
    }

    // 获取统计信息
    getStats() {
        return {
            totalClients: this.clients.size,
            totalRooms: this.rooms.size,
            roomDetails: Array.from(this.rooms.values()).map(room => ({
                id: room.id,
                clientCount: room.clients.size
            }))
        };
    }
}

module.exports = WebSocketServer;

5.2 实时数据同步系统

实时数据同步
数据变更检测
冲突解决
状态同步
离线支持
数据库触发器
变更日志
轮询检测
事件驱动
最后写入获胜
版本向量
操作转换
CRDT算法
全量同步
增量同步
差异同步
快照同步
本地存储
同步队列
冲突缓存
重连机制

实时协作编辑器实现

// collaborative-editor.js
const { EventEmitter } = require('events');

// 操作转换算法实现
class OperationalTransform {
    static transform(op1, op2) {
        // 简化的操作转换实现
        if (op1.type === 'insert' && op2.type === 'insert') {
            if (op1.position <= op2.position) {
                return [op1, { ...op2, position: op2.position + op1.content.length }];
            } else {
                return [{ ...op1, position: op1.position + op2.content.length }, op2];
            }
        }
        
        if (op1.type === 'delete' && op2.type === 'delete') {
            if (op1.position + op1.length <= op2.position) {
                return [op1, { ...op2, position: op2.position - op1.length }];
            } else if (op2.position + op2.length <= op1.position) {
                return [{ ...op1, position: op1.position - op2.length }, op2];
            } else {
                // 重叠删除,需要复杂处理
                return this.handleOverlappingDeletes(op1, op2);
            }
        }
        
        if (op1.type === 'insert' && op2.type === 'delete') {
            if (op1.position <= op2.position) {
                return [op1, { ...op2, position: op2.position + op1.content.length }];
            } else if (op1.position >= op2.position + op2.length) {
                return [{ ...op1, position: op1.position - op2.length }, op2];
            } else {
                return [{ ...op1, position: op2.position }, op2];
            }
        }
        
        if (op1.type === 'delete' && op2.type === 'insert') {
            const [transformedOp2, transformedOp1] = this.transform(op2, op1);
            return [transformedOp1, transformedOp2];
        }
        
        return [op1, op2];
    }
    
    static handleOverlappingDeletes(op1, op2) {
        // 处理重叠删除的复杂逻辑
        const start1 = op1.position;
        const end1 = op1.position + op1.length;
        const start2 = op2.position;
        const end2 = op2.position + op2.length;
        
        const newStart = Math.min(start1, start2);
        const newEnd = Math.max(end1, end2);
        
        return [
            { type: 'delete', position: newStart, length: newEnd - newStart },
            { type: 'noop' }
        ];
    }
}

// 文档状态管理
class DocumentState {
    constructor(initialContent = '') {
        this.content = initialContent;
        this.version = 0;
        this.operations = [];
    }
    
    applyOperation(operation) {
        switch (operation.type) {
            case 'insert':
                this.content = 
                    this.content.slice(0, operation.position) +
                    operation.content +
                    this.content.slice(operation.position);
                break;
                
            case 'delete':
                this.content = 
                    this.content.slice(0, operation.position) +
                    this.content.slice(operation.position + operation.length);
                break;
                
            case 'replace':
                this.content = 
                    this.content.slice(0, operation.position) +
                    operation.content +
                    this.content.slice(operation.position + operation.length);
                break;
        }
        
        this.version++;
        this.operations.push({ ...operation, version: this.version });
    }
    
    getOperationsSince(version) {
        return this.operations.filter(op => op.version > version);
    }
}

// 协作编辑器服务器
class CollaborativeEditor extends EventEmitter {
    constructor() {
        super();
        this.documents = new Map();
        this.clients = new Map();
        this.clientDocuments = new Map();
    }
    
    createDocument(documentId, initialContent = '') {
        if (!this.documents.has(documentId)) {
            this.documents.set(documentId, new DocumentState(initialContent));
        }
        return this.documents.get(documentId);
    }
    
    joinDocument(clientId, documentId) {
        const document = this.createDocument(documentId);
        
        if (!this.clientDocuments.has(documentId)) {
            this.clientDocuments.set(documentId, new Set());
        }
        
        this.clientDocuments.get(documentId).add(clientId);
        
        // 发送当前文档状态给客户端
        this.sendToClient(clientId, {
            type: 'document_state',
            documentId,
            content: document.content,
            version: document.version
        });
        
        // 通知其他客户端有新用户加入
        this.broadcastToDocument(documentId, {
            type: 'user_joined',
            clientId,
            documentId
        }, clientId);
    }
    
    leaveDocument(clientId, documentId) {
        const clients = this.clientDocuments.get(documentId);
        if (clients) {
            clients.delete(clientId);
            
            if (clients.size === 0) {
                this.clientDocuments.delete(documentId);
            } else {
                this.broadcastToDocument(documentId, {
                    type: 'user_left',
                    clientId,
                    documentId
                });
            }
        }
    }
    
    handleOperation(clientId, documentId, operation) {
        const document = this.documents.get(documentId);
        if (!document) {
            return;
        }
        
        // 获取客户端版本之后的所有操作
        const serverOps = document.getOperationsSince(operation.baseVersion);
        
        // 对操作进行转换
        let transformedOp = operation;
        for (const serverOp of serverOps) {
            [transformedOp] = OperationalTransform.transform(transformedOp, serverOp);
        }
        
        // 应用转换后的操作
        document.applyOperation(transformedOp);
        
        // 广播操作给其他客户端
        this.broadcastToDocument(documentId, {
            type: 'operation',
            operation: { ...transformedOp, version: document.version },
            documentId
        }, clientId);
        
        // 确认操作给发送者
        this.sendToClient(clientId, {
            type: 'operation_ack',
            operationId: operation.id,
            version: document.version
        });
    }
    
    sendToClient(clientId, message) {
        const client = this.clients.get(clientId);
        if (client && client.send) {
            client.send(JSON.stringify(message));
        }
    }
    
    broadcastToDocument(documentId, message, excludeClientId = null) {
        const clients = this.clientDocuments.get(documentId);
        if (clients) {
            clients.forEach(clientId => {
                if (clientId !== excludeClientId) {
                    this.sendToClient(clientId, message);
                }
            });
        }
    }
    
    addClient(clientId, connection) {
        this.clients.set(clientId, connection);
    }
    
    removeClient(clientId) {
        // 从所有文档中移除客户端
        this.clientDocuments.forEach((clients, documentId) => {
            if (clients.has(clientId)) {
                this.leaveDocument(clientId, documentId);
            }
        });
        
        this.clients.delete(clientId);
    }
    
    getDocumentStats(documentId) {
        const document = this.documents.get(documentId);
        const clients = this.clientDocuments.get(documentId);
        
        return {
            documentId,
            version: document?.version || 0,
            contentLength: document?.content?.length || 0,
            activeClients: clients?.size || 0,
            operationCount: document?.operations?.length || 0
        };
    }
}

// 客户端操作队列
class ClientOperationQueue {
    constructor(sendOperation) {
        this.sendOperation = sendOperation;
        this.pendingOperations = [];
        this.acknowledgedVersion = 0;
        this.localVersion = 0;
    }
    
    addOperation(operation) {
        operation.id = this.generateOperationId();
        operation.baseVersion = this.acknowledgedVersion;
        
        this.pendingOperations.push(operation);
        this.sendOperation(operation);
    }
    
    handleAcknowledgment(operationId, serverVersion) {
        const index = this.pendingOperations.findIndex(op => op.id === operationId);
        if (index !== -1) {
            this.pendingOperations.splice(0, index + 1);
            this.acknowledgedVersion = serverVersion;
        }
    }
    
    handleServerOperation(serverOperation) {
        // 转换待处理的操作
        this.pendingOperations = this.pendingOperations.map(pendingOp => {
            const [transformed] = OperationalTransform.transform(pendingOp, serverOperation);
            return transformed;
        });
    }
    
    generateOperationId() {
        return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
    }
}

module.exports = {
    OperationalTransform,
    DocumentState,
    CollaborativeEditor,
    ClientOperationQueue
};

6. 安全与认证系统

6.1 认证与授权架构

认证授权系统
认证方式
授权模型
安全机制
会话管理
用户名密码
JWT Token
OAuth 2.0
多因素认证
生物识别
RBAC角色控制
ABAC属性控制
ACL访问列表
权限继承
密码加密
传输加密
数据签名
防重放攻击
Session存储
Token刷新
单点登录
会话超时

高级认证系统实现

// advanced-auth-system.js
const crypto = require('crypto');
const jwt = require('jsonwebtoken');
const bcrypt = require('bcrypt');
const speakeasy = require('speakeasy');
const QRCode = require('qrcode');

class AdvancedAuthSystem {
    constructor(config) {
        this.config = {
            jwtSecret: process.env.JWT_SECRET,
            jwtExpiry: '15m',
            refreshTokenExpiry: '7d',
            bcryptRounds: 12,
            maxLoginAttempts: 5,
            lockoutDuration: 15 * 60 * 1000, // 15分钟
            ...config
        };
        
        this.users = new Map();
        this.refreshTokens = new Map();
        this.loginAttempts = new Map();
        this.sessions = new Map();
    }

    // 用户注册
    async register(userData) {
        const { username, email, password, phone } = userData;
        
        // 验证用户是否已存在
        if (this.findUserByEmail(email) || this.findUserByUsername(username)) {
            throw new Error('User already exists');
        }
        
        // 密码强度验证
        this.validatePasswordStrength(password);
        
        // 加密密码
        const hashedPassword = await bcrypt.hash(password, this.config.bcryptRounds);
        
        // 生成用户ID
        const userId = this.generateUserId();
        
        // 创建用户
        const user = {
            id: userId,
            username,
            email,
            phone,
            password: hashedPassword,
            roles: ['user'],
            permissions: [],
            isActive: true,
            emailVerified: false,
            phoneVerified: false,
            twoFactorEnabled: false,
            twoFactorSecret: null,
            createdAt: new Date(),
            lastLogin: null,
            loginAttempts: 0,
            lockedUntil: null
        };
        
        this.users.set(userId, user);
        
        // 发送验证邮件
        await this.sendVerificationEmail(user);
        
        return {
            id: user.id,
            username: user.username,
            email: user.email,
            message: 'Registration successful. Please verify your email.'
        };
    }

    // 用户登录
    async login(credentials) {
        const { email, password, twoFactorCode } = credentials;
        
        const user = this.findUserByEmail(email);
        if (!user) {
            throw new Error('Invalid credentials');
        }
        
        // 检查账户锁定
        if (this.isAccountLocked(user)) {
            throw new Error('Account is locked due to too many failed attempts');
        }
        
        // 验证密码
        const isPasswordValid = await bcrypt.compare(password, user.password);
        if (!isPasswordValid) {
            await this.handleFailedLogin(user);
            throw new Error('Invalid credentials');
        }
        
        // 验证两因素认证
        if (user.twoFactorEnabled) {
            if (!twoFactorCode) {
                throw new Error('Two-factor authentication code required');
            }
            
            const isValidCode = speakeasy.totp.verify({
                secret: user.twoFactorSecret,
                encoding: 'base32',
                token: twoFactorCode,
                window: 2
            });
            
            if (!isValidCode) {
                throw new Error('Invalid two-factor authentication code');
            }
        }
        
        // 重置登录尝试
        user.loginAttempts = 0;
        user.lockedUntil = null;
        user.lastLogin = new Date();
        
        // 生成令牌
        const tokens = await this.generateTokens(user);
        
        // 创建会话
        const sessionId = this.createSession(user, tokens);
        
        return {
            user: this.sanitizeUser(user),
            tokens,
            sessionId
        };
    }

    // 生成令牌
    async generateTokens(user) {
        const payload = {
            userId: user.id,
            username: user.username,
            email: user.email,
            roles: user.roles,
            permissions: user.permissions
        };
        
        const accessToken = jwt.sign(payload, this.config.jwtSecret, {
            expiresIn: this.config.jwtExpiry,
            issuer: 'auth-service',
            audience: 'api-service'
        });
        
        const refreshToken = this.generateRefreshToken();
        
        // 存储刷新令牌
        this.refreshTokens.set(refreshToken, {
            userId: user.id,
            createdAt: new Date(),
            expiresAt: new Date(Date.now() + this.parseExpiry(this.config.refreshTokenExpiry))
        });
        
        return {
            accessToken,
            refreshToken,
            expiresIn: this.parseExpiry(this.config.jwtExpiry)
        };
    }

    // 刷新令牌
    async refreshToken(refreshToken) {
        const tokenData = this.refreshTokens.get(refreshToken);
        if (!tokenData || tokenData.expiresAt < new Date()) {
            throw new Error('Invalid or expired refresh token');
        }
        
        const user = this.users.get(tokenData.userId);
        if (!user || !user.isActive) {
            throw new Error('User not found or inactive');
        }
        
        // 删除旧的刷新令牌
        this.refreshTokens.delete(refreshToken);
        
        // 生成新令牌
        return await this.generateTokens(user);
    }

    // 启用两因素认证
    async enableTwoFactor(userId) {
        const user = this.users.get(userId);
        if (!user) {
            throw new Error('User not found');
        }
        
        const secret = speakeasy.generateSecret({
            name: `MyApp (${user.email})`,
            issuer: 'MyApp'
        });
        
        user.twoFactorSecret = secret.base32;
        
        const qrCodeUrl = await QRCode.toDataURL(secret.otpauth_url);
        
        return {
            secret: secret.base32,
            qrCode: qrCodeUrl,
            backupCodes: this.generateBackupCodes()
        };
    }

    // 验证两因素认证设置
    async verifyTwoFactorSetup(userId, token) {
        const user = this.users.get(userId);
        if (!user || !user.twoFactorSecret) {
            throw new Error('Two-factor setup not initiated');
        }
        
        const isValid = speakeasy.totp.verify({
            secret: user.twoFactorSecret,
            encoding: 'base32',
            token,
            window: 2
        });
        
        if (isValid) {
            user.twoFactorEnabled = true;
            return { success: true, message: 'Two-factor authentication enabled' };
        } else {
            throw new Error('Invalid verification code');
        }
    }

    // 权限检查中间件
    requirePermission(permission) {
        return (req, res, next) => {
            const user = req.user;
            if (!user) {
                return res.status(401).json({ error: 'Authentication required' });
            }
            
            if (this.hasPermission(user, permission)) {
                next();
            } else {
                res.status(403).json({ error: 'Insufficient permissions' });
            }
        };
    }

    // 角色检查中间件
    requireRole(role) {
        return (req, res, next) => {
            const user = req.user;
            if (!user) {
                return res.status(401).json({ error: 'Authentication required' });
            }
            
            if (user.roles.includes(role) || user.roles.includes('admin')) {
                next();
            } else {
                res.status(403).json({ error: 'Insufficient role' });
            }
        };
    }

    // 检查用户权限
    hasPermission(user, permission) {
        // 管理员拥有所有权限
        if (user.roles.includes('admin')) {
            return true;
        }
        
        // 检查直接权限
        if (user.permissions.includes(permission)) {
            return true;
        }
        
        // 检查角色权限
        return user.roles.some(role => {
            const rolePermissions = this.getRolePermissions(role);
            return rolePermissions.includes(permission);
        });
    }

    // 获取角色权限
    getRolePermissions(role) {
        const rolePermissions = {
            user: ['read:profile', 'update:profile'],
            moderator: ['read:profile', 'update:profile', 'moderate:content'],
            admin: ['*'] // 所有权限
        };
        
        return rolePermissions[role] || [];
    }

    // 密码强度验证
    validatePasswordStrength(password) {
        const minLength = 8;
        const hasUpperCase = /[A-Z]/.test(password);
        const hasLowerCase = /[a-z]/.test(password);
        const hasNumbers = /\d/.test(password);
        const hasSpecialChar = /[!@#$%^&*(),.?":{}|<>]/.test(password);
        
        if (password.length < minLength) {
            throw new Error('Password must be at least 8 characters long');
        }
        
        if (!hasUpperCase || !hasLowerCase || !hasNumbers || !hasSpecialChar) {
            throw new Error('Password must contain uppercase, lowercase, numbers, and special characters');
        }
    }

    // 处理登录失败
    async handleFailedLogin(user) {
        user.loginAttempts = (user.loginAttempts || 0) + 1;
        
        if (user.loginAttempts >= this.config.maxLoginAttempts) {
            user.lockedUntil = new Date(Date.now() + this.config.lockoutDuration);
        }
    }

    // 检查账户是否锁定
    isAccountLocked(user) {
        return user.lockedUntil && user.lockedUntil > new Date();
    }

    // 创建会话
    createSession(user, tokens) {
        const sessionId = this.generateSessionId();
        
        this.sessions.set(sessionId, {
            userId: user.id,
            tokens,
            createdAt: new Date(),
            lastActivity: new Date(),
            ipAddress: null,
            userAgent: null
        });
        
        return sessionId;
    }

    // 工具方法
    generateUserId() {
        return crypto.randomBytes(16).toString('hex');
    }

    generateRefreshToken() {
        return crypto.randomBytes(32).toString('hex');
    }

    generateSessionId() {
        return crypto.randomBytes(24).toString('hex');
    }

    generateBackupCodes() {
        return Array.from({ length: 10 }, () => 
            crypto.randomBytes(4).toString('hex').toUpperCase()
        );
    }

    findUserByEmail(email) {
        return Array.from(this.users.values()).find(user => user.email === email);
    }

    findUserByUsername(username) {
        return Array.from(this.users.values()).find(user => user.username === username);
    }

    sanitizeUser(user) {
        const { password, twoFactorSecret, ...sanitized } = user;
        return sanitized;
    }

    parseExpiry(expiry) {
        const units = { s: 1000, m: 60000, h: 3600000, d: 86400000 };
        const match = expiry.match(/^(\d+)([smhd])$/);
        if (match) {
            return parseInt(match[1]) * units[match[2]];
        }
        return 900000; // 默认15分钟
    }

    async sendVerificationEmail(user) {
        // 实现邮件发送逻辑
        console.log(`Verification email sent to ${user.email}`);
    }
}

module.exports = AdvancedAuthSystem;

7. 数据处理与分析

7.1 大数据处理架构

大数据处理系统
数据采集
数据存储
数据处理
数据分析
数据可视化
实时采集
批量采集
流式采集
API采集
关系数据库
NoSQL数据库
时序数据库
数据湖
ETL处理
流处理
批处理
实时计算
统计分析
机器学习
数据挖掘
预测分析
报表系统
仪表板
图表展示
实时监控

流数据处理系统

// stream-processor.js
const { Transform, Writable, pipeline } = require('stream');
const { EventEmitter } = require('events');

class StreamProcessor extends EventEmitter {
    constructor(options = {}) {
        super();
        this.options = {
            batchSize: 1000,
            flushInterval: 5000,
            maxMemory: 100 * 1024 * 1024, // 100MB
            ...options
        };
        
        this.processors = new Map();
        this.metrics = {
            processed: 0,
            errors: 0,
            startTime: Date.now()
        };
    }

    // 创建数据转换流
    createTransformStream(name, transformFn) {
        const transform = new Transform({
            objectMode: true,
            transform(chunk, encoding, callback) {
                try {
                    const result = transformFn(chunk);
                    if (result !== null && result !== undefined) {
                        this.push(result);
                    }
                    callback();
                } catch (error) {
                    callback(error);
                }
            }
        });
        
        this.processors.set(name, transform);
        return transform;
    }

    // 创建批处理流
    createBatchStream(batchSize = this.options.batchSize) {
        let batch = [];
        
        return new Transform({
            objectMode: true,
            transform(chunk, encoding, callback) {
                batch.push(chunk);
                
                if (batch.length >= batchSize) {
                    this.push([...batch]);
                    batch = [];
                }
                callback();
            },
            flush(callback) {
                if (batch.length > 0) {
                    this.push(batch);
                }
                callback();
            }
        });
    }

    // 创建过滤流
    createFilterStream(filterFn) {
        return new Transform({
            objectMode: true,
            transform(chunk, encoding, callback) {
                try {
                    if (filterFn(chunk)) {
                        this.push(chunk);
                    }
                    callback();
                } catch (error) {
                    callback(error);
                }
            }
        });
    }

    // 创建聚合流
    createAggregateStream(keyFn, aggregateFn, windowSize = 60000) {
        const windows = new Map();
        
        return new Transform({
            objectMode: true,
            transform(chunk, encoding, callback) {
                const key = keyFn(chunk);
                const now = Date.now();
                const windowStart = Math.floor(now / windowSize) * windowSize;
                const windowKey = `${key}:${windowStart}`;
                
                if (!windows.has(windowKey)) {
                    windows.set(windowKey, {
                        key,
                        windowStart,
                        windowEnd: windowStart + windowSize,
                        data: []
                    });
                }
                
                const window = windows.get(windowKey);
                window.data.push(chunk);
                
                // 检查是否需要输出完成的窗口
                const completedWindows = Array.from(windows.entries())
                    .filter(([_, window]) => window.windowEnd <= now)
                    .map(([windowKey, window]) => {
                        windows.delete(windowKey);
                        return {
                            key: window.key,
                            windowStart: window.windowStart,
                            windowEnd: window.windowEnd,
                            result: aggregateFn(window.data)
                        };
                    });
                
                completedWindows.forEach(result => this.push(result));
                callback();
            }
        });
    }

    // 创建输出流
    createOutputSink(outputFn) {
        return new Writable({
            objectMode: true,
            write(chunk, encoding, callback) {
                try {
                    outputFn(chunk);
                    callback();
                } catch (error) {
                    callback(error);
                }
            }
        });
    }

    // 创建处理管道
    createPipeline(streams) {
        return new Promise((resolve, reject) => {
            pipeline(...streams, (error) => {
                if (error) {
                    this.metrics.errors++;
                    this.emit('error', error);
                    reject(error);
                } else {
                    this.emit('complete');
                    resolve();
                }
            });
        });
    }

    // 实时数据分析示例
    createRealTimeAnalyzer() {
        // 数据清洗流
        const cleaningStream = this.createTransformStream('cleaning', (data) => {
            if (!data || typeof data !== 'object') return null;
            
            // 数据清洗逻辑
            return {
                ...data,
                timestamp: data.timestamp || Date.now(),
                processed: true
            };
        });

        // 数据验证流
        const validationStream = this.createFilterStream((data) => {
            return data.timestamp && 
                   data.value !== undefined && 
                   !isNaN(data.value);
        });

        // 数据聚合流
        const aggregationStream = this.createAggregateStream(
            (data) => data.category || 'default',
            (dataArray) => ({
                count: dataArray.length,
                sum: dataArray.reduce((sum, item) => sum + (item.value || 0), 0),
                avg: dataArray.reduce((sum, item) => sum + (item.value || 0), 0) / dataArray.length,
                min: Math.min(...dataArray.map(item => item.value || 0)),
                max: Math.max(...dataArray.map(item => item.value || 0))
            })
        );

        // 输出流
        const outputStream = this.createOutputSink((result) => {
            this.metrics.processed++;
            this.emit('result', result);
            console.log('Analysis result:', result);
        });

        return [cleaningStream, validationStream, aggregationStream, outputStream];
    }

    // 获取处理指标
    getMetrics() {
        const runtime = Date.now() - this.metrics.startTime;
        return {
            ...this.metrics,
            runtime,
            throughput: this.metrics.processed / (runtime / 1000)
        };
    }
}

// 时序数据分析器
class TimeSeriesAnalyzer {
    constructor(options = {}) {
        this.windowSize = options.windowSize || 60000; // 1分钟
        this.retentionPeriod = options.retentionPeriod || 24 * 60 * 60 * 1000; // 24小时
        this.data = new Map();
        this.startCleanup();
    }

    addDataPoint(series, value, timestamp = Date.now()) {
        if (!this.data.has(series)) {
            this.data.set(series, []);
        }
        
        const seriesData = this.data.get(series);
        seriesData.push({ value, timestamp });
        
        // 保持数据按时间排序
        seriesData.sort((a, b) => a.timestamp - b.timestamp);
    }

    getMovingAverage(series, windowCount = 10) {
        const seriesData = this.data.get(series);
        if (!seriesData || seriesData.length < windowCount) {
            return null;
        }
        
        const recentData = seriesData.slice(-windowCount);
        const sum = recentData.reduce((sum, point) => sum + point.value, 0);
        return sum / recentData.length;
    }

    detectAnomalies(series, threshold = 2) {
        const seriesData = this.data.get(series);
        if (!seriesData || seriesData.length < 10) {
            return [];
        }
        
        const values = seriesData.map(point => point.value);
        const mean = values.reduce((sum, val) => sum + val, 0) / values.length;
        const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;
        const stdDev = Math.sqrt(variance);
        
        return seriesData.filter(point => 
            Math.abs(point.value - mean) > threshold * stdDev
        );
    }

    getTrend(series, periods = 10) {
        const seriesData = this.data.get(series);
        if (!seriesData || seriesData.length < periods) {
            return null;
        }
        
        const recentData = seriesData.slice(-periods);
        const n = recentData.length;
        
        let sumX = 0, sumY = 0, sumXY = 0, sumXX = 0;
        
        recentData.forEach((point, index) => {
            sumX += index;
            sumY += point.value;
            sumXY += index * point.value;
            sumXX += index * index;
        });
        
        const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
        const intercept = (sumY - slope * sumX) / n;
        
        return { slope, intercept, trend: slope > 0 ? 'increasing' : slope < 0 ? 'decreasing' : 'stable' };
    }

    startCleanup() {
        setInterval(() => {
            const cutoff = Date.now() - this.retentionPeriod;
            
            this.data.forEach((seriesData, series) => {
                const filteredData = seriesData.filter(point => point.timestamp > cutoff);
                this.data.set(series, filteredData);
            });
        }, 60000); // 每分钟清理一次
    }
}

module.exports = {
    StreamProcessor,
    TimeSeriesAnalyzer
};

7.2 机器学习集成

机器学习集成
数据预处理
模型训练
模型部署
预测服务
数据清洗
特征工程
数据标准化
数据分割
监督学习
无监督学习
强化学习
深度学习
模型序列化
容器化部署
API服务
边缘部署
实时预测
批量预测
A/B测试
模型监控

机器学习服务实现

// ml-service.js
const tf = require('@tensorflow/tfjs-node');
const fs = require('fs').promises;
const path = require('path');

class MLService {
    constructor() {
        this.models = new Map();
        this.preprocessors = new Map();
        this.metrics = new Map();
    }

    // 加载预训练模型
    async loadModel(name, modelPath) {
        try {
            const model = await tf.loadLayersModel(`file://${modelPath}`);
            this.models.set(name, {
                model,
                loadedAt: new Date(),
                predictions: 0
            });
            console.log(`Model ${name} loaded successfully`);
        } catch (error) {
            console.error(`Failed to load model ${name}:`, error);
            throw error;
        }
    }

    // 创建简单的线性回归模型
    createLinearRegressionModel(inputShape) {
        const model = tf.sequential({
            layers: [
                tf.layers.dense({
                    inputShape: [inputShape],
                    units: 64,
                    activation: 'relu'
                }),
                tf.layers.dropout({ rate: 0.2 }),
                tf.layers.dense({
                    units: 32,
                    activation: 'relu'
                }),
                tf.layers.dense({
                    units: 1,
                    activation: 'linear'
                })
            ]
        });

        model.compile({
            optimizer: tf.train.adam(0.001),
            loss: 'meanSquaredError',
            metrics: ['mae']
        });

        return model;
    }

    // 创建分类模型
    createClassificationModel(inputShape, numClasses) {
        const model = tf.sequential({
            layers: [
                tf.layers.dense({
                    inputShape: [inputShape],
                    units: 128,
                    activation: 'relu'
                }),
                tf.layers.dropout({ rate: 0.3 }),
                tf.layers.dense({
                    units: 64,
                    activation: 'relu'
                }),
                tf.layers.dropout({ rate: 0.2 }),
                tf.layers.dense({
                    units: numClasses,
                    activation: 'softmax'
                })
            ]
        });

        model.compile({
            optimizer: tf.train.adam(0.001),
            loss: 'categoricalCrossentropy',
            metrics: ['accuracy']
        });

        return model;
    }

    // 训练模型
    async trainModel(name, trainData, validationData, options = {}) {
        const model = this.models.get(name)?.model;
        if (!model) {
            throw new Error(`Model ${name} not found`);
        }

        const {
            epochs = 100,
            batchSize = 32,
            validationSplit = 0.2,
            callbacks = []
        } = options;

        // 添加早停回调
        const earlyStopping = tf.callbacks.earlyStopping({
            monitor: 'val_loss',
            patience: 10,
            restoreBestWeights: true
        });

        // 添加学习率调度
        const reduceLROnPlateau = tf.callbacks.reduceLROnPlateau({
            monitor: 'val_loss',
            factor: 0.5,
            patience: 5,
            minLr: 0.0001
        });

        const allCallbacks = [earlyStopping, reduceLROnPlateau, ...callbacks];

        const history = await model.fit(trainData.xs, trainData.ys, {
            epochs,
            batchSize,
            validationData: validationData ? [validationData.xs, validationData.ys] : undefined,
            validationSplit: validationData ? undefined : validationSplit,
            callbacks: allCallbacks,
            verbose: 1
        });

        // 保存训练历史
        this.metrics.set(name, {
            history: history.history,
            trainedAt: new Date()
        });

        return history;
    }

    // 进行预测
    async predict(modelName, inputData) {
        const modelInfo = this.models.get(modelName);
        if (!modelInfo) {
            throw new Error(`Model ${modelName} not found`);
        }

        const { model } = modelInfo;
        
        // 预处理输入数据
        const preprocessedData = await this.preprocessData(modelName, inputData);
        
        // 进行预测
        const prediction = model.predict(preprocessedData);
        
        // 更新预测计数
        modelInfo.predictions++;
        
        // 转换为JavaScript数组
        const result = await prediction.data();
        
        // 清理内存
        prediction.dispose();
        preprocessedData.dispose();
        
        return Array.from(result);
    }

    // 批量预测
    async batchPredict(modelName, inputDataArray) {
        const results = [];
        
        for (const inputData of inputDataArray) {
            const result = await this.predict(modelName, inputData);
            results.push(result);
        }
        
        return results;
    }

    // 数据预处理
    async preprocessData(modelName, data) {
        const preprocessor = this.preprocessors.get(modelName);
        
        if (preprocessor) {
            return preprocessor(data);
        }
        
        // 默认预处理:转换为张量
        if (Array.isArray(data)) {
            return tf.tensor2d([data]);
        } else if (typeof data === 'object') {
            // 假设是特征对象
            const features = Object.values(data);
            return tf.tensor2d([features]);
        }
        
        return tf.tensor2d([[data]]);
    }

    // 注册预处理器
    registerPreprocessor(modelName, preprocessorFn) {
        this.preprocessors.set(modelName, preprocessorFn);
    }

    // 模型评估
    async evaluateModel(modelName, testData) {
        const modelInfo = this.models.get(modelName);
        if (!modelInfo) {
            throw new Error(`Model ${modelName} not found`);
        }

        const { model } = modelInfo;
        const evaluation = await model.evaluate(testData.xs, testData.ys);
        
        const metrics = {};
        const metricNames = model.metricsNames;
        
        for (let i = 0; i < metricNames.length; i++) {
            metrics[metricNames[i]] = await evaluation[i].data();
        }
        
        // 清理内存
        evaluation.forEach(tensor => tensor.dispose());
        
        return metrics;
    }

    // 保存模型
    async saveModel(modelName, savePath) {
        const modelInfo = this.models.get(modelName);
        if (!modelInfo) {
            throw new Error(`Model ${modelName} not found`);
        }

        const { model } = modelInfo;
        await model.save(`file://${savePath}`);
        console.log(`Model ${modelName} saved to ${savePath}`);
    }

    // 获取模型信息
    getModelInfo(modelName) {
        const modelInfo = this.models.get(modelName);
        if (!modelInfo) {
            return null;
        }

        const { model, loadedAt, predictions } = modelInfo;
        
        return {
            name: modelName,
            loadedAt,
            predictions,
            inputShape: model.inputs[0].shape,
            outputShape: model.outputs[0].shape,
            trainableParams: model.countParams(),
            layers: model.layers.length
        };
    }

    // 获取所有模型统计
    getAllModelsStats() {
        const stats = {};
        
        this.models.forEach((modelInfo, name) => {
            stats[name] = this.getModelInfo(name);
        });
        
        return stats;
    }

    // 清理模型内存
    disposeModel(modelName) {
        const modelInfo = this.models.get(modelName);
        if (modelInfo) {
            modelInfo.model.dispose();
            this.models.delete(modelName);
            this.preprocessors.delete(modelName);
            this.metrics.delete(modelName);
            console.log(`Model ${modelName} disposed`);
        }
    }

    // 清理所有模型
    disposeAllModels() {
        this.models.forEach((_, name) => {
            this.disposeModel(name);
        });
    }
}

// 特征工程工具
class FeatureEngineering {
    // 数据标准化
    static normalize(data) {
        const tensor = tf.tensor(data);
        const normalized = tf.div(
            tf.sub(tensor, tf.mean(tensor, 0)),
            tf.add(tf.moments(tensor, 0).variance.sqrt(), 1e-7)
        );
        
        const result = normalized.arraySync();
        tensor.dispose();
        normalized.dispose();
        
        return result;
    }

    // 最小-最大缩放
    static minMaxScale(data, min = 0, max = 1) {
        const tensor = tf.tensor(data);
        const minVal = tf.min(tensor, 0);
        const maxVal = tf.max(tensor, 0);
        
        const scaled = tf.add(
            tf.mul(
                tf.div(tf.sub(tensor, minVal), tf.sub(maxVal, minVal)),
                max - min
            ),
            min
        );
        
        const result = scaled.arraySync();
        tensor.dispose();
        minVal.dispose();
        maxVal.dispose();
        scaled.dispose();
        
        return result;
    }

    // 独热编码
    static oneHotEncode(labels, numClasses) {
        const tensor = tf.tensor1d(labels, 'int32');
        const oneHot = tf.oneHot(tensor, numClasses);
        
        const result = oneHot.arraySync();
        tensor.dispose();
        oneHot.dispose();
        
        return result;
    }

    // 创建时间特征
    static createTimeFeatures(timestamps) {
        return timestamps.map(timestamp => {
            const date = new Date(timestamp);
            return {
                hour: date.getHours(),
                dayOfWeek: date.getDay(),
                dayOfMonth: date.getDate(),
                month: date.getMonth(),
                quarter: Math.floor(date.getMonth() / 3),
                isWeekend: date.getDay() === 0 || date.getDay() === 6 ? 1 : 0
            };
        });
    }

    // 多项式特征
    static polynomialFeatures(data, degree = 2) {
        const features = [];
        
        for (let i = 0; i < data.length; i++) {
            const row = data[i];
            const polyRow = [...row];
            
            // 添加多项式特征
            for (let d = 2; d <= degree; d++) {
                for (let j = 0; j < row.length; j++) {
                    polyRow.push(Math.pow(row[j], d));
                }
            }
            
            // 添加交互特征
            if (degree >= 2) {
                for (let j = 0; j < row.length; j++) {
                    for (let k = j + 1; k < row.length; k++) {
                        polyRow.push(row[j] * row[k]);
                    }
                }
            }
            
            features.push(polyRow);
        }
        
        return features;
    }
}

module.exports = {
    MLService,
    FeatureEngineering
};

结语
感谢您的阅读!期待您的一键三连!欢迎指正!

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到