Node.js 中 cluster 模块全部 API 详解
1. 模块属性
const cluster = require('cluster');
// 1. isMaster
// 判断当前进程是否为主进程
console.log('是否为主进程:', cluster.isMaster);
// 2. isWorker
// 判断当前进程是否为工作进程
console.log('是否为工作进程:', cluster.isWorker);
// 3. schedulingPolicy
// 获取或设置调度策略
// SCHED_NONE: 由操作系统调度
// SCHED_RR: 轮询调度
console.log('当前调度策略:', cluster.schedulingPolicy);
cluster.schedulingPolicy = cluster.SCHED_RR;
// 4. workers
// 获取所有工作进程的引用
console.log('工作进程数量:', Object.keys(cluster.workers).length);
2. 主进程方法
// 1. fork()
// 创建新的工作进程
const worker = cluster.fork();
// 2. setupMaster([settings])
// 配置主进程
cluster.setupMaster({
exec: 'worker.js', // 工作进程文件
args: ['--use', 'http'], // 传递给工作进程的参数
silent: false, // 是否将工作进程的输出重定向到主进程
stdio: ['pipe', 'pipe', 'pipe', 'ipc'], // 标准输入输出配置
uid: 1000, // 用户 ID
gid: 1000, // 组 ID
inspectPort: 0 // 调试端口
});
// 3. disconnect([callback])
// 断开所有工作进程的连接
cluster.disconnect(() => {
console.log('所有工作进程已断开连接');
});
// 4. settings
// 获取当前配置
console.log('当前配置:', cluster.settings);
3. 工作进程属性
// 1. worker.id
// 获取工作进程 ID
console.log('工作进程 ID:', cluster.worker.id);
// 2. worker.process
// 获取工作进程的进程对象
console.log('进程 ID:', cluster.worker.process.pid);
// 3. worker.exitedAfterDisconnect
// 判断工作进程是否在断开连接后退出
console.log('是否在断开连接后退出:', cluster.worker.exitedAfterDisconnect);
// 4. worker.isDead()
// 判断工作进程是否已死亡
console.log('是否已死亡:', cluster.worker.isDead());
// 5. worker.isConnected()
// 判断工作进程是否已连接
console.log('是否已连接:', cluster.worker.isConnected());
4. 工作进程方法
// 1. worker.send(message[, sendHandle][, callback])
// 发送消息给主进程
cluster.worker.send('hello from worker', (err) => {
if (err) console.error('发送消息失败:', err);
});
// 2. worker.disconnect()
// 断开工作进程连接
cluster.worker.disconnect();
// 3. worker.kill([signal])
// 终止工作进程
cluster.worker.kill('SIGTERM');
5. 事件
5.1 主进程事件
// 1. fork
// 当创建新的工作进程时触发
cluster.on('fork', (worker) => {
console.log('工作进程已创建:', worker.id);
});
// 2. online
// 当工作进程上线时触发
cluster.on('online', (worker) => {
console.log('工作进程已上线:', worker.id);
});
// 3. listening
// 当工作进程开始监听时触发
cluster.on('listening', (worker, address) => {
console.log('工作进程正在监听:', worker.id, address);
});
// 4. message
// 当收到工作进程消息时触发
cluster.on('message', (worker, message, handle) => {
console.log('收到工作进程消息:', worker.id, message);
});
// 5. disconnect
// 当工作进程断开连接时触发
cluster.on('disconnect', (worker) => {
console.log('工作进程已断开连接:', worker.id);
});
// 6. exit
// 当工作进程退出时触发
cluster.on('exit', (worker, code, signal) => {
console.log('工作进程已退出:', worker.id, code, signal);
});
// 7. error
// 当工作进程发生错误时触发
cluster.on('error', (worker, code, signal) => {
console.log('工作进程错误:', worker.id, code, signal);
});
5.2 工作进程事件
// 1. message
// 当收到主进程消息时触发
cluster.worker.on('message', (message, handle) => {
console.log('收到主进程消息:', message);
});
// 2. disconnect
// 当工作进程断开连接时触发
cluster.worker.on('disconnect', () => {
console.log('工作进程已断开连接');
});
// 3. error
// 当工作进程发生错误时触发
cluster.worker.on('error', (code, signal) => {
console.log('工作进程错误:', code, signal);
});
// 4. exit
// 当工作进程退出时触发
cluster.worker.on('exit', (code, signal) => {
console.log('工作进程已退出:', code, signal);
});
6. 完整示例
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 配置主进程
cluster.setupMaster({
exec: 'worker.js',
args: ['--use', 'http'],
silent: false
});
// 启动工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 主进程事件处理
cluster.on('fork', (worker) => {
console.log('工作进程已创建:', worker.id);
});
cluster.on('online', (worker) => {
console.log('工作进程已上线:', worker.id);
});
cluster.on('listening', (worker, address) => {
console.log('工作进程正在监听:', worker.id, address);
});
cluster.on('message', (worker, message, handle) => {
console.log('收到工作进程消息:', worker.id, message);
worker.send('消息已收到');
});
cluster.on('disconnect', (worker) => {
console.log('工作进程已断开连接:', worker.id);
});
cluster.on('exit', (worker, code, signal) => {
console.log('工作进程已退出:', worker.id, code, signal);
// 重启工作进程
cluster.fork();
});
cluster.on('error', (worker, code, signal) => {
console.log('工作进程错误:', worker.id, code, signal);
});
// 定期检查工作进程状态
setInterval(() => {
for (const id in cluster.workers) {
const worker = cluster.workers[id];
console.log(`工作进程 ${worker.id} 状态:`, {
pid: worker.process.pid,
isDead: worker.isDead(),
isConnected: worker.isConnected(),
exitedAfterDisconnect: worker.exitedAfterDisconnect
});
}
}, 5000);
// 优雅关闭
process.on('SIGTERM', () => {
console.log('收到 SIGTERM 信号,开始优雅关闭');
for (const id in cluster.workers) {
cluster.workers[id].send('shutdown');
cluster.workers[id].disconnect();
}
});
} else {
// 工作进程代码
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end(`你好世界,来自工作进程 ${process.pid}\n`);
});
server.listen(8000);
// 工作进程事件处理
cluster.worker.on('message', (message) => {
console.log('收到主进程消息:', message);
if (message === 'shutdown') {
server.close(() => {
console.log(`工作进程 ${process.pid} 已关闭`);
process.exit(0);
});
}
});
cluster.worker.on('disconnect', () => {
console.log('工作进程已断开连接');
});
cluster.worker.on('error', (code, signal) => {
console.log('工作进程错误:', code, signal);
});
cluster.worker.on('exit', (code, signal) => {
console.log('工作进程已退出:', code, signal);
});
// 定期发送心跳
setInterval(() => {
cluster.worker.send('heartbeat');
}, 30000);
}
7. 高级用法
// 1. 动态调整工作进程数量
const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isMaster) {
let workerCount = os.cpus().length;
console.log(`初始工作进程数量: ${workerCount}`);
// 启动工作进程
for (let i = 0; i < workerCount; i++) {
cluster.fork();
}
// 动态调整工作进程数量
process.on('SIGUSR1', () => {
workerCount = Math.max(1, workerCount - 1);
console.log(`减少工作进程数量至: ${workerCount}`);
const workers = Object.values(cluster.workers);
if (workers.length > workerCount) {
workers[workers.length - 1].disconnect();
}
});
process.on('SIGUSR2', () => {
workerCount = Math.min(os.cpus().length * 2, workerCount + 1);
console.log(`增加工作进程数量至: ${workerCount}`);
if (Object.keys(cluster.workers).length < workerCount) {
cluster.fork();
}
});
}
// 2. 工作进程负载均衡
const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isMaster) {
const workerCount = os.cpus().length;
const workers = [];
// 启动工作进程
for (let i = 0; i < workerCount; i++) {
const worker = cluster.fork();
workers.push({
worker,
load: 0,
connections: 0
});
}
// 负载均衡
http.createServer((req, res) => {
// 选择负载最低的工作进程
const target = workers.reduce((min, w) =>
w.load < min.load ? w : min
, workers[0]);
target.connections++;
target.load = target.connections / 100; // 简单的负载计算
// 转发请求
target.worker.send('request', { url: req.url });
}).listen(8000);
// 处理工作进程响应
cluster.on('message', (worker, message) => {
if (message.type === 'response') {
const workerInfo = workers.find(w => w.worker.id === worker.id);
if (workerInfo) {
workerInfo.connections--;
workerInfo.load = workerInfo.connections / 100;
}
}
});
}
// 3. 工作进程健康检查
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
const workers = new Map();
// 启动工作进程
for (let i = 0; i < 4; i++) {
const worker = cluster.fork();
workers.set(worker.id, {
worker,
healthy: true,
lastHeartbeat: Date.now()
});
}
// 健康检查
setInterval(() => {
const now = Date.now();
for (const [id, info] of workers) {
if (now - info.lastHeartbeat > 30000) {
console.log(`工作进程 ${id} 可能已死亡`);
info.healthy = false;
info.worker.kill();
const newWorker = cluster.fork();
workers.set(newWorker.id, {
worker: newWorker,
healthy: true,
lastHeartbeat: now
});
}
}
}, 5000);
// 处理心跳
cluster.on('message', (worker, message) => {
if (message === 'heartbeat') {
const info = workers.get(worker.id);
if (info) {
info.healthy = true;
info.lastHeartbeat = Date.now();
}
}
});
}
cluster 模块的主要特点:
- 支持多核 CPU 并行处理
- 提供完整的事件系统
- 支持进程间通信
- 支持动态调整工作进程数量
- 支持负载均衡和健康检查
使用建议:
- 根据 CPU 核心数合理设置工作进程数量
- 实现完善的错误处理和重启机制
- 使用事件系统进行进程间通信
- 实现健康检查确保系统稳定性
- 考虑使用更高级的进程管理工具(如 PM2)