基于匿名管道的多进程任务池实现与FD泄漏解决方案

发布于:2025-07-27 ⋅ 阅读:(10) ⋅ 点赞:(0)

目录

一、ProcessPool.hpp

1. 头文件保护和包含

2. Channel类(通信通道管理)

3. ChannelManager类(通道管理器)

4. ProcessPool类(进程池)

5. 总结

二、Task.hpp

1. 预处理指令和头文件包含

2. 任务函数指针类型定义

3. 调试用任务函数

4. TaskManager类

成员变量

构造函数

成员函数

1. Register

2. Code

3. Execute

设计特点

三、Main.cc

1. 头文件包含

2. main函数流程

3. 代码执行流程

四、Makefile

五、代码Bug分析

1、问题所在

2、具体表现

3、为什么这是个问题

4、解决方案

5、详细分析子进程代码块中的文件描述符处理

初始状态

第一次循环 (i=0)

父进程状态

fork后子进程代码执行

结果

变化过程

第二次循环 (i=1)

父进程状态

fork后子进程代码执行

结果

变化过程

第三次循环 (i=2)

父进程状态

fork后子进程代码执行

结果

变化过程

 关键点总结

最终效果

六、编译和运行进程池程序

1、编译步骤

2、运行程序

3、监视子进程

4、程序说明


一、ProcessPool.hpp

#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__

#include <iostream>
#include <cstdlib> // stdlib.h stdio.h -> cstdlib cstdio
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include "Task.hpp"

// 先描述 - 定义Channel类,用于管理子进程通信通道
class Channel
{
public:
    // 构造函数,初始化写端文件描述符和子进程ID
    Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
    {
        // 生成通道名称,格式为"channel-[wfd]-[subid]"
        _name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
    }
    ~Channel()
    {
    }
    
    // 向子进程发送任务码
    void Send(int code)
    {
        int n = write(_wfd, &code, sizeof(code));
        (void)n; // 忽略返回值,避免编译器警告
    }
    
    // 关闭通道的写端
    void Close()
    {
        close(_wfd);
    }
    
    // 等待子进程结束
    void Wait()
    {
        pid_t rid = waitpid(_subid, nullptr, 0);
        (void)rid; // 忽略返回值
    }
    
    // 获取写端文件描述符
    int Fd() { return _wfd; }
    // 获取子进程ID
    pid_t SubId() { return _subid; }
    // 获取通道名称
    std::string Name() { return _name; }

private:
    int _wfd;            // 管道写端文件描述符
    pid_t _subid;        // 子进程ID
    std::string _name;   // 通道名称
    // int _loadnum;      // 可扩展:负载计数
};

// 在组织 - 管理多个Channel对象
class ChannelManager
{
public:
    ChannelManager() : _next(0)
    {
    }
    
    // 插入一个新的Channel对象
    void Insert(int wfd, pid_t subid)
    {
        _channels.emplace_back(wfd, subid);
        // Channel c(wfd, subid);
        // _channels.push_back(std::move(c));
    }
    
    // 选择下一个Channel对象(轮询负载均衡)
    Channel &Select()
    {
        auto &c = _channels[_next];
        _next++;
        _next %= _channels.size(); // 循环选择
        return c;
    }
    
    // 打印所有Channel信息
    void PrintChannel()
    {
        for (auto &channel : _channels)
        {
            std::cout << channel.Name() << std::endl;
        }
    }
    
    // 关闭所有子进程的通信通道
    void StopSubProcess()
    {
        for (auto &channel : _channels)
        {
            channel.Close();
            std::cout << "关闭: " << channel.Name() << std::endl;
        }
    }
    
    // 等待所有子进程结束
    void WaitSubProcess()
    {
        for (auto &channel : _channels)
        {
            channel.Wait();
            std::cout << "回收: " << channel.Name() << std::endl;
        }
    }
    
    ~ChannelManager() {}

private:
    std::vector<Channel> _channels; // 存储所有Channel对象
    int _next;                      // 下一个要选择的Channel索引
};

