1.进程间通信⽬的
2.管道
2.1 匿名管道
-----通常用来实现 父子通信
创建子进程时,需要把父进程的进程内容全部拷贝一份,但文件管理是不需要拷贝的
但是我们把父进程的文件描述符表给拷贝下来了,文件描述符表里是一堆指针,他们仍然指向父进程打开的那些文件
- 这也是为什么之前运行子进程会在同一个屏幕上打印内容,
因为父子进程用的是同一个显示器文件
,自然在同一个屏幕上打印咯, - 和c++中遇到的
浅拷贝
十分相似
2.2 原理
2.2 管道样例
#include <iostream>
#include <unistd.h>
using namespace std;
int main()
{
int fd[2]={0};//这里使用fd模拟文件描述符表,忽略了0,1,2即标准输入stdin,标准输出stdout,标准错误stderr
int n=pipe(fd);//pipe函数需要头文件unistd.h
if(n<0)//运行失败会是n<0
{
cout<<"error"<<endl;
return 1;
}
cout<<"fd[0]:"<<fd[0]<<endl;
cout<<"fd[1]:"<<fd[1]<<endl;
return 0;
}
最终结果是:
- 因为0,1,2即标准输入,标准输出,标准错误一直在被打开,所以只能分配3,4
完整父子进程管道代码:
#include <iostream> // 标准输入输出(cout, endl)
#include <unistd.h> // 提供 pipe(), fork(), close(), read(), write(), sleep() 等系统调用
#include <cstdio> // 提供 printf() 等 C 标准 I/O 函数
#include <cstring> // 提供字符串处理函数(如 memset)
#include <sys/types.h> // 提供 pid_t 等数据类型定义
#include <sys/wait.h> // 提供 waitpid() 函数
using namespace std;
// 子进程向管道写入数据的函数
void childwrite(int wfd) {
char c = 0; // 写入的字符(这里固定为 0)
int cnt = 0; // 计数器,记录写入次数
while (true) {
write(wfd, &c, 1); // 向管道写入 1 字节(实际写入的是 '\0')
printf("child: %d\n", cnt++); // 打印写入次数
}
}
// 父进程从管道读取数据的函数
void fatherread(int rfd) {
char buffer[1024]; // 读取缓冲区
while (true) {
sleep(100); // 父进程休眠 100 秒(实际会被 read() 打断)
buffer[0] = 0; // 清空缓冲区(可选)
// 从管道读取数据(最多读 sizeof(buffer)-1 字节,预留 1 字节给 '\0')
ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
if (n > 0) { // 读取成功
buffer[n] = 0; // 手动添加字符串结束符 '\0'
std::cout << "child say: " << buffer << std::endl; // 打印读取的内容
} else if (n == 0) { // 管道写端关闭(子进程退出)
std::cout << "n : " << n << std::endl;
std::cout << "child 退出,我也退出";
break;
} else { // 读取错误
break;
}
break; // 测试时提前退出循环(实际应去掉)
}
}
int main() {
// 1. 创建管道
int fd[2] = {0}; // fd[0]:读端,fd[1]:写端
int n = pipe(fd); // 调用 pipe() 创建匿名管道
if (n < 0) { // 创建失败
cout << "error" << endl;
return 1;
}
cout << "fd[0]:" << fd[0] << endl; // 打印读端 fd
cout << "fd[1]:" << fd[1] << endl; // 打印写端 fd
// 2. 创建子进程
pid_t pid = fork(); // 调用 fork() 创建子进程
if (pid == 0) { // 子进程逻辑
close(fd[0]); // 关闭读端(子进程只写)
childwrite(fd[1]); // 调用子进程写入函数
close(fd[1]); // 关闭写端(实际不会执行到这里)
exit(0); // 子进程退出
}
sleep(5); // 父进程休眠 5 秒(等待子进程写入数据)
close(fd[1]); // 关闭写端(父进程只读)
fatherread(fd[0]); // 调用父进程读取函数
close(fd[0]); // 关闭读端
// 等待子进程退出
int status = 0;
int ret = waitpid(pid, &status, 0); // 阻塞等待子进程结束
if (ret > 0) { // 子进程已退出
// 打印子进程退出状态(高 8 位是退出码,低 7 位是终止信号)
printf("exit code: %d, exit signal: %d\n", (status>>8)&0xFF, status&0x7F);
sleep(5); // 父进程再休眠 5 秒(观察用)
}
return 0;
}
2.3 五种特性
2.4 四种通信情况
blog.csdnimg.cn/direct/eeef895593df4fd08b31442035e93198.png)
3.进程池的模拟
3.1 hpp文件的使用
#ifndef __PROCESS_POOL_HPP__ // 头文件保护宏(双下划线风格)
#define __PROCESS_POOL_HPP__
#include <iostream> // 系统头文件用尖括号<>
// 函数声明/定义
void test() {
std::cout << "test" << std::endl; // 直接使用std::前缀
}
#endif // __PROCESS_POOL_HPP__
-函数的声明和定义可以放在一块写,注意头两行和末尾一行 是格式
3.2 进程池代码实现
ProcessPool.hpp:
#ifndef __PROCESS_POOL_HPP__ // 头文件保护宏,防止重复包含
#define __PROCESS_POOL_HPP__
#include <iostream> // 标准输入输出
#include <cstdlib> // C标准库(替代stdlib.h的C++版本)
#include <vector> // 动态数组容器
#include <unistd.h> // POSIX API(pipe/fork/close等)
#include <sys/wait.h> // 进程等待相关函数
#include "Task.hpp" // 自定义任务管理头文件
// Channel类:管理单个子进程的通信通道
class Channel
{
public:
// 构造函数:初始化写端fd和子进程ID
Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
{
// 生成通道名称(格式:channel-[fd]-[pid])
_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
}
// 析构函数(空实现,资源通过Close()显式释放)
~Channel() {}
// 向子进程发送任务码
void Send(int code)
{
int n = write(_wfd, &code, sizeof(code));
(void)n; // 显式忽略返回值(避免编译器警告)
}
// 关闭写端文件描述符
void Close()
{
close(_wfd); // 关闭管道写端
}
// 等待子进程退出,回收子进程,避免僵尸进程出现
void Wait()
{
pid_t rid = waitpid(_subid, nullptr, 0); // 阻塞等待
(void)rid; // 显式忽略返回值
}
// Getter方法
int Fd() { return _wfd; } // 获取写端fd
pid_t SubId() { return _subid; } // 获取子进程PID
std::string Name() { return _name; } // 获取通道名称
private:
int _wfd; // 管道写端文件描述符
pid_t _subid; // 子进程PID
std::string _name; // 通道标识名称
};
// ChannelManager类:管理所有子进程通道
class ChannelManager
{
public:
ChannelManager() : _next(0) {} // 初始化轮询索引
// 添加新通道
void Insert(int wfd, pid_t subid)
{
_channels.emplace_back(wfd, subid); // 原地构造Channel对象,加入channel数组
}
// 轮询选择下一个通道(简单负载均衡)
Channel &Select()
{
auto &c = _channels[_next];
_next = (_next + 1) % _channels.size(); // 环形选择
return c;
}
// 打印所有通道信息
void PrintChannel()
{
for (auto &channel : _channels)
{
std::cout << channel.Name() << std::endl;
}
}
// 关闭所有子进程管道
void StopSubProcess()
{
for (auto &channel : _channels)
{
channel.Close();//关掉读
std::cout << "关闭: " << channel.Name() << std::endl;
}
}
// 回收所有子进程
void WaitSubProcess()
{
for (auto &channel : _channels)
{
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
~ChannelManager() {} // 析构函数(vector自动释放)
private:
std::vector<Channel> _channels; // 存储所有Channel对象
int _next; // 轮询索引
};
const int gdefaultnum = 5; // 默认子进程数量
// ProcessPool类:主进程池实现
class ProcessPool
{
public:
// 构造函数:初始化进程数并注册任务
ProcessPool(int num) : _process_num(num)
{
_tm.Register(PrintLog); // 注册日志任务
_tm.Register(Download); // 注册下载任务
_tm.Register(Upload); // 注册上传任务
}//把这三个函数指针全部加入函数指针数组中
// 子进程工作循环
void Work(int rfd)
{
while (true)
{
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));//从rfd中读任务吗,和channel的send函数相对应,正常一次读4字节
if (n > 0) // 成功读取
{
if (n != sizeof(code)) continue; // 数据不完整则继续读取
std::cout << "子进程[" << getpid() << "]收到任务码: " << code << std::endl;
_tm.Execute(code); // 执行对应任务,就是三个函数之一,上传,下载。。。。
}
else if (n == 0) // 管道关闭(父进程终止)
{
std::cout << "子进程退出" << std::endl;
break;
}
else // 读取错误
{
std::cerr << "读取错误" << std::endl;
break;
}
}
}
// 启动进程池
bool Start()
{
for (int i = 0; i < _process_num; i++)
{
// 1. 创建管道
int pipefd[2] = {0};
if (pipe(pipefd) < 0) return false; // 创建失败
// 2. 创建子进程
pid_t subid = fork();
if (subid < 0) return false; // fork失败
if (subid == 0) // 子进程分支
{
close(pipefd[1]); // 关闭写端
Work(pipefd[0]); // 进入工作循环
close(pipefd[0]);
exit(0); // 正常退出
}
else // 父进程分支
{
close(pipefd[0]); // 关闭读端
_cm.Insert(pipefd[1], subid); // 记录通道信息
}
}
return true;
}
// 调试用:打印所有通道
void Debug() { _cm.PrintChannel(); }
// 运行任务(主进程调用)
void Run()
{
int taskcode = _tm.Code(); // 1. 获取任务码
auto &c = _cm.Select(); // 2. 选择子进程
std::cout << "选择子进程: " << c.Name() << std::endl;
c.Send(taskcode); // 3. 发送任务
std::cout << "发送任务码: " << taskcode << std::endl;
}
// 停止进程池
void Stop()
{
_cm.StopSubProcess(); // 关闭所有管道
_cm.WaitSubProcess(); // 回收所有子进程
}
~ProcessPool() {} // 析构函数
private:
ChannelManager _cm; // 通道管理器
int _process_num; // 子进程数量
TaskManager _tm; // 任务管理器
};
Task.hpp:
// 防止头文件被重复包含的编译器指令(现代C++替代#ifndef的方式)
#pragma once
// 标准输入输出库(用于cout等)
#include <iostream>
// 动态数组容器(用于存储任务函数指针)
#include <vector>
// 时间相关函数(用于随机数种子初始化)
#include <ctime>
// 定义函数指针类型:无参数、无返回值的函数,名字是task_t!!!!!
typedef void (*task_t)();
调试用任务函数
// 打印日志任务函数
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << std::endl;
}
// 下载任务函数
void Download()
{
std::cout << "我是一个下载的任务" << std::endl;
}
// 上传任务函数
void Upload()
{
std::cout << "我是一个上传的任务" << std::endl;
}
//
// 任务管理类
class TaskManager
{
public:
// 构造函数:初始化随机数种子
TaskManager()
{
srand(time(nullptr)); // 用当前时间初始化随机数生成器
}
// 注册任务函数:将函数指针存入vector
void Register(task_t t)
{
_tasks.push_back(t); // 添加到任务列表末尾
}
// 生成随机任务码:返回[0, 任务数量-1]的随机数
int Code()
{
return rand() % _tasks.size(); // 取模保证不越界
}
// 执行任务:根据code调用对应的函数
void Execute(int code)
{
// 检查code是否合法(防御性编程)
if(code >= 0 && code < _tasks.size())
{
_tasks[code](); // 通过函数指针调用任务,就是上面三个打印,上传,下载函数
}
// 注意:未处理非法code的情况(可添加错误处理)
}
// 析构函数(当前为空实现)
~TaskManager()
{}
private:
std::vector<task_t> _tasks; // 存储所有注册的任务函数指针
};
makefile:
process_pool:Main.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f process_pool
main.cc:
#include "ProcessPool.hpp"
int main()
{
// 创建进程池对象
ProcessPool pp(gdefaultnum);
// 启动进程池
pp.Start();//刚开始就是建立5个子进程和通道,但通道内没有内容即任务码,所以子进程的work会被卡住。
// 自动派发任务
int cnt = 10;
while(cnt--)
{
pp.Run();//往子进程里去发放任务码,子进程开始work,也就是开始调用manager的Execute函数,就是在三个上传,下载函数中随机选一个来执行
sleep(1);
}
// 回收,结束进程池
pp.Stop();// 关闭所有管道-即回收父进程的wfd---使用close函数关掉所有channel中的wfd
//回收所有子进程----调用waitpid函数
return 0;
}