【Linux系统】进程间通信:基于匿名管道实现进程池

发布于:2025-08-06 ⋅ 阅读:(15) ⋅ 点赞:(0)

1. 进程池介绍

1.1 核心定义

进程池(Process Pool) 是一种预创建+复用式的进程管理技术,其本质是操作系统中预分配的进程资源容器。它包含两大核心组件:

  • 资源进程:池中预先创建的空闲进程,随时待命执行任务。
  • 管理进程:负责分配任务、回收进程和监控状态的中控系统 。

1.2 运作模式图解

关键特征

  • 进程数量固定(如CPU核心数的1-2倍)
  • 任务与进程解耦:任务排队等待空闲进程
  • 进程复用:任务完成后进程返回池中待命 

所以,进程池中维护了一定数量的进程,这些进程在进程池创建时被预先创建,并一直存在于系统中,等待任务的分配。当有任务提交给进程池时,它会从池中取出一个空闲进程来执行该任务,任务执行完毕后,该进程不会被销毁,而是返回进程池,等待下一个任务的分配。

1.3 为什么需要进程池

前提:

  • 普通进程的致命缺陷

    • 创建进程需复制页表、文件描述符等资源,Linux中fork()耗时可达数百微秒 
    • 销毁进程需内核回收内存、发送信号等,开销更大

进程池优化:

  • 减少进程创建和销毁的开销 :创建和销毁进程是耗时的操作,涉及操作系统的系统调用和资源分配。使用进程池预先创建一组进程,可避免频繁的进程创建和销毁,提高程序性能和效率。

  • 控制并发进程数量 :进程池能限制并发执行的进程数,防止进程过多导致系统资源耗尽。通过设置进程池大小,可根据系统处理能力和任务特性合理分配资源,确保进程数量在合理范围内。

  • 充分利用多核 CPU :进程池可以充分利用多核 CPU 的优势,通过并行执行多个进程,实现真正的并行处理,提高系统的处理能力和任务的执行效率,特别适用于 CPU 密集型任务。

  • 避免进程资源竞争和冲突 :过多的进程易导致资源竞争和冲突,如共享内存同步问题等。进程池限制了进程数量,降低了资源竞争和冲突的概率,提高了程序的稳定性和可靠性。

  • 提供任务队列和排队机制 :进程池通常与任务队列结合使用,可将任务放入队列,由进程池自动取出并处理。这避免了任务丢失和重复执行,提供了简单高效的任务调度和排队机制,使任务处理更有序可控。


2. 基于匿名管道实现进程池

下面我们会基于父进程写子进程读的匿名管道来实现进程池

注意我们大部分的封装都会在ProcessPool.hpp源文件中实现,关于.hpp源文件,我们先来介绍一下:

  • .hpp 头文件可以同时包含声明和定义。

  • 允许的定义类型有:

    • 模板定义 (必须)

    • 内联函数定义 (应该)

    • constexpr 函数/变量定义 (应该)

    • 类/结构体/联合体定义本身 (必须)

    • 类成员函数的定义 (可以,需隐式或显式 inline)

    • inline 变量定义 (C++17+, 应该)

    • 类内静态常量整型/枚举成员的初始化 (不是完整定义,C++17 前需注意)

  • 禁止的定义类型有:

    • 非内联、非模板的普通函数定义

    • 非内联、非 constexpr、非模板的全局变量或静态变量定义

2.1 框架

先来把框架给搭好

上图展示的是我们大致想要实现的框架,具体需求我们后面慢慢补充

#include <iostream>
#include <vector>


// 先描述
class Channel
{
public:
    Channel()
    {}
    ~Channel()
    {}

private:
    int _wfd;
    pid_t _subid;
};


// 再组织
class ChannelManager
{
public:
    ChannelManager()
    {}
    ~ChannelManager()
    {}

private:
    std::vector<Channel> _channels;
};

const int gdefaultnum = 5;// 默认创建5个子进程

class ProcessPool
{
public:
    ProcessPool(int num = gdefaultnum)
        :_process_num(num)
    {}
    ~ProcessPool()
    {}

private:
    ChannelManager _cm; 
    int _process_num;   
};

后面需要哪个我们再继续完善

