Node.js种cluster模块详解

发布于:2025-04-13 ⋅ 阅读:(28) ⋅ 点赞:(0)

Node.js 中 cluster 模块全部 API 详解

1. 模块属性

const cluster = require('cluster');

// 1. isMaster
// 判断当前进程是否为主进程
console.log('是否为主进程:', cluster.isMaster);

// 2. isWorker
// 判断当前进程是否为工作进程
console.log('是否为工作进程:', cluster.isWorker);

// 3. schedulingPolicy
// 获取或设置调度策略
// SCHED_NONE: 由操作系统调度
// SCHED_RR: 轮询调度
console.log('当前调度策略:', cluster.schedulingPolicy);
cluster.schedulingPolicy = cluster.SCHED_RR;

// 4. workers
// 获取所有工作进程的引用
console.log('工作进程数量:', Object.keys(cluster.workers).length);

2. 主进程方法

// 1. fork()
// 创建新的工作进程
const worker = cluster.fork();

// 2. setupMaster([settings])
// 配置主进程
cluster.setupMaster({
  exec: 'worker.js',      // 工作进程文件
  args: ['--use', 'http'], // 传递给工作进程的参数
  silent: false,          // 是否将工作进程的输出重定向到主进程
  stdio: ['pipe', 'pipe', 'pipe', 'ipc'], // 标准输入输出配置
  uid: 1000,             // 用户 ID
  gid: 1000,             // 组 ID
  inspectPort: 0         // 调试端口
});

// 3. disconnect([callback])
// 断开所有工作进程的连接
cluster.disconnect(() => {
  console.log('所有工作进程已断开连接');
});

// 4. settings
// 获取当前配置
console.log('当前配置:', cluster.settings);

3. 工作进程属性

// 1. worker.id
// 获取工作进程 ID
console.log('工作进程 ID:', cluster.worker.id);

// 2. worker.process
// 获取工作进程的进程对象
console.log('进程 ID:', cluster.worker.process.pid);

// 3. worker.exitedAfterDisconnect
// 判断工作进程是否在断开连接后退出
console.log('是否在断开连接后退出:', cluster.worker.exitedAfterDisconnect);

// 4. worker.isDead()
// 判断工作进程是否已死亡
console.log('是否已死亡:', cluster.worker.isDead());

// 5. worker.isConnected()
// 判断工作进程是否已连接
console.log('是否已连接:', cluster.worker.isConnected());

4. 工作进程方法

// 1. worker.send(message[, sendHandle][, callback])
// 发送消息给主进程
cluster.worker.send('hello from worker', (err) => {
  if (err) console.error('发送消息失败:', err);
});

// 2. worker.disconnect()
// 断开工作进程连接
cluster.worker.disconnect();

// 3. worker.kill([signal])
// 终止工作进程
cluster.worker.kill('SIGTERM');

5. 事件

5.1 主进程事件

// 1. fork
// 当创建新的工作进程时触发
cluster.on('fork', (worker) => {
  console.log('工作进程已创建:', worker.id);
});

// 2. online
// 当工作进程上线时触发
cluster.on('online', (worker) => {
  console.log('工作进程已上线:', worker.id);
});

// 3. listening
// 当工作进程开始监听时触发
cluster.on('listening', (worker, address) => {
  console.log('工作进程正在监听:', worker.id, address);
});

// 4. message
// 当收到工作进程消息时触发
cluster.on('message', (worker, message, handle) => {
  console.log('收到工作进程消息:', worker.id, message);
});

// 5. disconnect
// 当工作进程断开连接时触发
cluster.on('disconnect', (worker) => {
  console.log('工作进程已断开连接:', worker.id);
});

// 6. exit
// 当工作进程退出时触发
cluster.on('exit', (worker, code, signal) => {
  console.log('工作进程已退出:', worker.id, code, signal);
});

// 7. error
// 当工作进程发生错误时触发
cluster.on('error', (worker, code, signal) => {
  console.log('工作进程错误:', worker.id, code, signal);
});

5.2 工作进程事件

// 1. message
// 当收到主进程消息时触发
cluster.worker.on('message', (message, handle) => {
  console.log('收到主进程消息:', message);
});

// 2. disconnect
// 当工作进程断开连接时触发
cluster.worker.on('disconnect', () => {
  console.log('工作进程已断开连接');
});

// 3. error
// 当工作进程发生错误时触发
cluster.worker.on('error', (code, signal) => {
  console.log('工作进程错误:', code, signal);
});

// 4. exit
// 当工作进程退出时触发
cluster.worker.on('exit', (code, signal) => {
  console.log('工作进程已退出:', code, signal);
});

