Linux——进程间通信,匿名管道,进程池

发布于:2025-07-23 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、进程间通信(IPC)的理解

1.为什么进程间要通信(IPC)

首先进程之间是相互独立的,尽管是父子进程之间,它们虽然资源共享,但当子进程需要修改数据时仍然需要 进行写时拷贝,保持独立性。

而让进程间通信可以实现数据之间的交互,资源共享,事件通知,又或者是让一个进程对另一个进程进行控制。

进程间通信是操作系统中实现进程间协作和数据交换的重要机制 ,它使得多个进程能够共同完成任务,提高系统的效率和可靠性。

2.如何进行通信

进程间通信的原理其实很简单,只需要两个进程共同访问一个资源,而一个进程对资源的更改能被另一进程感知到,从而做出相应的操作。

所以通信的前提是进程之间能够访问同一个资源,而且该资源是公共的,而不是某进程内部的。

IPC 的典型方式对比

在这里插入图片描述

二、匿名管道

1.管道的理解

我们把进程之间通信的介质(资源)叫作管道。
开发者在设计管道技术时文件系统已经比较成熟,所以为了方便管理该资源就使用文件来实现, 而对文件的读写就是通信的过程 ,但它与一般的文件还是有些区别,文件都是储存到磁盘上的,而进程之间通信用的文件并不需要把它储存到磁盘上,它只是作为一个传输介质。

它比较特殊,所以起名为管道。管道其实是一个内存级的文件。

在这里插入图片描述

注意:父子进程之间的管道叫作匿名管道,顾名思义就是没有名字,也不需要名字,因为子进程能够继承下来父进程开辟的管道资源。

2.匿名管道的使用

创建匿名管道常用的接口是:

            int pipe(int pipefd[2]);

需要包含头文件:

            #include<unistd.h>
  • 返回值:创建成功返回0,失败返回-1
  • 参数:这个是一个输出型参数,传入一个int类型长度为2的数组,然后得到
    pipefd[0]:以读的方式打开的文件描述符
    pipefd[1]:以写的方式打开的文件描述符。

示例:

#include <iostream>
#include <sys/types.h>
#include <unistd.h>
int main()
{
    int pipefd[2];
    pipe(pipefd);
    int rfd = pipefd[0],wfd = pipefd[1];
    pid_t id = fork();
    if(id == 0)
    {
        close(wfd);//关闭子进程的写文件,只让它读
        int k=0;
        while(true)
        {
            read(rfd,&k,sizeof(k));
            printf("read:%d\n",k);
        }
    }
    else
    {
        close(rfd);//关闭父进程的读文件,只让它写。
        int num=0;
        while(true)
        {
            write(wfd,&num,sizeof(num));
            num++;
            sleep(1);
        }
    }
    return 0;
}

要记住pipefd[2]中哪个是读哪个是写有一个小技巧,0像嘴巴,所以下标为0的是读,1像钢笔,所以1下标是写。

3.管道的五种特性

  1. 匿名管道,只能用来进行具有血缘关系的进程间通信(用于父与子)。
  2. 管道文件,自带同步机制。如上代码示例,父进程写一次休眠一秒,而子进程是一直不断地读,快的一端会迁就于慢的一端,最后实现同步。
  3. 管道是面向字节流的。怎么读与怎么写并没有联系,比如写入“hello world”,但可能读到“hel”,这取决于你要读多少字节。
  4. 管道是单向通信的。也就是a(表示进程)写的时候b读。b写的时候a在读。而不是既在写同时也在读。
  5. 管道(文件)的生命周期是随进程的。进程结束管道也随之销毁。

4.管道的四种通信情况

  • 写慢,读快 — 读端就要阻塞(等待写端写入)。
  • 写快,读慢 —到管道容量满了后,写端就要阻塞(等待读端读取数据,然后就可以覆盖式地继续往管道写入)。
  • 写关闭,读继续 — read就会返回0,表示文件结尾。
  • 写继续,读关闭 — 写端不再有意义,系统会杀掉写端进程。

5.管道缓冲区容量

管道缓冲区容量为64kb,大家可以根据管道的性质与通信特点,自行进行测试。

三、进程池

1.进程池的理解

在程序使用内存的时候,比如vector扩容机制,会提前给你开辟一块空间供你使用,尽管现在用不到,相当于做一下预备。减少开辟空间的频次,从而达到提高效率的效果。

那么进程池也同样,给父进程提前开辟一些子进程,提供父进程使用。其中我们使用匿名管道建立联系。
在这里插入图片描述
在父进程给子进程派发任务时,为了提高效率会让每个子进程均匀地分配到任务(称为负载均匀),而不是把大部分的任务都派发到一个子进程上,通常会有以下策略:

  • 轮询:按顺序一一分配。
  • 随机:随机进行分配。
  • 负载因子:设计一个负载因子,让子进程按负载因子的大小排成一个小根堆,每次取出堆头的子进程派发任务,然后更新负载因子插回到堆中。