const int gdefaultnum = 5; // 默认子进程数量

// 进程池类
class ProcessPool
{
public:
    // 构造函数,初始化进程数量并注册任务
    ProcessPool(int num) : _process_num(num)
    {
        _tm.Register(PrintLog);  // 注册打印日志任务
        _tm.Register(Download);  // 注册下载任务
        _tm.Register(Upload);    // 注册上传任务
    }
    
    // 子进程工作函数
    void Work(int rfd)
    {
        while (true)
        {
            int code = 0;
            ssize_t n = read(rfd, &code, sizeof(code));
            if (n > 0)
            {
                if (n != sizeof(code))
                {
                    continue; // 读取不完整,继续尝试
                }
                std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;
                _tm.Execute(code); // 执行对应任务
            }
            else if (n == 0)
            {
                std::cout << "子进程退出" << std::endl;
                break; // 管道关闭,子进程退出
            }
            else
            {
                std::cout << "读取错误" << std::endl;
                break; // 读取错误,子进程退出
            }
        }
    }
    
    // 启动进程池
    bool Start()
    {
        for (int i = 0; i < _process_num; i++)
        {
            // 1. 创建管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                return false;

            // 2. 创建子进程
            pid_t subid = fork();
            if (subid < 0)
                return false;
            else if (subid == 0)
            {
                // 子进程
                // 3. 关闭不需要的文件描述符(写端)
                close(pipefd[1]);
                Work(pipefd[0]); // 开始工作,监听管道读端
                close(pipefd[0]);
                exit(0); // 工作结束退出
            }
            else
            {
                // 父进程
                // 3. 关闭不需要的文件描述符(读端)
                close(pipefd[0]); // 写端:pipefd[1];
                _cm.Insert(pipefd[1], subid); // 将管道和子进程信息加入管理器
                // wfd, subid
            }
        }
        return true;
    }
    
    // 调试用:打印所有Channel信息
    void Debug()
    {
        _cm.PrintChannel();
    }
    
    // 运行任务
    void Run()
    {
        // 1. 选择一个任务
        int taskcode = _tm.Code();

        // 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务
        auto &c = _cm.Select();
        std::cout << "选择了一个子进程: " << c.Name() << std::endl;
        // 2. 发送任务
        c.Send(taskcode);
        std::cout << "发送了一个任务码: " << taskcode << std::endl;
    }
    
    // 停止进程池
    void Stop()
    {
        // 关闭父进程所有的wfd即可
        _cm.StopSubProcess();
        // 回收所有子进程
        _cm.WaitSubProcess();
    }
    
    ~ProcessPool()
    {
    }

private:
    ChannelManager _cm;       // Channel管理器
    int _process_num;        // 子进程数量
    TaskManager _tm;         // 任务管理器
};

#endif

1. 头文件保护和包含

#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
  • 这是标准的头文件保护,防止头文件被多次包含

#include <iostream>
#include <cstdlib> // stdlib.h stdio.h -> cstdlib cstdio
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include "Task.hpp"
  • 包含必要的系统头文件和自定义头文件:

    • <iostream>:输入输出流

    • <cstdlib>:C标准库功能

    • <vector>:向量容器

    • <unistd.h>:Unix标准函数(如fork, pipe等)

    • <sys/wait.h>:进程等待相关函数

    • "Task.hpp":自定义任务管理器头文件

2. Channel类(通信通道管理)

class Channel
{
public:
    // 构造函数
    Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
    {
        _name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
    }
  • 构造函数初始化写端文件描述符_wfd和子进程ID_subid

  • 生成通道名称,格式为"channel-[wfd]-[subid]"

    // 向子进程发送任务码
    void Send(int code)
    {
        int n = write(_wfd, &code, sizeof(code));
        (void)n; // 忽略返回值,避免编译器警告
    }
  • 通过管道写端_wfd发送整数任务码

  • (void)n用于忽略write返回值,避免编译器警告