Channel类:将我们打开的管道描述起来,父进程可以通过写端文件描述符wfd来管理,所以我们肯定需要拿到打开文件的文件描述符fd,故我们可以先有一个成员变量_wfd方便后续可以管理打开的管道文件,同样打开管道,是为了让父子进程通信,那我们肯定也需要对子进程进行管理,那对子进程管理就需要子进程pid,所以还需要定义一个成员变量_subid,至于还需要什么,我们后面需要的时候再加上。

ChannelManager类:将我们打开的所有管道统一管理起来,进行面向对象式的封装,方便后续管理,所以我们使用vector来将所有打开的管道进行管理

ProcessPool类:封装所有创建的进程为进程池类的主体,将创建的进程都管理起来,达到我们实现进程池的目的。每个进程都需要通过管道来与父进程通信,所以我们需要一个ChannelManager类型的成员变量,方便我们后续在ProcessPool类主体中将进程管理起来,同时还可以使用一个成员变量来记录我们进程池中的进程数量,那我们构造函数就可以先定义一个全局变量(默认一开始创建5个子进程)来初始化_process_num。


2.2 启动进程池

进程池,首先得要有 "进程池" ,所以我们需要做一些准备工作,和上篇文章【进程间通信:匿名管道】的代码示例一样的流程,首先需要创建管道,然后创建子进程,分别关闭父子进程的读写端,不过这里我们需要循环一样的步骤,分别创建默认的5个管道和子进程,来做到有 "进程池" 。

代码如下:

 bool Start()
 {
    for (int i = 0; i < _process_num; i++)
    {
        // 1. 创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if(n < 0) return false;

        // 2. 创建子进程
        pid_t subid = fork();
        if(subid < 0) return false;
        else if(subid == 0)
        {
            // 子进程
            // 3. 关闭不需要的读写端
            close(pipefd[1]);
            Work(pipefd[0]); //子进程需要处理父进程派发的任务
            close(pipefd[0]);
            exit(0);
        }
        else
        {
            // 父进程
            // 3. 关闭不需要的读写端
            close(pipefd[0]);
            _cm.Insert(pipefd[1], subid); // 父进程将创建的子进程和对应的管道插入到数组中管理起来
        }
    }
    return true;
}

注意:父进程完成将创建的子进程和对应的管道插入到数组中管理起来之后,不能关闭写端

要管理进程池,就等于管理数组,管理数组的增删查改

管理数组

我们需要通过父进程先将这些创建的子进程和管道插入到数组中,方便后续管理

void Insert(int wfd, int subid)
{
    _channels.emplace_back(wfd, subid);
}

关于emplace相关接口可在【C++】专栏的【C++11右值引用相关文章】详细了解

那么channel类的构造函数就需要完善,同时为了后续方便我们肉眼能够区分每个子进程和管道,再增加一个Channel类的成员变量_name,这样后面测试的时候可以打印在屏幕上,方便我们查看

代码如下:

class Channel
{
public:
    Channel(int wfd, pid_t subid)
        :_wfd(wfd)
        ,_subid(subid)
    {
        _name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
    }

    int Fd() 
    {
        return _wfd;
    }

    pid_t Subid()
    {
        return _subid;
    }

    std::string Name()
    {
        return _name;
    }
    
    ~Channel()
    {
    }

private:
    int _wfd;
    pid_t _subid;
    std::string _name;
};

方便后续在其他类中要访问Channel类中的私有成员变量,我们在实现出能获得私有变量值的公有方法


子进程接收任务

接下来就来处理子进程,子进程通过管道读取父进程派发的任务码,然后去执行对应的任务

代码如下:

    void Work(int rfd)
    {
        while (true)
        {
            int code = 0;
            ssize_t n = read(rfd, &code, sizeof(code));
            if(n > 0)
            {
                if(n != sizeof(code)) continue;
                std::cout << "子进程[" << getpid() << "]收到一个任务码" << std::endl;
                // 根据任务码执行对应任务
                // 待实现
            }
            else if(n == 0)
            {
                std::cout << "子进程退出" << std::endl;
                break;
            }
            else
            {
                std::cout << "读取错误" << std::endl;
                break;
            }
        }
    }

2.3 运行进程池

启动进程池,并完成需要的准备工作之后,就需要运行进程池,让进程池能够去执行父进程分配的任务