2.进程池的制作

在面向对象的编程中最重要的就是对对象的描述与组织,这里我们的核心就是对管道进行管理。那么首先需要一个类对管道进行描述。

class Channel
{
public:
    Channel(int fd,pid_t id): _wfd(fd),_subid(id)
    { }
    //... ...
    ~Channel()
    {}
private:
    int _wfd;
    int _subid;
};

_wfd是该管道对应写端的fd,_subid是该管道对应的子进程的pid。

这里我们不必把rfd(读端fd)加入,因为我们现在对管道的描述组织,目的是方便父进程管理,而rfd是给子进程用的,所以不用添加为变量。

这里我们就以轮询的方式派发任务,刚才创建的Channel相当于对管道的描述,接下来创建ChannelManage进行组织。这里选择使用数组来管理,派发任务方式选择轮询,所以需要记录下一个需要派发到的管道的下标。

class ChannelManage
{
public:
    ChannelManage():_next(0)
    {}
    //... ...
    ~ChannelManage()
    {}
private:
    vector<Channel> _channels;
    int _next;
};

接下来还需要创建一个类对整体的进程池做管理。

class ProcessPool
{
public:
    ProcessPool(int num) : __process_num(num)
    {}
    // ... ...
    ~ProcessPool()
    {}
private:
    ChannelManage _cm;
    int _process_num;
};

其中_process_num表示需要创建多少子进程,这是由使用者来决定的。

在ProcessPool中我们准备实现这些方法

  • bool Start():用于创建子进程。
    由于我们是要生成多个通道所以需要循环来进行,而单趟循环需要做以下这些操作:

    1.创建管道,然后创建子进程。(这样能让子进程继承到管道信息)

    2.关于子进程:写端关闭,然后执行Work(),最后把读端关闭,并exit退出。

    3.关于父进程:读端关闭,然后把wfd,pid存入_cm中。

  • void Work(int rfd):用于子进程读取任务码并执行命令。

  • void Run():用于获取并派发任务。

  • void Stop():用于关闭写端并回收子进程。

最后为方便测试我们还需要一个管理任务的类和方法。我们可以单独创建一个Task.hpp文件。

typedef void (*task_t)();

class TaskManage
{
public:
    TaskManage()
    {   //随机数种子
        srand((unsigned int)time(nullptr));
    }
    int Code()
    {   //随机生成任务码(数组下标)
        return rand()%_tasks.size();
    }
    void Execute(int code)
    {   //执行任务
        _tasks[code]();
    }
    // ... ...
    ~TaskManage()
    {}
private:
    vector<function<task_t>> _tasks;//用于储存任务的数组
};

然后需要在ProcessPool中放入TaskManage成员变量,并在ProcessPool的构造函数中完成对_tasks中内容的插入。具体操作参考下面源码。

四、源码

ProcessPool.hpp

#ifndef _PROCESS_POOL_HPP_
#define _PROCESS_POOL_HPP_

#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include "Task.hpp"
using namespace std;

//先描述
class Channel
{
public:
    Channel(int fd,pid_t id): _wfd(fd),_subid(id)
    {
        _name = "channel-" + to_string(_wfd) + "-" + to_string(_subid);
    }

    ~Channel()
    {

    }

    void Send(int code)
    {
        int n = write(_wfd,&code,sizeof(code));
        (void)n;
    }

    void Close()
    {
        close(_wfd);
    }

    void Wait()
    {
        pid_t rid = waitpid(_subid, nullptr, 0);
        (void)rid;
    }

    int Fd()
    {
        return _wfd;
    }

    pid_t SubId()
    {
        return _subid;
    }

    string Name()
    {
        return _name;
    }

private:
    int _wfd;
    pid_t _subid;
    string _name;
    //int _loadnum;
};

//再组织
class ChannelManager
{
public:
    ChannelManager(): _next(0)
    {

    }

    ~ChannelManager()
    {

    }

    void Insert(int wfd,pid_t subid)
    {
        _channels.emplace_back(wfd,subid);
        // Channel c(wfd,subid);
        // _channels.push_back(move(c));
    }

    Channel& Select()
    {
        auto& c = _channels[_next];
        _next++;
        _next %= _channels.size();
        return c;
    }

    void PrintChannel()
    {
        for(auto& channel : _channels)
        {
            cout << channel.Name() << endl;
        }
    }

    void CloseAll()
    {
        for(auto& channel: _channels)
        {
            channel.Close();
        }
    }

