Node.js系列(5)--数据库操作指南

发布于:2025-03-20 ⋅ 阅读:(19) ⋅ 点赞:(0)

Node.js数据库操作指南 💾

引言

数据库操作是Node.js应用开发中的关键环节。本文将深入探讨Node.js数据库操作的实现方案,包括连接管理、查询优化、事务处理等方面,帮助开发者构建高效可靠的数据访问层。

数据库操作概述

Node.js数据库操作主要包括以下方面:

  • 连接管理:连接池、故障恢复、负载均衡
  • 查询处理:SQL构建、参数绑定、结果映射
  • 事务管理:事务控制、隔离级别、一致性保证
  • 性能优化:查询优化、缓存策略、批量操作
  • 安全防护:SQL注入防护、访问控制、数据加密

数据库操作实现

数据库管理器

// 数据库管理器
class DatabaseManager {
    private static instance: DatabaseManager;
    private config: DatabaseConfig;
    private pools: Map<string, Pool>;
    private metrics: DatabaseMetrics;
    
    private constructor() {
        this.pools = new Map();
        this.metrics = {
            queries: 0,
            errors: 0,
            connections: 0,
            transactions: 0
        };
        this.config = {
            maxConnections: 10,
            idleTimeout: 30000,
            connectionTimeout: 5000,
            retryAttempts: 3
        };
    }
    
    // 获取单例实例
    static getInstance(): DatabaseManager {
        if (!DatabaseManager.instance) {
            DatabaseManager.instance = new DatabaseManager();
        }
        return DatabaseManager.instance;
    }
    
    // 初始化数据库管理器
    async init(config: DatabaseConfig): Promise<void> {
        this.config = { ...this.config, ...config };
        
        // 创建主数据库连接池
        await this.createPool('master', {
            host: config.masterHost,
            port: config.masterPort,
            database: config.database,
            user: config.user,
            password: config.password
        });
        
        // 创建从数据库连接池
        if (config.slaves) {
            for (const [name, slave] of Object.entries(config.slaves)) {
                await this.createPool(name, {
                    host: slave.host,
                    port: slave.port,
                    database: config.database,
                    user: config.user,
                    password: config.password
                });
            }
        }
        
        // 设置事件监听器
        this.setupEventListeners();
    }
    
    // 创建连接池
    private async createPool(name: string, config: PoolConfig): Promise<void> {
        const pool = new Pool({
            ...config,
            max: this.config.maxConnections,
            idleTimeoutMillis: this.config.idleTimeout,
            connectionTimeoutMillis: this.config.connectionTimeout
        });
        
        // 测试连接
        try {
            const client = await pool.connect();
            client.release();
            console.log(`Database pool ${name} created successfully`);
        } catch (error) {
            console.error(`Failed to create database pool ${name}:`, error);
            throw error;
        }
        
        this.pools.set(name, pool);
    }
    
    // 设置事件监听器
    private setupEventListeners(): void {
        for (const [name, pool] of this.pools.entries()) {
            pool.on('connect', () => {
                this.metrics.connections++;
                console.log(`New connection in pool ${name}`);
            });
            
            pool.on('error', error => {
                this.metrics.errors++;
                console.error(`Pool ${name} error:`, error);
            });
        }
    }
    
    // 执行查询
    async query<T>(
        sql: string,
        params: any[] = [],
        options: QueryOptions = {}
    ): Promise<T> {
        const poolName = options.useReplica ? this.selectReplica() : 'master';
        const pool = this.pools.get(poolName);
        
        if (!pool) {
            throw new Error(`Database pool ${poolName} not found`);
        }
        
        let client;
        let retryCount = 0;
        
        while (retryCount < this.config.retryAttempts) {
            try {
                client = await pool.connect();
                
                const startTime = Date.now();
                const result = await client.query(sql, params);
                const duration = Date.now() - startTime;
                
                this.metrics.queries++;
                this.logQuery(sql, params, duration);
                
                return result.rows as T;
                
            } catch (error) {
                retryCount++;
                this.metrics.errors++;
                
                if (retryCount === this.config.retryAttempts) {
                    throw new DatabaseError(
                        'Query failed after max retry attempts',
                        sql,
                        params,
                        error
                    );
                }
                
                await this.wait(Math.pow(2, retryCount) * 1000);
                
            } finally {
                if (client) {
                    client.release();
                }
            }
        }
        
        throw new Error('Unexpected query execution path');
    }
    