添加任务

所以我们还需要一个任务类,封装函数指针,这样在后面我们需要执行哪个任务,就可以通过函数指针进行回调,关于回调函数可以看回调函数详解

代码如下:

#include <iostream>
#include <vector>

typedef void (*task_t)();

void PrintLog()
{
    std::cout << "我是一个打印日志的任务" << std::endl;
}

void Download()
{
    std::cout << "我是一个下载的任务" << std::endl;
}

void Upload()
{
    std::cout << "我是一个上传的任务" << std::endl;
}

class TaskManager
{
public:
    TaskManager()
    {}

    void Register(task_t t)
    {
        _tasks.push_back(t);
    }

    ~TaskManager()
    {}
private:
    std::vector<task_t> _tasks;
};

同样,这里使用数组vector来管理任务,数组中每个元素都是函数指针,我们根据任务码来执行对应的任务,我们可以将任务码与对应任务进行关联,怎么关联呢?由于我们使用数组来管理任务,所以可以使用下标来进行关联,这样就可以通过任务码来获取对应的函数指针,然后函数指针回调执行对应的任务,所以我们需要一个注册函数,将函数指针都插入到数组中,那么,下面我们还要处理任务码相关工作

    TaskManager()
    {
        srand(time(nullptr));
    }

    int Code()
    {
        return rand() % _tasks.size();
    }

    void Execute(int code)
    {
        if(code >= 0 && code < _tasks.size())
        {
            _tasks[code]();
        }
    }

我们可以使用随机数来随机生成任务码,不过任务码需要与下标对应,所以需要取模数组的大小,然后根据任务码回调具体的任务函数

那么我们在ProcessPool类中,还需要再添加一个成员变量,来管理任务

如下:

private:
    ChannelManager _cm;
    int _process_num;
    TaskManager _tm;

构造函数中,我们需要初始化,注册所有任务函数,也就是将所有任务函数插入到数组中

ProcessPool(int num = gdefaultnum)
        : _process_num(num)
    {
        _tm.Register(PrintLog);
        _tm.Register(Download);
        _tm.Register(Upload);
    }

运行

那么接下来就需要运行进程池,首先我们获取TaskManager类中随机生成的任务码

// 1. 获取任务码
int taskcode = _tm.Code();

接下来我们需要将任务码写入管道,然后让子进程从管道中读取任务码,子进程根据任务码执行任务,那么要写入管道,我们需要先选择一个管道来写,选哪个管道呢?肯定不能随便选,也不能乱选,不然后果就是进程池中的进程 “闲的闲死,忙的忙死” ,负载不均衡。那要怎么选择才可以负载均衡,这里我们采用轮询的方式来选择管道,也就是依次选择管道,大家轮着来。

那如何轮询呢?首先我们在ChannelManager类中添加一个成员变量_next,代表接下来轮到谁

private:
    std::vector<Channel> _channels;
    int _next = 0;

同时缺省值为0,表示从数组中0号下标开始轮询

那么怎么选现在就一目了然了

代码如下:

    Channel& Select()
    {
        auto& c = _channels[_next++];
        _next %= _channels.size();
        return c;
    }

管道选好了,那么现在就需要把任务码写入管道

代码如下:

    void Send(int code)
    {
        ssize_t n = write(_wfd, &code, sizeof(code));
        if(n != sizeof(code))
        {
            std::cerr << "写入错误" << std::endl;
        }
    }

完整Run()函数代码如下:

    void Run()
    {
        // 1. 获取任务码
        int taskcode = _tm.Code();

        // 2. 选择一个管道写入,让子进程读取然后执行
        auto& c = _cm.Select();
        std::cout << "选择了一个子进程:" << c.Name() << std::endl;
        c.Send(taskcode);
        std::cout << "发送了一个任务码:" << taskcode << std::endl;
        
    }

所以,待实现的子进程执行任务代码,也可以完善了

    void Work(int rfd)
    {
        while (true)
        {
            int code = 0;
            ssize_t n = read(rfd, &code, sizeof(code));
            if(n > 0)
            {
                if(n != sizeof(code)) continue;
                std::cout << "子进程[" << getpid() << "]收到一个任务码" << std::endl;
                // 根据任务码执行对应任务
                _tm.Execute(code);
            }
            else if(n == 0)
            {
                std::cout << "子进程退出" << std::endl;
                break;
            }
            else
            {
                std::cout << "读取错误" << std::endl;
                break;
            }
        }
    }

