目录
一、ProcessPool.hpp
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
#include <iostream>
#include <cstdlib> // stdlib.h stdio.h -> cstdlib cstdio
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include "Task.hpp"
// 先描述 - 定义Channel类,用于管理子进程通信通道
class Channel
{
public:
// 构造函数,初始化写端文件描述符和子进程ID
Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
{
// 生成通道名称,格式为"channel-[wfd]-[subid]"
_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
}
~Channel()
{
}
// 向子进程发送任务码
void Send(int code)
{
int n = write(_wfd, &code, sizeof(code));
(void)n; // 忽略返回值,避免编译器警告
}
// 关闭通道的写端
void Close()
{
close(_wfd);
}
// 等待子进程结束
void Wait()
{
pid_t rid = waitpid(_subid, nullptr, 0);
(void)rid; // 忽略返回值
}
// 获取写端文件描述符
int Fd() { return _wfd; }
// 获取子进程ID
pid_t SubId() { return _subid; }
// 获取通道名称
std::string Name() { return _name; }
private:
int _wfd; // 管道写端文件描述符
pid_t _subid; // 子进程ID
std::string _name; // 通道名称
// int _loadnum; // 可扩展:负载计数
};
// 在组织 - 管理多个Channel对象
class ChannelManager
{
public:
ChannelManager() : _next(0)
{
}
// 插入一个新的Channel对象
void Insert(int wfd, pid_t subid)
{
_channels.emplace_back(wfd, subid);
// Channel c(wfd, subid);
// _channels.push_back(std::move(c));
}
// 选择下一个Channel对象(轮询负载均衡)
Channel &Select()
{
auto &c = _channels[_next];
_next++;
_next %= _channels.size(); // 循环选择
return c;
}
// 打印所有Channel信息
void PrintChannel()
{
for (auto &channel : _channels)
{
std::cout << channel.Name() << std::endl;
}
}
// 关闭所有子进程的通信通道
void StopSubProcess()
{
for (auto &channel : _channels)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
}
}
// 等待所有子进程结束
void WaitSubProcess()
{
for (auto &channel : _channels)
{
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
~ChannelManager() {}
private:
std::vector<Channel> _channels; // 存储所有Channel对象
int _next; // 下一个要选择的Channel索引
};
const int gdefaultnum = 5; // 默认子进程数量
// 进程池类
class ProcessPool
{
public:
// 构造函数,初始化进程数量并注册任务
ProcessPool(int num) : _process_num(num)
{
_tm.Register(PrintLog); // 注册打印日志任务
_tm.Register(Download); // 注册下载任务
_tm.Register(Upload); // 注册上传任务
}
// 子进程工作函数
void Work(int rfd)
{
while (true)
{
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));
if (n > 0)
{
if (n != sizeof(code))
{
continue; // 读取不完整,继续尝试
}
std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;
_tm.Execute(code); // 执行对应任务
}
else if (n == 0)
{
std::cout << "子进程退出" << std::endl;
break; // 管道关闭,子进程退出
}
else
{
std::cout << "读取错误" << std::endl;
break; // 读取错误,子进程退出
}
}
}
// 启动进程池
bool Start()
{
for (int i = 0; i < _process_num; i++)
{
// 1. 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false;
// 2. 创建子进程
pid_t subid = fork();
if (subid < 0)
return false;
else if (subid == 0)
{
// 子进程
// 3. 关闭不需要的文件描述符(写端)
close(pipefd[1]);
Work(pipefd[0]); // 开始工作,监听管道读端
close(pipefd[0]);
exit(0); // 工作结束退出
}
else
{
// 父进程
// 3. 关闭不需要的文件描述符(读端)
close(pipefd[0]); // 写端:pipefd[1];
_cm.Insert(pipefd[1], subid); // 将管道和子进程信息加入管理器
// wfd, subid
}
}
return true;
}
// 调试用:打印所有Channel信息
void Debug()
{
_cm.PrintChannel();
}
// 运行任务
void Run()
{
// 1. 选择一个任务
int taskcode = _tm.Code();
// 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务
auto &c = _cm.Select();
std::cout << "选择了一个子进程: " << c.Name() << std::endl;
// 2. 发送任务
c.Send(taskcode);
std::cout << "发送了一个任务码: " << taskcode << std::endl;
}
// 停止进程池
void Stop()
{
// 关闭父进程所有的wfd即可
_cm.StopSubProcess();
// 回收所有子进程
_cm.WaitSubProcess();
}
~ProcessPool()
{
}
private:
ChannelManager _cm; // Channel管理器
int _process_num; // 子进程数量
TaskManager _tm; // 任务管理器
};
#endif
1. 头文件保护和包含
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
这是标准的头文件保护,防止头文件被多次包含
#include <iostream>
#include <cstdlib> // stdlib.h stdio.h -> cstdlib cstdio
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include "Task.hpp"
包含必要的系统头文件和自定义头文件:
<iostream>
:输入输出流<cstdlib>
:C标准库功能<vector>
:向量容器<unistd.h>
:Unix标准函数(如fork, pipe等)<sys/wait.h>
:进程等待相关函数"Task.hpp"
:自定义任务管理器头文件
2. Channel类(通信通道管理)
class Channel
{
public:
// 构造函数
Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
{
_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
}
构造函数初始化写端文件描述符
_wfd
和子进程ID_subid
生成通道名称,格式为"channel-[wfd]-[subid]"
// 向子进程发送任务码
void Send(int code)
{
int n = write(_wfd, &code, sizeof(code));
(void)n; // 忽略返回值,避免编译器警告
}
通过管道写端
_wfd
发送整数任务码(void)n
用于忽略write返回值,避免编译器警告
// 关闭通道的写端
void Close()
{
close(_wfd);
}
关闭管道写端文件描述符
// 等待子进程结束
void Wait()
{
pid_t rid = waitpid(_subid, nullptr, 0);
(void)rid; // 忽略返回值
}
使用
waitpid
等待指定子进程结束nullptr
表示不关心退出状态,0
表示阻塞等待
// 获取写端文件描述符
int Fd() { return _wfd; }
// 获取子进程ID
pid_t SubId() { return _subid; }
// 获取通道名称
std::string Name() { return _name; }
提供获取私有成员的接口方法
private:
int _wfd; // 管道写端文件描述符
pid_t _subid; // 子进程ID
std::string _name; // 通道名称
// int _loadnum; // 可扩展:负载计数
私有成员变量:
_wfd
:管道写端文件描述符_subid
:子进程ID_name
:通道名称注释掉的
_loadnum
可用于扩展实现负载计数
3. ChannelManager类(通道管理器)
class ChannelManager
{
public:
ChannelManager() : _next(0)
{
}
构造函数初始化
_next
为0,用于轮询选择通道
// 插入一个新的Channel对象
void Insert(int wfd, pid_t subid)
{
_channels.emplace_back(wfd, subid);
// Channel c(wfd, subid);
// _channels.push_back(std::move(c));
}
使用
emplace_back
直接构造Channel对象并添加到向量中注释掉的代码展示了另一种实现方式(这个通俗易懂)
// 选择下一个Channel对象(轮询负载均衡)
Channel &Select()
{
auto &c = _channels[_next];
_next++;
_next %= _channels.size(); // 循环选择
return c;
}
使用简单的轮询算法选择下一个通道,使得子进程负载均衡!!!
_next
递增后取模确保循环选择
// 打印所有Channel信息
void PrintChannel()
{
for (auto &channel : _channels)
{
std::cout << channel.Name() << std::endl;
}
}
遍历并打印所有通道的名称
// 关闭所有子进程的通信通道
void StopSubProcess()
{
for (auto &channel : _channels)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
}
}
关闭所有通道的写端,这会通知子进程退出
// 等待所有子进程结束
void WaitSubProcess()
{
for (auto &channel : _channels)
{
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
等待所有子进程结束并回收资源
private:
std::vector<Channel> _channels; // 存储所有Channel对象
int _next; // 下一个要选择的Channel索引
私有成员:
_channels
:存储所有Channel对象的向量_next
:轮询选择索引
4. ProcessPool类(进程池)
const int gdefaultnum = 5; // 默认子进程数量
定义默认子进程数量为5
class ProcessPool
{
public:
// 构造函数
ProcessPool(int num) : _process_num(num)
{
_tm.Register(PrintLog); // 注册打印日志任务
_tm.Register(Download); // 注册下载任务
_tm.Register(Upload); // 注册上传任务
}
构造函数初始化子进程数量
注册三种任务:打印日志、下载和上传
// 子进程工作函数
void Work(int rfd)
{
while (true)
{
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));
if (n > 0)
{
if (n != sizeof(code))
{
continue; // 读取不完整,继续尝试
}
std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;
_tm.Execute(code); // 执行对应任务
}
else if (n == 0)
{
std::cout << "子进程退出" << std::endl;
break; // 管道关闭,子进程退出
}
else
{
std::cout << "读取错误" << std::endl;
break; // 读取错误,子进程退出
}
}
}
子进程工作循环:
从管道读取任务码
如果读取成功,执行对应任务
如果管道关闭(n==0),退出循环
如果读取错误,退出循环
// 启动进程池
bool Start()
{
for (int i = 0; i < _process_num; i++)
{
// 1. 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false;
// 2. 创建子进程
pid_t subid = fork();
if (subid < 0)
return false;
else if (subid == 0)
{
// 子进程
// 3. 关闭不需要的文件描述符(写端)
close(pipefd[1]);
Work(pipefd[0]); // 开始工作,监听管道读端
close(pipefd[0]);
exit(0); // 工作结束退出
}
else
{
// 父进程
// 3. 关闭不需要的文件描述符(读端)
close(pipefd[0]); // 写端:pipefd[1];
_cm.Insert(pipefd[1], subid); // 将管道和子进程信息加入管理器
}
}
return true;
}
启动进程池的主要步骤:
创建管道
创建子进程
子进程关闭写端,进入工作循环
父进程关闭读端,记录通道信息
// 调试用:打印所有Channel信息
void Debug()
{
_cm.PrintChannel();
}
调试方法,打印所有通道信息
// 运行任务
void Run()
{
// 1. 选择一个任务
int taskcode = _tm.Code();
// 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务
auto &c = _cm.Select();
std::cout << "选择了一个子进程: " << c.Name() << std::endl;
// 2. 发送任务
c.Send(taskcode);
std::cout << "发送了一个任务码: " << taskcode << std::endl;
}
运行任务的步骤:
从任务管理器获取任务码
选择子进程(轮询负载均衡)
通过管道发送任务码
// 停止进程池
void Stop()
{
// 关闭父进程所有的wfd即可
_cm.StopSubProcess();
// 回收所有子进程
_cm.WaitSubProcess();
}
停止进程池的步骤:
关闭所有管道写端(通知子进程退出)
等待所有子进程结束
private:
ChannelManager _cm; // Channel管理器
int _process_num; // 子进程数量
TaskManager _tm; // 任务管理器
};
私有成员:
_cm
:通道管理器_process_num
:子进程数量_tm
:任务管理器
5. 总结
这个进程池实现的主要特点:
父子进程通信:使用管道进行通信
负载均衡:简单的轮询算法分配任务
任务管理:通过任务码区分不同任务
资源管理:正确关闭文件描述符,避免资源泄漏
进程管理:优雅地启动和停止子进程
每个类都有明确的职责:
Channel
:管理单个子进程的通信ChannelManager
:管理所有子进程通信ProcessPool
:提供进程池的对外接口
这种设计模式在需要并行处理任务的服务器程序中非常有用,可以有效利用多核CPU资源。
二、Task.hpp
#pragma once // 防止头文件被重复包含
#include <iostream>
#include <vector>
#include <ctime> // 用于随机数种子初始化
// 定义任务函数指针类型,无参数无返回值
typedef void (*task_t)();
//////////////// 调试用任务函数 /////////////////////
// 打印日志任务
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << std::endl;
}
// 下载任务
void Download()
{
std::cout << "我是一个下载的任务" << std::endl;
}
// 上传任务
void Upload()
{
std::cout << "我是一个上传的任务" << std::endl;
}
//////////////////////////////////////
// 任务管理器类
class TaskManager
{
public:
// 构造函数,初始化随机数种子
TaskManager()
{
srand(time(nullptr)); // 使用当前时间作为随机数种子
}
// 注册任务函数到任务列表
void Register(task_t t)
{
_tasks.push_back(t); // 将任务函数指针添加到向量中
}
// 生成随机任务码
int Code()
{
// 返回0到任务数量-1之间的随机数
return rand() % _tasks.size();
}
// 执行指定编码的任务
void Execute(int code)
{
// 检查任务码是否在有效范围内
if(code >= 0 && code < _tasks.size())
{
_tasks[code](); // 调用对应的任务函数
}
}
~TaskManager()
{}
private:
std::vector<task_t> _tasks; // 存储所有注册的任务函数指针
};
1. 预处理指令和头文件包含
#pragma once // 防止头文件被重复包含
#include <iostream>
#include <vector>
#include <ctime> // 用于随机数种子初始化
#pragma once
:确保头文件只被包含一次,防止重复定义<iostream>
:提供输入输出功能<vector>
:使用向量容器存储任务<ctime>
:用于获取当前时间,初始化随机数种子
2. 任务函数指针类型定义
typedef void (*task_t)();
定义了一个名为
task_t
的函数指针类型这种函数指针指向无参数、无返回值的函数
后续所有任务函数都需要符合这个签名
3. 调试用任务函数
void PrintLog() { ... }
void Download() { ... }
void Upload() { ... }
定义了三个示例任务函数,都符合
task_t
类型每个函数执行简单的打印操作,实际应用中可替换为真实任务
4. TaskManager类
成员变量
std::vector<task_t> _tasks;
使用向量存储所有注册的任务函数指针
构造函数
TaskManager()
{
srand(time(nullptr)); // 使用当前时间作为随机数种子
}
初始化随机数生成器,用于后续生成随机任务码
成员函数
1. Register
void Register(task_t t)
{
_tasks.push_back(t); // 将任务函数指针添加到向量中
}
接收一个符合
task_t
类型的函数指针将其添加到任务列表
_tasks
中
2. Code
int Code()
{
return rand() % _tasks.size();
}
生成一个随机任务码
范围是0到任务数量-1(因为
%
取模运算)
3. Execute
void Execute(int code)
{
if(code >= 0 && code < _tasks.size())
{
_tasks[code](); // 调用对应的任务函数
}
}
根据传入的任务码执行对应的任务
先检查任务码是否有效(防止越界访问)
然后通过函数指针调用对应的任务函数
设计特点
灵活性:可以动态添加新任务,只需符合
task_t
类型安全性:执行前检查任务码有效性
随机性:可以随机选择任务执行
封装性:将任务管理功能封装在类中
三、Main.cc
#include "ProcessPool.hpp"
int main()
{
// 这个代码,有一个藏得比较深的bug --- TODO
// 创建进程池对象
ProcessPool pp(gdefaultnum);
// 启动进程池
pp.Start();
// 自动派发任务
int cnt = 10;
while(cnt--)
{
pp.Run();
sleep(1);
}
// 回收,结束进程池
pp.Stop();
return 0;
}
1. 头文件包含
#include "ProcessPool.hpp"
包含了自定义的
ProcessPool
头文件,该头文件应该定义了进程池类的实现
2. main函数流程
int main()
{
// 1. 创建进程池对象
ProcessPool pp(gdefaultnum);
// 2. 启动进程池
pp.Start();
// 3. 自动派发任务
int cnt = 10;
while(cnt--)
{
pp.Run();
sleep(1);
}
// 4. 回收,结束进程池
pp.Stop();
return 0;
}
3. 代码执行流程
创建进程池对象:使用
gdefaultnum
(全局变量)作为参数创建进程池启动进程池:调用
Start()
方法初始化并启动进程池任务派发循环:循环10次,每次调用
Run()
方法派发任务,然后休眠1秒停止进程池:调用
Stop()
方法回收资源并结束进程池
四、Makefile
process_pool:Main.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f process_pool
五、代码Bug分析
实现了一个进程池,父进程通过管道与多个子进程通信,并分配任务给子进程执行。老师指出的bug是关于文件描述符在fork时的继承问题。
1、问题所在
在ProcessPool::Start()
函数中,每次循环创建子进程时,存在文件描述符泄漏的问题。具体来说:
每次循环都会创建一个新的管道(
pipe(pipefd)
),然后fork一个子进程。子进程会继承父进程的所有打开的文件描述符,包括之前循环中创建的管道文件描述符。
虽然父进程在每次循环后会关闭当前管道的读端(
close(pipefd[0])
),但子进程仍然保留着之前所有管道的文件描述符。
2、具体表现
第一个子进程:只有第一个管道的文件描述符
第二个子进程:有第一个和第二个管道的文件描述符
第三个子进程:有第一、第二和第三个管道的文件描述符
...
第N个子进程:有所有N个管道的文件描述符
3、为什么这是个问题
资源泄漏:每个子进程保留着不需要的文件描述符,浪费系统资源。
潜在的错误:如果子进程意外尝试读取这些额外的管道,可能会导致不可预期的行为。
关闭时的混乱:当父进程关闭所有写端时,子进程可能因为持有多个管道读端而无法正确退出。
4、解决方案
在子进程代码块中,应该在开始工作前关闭所有不需要的文件描述符。具体修改如下:
else if (subid == 0)
{
// 子进程
// 关闭父进程继承下来的所有写端文件描述符
for (auto &channel : _cm._channels) {
close(channel.Fd());
}
// 关闭当前管道的写端
close(pipefd[1]);
Work(pipefd[0]); // 开始工作,监听管道读端
close(pipefd[0]);
exit(0); // 工作结束退出
}
或者更简单的方法是在fork之前设置close-on-exec
标志:
// 在pipe创建后,fork前
fcntl(pipefd[0], F_SETFD, FD_CLOEXEC);
fcntl(pipefd[1], F_SETFD, FD_CLOEXEC);
这样当子进程执行exec时,这些文件描述符会自动关闭(虽然这个代码中子进程没有exec)。
5、详细分析子进程代码块中的文件描述符处理
让我们一步一步详细分析这段子进程代码,看看它是如何解决文件描述符泄漏问题的。我们以创建3个子进程为例来说明。
初始状态
假设我们创建3个子进程(_process_num=3
),每次循环都会:
创建新管道(
pipefd[2]
)fork子进程
父进程将写端(
pipefd[1]
)存入_cm._channels
第一次循环 (i=0)
父进程状态
创建管道
pipefd0[2]
(假设值为[3,4])_cm._channels
: 空
fork后子进程代码执行
else if (subid == 0) // 第一个子进程
{
// _cm._channels是空的,for循环不执行
for (auto &channel : _cm._channels) { // 不执行
close(channel.Fd());
}
close(pipefd0[1]); // 关闭写端4
Work(pipefd0[0]); // 使用读端3
close(pipefd0[0]); // 关闭读端3
exit(0);
}
结果
子进程只持有自己管道的读端(3)
没有多余的文件描述符
变化过程
第二次循环 (i=1)
父进程状态
已有第一个管道
pipefd0[2]
=[3,4] (写端4在_channels中)创建新管道
pipefd1[2]
(假设值为[5,6])_cm._channels
: 包含[写端4, pid1]
fork后子进程代码执行
else if (subid == 0) // 第二个子进程
{
// _cm._channels包含第一个管道的写端
for (auto &channel : _cm._channels) { // 执行一次
close(channel.Fd()); // 关闭4
}
close(pipefd1[1]); // 关闭当前管道的写端6
Work(pipefd1[0]); // 使用读端5
close(pipefd1[0]); // 关闭读端5
exit(0);
}
结果
子进程关闭了第一个管道的写端(4)
子进程只持有自己管道的读端(5)
没有多余的文件描述符
变化过程
第三次循环 (i=2)
父进程状态
已有两个管道:
pipefd0[2]=[3,4] (写端4在_channels中)
pipefd1[2]=[5,6] (写端6在_channels中)
创建新管道
pipefd2[2]
(假设值为[7,8])_cm._channels
: 包含[写端4, pid1], [写端6, pid2]
fork后子进程代码执行
else if (subid == 0) // 第三个子进程
{
// _cm._channels包含前两个管道的写端
for (auto &channel : _cm._channels) { // 执行两次
close(channel.Fd()); // 第一次关闭4,第二次关闭6
}
close(pipefd2[1]); // 关闭当前管道的写端8
Work(pipefd2[0]); // 使用读端7
close(pipefd2[0]); // 关闭读端7
exit(0);
}
结果
子进程关闭了前两个管道的写端(4和6)
子进程只持有自己管道的读端(7)
没有多余的文件描述符
变化过程
关键点总结
_cm._channels
的内容:每次循环父进程都会添加新的写端fd到
_cm._channels
子进程看到的
_cm._channels
是父进程在该时刻的快照
for循环的作用:
关闭所有之前创建的管道的写端
确保子进程只保留自己管道的读端
为什么需要这样做:
如果不关闭之前的写端,子进程会持有所有之前创建的管道的文件描述符
这样会导致:
文件描述符泄漏
父进程关闭写端时,子进程可能不会正确退出(因为还持有其他管道的读端)
当前管道的处理:
close(pipefd[1])
:关闭当前管道的写端(子进程不需要)Work(pipefd[0])
:使用当前管道的读端close(pipefd[0])
:工作完成后关闭读端
最终效果
通过这种处理方式,每个子进程:
只保留自己管道的读端
不持有任何其他管道的文件描述符
当父进程关闭所有写端时,子进程的read会返回0,从而正确退出
这样就完美解决了文件描述符泄漏的问题。
六、编译和运行进程池程序
1、编译步骤
首先确保有这三个文件:
ProcessPool.hpp
(主头文件)Task.hpp
(任务管理头文件)main.cpp
(主程序)
使用g++编译命令:
g++ -o process_pool main.cpp -std=c++11
这个命令会:
将
main.cpp
编译为可执行文件process_pool
使用C++11标准
自动包含两个头文件
2、运行程序
编译成功后,直接运行生成的可执行文件:
./process_pool
3、监视子进程
可以使用类似ps
或top
命令来监视进程。
在一个终端运行程序:
./process_pool
在另一个终端使用
ps
命令监视:
watch -n 1 'ps axo pid,ppid,stat,command | grep process_pool'
这个命令会:
每秒刷新一次(
-n 1
)显示所有
process_pool
相关进程包括PID、PPID、状态和命令
4、程序说明
创建指定数量(默认5个)的子进程
父进程通过管道向子进程发送任务码
子进程接收任务码并执行相应任务
10秒后父进程关闭所有子进程