    // 执行事务
    async transaction<T>(
        callback: (client: PoolClient) => Promise<T>
    ): Promise<T> {
        const pool = this.pools.get('master');
        if (!pool) {
            throw new Error('Master database pool not found');
        }
        
        const client = await pool.connect();
        
        try {
            await client.query('BEGIN');
            this.metrics.transactions++;
            
            const result = await callback(client);
            
            await client.query('COMMIT');
            return result;
            
        } catch (error) {
            await client.query('ROLLBACK');
            throw error;
            
        } finally {
            client.release();
        }
    }
    
    // 批量插入
    async batchInsert<T>(
        table: string,
        records: T[],
        options: BatchOptions = {}
    ): Promise<void> {
        const batchSize = options.batchSize || 1000;
        const batches = this.chunk(records, batchSize);
        
        for (const batch of batches) {
            const values = this.buildBatchValues(batch);
            const columns = Object.keys(batch[0]);
            
            const sql = `
                INSERT INTO ${table} (${columns.join(', ')})
                VALUES ${values}
            `;
            
            await this.query(sql);
        }
    }
    
    // 选择从库
    private selectReplica(): string {
        const replicas = Array.from(this.pools.keys())
            .filter(name => name !== 'master');
        
        if (replicas.length === 0) {
            return 'master';
        }
        
        // 简单轮询策略
        const index = Math.floor(Math.random() * replicas.length);
        return replicas[index];
    }
    
    // 构建批量插入值
    private buildBatchValues<T>(records: T[]): string {
        return records.map(record => {
            const values = Object.values(record)
                .map(value => {
                    if (value === null) return 'NULL';
                    if (typeof value === 'string') return `'${value}'`;
                    return value;
                })
                .join(', ');
            
            return `(${values})`;
        }).join(', ');
    }
    
    // 分割数组
    private chunk<T>(array: T[], size: number): T[][] {
        const chunks = [];
        for (let i = 0; i < array.length; i += size) {
            chunks.push(array.slice(i, i + size));
        }
        return chunks;
    }
    
    // 等待指定时间
    private wait(ms: number): Promise<void> {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
    
    // 记录查询日志
    private logQuery(
        sql: string,
        params: any[],
        duration: number
    ): void {
        console.log('Query executed:', {
            sql,
            params,
            duration: `${duration}ms`
        });
    }
    
    // 获取性能指标
    getMetrics(): DatabaseMetrics {
        return { ...this.metrics };
    }
    
    // 关闭所有连接池
    async close(): Promise<void> {
        for (const [name, pool] of this.pools.entries()) {
            await pool.end();
            console.log(`Database pool ${name} closed`);
        }
    }
}

// 查询构建器
class QueryBuilder {
    private table: string;
    private conditions: string[] = [];
    private parameters: any[] = [];
    private orderClauses: string[] = [];
    private limitValue?: number;
    private offsetValue?: number;
    private joinClauses: string[] = [];
    
    constructor(table: string) {
        this.table = table;
    }
    
    // 添加条件
    where(column: string, operator: string, value: any): this {
        this.conditions.push(`${column} ${operator} $${this.parameters.length + 1}`);
        this.parameters.push(value);
        return this;
    }
    
    // 添加AND条件
    andWhere(column: string, operator: string, value: any): this {
        if (this.conditions.length > 0) {
            this.conditions.push('AND');
        }
        return this.where(column, operator, value);
    }
    
    // 添加OR条件
    orWhere(column: string, operator: string, value: any): this {
        if (this.conditions.length > 0) {
            this.conditions.push('OR');
        }
        return this.where(column, operator, value);
    }
    
    // 添加JOIN
    join(table: string, condition: string): this {
        this.joinClauses.push(`JOIN ${table} ON ${condition}`);
        return this;
    }
    
    // 添加LEFT JOIN
    leftJoin(table: string, condition: string): this {
        this.joinClauses.push(`LEFT JOIN ${table} ON ${condition}`);
        return this;
    }
    
    // 添加排序
    orderBy(column: string, direction: 'ASC' | 'DESC' = 'ASC'): this {
        this.orderClauses.push(`${column} ${direction}`);
        return this;
    }
    
    // 设置限制
    limit(value: number): this {
        this.limitValue = value;
        return this;
    }
    
    // 设置偏移
    offset(value: number): this {
        this.offsetValue = value;
        return this;
    }
    