2.4 结束进程池

最终是需要结束进程池的,可能是正常结束,也有可能是异常结束,但不管是哪种结束,我们都需要回收资源,避免内存泄漏

代码如下:

    void Stop()
    {
        // 关闭文件描述符,回收子进程
        _cm.CloseFd();
        _cm.WaitSubid();
    }

注意:只需要关闭父进程写端文件描述符,写端关闭,管道内数据读完后,子进程再读,read会返回0,在Work()接口中就会跳出循环,子进程工作完之后就会自己关闭读端

关闭文件描述符

只需要在ChannelManager类中,循环遍历vector,调用每个Channel对象自己关闭文件描述符的方法

代码如下:

    // ChannelManager
    void CloseFd()
    {
        for(auto& channel : _channels)
        {
            channel.Close();
            std::cout << "关闭:" << channel.Name() << std::endl; 
        }
    }

    // Channel
    void Close()
    {
        close(_wfd);
    }

回收子进程

和关闭文件描述符一样,只需循环遍历数组,调用Channel对象自己的回收子进程方法

    // ChannelManager
    void WaitSubid()
    {
        for(auto& channel : _channels)
        {
            channel.Wait();
            std::cout << "回收:" << channel.Name() << std::endl;
        }
    }

    // Channel
    void Wait()
    {
        pid_t rid = waitpid(_subid, nullptr, 0);
        (void)rid;//不处理返回值,正常情况都会回收成功
    }

2.5 测试代码

在Main.cc中完成对进程池的测试

代码如下:

#include "ProcessPool.hpp"

int main()
{
    // 创建进程池
    ProcessPool pp;

    // 启动进程池
    pp.Start();

    // 自动派发任务
    int cnt = 10;
    while (cnt--)
    {
        pp.Run();
        sleep(1);
    }
    
    // 结束进程池
    pp.Stop();
    return 0;
}

这里我们循环运行10次看看效果

运行结果:

ltx@iv-ye1i2elts0wh2yp1ahah:~/Linux_system/lesson10/Processpool$ ./processpool
选择了一个子进程:channel-4-268693
发送了一个任务码:1
子进程[268693]收到一个任务码
我是一个下载的任务
选择了一个子进程:channel-5-268694
发送了一个任务码:0
子进程[268694]收到一个任务码
我是一个打印日志的任务
选择了一个子进程:channel-6-268695
发送了一个任务码:1
子进程[268695]收到一个任务码
我是一个下载的任务
选择了一个子进程:channel-7-268696
发送了一个任务码:1
子进程[268696]收到一个任务码
我是一个下载的任务
选择了一个子进程:channel-8-268697
发送了一个任务码:0
子进程[268697]收到一个任务码
我是一个打印日志的任务
选择了一个子进程:channel-4-268693
发送了一个任务码:1
子进程[268693]收到一个任务码
我是一个下载的任务
选择了一个子进程:channel-5-268694
发送了一个任务码:2
子进程[268694]收到一个任务码
我是一个上传的任务
选择了一个子进程:channel-6-268695
发送了一个任务码:0
子进程[268695]收到一个任务码
我是一个打印日志的任务
选择了一个子进程:channel-7-268696
发送了一个任务码:0
子进程[268696]收到一个任务码
我是一个打印日志的任务
选择了一个子进程:channel-8-268697
发送了一个任务码:2
子进程[268697]收到一个任务码
我是一个上传的任务
关闭:channel-4-268693
关闭:channel-5-268694
关闭:channel-6-268695
关闭:channel-7-268696
关闭:channel-8-268697
子进程退出
子进程退出
子进程退出
子进程退出
子进程退出
回收:channel-4-268693
回收:channel-5-268694
回收:channel-6-268695
回收:channel-7-268696
回收:channel-8-268697

通过ps指令查看

可以看到我们确实创建了5个子进程,进程池结束之后,5个子进程也成功被回收。