    // 关闭通道的写端
    void Close()
    {
        close(_wfd);
    }
  • 关闭管道写端文件描述符

    // 等待子进程结束
    void Wait()
    {
        pid_t rid = waitpid(_subid, nullptr, 0);
        (void)rid; // 忽略返回值
    }
  • 使用waitpid等待指定子进程结束

  • nullptr表示不关心退出状态,0表示阻塞等待

    // 获取写端文件描述符
    int Fd() { return _wfd; }
    // 获取子进程ID
    pid_t SubId() { return _subid; }
    // 获取通道名称
    std::string Name() { return _name; }
  • 提供获取私有成员的接口方法

private:
    int _wfd;            // 管道写端文件描述符
    pid_t _subid;        // 子进程ID
    std::string _name;   // 通道名称
    // int _loadnum;      // 可扩展:负载计数
  • 私有成员变量:

    • _wfd:管道写端文件描述符

    • _subid:子进程ID

    • _name:通道名称

    • 注释掉的_loadnum可用于扩展实现负载计数

3. ChannelManager类(通道管理器)

class ChannelManager
{
public:
    ChannelManager() : _next(0)
    {
    }
  • 构造函数初始化_next为0,用于轮询选择通道

    // 插入一个新的Channel对象
    void Insert(int wfd, pid_t subid)
    {
        _channels.emplace_back(wfd, subid);
        // Channel c(wfd, subid);
        // _channels.push_back(std::move(c));
    }
  • 使用emplace_back直接构造Channel对象并添加到向量中

  • 注释掉的代码展示了另一种实现方式(这个通俗易懂)

    // 选择下一个Channel对象(轮询负载均衡)
    Channel &Select()
    {
        auto &c = _channels[_next];
        _next++;
        _next %= _channels.size(); // 循环选择
        return c;
    }
  • 使用简单的轮询算法选择下一个通道,使得子进程负载均衡!!!

  • _next递增后取模确保循环选择

    // 打印所有Channel信息
    void PrintChannel()
    {
        for (auto &channel : _channels)
        {
            std::cout << channel.Name() << std::endl;
        }
    }
  • 遍历并打印所有通道的名称

    // 关闭所有子进程的通信通道
    void StopSubProcess()
    {
        for (auto &channel : _channels)
        {
            channel.Close();
            std::cout << "关闭: " << channel.Name() << std::endl;
        }
    }
  • 关闭所有通道的写端,这会通知子进程退出

    // 等待所有子进程结束
    void WaitSubProcess()
    {
        for (auto &channel : _channels)
        {
            channel.Wait();
            std::cout << "回收: " << channel.Name() << std::endl;
        }
    }
  • 等待所有子进程结束并回收资源

private:
    std::vector<Channel> _channels; // 存储所有Channel对象
    int _next;                      // 下一个要选择的Channel索引
  • 私有成员:

    • _channels:存储所有Channel对象的向量

    • _next:轮询选择索引

4. ProcessPool类(进程池)

const int gdefaultnum = 5; // 默认子进程数量
  • 定义默认子进程数量为5

class ProcessPool
{
public:
    // 构造函数
    ProcessPool(int num) : _process_num(num)
    {
        _tm.Register(PrintLog);  // 注册打印日志任务
        _tm.Register(Download);  // 注册下载任务
        _tm.Register(Upload);    // 注册上传任务
    }
  • 构造函数初始化子进程数量

  • 注册三种任务:打印日志、下载和上传

    // 子进程工作函数
    void Work(int rfd)
    {
        while (true)
        {
            int code = 0;
            ssize_t n = read(rfd, &code, sizeof(code));
            if (n > 0)
            {
                if (n != sizeof(code))
                {
                    continue; // 读取不完整,继续尝试
                }
                std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;
                _tm.Execute(code); // 执行对应任务
            }
            else if (n == 0)
            {
                std::cout << "子进程退出" << std::endl;
                break; // 管道关闭,子进程退出
            }
            else
            {
                std::cout << "读取错误" << std::endl;
                break; // 读取错误,子进程退出
            }
        }
    }
  • 子进程工作循环:

    1. 从管道读取任务码

    2. 如果读取成功,执行对应任务

    3. 如果管道关闭(n==0),退出循环

    4. 如果读取错误,退出循环

    // 启动进程池
    bool Start()
    {
        for (int i = 0; i < _process_num; i++)
        {
            // 1. 创建管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                return false;

            // 2. 创建子进程
            pid_t subid = fork();
            if (subid < 0)
                return false;
            else if (subid == 0)
            {
                // 子进程
                // 3. 关闭不需要的文件描述符(写端)
                close(pipefd[1]);
                Work(pipefd[0]); // 开始工作,监听管道读端
                close(pipefd[0]);
                exit(0); // 工作结束退出
            }
            else
            {
                // 父进程
                // 3. 关闭不需要的文件描述符(读端)
                close(pipefd[0]); // 写端:pipefd[1];
                _cm.Insert(pipefd[1], subid); // 将管道和子进程信息加入管理器
            }
        }
        return true;
    }
  • 启动进程池的主要步骤:

    1. 创建管道

    2. 创建子进程

    3. 子进程关闭写端,进入工作循环

    4. 父进程关闭读端,记录通道信息

    // 调试用:打印所有Channel信息
    void Debug()
    {
        _cm.PrintChannel();
    }
  • 调试方法,打印所有通道信息

    // 运行任务
    void Run()
    {
        // 1. 选择一个任务
        int taskcode = _tm.Code();

        // 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务
        auto &c = _cm.Select();
        std::cout << "选择了一个子进程: " << c.Name() << std::endl;
        // 2. 发送任务
        c.Send(taskcode);
        std::cout << "发送了一个任务码: " << taskcode << std::endl;
    }
  • 运行任务的步骤:

    1. 从任务管理器获取任务码

    2. 选择子进程(轮询负载均衡)

    3. 通过管道发送任务码

    // 停止进程池
    void Stop()
    {
        // 关闭父进程所有的wfd即可
        _cm.StopSubProcess();
        // 回收所有子进程
        _cm.WaitSubProcess();
    }
  • 停止进程池的步骤:

    1. 关闭所有管道写端(通知子进程退出)

    2. 等待所有子进程结束

private:
    ChannelManager _cm;       // Channel管理器
    int _process_num;        // 子进程数量
    TaskManager _tm;         // 任务管理器
};
  • 私有成员:

    • _cm:通道管理器

    • _process_num:子进程数量

    • _tm:任务管理器

5. 总结

这个进程池实现的主要特点:

  1. 父子进程通信:使用管道进行通信

  2. 负载均衡:简单的轮询算法分配任务

  3. 任务管理:通过任务码区分不同任务

  4. 资源管理:正确关闭文件描述符,避免资源泄漏

  5. 进程管理:优雅地启动和停止子进程

每个类都有明确的职责:

  • Channel:管理单个子进程的通信

  • ChannelManager:管理所有子进程通信

  • ProcessPool:提供进程池的对外接口

这种设计模式在需要并行处理任务的服务器程序中非常有用,可以有效利用多核CPU资源。


二、Task.hpp

#pragma once  // 防止头文件被重复包含

#include <iostream>
#include <vector>
#include <ctime>  // 用于随机数种子初始化

// 定义任务函数指针类型,无参数无返回值
typedef void (*task_t)();

//////////////// 调试用任务函数 /////////////////////
// 打印日志任务
void PrintLog()
{
    std::cout << "我是一个打印日志的任务" << std::endl;
}

// 下载任务
void Download()
{
    std::cout << "我是一个下载的任务" << std::endl;
}

// 上传任务
void Upload()
{
    std::cout << "我是一个上传的任务" << std::endl;
}
//////////////////////////////////////

// 任务管理器类
class TaskManager
{
public:
    // 构造函数,初始化随机数种子
    TaskManager()
    {
        srand(time(nullptr));  // 使用当前时间作为随机数种子
    }
    