    // 构建查询
    build(): { sql: string; params: any[] } {
        let sql = `SELECT * FROM ${this.table}`;
        
        if (this.joinClauses.length > 0) {
            sql += ' ' + this.joinClauses.join(' ');
        }
        
        if (this.conditions.length > 0) {
            sql += ' WHERE ' + this.conditions.join(' ');
        }
        
        if (this.orderClauses.length > 0) {
            sql += ' ORDER BY ' + this.orderClauses.join(', ');
        }
        
        if (this.limitValue !== undefined) {
            sql += ` LIMIT ${this.limitValue}`;
        }
        
        if (this.offsetValue !== undefined) {
            sql += ` OFFSET ${this.offsetValue}`;
        }
        
        return {
            sql,
            params: this.parameters
        };
    }
}

// 数据库错误类
class DatabaseError extends Error {
    constructor(
        message: string,
        public sql: string,
        public params: any[],
        public originalError: Error
    ) {
        super(message);
        this.name = 'DatabaseError';
    }
}

// 接口定义
interface DatabaseConfig {
    masterHost: string;
    masterPort: number;
    database: string;
    user: string;
    password: string;
    maxConnections: number;
    idleTimeout: number;
    connectionTimeout: number;
    retryAttempts: number;
    slaves?: Record<string, SlaveConfig>;
}

interface SlaveConfig {
    host: string;
    port: number;
}

interface PoolConfig {
    host: string;
    port: number;
    database: string;
    user: string;
    password: string;
}

interface QueryOptions {
    useReplica?: boolean;
}

interface BatchOptions {
    batchSize?: number;
}

interface DatabaseMetrics {
    queries: number;
    errors: number;
    connections: number;
    transactions: number;
}

// 使用示例
async function main() {
    // 创建数据库管理器
    const dbManager = DatabaseManager.getInstance();
    
    // 初始化配置
    await dbManager.init({
        masterHost: 'localhost',
        masterPort: 5432,
        database: 'myapp',
        user: 'postgres',
        password: 'secret',
        maxConnections: 10,
        idleTimeout: 30000,
        connectionTimeout: 5000,
        retryAttempts: 3,
        slaves: {
            slave1: {
                host: 'slave1.example.com',
                port: 5432
            },
            slave2: {
                host: 'slave2.example.com',
                port: 5432
            }
        }
    });
    
    // 执行简单查询
    const users = await dbManager.query<User[]>(
        'SELECT * FROM users WHERE age > $1',
        [18]
    );
    
    // 使用查询构建器
    const queryBuilder = new QueryBuilder('users')
        .where('age', '>', 18)
        .andWhere('status', '=', 'active')
        .orderBy('created_at', 'DESC')
        .limit(10);
    
    const { sql, params } = queryBuilder.build();
    const result = await dbManager.query(sql, params);
    
    // 执行事务
    await dbManager.transaction(async client => {
        await client.query('UPDATE users SET balance = balance - $1 WHERE id = $2', [100, 1]);
        await client.query('UPDATE users SET balance = balance + $1 WHERE id = $2', [100, 2]);
    });
    
    // 批量插入
    const newUsers = [
        { name: 'User 1', age: 25 },
        { name: 'User 2', age: 30 }
    ];
    
    await dbManager.batchInsert('users', newUsers, { batchSize: 1000 });
    
    // 获取性能指标
    const metrics = dbManager.getMetrics();
    console.log('Database metrics:', metrics);
    
    // 关闭连接
    await dbManager.close();
}

interface User {
    id: number;
    name: string;
    age: number;
    status: string;
    created_at: Date;
}

main().catch(console.error);

最佳实践与建议

  1. 连接管理

    • 使用连接池
    • 配置最大连接数
    • 设置连接超时
    • 实现故障转移
  2. 查询优化

    • 使用参数化查询
    • 优化查询语句
    • 合理使用索引
    • 避免N+1查询
  3. 事务处理

    • 合理设置隔离级别
    • 控制事务范围
    • 处理并发访问
    • 实现事务补偿
  4. 性能优化

    • 实现读写分离
    • 使用批量操作
    • 配置查询缓存
    • 监控性能指标

总结

Node.js数据库操作需要考虑以下方面:

  1. 连接管理和故障处理
  2. 查询优化和性能调优
  3. 事务管理和数据一致性
  4. 安全防护和访问控制
  5. 监控和维护支持

通过合理的数据库操作实现,可以提高应用的性能和可靠性。

学习资源

  1. 数据库设计模式
  2. SQL优化技巧
  3. 事务处理机制
  4. 性能优化指南
  5. 安全最佳实践

如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇

终身学习,共同成长。

咱们下一期见

💻