    void StopSubProcess()
    {
        for(auto& channel: _channels)
        {
            channel.Close();
            cout << "关闭: " << channel.Name() << endl;
        }
    }

    void WaitSubProcess()
    {
        for(auto& channel: _channels)
        {
            channel.Wait();
            cout << "回收: " << channel.Name() << endl;
        }
    }

    void CloseAndWait()
    {
        for(auto& channel: _channels)
        {
            channel.Close();
            cout << "关闭: " << channel.Name() << endl;
            channel.Wait();
            cout << "回收: " << channel.Name() << endl;
        }

        //解决方法1 倒着关闭
        // for(int i = _channels.size() - 1;i >= 0;i--)
        // {
        //     _channels[i].Close();
        //     cout << "关闭: " << _channels[i].Name() << endl;
        //     _channels[i].Wait();
        //     cout << "回收: " << _channels[i].Name() << endl;
        // }
    }

private:
    vector<Channel> _channels;
    int _next;
};

const int gdefaultnum = 5;

class ProcessPool
{
public:
    ProcessPool(int num): _process_num(num)
    {
        _tm.Register(PrintLog);
        _tm.Register(DownLoad);
        _tm.Register(UpLoad);
    }

    ~ProcessPool()
    {

    }

    void Work(int rfd)
    {
        while(true)
        {
            int code = 0;
            size_t n = read(rfd,&code,sizeof(code));
            if(n > 0)
            {
                if(n != sizeof(code))
                {
                    continue;
                }
                cout << "子进程[" << getpid() << "]收到一个任务码: " << code << endl;
                _tm.Execute(code);
            }
            else if(n == 0)
            {
                cout << "子进程退出" << endl;
                break;
            }
            else
            {
                cout << "读取错误" << endl;
                break;
            }
        }
    }

    bool Start()
    {
        for(int i = 0;i < _process_num;i++)
        {
            //1.创建管道
            int pipefd[2] = { 0 };
            int n = pipe(pipefd);
            if(n < 0)
            {
                return false;
            }

            //2.创建子进程
            pid_t subid = fork();
            if(subid < 0)
            {
                return false;
            }
            else if(subid == 0)
            {
                //子进程
                //关闭子进程继承的哥哥的w端
                _cm.CloseAll();

                //3.关闭不需要的文件描述符
                close(pipefd[1]);
                Work(pipefd[0]);
                close(pipefd[0]);
                exit(0);
            }
            else
            {
                //父进程
                //3.关闭不需要的文件描述符
                close(pipefd[0]);
                _cm.Insert(pipefd[1],subid);
            }
        }
        return true;
    }

    void Debug()
    {
        _cm.PrintChannel();
    }

    void Run()
    {
        //1.选择一个任务
        int taskcode = _tm.Code();
        //2.选择一个信道[子进程],负载均衡的选择一个子进程,完成任务
        auto& c = _cm.Select();
        cout << "选择了一个子进程: " << c.Name() << endl;
        //3.发送任务
        c.Send(taskcode);
        cout << "发送了一个任务码: " << taskcode << endl;
    }

    void Stop()
    {
        //关闭父进程
        //_cm.StopSubProcess();
        //回收所有的子进程
        //_cm.WaitSubProcess();

        _cm.CloseAndWait();
    }

private:
    ChannelManager _cm;
    int _process_num;
    TaskManager _tm;
};

#endif

Task.hpp

#pragma once

#include <iostream>
#include <vector>
#include <ctime>
using namespace std;

typedef void (*task_t)();

void PrintLog()
{
    cout << "我是一个打印日志的任务" << endl;
}

void DownLoad()
{
    cout << "我是一个下载的任务" << endl;
}

void UpLoad()
{
    cout << "我是一个上传的任务" << endl;
}

class TaskManager
{
public:
    TaskManager()
    {
        srand((unsigned int)time(nullptr));
    }

    ~TaskManager()
    {

    }

    void Register(task_t t)
    {
        _tasks.push_back(t);
    }

    int Code()
    {
        return rand() % _tasks.size();
    }

    void Execute(int code)
    {
        if(code >= 0 && code < _tasks.size())
        {
            _tasks[code]();
        }
    }

private:
    vector<task_t> _tasks;
};

Main.cc

#include "ProcessPool.hpp"

int main()
{
    //创建进程池对象
    ProcessPool pp(gdefaultnum);

    //启动进程池
    pp.Start();

    //自动派发任务
    int cnt = 10;
    while(cnt--)
    {
        pp.Run();
        sleep(1);
    }

    //回收,结束进程池
    pp.Stop();
    return 0;
}

网站公告

今日签到

点亮在社区的每一天
去签到