Node.js数据库操作指南 💾
引言
数据库操作是Node.js应用开发中的关键环节。本文将深入探讨Node.js数据库操作的实现方案,包括连接管理、查询优化、事务处理等方面,帮助开发者构建高效可靠的数据访问层。
数据库操作概述
Node.js数据库操作主要包括以下方面:
- 连接管理:连接池、故障恢复、负载均衡
- 查询处理:SQL构建、参数绑定、结果映射
- 事务管理:事务控制、隔离级别、一致性保证
- 性能优化:查询优化、缓存策略、批量操作
- 安全防护:SQL注入防护、访问控制、数据加密
数据库操作实现
数据库管理器
// 数据库管理器
class DatabaseManager {
private static instance: DatabaseManager;
private config: DatabaseConfig;
private pools: Map<string, Pool>;
private metrics: DatabaseMetrics;
private constructor() {
this.pools = new Map();
this.metrics = {
queries: 0,
errors: 0,
connections: 0,
transactions: 0
};
this.config = {
maxConnections: 10,
idleTimeout: 30000,
connectionTimeout: 5000,
retryAttempts: 3
};
}
// 获取单例实例
static getInstance(): DatabaseManager {
if (!DatabaseManager.instance) {
DatabaseManager.instance = new DatabaseManager();
}
return DatabaseManager.instance;
}
// 初始化数据库管理器
async init(config: DatabaseConfig): Promise<void> {
this.config = { ...this.config, ...config };
// 创建主数据库连接池
await this.createPool('master', {
host: config.masterHost,
port: config.masterPort,
database: config.database,
user: config.user,
password: config.password
});
// 创建从数据库连接池
if (config.slaves) {
for (const [name, slave] of Object.entries(config.slaves)) {
await this.createPool(name, {
host: slave.host,
port: slave.port,
database: config.database,
user: config.user,
password: config.password
});
}
}
// 设置事件监听器
this.setupEventListeners();
}
// 创建连接池
private async createPool(name: string, config: PoolConfig): Promise<void> {
const pool = new Pool({
...config,
max: this.config.maxConnections,
idleTimeoutMillis: this.config.idleTimeout,
connectionTimeoutMillis: this.config.connectionTimeout
});
// 测试连接
try {
const client = await pool.connect();
client.release();
console.log(`Database pool ${name} created successfully`);
} catch (error) {
console.error(`Failed to create database pool ${name}:`, error);
throw error;
}
this.pools.set(name, pool);
}
// 设置事件监听器
private setupEventListeners(): void {
for (const [name, pool] of this.pools.entries()) {
pool.on('connect', () => {
this.metrics.connections++;
console.log(`New connection in pool ${name}`);
});
pool.on('error', error => {
this.metrics.errors++;
console.error(`Pool ${name} error:`, error);
});
}
}
// 执行查询
async query<T>(
sql: string,
params: any[] = [],
options: QueryOptions = {}
): Promise<T> {
const poolName = options.useReplica ? this.selectReplica() : 'master';
const pool = this.pools.get(poolName);
if (!pool) {
throw new Error(`Database pool ${poolName} not found`);
}
let client;
let retryCount = 0;
while (retryCount < this.config.retryAttempts) {
try {
client = await pool.connect();
const startTime = Date.now();
const result = await client.query(sql, params);
const duration = Date.now() - startTime;
this.metrics.queries++;
this.logQuery(sql, params, duration);
return result.rows as T;
} catch (error) {
retryCount++;
this.metrics.errors++;
if (retryCount === this.config.retryAttempts) {
throw new DatabaseError(
'Query failed after max retry attempts',
sql,
params,
error
);
}
await this.wait(Math.pow(2, retryCount) * 1000);
} finally {
if (client) {
client.release();
}
}
}
throw new Error('Unexpected query execution path');
}
// 执行事务
async transaction<T>(
callback: (client: PoolClient) => Promise<T>
): Promise<T> {
const pool = this.pools.get('master');
if (!pool) {
throw new Error('Master database pool not found');
}
const client = await pool.connect();
try {
await client.query('BEGIN');
this.metrics.transactions++;
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
// 批量插入
async batchInsert<T>(
table: string,
records: T[],
options: BatchOptions = {}
): Promise<void> {
const batchSize = options.batchSize || 1000;
const batches = this.chunk(records, batchSize);
for (const batch of batches) {
const values = this.buildBatchValues(batch);
const columns = Object.keys(batch[0]);
const sql = `
INSERT INTO ${table} (${columns.join(', ')})
VALUES ${values}
`;
await this.query(sql);
}
}
// 选择从库
private selectReplica(): string {
const replicas = Array.from(this.pools.keys())
.filter(name => name !== 'master');
if (replicas.length === 0) {
return 'master';
}
// 简单轮询策略
const index = Math.floor(Math.random() * replicas.length);
return replicas[index];
}
// 构建批量插入值
private buildBatchValues<T>(records: T[]): string {
return records.map(record => {
const values = Object.values(record)
.map(value => {
if (value === null) return 'NULL';
if (typeof value === 'string') return `'${value}'`;
return value;
})
.join(', ');
return `(${values})`;
}).join(', ');
}
// 分割数组
private chunk<T>(array: T[], size: number): T[][] {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
// 等待指定时间
private wait(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// 记录查询日志
private logQuery(
sql: string,
params: any[],
duration: number
): void {
console.log('Query executed:', {
sql,
params,
duration: `${duration}ms`
});
}
// 获取性能指标
getMetrics(): DatabaseMetrics {
return { ...this.metrics };
}
// 关闭所有连接池
async close(): Promise<void> {
for (const [name, pool] of this.pools.entries()) {
await pool.end();
console.log(`Database pool ${name} closed`);
}
}
}
// 查询构建器
class QueryBuilder {
private table: string;
private conditions: string[] = [];
private parameters: any[] = [];
private orderClauses: string[] = [];
private limitValue?: number;
private offsetValue?: number;
private joinClauses: string[] = [];
constructor(table: string) {
this.table = table;
}
// 添加条件
where(column: string, operator: string, value: any): this {
this.conditions.push(`${column} ${operator} $${this.parameters.length + 1}`);
this.parameters.push(value);
return this;
}
// 添加AND条件
andWhere(column: string, operator: string, value: any): this {
if (this.conditions.length > 0) {
this.conditions.push('AND');
}
return this.where(column, operator, value);
}
// 添加OR条件
orWhere(column: string, operator: string, value: any): this {
if (this.conditions.length > 0) {
this.conditions.push('OR');
}
return this.where(column, operator, value);
}
// 添加JOIN
join(table: string, condition: string): this {
this.joinClauses.push(`JOIN ${table} ON ${condition}`);
return this;
}
// 添加LEFT JOIN
leftJoin(table: string, condition: string): this {
this.joinClauses.push(`LEFT JOIN ${table} ON ${condition}`);
return this;
}
// 添加排序
orderBy(column: string, direction: 'ASC' | 'DESC' = 'ASC'): this {
this.orderClauses.push(`${column} ${direction}`);
return this;
}
// 设置限制
limit(value: number): this {
this.limitValue = value;
return this;
}
// 设置偏移
offset(value: number): this {
this.offsetValue = value;
return this;
}
// 构建查询
build(): { sql: string; params: any[] } {
let sql = `SELECT * FROM ${this.table}`;
if (this.joinClauses.length > 0) {
sql += ' ' + this.joinClauses.join(' ');
}
if (this.conditions.length > 0) {
sql += ' WHERE ' + this.conditions.join(' ');
}
if (this.orderClauses.length > 0) {
sql += ' ORDER BY ' + this.orderClauses.join(', ');
}
if (this.limitValue !== undefined) {
sql += ` LIMIT ${this.limitValue}`;
}
if (this.offsetValue !== undefined) {
sql += ` OFFSET ${this.offsetValue}`;
}
return {
sql,
params: this.parameters
};
}
}
// 数据库错误类
class DatabaseError extends Error {
constructor(
message: string,
public sql: string,
public params: any[],
public originalError: Error
) {
super(message);
this.name = 'DatabaseError';
}
}
// 接口定义
interface DatabaseConfig {
masterHost: string;
masterPort: number;
database: string;
user: string;
password: string;
maxConnections: number;
idleTimeout: number;
connectionTimeout: number;
retryAttempts: number;
slaves?: Record<string, SlaveConfig>;
}
interface SlaveConfig {
host: string;
port: number;
}
interface PoolConfig {
host: string;
port: number;
database: string;
user: string;
password: string;
}
interface QueryOptions {
useReplica?: boolean;
}
interface BatchOptions {
batchSize?: number;
}
interface DatabaseMetrics {
queries: number;
errors: number;
connections: number;
transactions: number;
}
// 使用示例
async function main() {
// 创建数据库管理器
const dbManager = DatabaseManager.getInstance();
// 初始化配置
await dbManager.init({
masterHost: 'localhost',
masterPort: 5432,
database: 'myapp',
user: 'postgres',
password: 'secret',
maxConnections: 10,
idleTimeout: 30000,
connectionTimeout: 5000,
retryAttempts: 3,
slaves: {
slave1: {
host: 'slave1.example.com',
port: 5432
},
slave2: {
host: 'slave2.example.com',
port: 5432
}
}
});
// 执行简单查询
const users = await dbManager.query<User[]>(
'SELECT * FROM users WHERE age > $1',
[18]
);
// 使用查询构建器
const queryBuilder = new QueryBuilder('users')
.where('age', '>', 18)
.andWhere('status', '=', 'active')
.orderBy('created_at', 'DESC')
.limit(10);
const { sql, params } = queryBuilder.build();
const result = await dbManager.query(sql, params);
// 执行事务
await dbManager.transaction(async client => {
await client.query('UPDATE users SET balance = balance - $1 WHERE id = $2', [100, 1]);
await client.query('UPDATE users SET balance = balance + $1 WHERE id = $2', [100, 2]);
});
// 批量插入
const newUsers = [
{ name: 'User 1', age: 25 },
{ name: 'User 2', age: 30 }
];
await dbManager.batchInsert('users', newUsers, { batchSize: 1000 });
// 获取性能指标
const metrics = dbManager.getMetrics();
console.log('Database metrics:', metrics);
// 关闭连接
await dbManager.close();
}
interface User {
id: number;
name: string;
age: number;
status: string;
created_at: Date;
}
main().catch(console.error);
最佳实践与建议
连接管理
- 使用连接池
- 配置最大连接数
- 设置连接超时
- 实现故障转移
查询优化
- 使用参数化查询
- 优化查询语句
- 合理使用索引
- 避免N+1查询
事务处理
- 合理设置隔离级别
- 控制事务范围
- 处理并发访问
- 实现事务补偿
性能优化
- 实现读写分离
- 使用批量操作
- 配置查询缓存
- 监控性能指标
总结
Node.js数据库操作需要考虑以下方面:
- 连接管理和故障处理
- 查询优化和性能调优
- 事务管理和数据一致性
- 安全防护和访问控制
- 监控和维护支持
通过合理的数据库操作实现,可以提高应用的性能和可靠性。
学习资源
- 数据库设计模式
- SQL优化技巧
- 事务处理机制
- 性能优化指南
- 安全最佳实践
如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇
终身学习,共同成长。
咱们下一期见
💻