其实我们的进程池还有一个小bug,这里我们是先关闭所有子进程文件描述符,再回收所有子进程;那如果我们一边关闭一边回收呢?

如下代码:

    void CloseAndWait()
    {
        for(auto& channel : _channels)
        {
            channel.Close();
            std::cout << "关闭:" << channel.Name() << std::endl; 
            channel.Wait();
            std::cout << "回收:" << channel.Name() << std::endl;
        }
    }

运行结果:

我们可以看到只有父进程关闭了一个文件描述符,然后就阻塞住了

这是什么原因呢?

2.6 关于bug

如下图分析:

由于每次循环创建子进程时,都会拷贝父进程的文件描述符,所以原来父进程的写端指向的管道,也会被拷贝到子进程的文件描述符,所以之前的管道,也会被后创建的子进程的写端指向。

交叉操作(边关边等)的问题

如果改为循环内交替执行 channel.Close() 和 channel.Wait()

for (auto& channel : _channels) {
    channel.Close(); // 关闭一个写端
    channel.Wait();  // 等待对应子进程
}

会导致:

  • 死锁风险

    • 这里我们有子进程 1、2、3、4、5。

    • 关闭 父进程 的写端后,子进程1 的管道引用计数-1。

    • 但  子进程2、3、4、5  的写端任然指向子进程1的管道,尚未关闭,写端不写,读端进程因无数据可读而进入阻塞状态(挂起)。

    • 父进程在关闭 子进程1 的文件描述符后,会继续阻塞等待(waitpid阻塞等)回收子进程1,而子进程又因为read读被阻塞挂起,导致一直卡在这

    • 问题:子进程1 的写端还未全部关闭!父进程卡在 Wait() 等待 子进程1 退出,但 子进程1 仍在阻塞读(因为它的写端未被关闭,read 不返回)。

    • 父进程和子进程 1 互相等待:父进程等 子进程1 退出,子进程1 等父进程关闭写端或发送数据 → 死锁

 正确顺序(先关fd,再wait)的工作原理

  • 关闭所有写端(CloseFd()

    • 父进程关闭所有管道的写端文件描述符(_wfd)。

    • 当某个管道的所有写端都被关闭时,子进程在 read 调用中会收到 EOF(返回 n=0)。

    • 子进程检测到 n=0 后跳出循环,执行 close(pipefd[0]) 并 exit(0) 退出。

  • 等待子进程(WaitSubid()

    • 父进程通过 waitpid 回收已退出的子进程。

    • 由于所有子进程已收到 EOF 并退出,waitpid 会立即返回,不会阻塞。

关键点:子进程的退出条件

  • 子进程通过 read 阻塞等待任务。只有父进程关闭写端,子进程的 read 才会返回 0,从而触发退出。

  • 如果父进程不先关闭所有写端,部分子进程会永远阻塞在 read,导致 waitpid 无限等待。

虽然先关闭所有文件描述符,再回收所有子进程,不会引起这样的问题,但是这个bug依然存在,并没有解决。那有什么解决方法呢?

首先我们可以倒着从最后一个管道开始边关闭写端边回收子进程,虽然这样不会出现死锁的问题,但bug依然没有解决

所以我们可以采取在创建子进程时,关闭子进程的 "哥哥进程" 的写端方法,来解决这个bug的根本所在

    void CloseAll()
    {
        for(int i = _channels.size()-1; i >= 0; i--)
        {
            _channels[i].Close();
        }
    }

子进程创建时拷贝父进程的文件描述符,子进程遍历数组,将已经拷贝自父进程的 ”哥哥进程" 的写端关闭,这里你可能会问:父子进程会并发进行创建子进程之后的代码,那父进程在把当前子进程插入数组后,那当前子进程遍历数组关闭所有写端,会不会把父进程新插入指向当前子进程的管道的写端也给关闭了呢?答案是不会的,因为有写时拷贝(COW)的存在

    bool Start()
    {
        for (int i = 0; i < _process_num; i++)
        {
            // 1. 创建管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                return false;

            // 2. 创建子进程
            pid_t subid = fork();
            if (subid < 0)
                return false;
            else if (subid == 0)
            {
                // 子进程
                // 3. 关闭不需要的读写端
                close(pipefd[1]);
                _cm.CloseAll(); // 关闭除了父进程的写端
                Work(pipefd[0]); // 子进程需要处理父进程派发的任务
                close(pipefd[0]);
                exit(0);
            }
            else
            {
                // 父进程
                // 3. 关闭不需要的读写端
                close(pipefd[0]);
                _cm.Insert(pipefd[1], subid); // 父进程将创建的子进程和对应的管道插入到数组中管理起来make
            }
        }
        return true;
    }

现在我们再运行一下查看效果:

ltx@iv-ye1i2elts0wh2yp1ahah:~/Linux_system/lesson10/Processpool$ make
g++ -o processpool Main.cc -std=c++11
ltx@iv-ye1i2elts0wh2yp1ahah:~/Linux_system/lesson10/Processpool$ ./processpool
选择了一个子进程:channel-4-274187
发送了一个任务码:2
子进程[274187]收到一个任务码
我是一个上传的任务
选择了一个子进程:channel-5-274188
发送了一个任务码:0
子进程[274188]收到一个任务码
我是一个打印日志的任务
选择了一个子进程:channel-6-274189
发送了一个任务码:2
子进程[274189]收到一个任务码
我是一个上传的任务
选择了一个子进程:channel-7-274190
发送了一个任务码:1
子进程[274190]收到一个任务码
我是一个下载的任务
选择了一个子进程:channel-8-274191
发送了一个任务码:2
子进程[274191]收到一个任务码
我是一个上传的任务
选择了一个子进程:channel-4-274187
子进程[274187]收到一个任务码
我是一个打印日志的任务
发送了一个任务码:0
选择了一个子进程:channel-5-274188
发送了一个任务码:0
子进程[274188]收到一个任务码
我是一个打印日志的任务
选择了一个子进程:channel-6-274189
子进程[274189]收到一个任务码
我是一个打印日志的任务
发送了一个任务码:0
选择了一个子进程:channel-7-274190
发送了一个任务码:1
子进程[274190]收到一个任务码
我是一个下载的任务
选择了一个子进程:channel-8-274191
发送了一个任务码:1
子进程[274191]收到一个任务码
我是一个下载的任务
关闭:channel-4-274187
子进程退出
回收:channel-4-274187
关闭:channel-5-274188
子进程退出
回收:channel-5-274188
关闭:channel-6-274189
子进程退出
回收:channel-6-274189
关闭:channel-7-274190
子进程退出
回收:channel-7-274190
关闭:channel-8-274191
子进程退出
回收:channel-8-274191

成功解决!

源码:

Main.cc

#include "ProcessPool.hpp"

int main()
{
    // 创建进程池
    ProcessPool pp;

    // 启动进程池
    pp.Start();

    // 自动派发任务
    int cnt = 10;
    while (cnt--)
    {
        pp.Run();
        sleep(1);
    }
    
    // 结束进程池
    pp.Stop();
    return 0;
}

ProcessPool.hpp

#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__

#include <iostream>
#include <vector>
#include <sys/types.h>
#include <unistd.h>
#include <cstdlib>
#include <cstdio>
#include <sys/wait.h>
#include "Task.hpp"

// 先描述
class Channel
{
public:
    Channel(int wfd, pid_t subid)
        : _wfd(wfd), _subid(subid)
    {
        _name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
    }

    void Send(int code)
    {
        ssize_t n = write(_wfd, &code, sizeof(code));
        // if(n != sizeof(code))
        // {
        //     std::cerr << "写入错误" << std::endl;
        // }
        (void)n;
    }

    void Close()
    {
        close(_wfd);
    }

    void Wait()
    {
        pid_t rid = waitpid(_subid, nullptr, 0);
        (void)rid;//不处理返回值,正常情况都会回收成功
    }

    int Fd()
    {
        return _wfd;
    }

    pid_t Subid()
    {
        return _subid;
    }

    std::string Name()
    {
        return _name;
    }

    ~Channel() {}

private:
    int _wfd;
    pid_t _subid;
    std::string _name;
};

// 再组织
class ChannelManager
{
public:
    ChannelManager() {}

    void Insert(int wfd, int subid)
    {
        _channels.emplace_back(wfd, subid);
    }

    Channel& Select()
    {
        auto& c = _channels[_next++];
        _next %= _channels.size();
        return c;
    }

    void CloseFd()
    {
        for(auto& channel : _channels)
        {
            channel.Close();
            std::cout << "关闭:" << channel.Name() << std::endl; 
        }
    }

    void WaitSubid()
    {
        for(auto& channel : _channels)
        {
            channel.Wait();
            std::cout << "回收:" << channel.Name() << std::endl;
        }
    }

    void CloseAndWait()
    {
        for(auto& channel : _channels)
        {
            channel.Close();
            std::cout << "关闭:" << channel.Name() << std::endl; 
            channel.Wait();
            std::cout << "回收:" << channel.Name() << std::endl;
        }
    }

    void CloseAll()
    {
        for(int i = _channels.size()-1; i >= 0; i--)
        {
            _channels[i].Close();
        }
    }

    ~ChannelManager() {}

private:
    std::vector<Channel> _channels;
    int _next = 0;
};

const int gdefaultnum = 5; // 默认创建5个子进程

class ProcessPool
{
public:
    ProcessPool(int num = gdefaultnum)
        : _process_num(num)
    {
        _tm.Register(PrintLog);
        _tm.Register(Download);
        _tm.Register(Upload);
    }

    void Work(int rfd)
    {
        while (true)
        {
            int code = 0;
            ssize_t n = read(rfd, &code, sizeof(code));
            if(n > 0)
            {
                if(n != sizeof(code)) 
                    continue;
                std::cout << "子进程[" << getpid() << "]收到一个任务码" << std::endl;
                // 根据任务码执行对应任务
                _tm.Execute(code);
            }
            else if(n == 0)
            {
                std::cout << "子进程退出" << std::endl;
                break;
            }
            else
            {
                std::cout << "读取错误" << std::endl;
                break;
            }
        }
    }

    bool Start()
    {
        for (int i = 0; i < _process_num; i++)
        {
            // 1. 创建管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                return false;

            // 2. 创建子进程
            pid_t subid = fork();
            if (subid < 0)
                return false;
            else if (subid == 0)
            {
                // 子进程
                // 3. 关闭不需要的读写端
                close(pipefd[1]);
                _cm.CloseAll(); // 关闭除了父进程的写端
                Work(pipefd[0]); // 子进程需要处理父进程派发的任务
                close(pipefd[0]);
                exit(0);
            }
            else
            {
                // 父进程
                // 3. 关闭不需要的读写端
                close(pipefd[0]);
                _cm.Insert(pipefd[1], subid); // 父进程将创建的子进程和对应的管道插入到数组中管理起来make
            }
        }
        return true;
    }

    void Run()
    {
        // 1. 获取任务码
        int taskcode = _tm.Code();

        // 2. 选择一个管道写入,让子进程读取然后执行
        auto& c = _cm.Select();
        std::cout << "选择了一个子进程:" << c.Name() << std::endl;
        c.Send(taskcode);
        std::cout << "发送了一个任务码:" << taskcode << std::endl;
        
    }

    void Stop()
    {
        // 关闭文件描述符,回收子进程
        // _cm.CloseFd();
        // _cm.WaitSubid();
        _cm.CloseAndWait();
    }

    ~ProcessPool() {}

private:
    ChannelManager _cm;
    int _process_num;
    TaskManager _tm;
};

#endif

Task.hpp

#pragma once

#include <iostream>
#include <vector>
#include <ctime>

typedef void (*task_t)();

void PrintLog()
{
    std::cout << "我是一个打印日志的任务" << std::endl;
}

void Download()
{
    std::cout << "我是一个下载的任务" << std::endl;
}

void Upload()
{
    std::cout << "我是一个上传的任务" << std::endl;
}

class TaskManager
{
public:
    TaskManager()
    {
        srand(time(nullptr));
    }

    int Code()
    {
        return rand() % _tasks.size();
    }

    void Execute(int code)
    {
        if(code >= 0 && code < _tasks.size())
        {
            _tasks[code]();
        }
    }

    void Register(task_t t)
    {
        _tasks.push_back(t);
    }

    ~TaskManager()
    {}
private:
    std::vector<task_t> _tasks;
};

Makefile

processpool:Main.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -f processpool


网站公告

今日签到

点亮在社区的每一天
去签到