6. 完整示例

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);

  // 配置主进程
  cluster.setupMaster({
    exec: 'worker.js',
    args: ['--use', 'http'],
    silent: false
  });

  // 启动工作进程
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // 主进程事件处理
  cluster.on('fork', (worker) => {
    console.log('工作进程已创建:', worker.id);
  });

  cluster.on('online', (worker) => {
    console.log('工作进程已上线:', worker.id);
  });

  cluster.on('listening', (worker, address) => {
    console.log('工作进程正在监听:', worker.id, address);
  });

  cluster.on('message', (worker, message, handle) => {
    console.log('收到工作进程消息:', worker.id, message);
    worker.send('消息已收到');
  });

  cluster.on('disconnect', (worker) => {
    console.log('工作进程已断开连接:', worker.id);
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('工作进程已退出:', worker.id, code, signal);
    // 重启工作进程
    cluster.fork();
  });

  cluster.on('error', (worker, code, signal) => {
    console.log('工作进程错误:', worker.id, code, signal);
  });

  // 定期检查工作进程状态
  setInterval(() => {
    for (const id in cluster.workers) {
      const worker = cluster.workers[id];
      console.log(`工作进程 ${worker.id} 状态:`, {
        pid: worker.process.pid,
        isDead: worker.isDead(),
        isConnected: worker.isConnected(),
        exitedAfterDisconnect: worker.exitedAfterDisconnect
      });
    }
  }, 5000);

  // 优雅关闭
  process.on('SIGTERM', () => {
    console.log('收到 SIGTERM 信号,开始优雅关闭');
    for (const id in cluster.workers) {
      cluster.workers[id].send('shutdown');
      cluster.workers[id].disconnect();
    }
  });
} else {
  // 工作进程代码
  const server = http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`你好世界,来自工作进程 ${process.pid}\n`);
  });

  server.listen(8000);

  // 工作进程事件处理
  cluster.worker.on('message', (message) => {
    console.log('收到主进程消息:', message);
    if (message === 'shutdown') {
      server.close(() => {
        console.log(`工作进程 ${process.pid} 已关闭`);
        process.exit(0);
      });
    }
  });

  cluster.worker.on('disconnect', () => {
    console.log('工作进程已断开连接');
  });

  cluster.worker.on('error', (code, signal) => {
    console.log('工作进程错误:', code, signal);
  });

  cluster.worker.on('exit', (code, signal) => {
    console.log('工作进程已退出:', code, signal);
  });

  // 定期发送心跳
  setInterval(() => {
    cluster.worker.send('heartbeat');
  }, 30000);
}

7. 高级用法

// 1. 动态调整工作进程数量
const cluster = require('cluster');
const http = require('http');
const os = require('os');

if (cluster.isMaster) {
  let workerCount = os.cpus().length;
  console.log(`初始工作进程数量: ${workerCount}`);

  // 启动工作进程
  for (let i = 0; i < workerCount; i++) {
    cluster.fork();
  }

  // 动态调整工作进程数量
  process.on('SIGUSR1', () => {
    workerCount = Math.max(1, workerCount - 1);
    console.log(`减少工作进程数量至: ${workerCount}`);
    const workers = Object.values(cluster.workers);
    if (workers.length > workerCount) {
      workers[workers.length - 1].disconnect();
    }
  });

  process.on('SIGUSR2', () => {
    workerCount = Math.min(os.cpus().length * 2, workerCount + 1);
    console.log(`增加工作进程数量至: ${workerCount}`);
    if (Object.keys(cluster.workers).length < workerCount) {
      cluster.fork();
    }
  });
}

// 2. 工作进程负载均衡
const cluster = require('cluster');
const http = require('http');
const os = require('os');

if (cluster.isMaster) {
  const workerCount = os.cpus().length;
  const workers = [];

  // 启动工作进程
  for (let i = 0; i < workerCount; i++) {
    const worker = cluster.fork();
    workers.push({
      worker,
      load: 0,
      connections: 0
    });
  }

  // 负载均衡
  http.createServer((req, res) => {
    // 选择负载最低的工作进程
    const target = workers.reduce((min, w) => 
      w.load < min.load ? w : min
    , workers[0]);

    target.connections++;
    target.load = target.connections / 100; // 简单的负载计算

    // 转发请求
    target.worker.send('request', { url: req.url });
  }).listen(8000);

  // 处理工作进程响应
  cluster.on('message', (worker, message) => {
    if (message.type === 'response') {
      const workerInfo = workers.find(w => w.worker.id === worker.id);
      if (workerInfo) {
        workerInfo.connections--;
        workerInfo.load = workerInfo.connections / 100;
      }
    }
  });
}

// 3. 工作进程健康检查
const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  const workers = new Map();

  // 启动工作进程
  for (let i = 0; i < 4; i++) {
    const worker = cluster.fork();
    workers.set(worker.id, {
      worker,
      healthy: true,
      lastHeartbeat: Date.now()
    });
  }

  // 健康检查
  setInterval(() => {
    const now = Date.now();
    for (const [id, info] of workers) {
      if (now - info.lastHeartbeat > 30000) {
        console.log(`工作进程 ${id} 可能已死亡`);
        info.healthy = false;
        info.worker.kill();
        const newWorker = cluster.fork();
        workers.set(newWorker.id, {
          worker: newWorker,
          healthy: true,
          lastHeartbeat: now
        });
      }
    }
  }, 5000);

  // 处理心跳
  cluster.on('message', (worker, message) => {
    if (message === 'heartbeat') {
      const info = workers.get(worker.id);
      if (info) {
        info.healthy = true;
        info.lastHeartbeat = Date.now();
      }
    }
  });
}

cluster 模块的主要特点:

  1. 支持多核 CPU 并行处理
  2. 提供完整的事件系统
  3. 支持进程间通信
  4. 支持动态调整工作进程数量
  5. 支持负载均衡和健康检查

使用建议:

  1. 根据 CPU 核心数合理设置工作进程数量
  2. 实现完善的错误处理和重启机制
  3. 使用事件系统进行进程间通信
  4. 实现健康检查确保系统稳定性
  5. 考虑使用更高级的进程管理工具(如 PM2)

网站公告

今日签到

点亮在社区的每一天
去签到