    // 注册任务函数到任务列表
    void Register(task_t t)
    {
        _tasks.push_back(t);  // 将任务函数指针添加到向量中
    }
    
    // 生成随机任务码
    int Code()
    {
        // 返回0到任务数量-1之间的随机数
        return rand() % _tasks.size();
    }
    
    // 执行指定编码的任务
    void Execute(int code)
    {
        // 检查任务码是否在有效范围内
        if(code >= 0 && code < _tasks.size())
        {
            _tasks[code]();  // 调用对应的任务函数
        }
    }
    
    ~TaskManager()
    {}
    
private:
    std::vector<task_t> _tasks;  // 存储所有注册的任务函数指针
};

1. 预处理指令和头文件包含

#pragma once  // 防止头文件被重复包含
#include <iostream>
#include <vector>
#include <ctime>  // 用于随机数种子初始化
  • #pragma once:确保头文件只被包含一次,防止重复定义

  • <iostream>:提供输入输出功能

  • <vector>:使用向量容器存储任务

  • <ctime>:用于获取当前时间,初始化随机数种子

2. 任务函数指针类型定义

typedef void (*task_t)();
  • 定义了一个名为task_t的函数指针类型

  • 这种函数指针指向无参数、无返回值的函数

  • 后续所有任务函数都需要符合这个签名

3. 调试用任务函数

void PrintLog() { ... }
void Download() { ... }
void Upload() { ... }
  • 定义了三个示例任务函数,都符合task_t类型

  • 每个函数执行简单的打印操作,实际应用中可替换为真实任务

4. TaskManager类

成员变量

std::vector<task_t> _tasks;
  • 使用向量存储所有注册的任务函数指针

构造函数

TaskManager()
{
    srand(time(nullptr));  // 使用当前时间作为随机数种子
}
  • 初始化随机数生成器,用于后续生成随机任务码

成员函数

1. Register
void Register(task_t t)
{
    _tasks.push_back(t);  // 将任务函数指针添加到向量中
}
  • 接收一个符合task_t类型的函数指针

  • 将其添加到任务列表_tasks

2. Code
int Code()
{
    return rand() % _tasks.size();
}
  • 生成一个随机任务码

  • 范围是0到任务数量-1(因为%取模运算)

3. Execute
void Execute(int code)
{
    if(code >= 0 && code < _tasks.size())
    {
        _tasks[code]();  // 调用对应的任务函数
    }
}
  • 根据传入的任务码执行对应的任务

  • 先检查任务码是否有效(防止越界访问)

  • 然后通过函数指针调用对应的任务函数

设计特点

  1. 灵活性:可以动态添加新任务,只需符合task_t类型

  2. 安全性:执行前检查任务码有效性

  3. 随机性:可以随机选择任务执行

  4. 封装性:将任务管理功能封装在类中


三、Main.cc

#include "ProcessPool.hpp"

int main()
{
    // 这个代码,有一个藏得比较深的bug --- TODO
    // 创建进程池对象
    ProcessPool pp(gdefaultnum);

    // 启动进程池
    pp.Start();

    // 自动派发任务
    int cnt = 10;
    while(cnt--)
    {
        pp.Run();
        sleep(1);
    }

    // 回收,结束进程池
    pp.Stop();
    return 0;
}

1. 头文件包含

#include "ProcessPool.hpp"
  • 包含了自定义的ProcessPool头文件,该头文件应该定义了进程池类的实现

2. main函数流程

int main()
{
    // 1. 创建进程池对象
    ProcessPool pp(gdefaultnum);

    // 2. 启动进程池
    pp.Start();

    // 3. 自动派发任务
    int cnt = 10;
    while(cnt--)
    {
        pp.Run();
        sleep(1);
    }

    // 4. 回收,结束进程池
    pp.Stop();
    return 0;
}

3. 代码执行流程

  1. 创建进程池对象:使用gdefaultnum(全局变量)作为参数创建进程池

  2. 启动进程池:调用Start()方法初始化并启动进程池

  3. 任务派发循环:循环10次,每次调用Run()方法派发任务,然后休眠1秒

  4. 停止进程池:调用Stop()方法回收资源并结束进程池


四、Makefile

process_pool:Main.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -f process_pool

五、代码Bug分析

        实现了一个进程池,父进程通过管道与多个子进程通信,并分配任务给子进程执行。老师指出的bug是关于文件描述符在fork时的继承问题。

1、问题所在

        在ProcessPool::Start()函数中,每次循环创建子进程时,存在文件描述符泄漏的问题。具体来说:

  1. 每次循环都会创建一个新的管道(pipe(pipefd)),然后fork一个子进程。

  2. 子进程会继承父进程的所有打开的文件描述符,包括之前循环中创建的管道文件描述符。

  3. 虽然父进程在每次循环后会关闭当前管道的读端(close(pipefd[0])),但子进程仍然保留着之前所有管道的文件描述符。

2、具体表现

  • 第一个子进程:只有第一个管道的文件描述符

  • 第二个子进程:有第一个和第二个管道的文件描述符

  • 第三个子进程:有第一、第二和第三个管道的文件描述符

  • ...

  • 第N个子进程:有所有N个管道的文件描述符

3、为什么这是个问题

  1. 资源泄漏:每个子进程保留着不需要的文件描述符,浪费系统资源。

  2. 潜在的错误:如果子进程意外尝试读取这些额外的管道,可能会导致不可预期的行为。

  3. 关闭时的混乱:当父进程关闭所有写端时,子进程可能因为持有多个管道读端而无法正确退出。

4、解决方案

在子进程代码块中,应该在开始工作前关闭所有不需要的文件描述符。具体修改如下:

else if (subid == 0)
{
    // 子进程
    // 关闭父进程继承下来的所有写端文件描述符
    for (auto &channel : _cm._channels) {
        close(channel.Fd());
    }
    // 关闭当前管道的写端
    close(pipefd[1]);
    Work(pipefd[0]); // 开始工作,监听管道读端
    close(pipefd[0]);
    exit(0); // 工作结束退出
}

或者更简单的方法是在fork之前设置close-on-exec标志:

// 在pipe创建后,fork前
fcntl(pipefd[0], F_SETFD, FD_CLOEXEC);
fcntl(pipefd[1], F_SETFD, FD_CLOEXEC);

这样当子进程执行exec时,这些文件描述符会自动关闭(虽然这个代码中子进程没有exec)。

5、详细分析子进程代码块中的文件描述符处理

        让我们一步一步详细分析这段子进程代码,看看它是如何解决文件描述符泄漏问题的。我们以创建3个子进程为例来说明。

初始状态

假设我们创建3个子进程(_process_num=3),每次循环都会:

  1. 创建新管道(pipefd[2])

  2. fork子进程

  3. 父进程将写端(pipefd[1])存入_cm._channels

第一次循环 (i=0)

父进程状态

  • 创建管道pipefd0[2] (假设值为[3,4])

  • _cm._channels: 空

fork后子进程代码执行

else if (subid == 0)  // 第一个子进程
{
    // _cm._channels是空的,for循环不执行
    for (auto &channel : _cm._channels) {  // 不执行
        close(channel.Fd());
    }
    
    close(pipefd0[1]);  // 关闭写端4
    Work(pipefd0[0]);   // 使用读端3
    close(pipefd0[0]);  // 关闭读端3
    exit(0);
}

结果

  • 子进程只持有自己管道的读端(3)

  • 没有多余的文件描述符

变化过程

第二次循环 (i=1)

父进程状态

  • 已有第一个管道pipefd0[2]=[3,4] (写端4在_channels中)

  • 创建新管道pipefd1[2] (假设值为[5,6])

  • _cm._channels: 包含[写端4, pid1]

fork后子进程代码执行

else if (subid == 0)  // 第二个子进程
{
    // _cm._channels包含第一个管道的写端
    for (auto &channel : _cm._channels) {  // 执行一次
        close(channel.Fd());  // 关闭4
    }
    
    close(pipefd1[1]);  // 关闭当前管道的写端6
    Work(pipefd1[0]);   // 使用读端5
    close(pipefd1[0]);  // 关闭读端5
    exit(0);
}

结果

  • 子进程关闭了第一个管道的写端(4)

  • 子进程只持有自己管道的读端(5)

  • 没有多余的文件描述符

变化过程

第三次循环 (i=2)

父进程状态

  • 已有两个管道:

    • pipefd0[2]=[3,4] (写端4在_channels中)

    • pipefd1[2]=[5,6] (写端6在_channels中)

  • 创建新管道pipefd2[2] (假设值为[7,8])

  • _cm._channels: 包含[写端4, pid1], [写端6, pid2]

fork后子进程代码执行

else if (subid == 0)  // 第三个子进程
{
    // _cm._channels包含前两个管道的写端
    for (auto &channel : _cm._channels) {  // 执行两次
        close(channel.Fd());  // 第一次关闭4,第二次关闭6
    }
    
    close(pipefd2[1]);  // 关闭当前管道的写端8
    Work(pipefd2[0]);   // 使用读端7
    close(pipefd2[0]);  // 关闭读端7
    exit(0);
}

结果

  • 子进程关闭了前两个管道的写端(4和6)

  • 子进程只持有自己管道的读端(7)

  • 没有多余的文件描述符

变化过程

 关键点总结

  1. _cm._channels的内容

    • 每次循环父进程都会添加新的写端fd到_cm._channels

    • 子进程看到的_cm._channels是父进程在该时刻的快照

  2. for循环的作用

    • 关闭所有之前创建的管道的写端

    • 确保子进程只保留自己管道的读端

  3. 为什么需要这样做

    • 如果不关闭之前的写端,子进程会持有所有之前创建的管道的文件描述符

    • 这样会导致:

      • 文件描述符泄漏

      • 父进程关闭写端时,子进程可能不会正确退出(因为还持有其他管道的读端)

  4. 当前管道的处理

    • close(pipefd[1]):关闭当前管道的写端(子进程不需要)

    • Work(pipefd[0]):使用当前管道的读端

    • close(pipefd[0]):工作完成后关闭读端

最终效果

通过这种处理方式,每个子进程:

  • 只保留自己管道的读端

  • 不持有任何其他管道的文件描述符

  • 当父进程关闭所有写端时,子进程的read会返回0,从而正确退出

这样就完美解决了文件描述符泄漏的问题。


六、编译和运行进程池程序

1、编译步骤

  1. 首先确保有这三个文件:

    • ProcessPool.hpp (主头文件)

    • Task.hpp (任务管理头文件)

    • main.cpp (主程序)

  2. 使用g++编译命令:

g++ -o process_pool main.cpp -std=c++11

这个命令会:

  • main.cpp编译为可执行文件process_pool

  • 使用C++11标准

  • 自动包含两个头文件

2、运行程序

编译成功后,直接运行生成的可执行文件:

./process_pool

3、监视子进程

可以使用类似pstop命令来监视进程。

  1. 在一个终端运行程序:

    ./process_pool

  2. 在另一个终端使用ps命令监视:

watch -n 1 'ps axo pid,ppid,stat,command | grep process_pool'

这个命令会:

  • 每秒刷新一次(-n 1)

  • 显示所有process_pool相关进程

  • 包括PID、PPID、状态和命令

4、程序说明

  1. 创建指定数量(默认5个)的子进程

  2. 父进程通过管道向子进程发送任务码

  3. 子进程接收任务码并执行相应任务

  4. 10秒后父进程关闭所有子进程


网站公告

今日签到

点亮在社区的每一天
去签到