【Linux】I/O 多路转接:select & epoll 技术剖析

发布于:2025-03-23 ⋅ 阅读:(29) ⋅ 点赞:(0)

🌈 个人主页:Zfox_
🔥 系列专栏:Linux

前言:🔥 I/O 多路转接

💻  多路I/O转接服务器  \colorbox{cyan}{ 多路I/O转接服务器 }  多路I/O转接服务器 (或称为多任务I/O服务器)的核心思想是利用操作系统提供的多路I/O转接机制(如 selectpollepoll 等),由内核帮助应用程序高效地监视多个文件描述符(包括网络连接、管道、文件等)的状态变化,而不是让应用程序自己轮询每个连接的状态。这种方式能够显著提高服务器的性能和可扩展性,尤其是在处理大量并发连接时。

一:🔥 I/O 多路转接之 select

🦋 初识 select

💻 系统提供  select  \colorbox{pink}{ select }  select  函数来实现多路复用 输入 / 输出 模型.

  • select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
  • 程序会停在 select 这里等待, 直到被监视的文件描述符有一个或多个发生了状态改变;

🦋 select 函数原型

💤 select 的函数原型如下:

NAME
       select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing

SYNOPSIS
       /* According to POSIX.1-2001, POSIX.1-2008 */
       #include <sys/select.h>

       int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);

       void FD_CLR(int fd, fd_set *set);
       int  FD_ISSET(int fd, fd_set *set);
       void FD_SET(int fd, fd_set *set);
       void FD_ZERO(fd_set *set);

📚 参数解释:

  • nfds 是需要监视的最大的文件描述符值 +1
  • rdset, wrset, exset 分别对应于需要检测的 可读文件描述符的集合可写文件描述符的集合异常文件描述符的集合
  • timeout 为 结构体 timeval用来设置 select() 的等待时间
/* A time value that is accurate to the nearest
   microsecond but also has a range of years.  */
struct timeval
{
  __time_t tv_sec;		/* Seconds.  */
  __suseconds_t tv_usec;	/* Microseconds.  */
};

📚 参数 timeout 取值:

  • NULL则表示 select() 没有 timeoutselect 将一直被阻塞, 直到某个文件描述符上发生了事件
  • 0仅检测描述符集合的状态, 然后立即返回, 并不等待外部事件的发生 (非阻塞)
  • 特定的时间值 例如 struct timeval timeout = {10, 0} : 如果在指定的时间段里没有事件发生,select 将超时返回

🎀 关于 fd_set 结构

在这里插入图片描述
在这里插入图片描述

  • 其实这个结构就是一个 整数数组, 更严格的说, 是一个 “位图” . 使用位图中对应的位来表示要监视的文件描述符.

  • 提供了一组操作 fd_set 的接口, 来比较方便的操作位图

void FD_CLR(int fd, fd_set *set); 	// 用来清除描述词组 set 中相关 fd 的位
int  FD_ISSET(int fd, fd_set *set); // 用来测试描述词组 set 中相关 fd 的位是否为真
void FD_SET(int fd, fd_set *set); 	// 用来设置描述词组 set 中相关 fd 的位
void FD_ZERO(fd_set *set); 			// 用来清除描述词组 set 的全部位

🎀 函数返回值

  • 执行成功则返回 文件描述符状态已改变的个数
  • 如果返回 0 代表在描述符状态改变前已超过 timeout 时间
  • 当有错误发生时则返回-1, 错误原因存于 errno, 此时参数 readfds, writefds, exceptfds 和 timeout 的值变成不可预测

🙅 错误值可能为:

  • EBADF 文件描述词为无效的或该文件已关闭
  • EINTR 此调用被信号所中断
  • EINVAL 参数 n 为负值
  • ENOMEM 核心内存不足

🦋 理解 select 执行过程

🦈 理解 select 模型的关键在于理解 fd_set, 为说明方便, 取 fd_set 长度为 1 字节, fd_set 中的每一 bit 可以对应一个文件描述符 fd_set。 则 1 字节长的 fd_set 最大可以对应 8 个 fd.**

  • (1) 执行 fd_set ; FD_ZERO(&set);set 用位表示是 0000,0000
  • (2) 若 fd= 5,执行 FD_SET(fd,&set)c; 后 set 变为 0001,0000(第 5 位置为 1)
  • (3) 若再加入 fd= 2, fd=1,则 set 变为 0001,0011
  • (4) 执行 select(6,&set,0,0,0) 阻塞等待
  • (5) 若 fd=1,fd=2 上都发生可读事件, 则 select 返回, 此时 set 变为 0000,0011。 注意: 没有事件发生的 fd=5 被清空。

🎀 socket 就绪条件

读就绪
  • socket 内核中, 接收缓冲区中的字节数, 大于等于低水位标记 SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于 0;
  • socket TCP 通信中, 对端关闭连接, 此时对该 socket 读, 则返回 0;
  • 监听的 socket 上有新的连接请求;
  • socket 上有未处理的错误;
写就绪
  • socket 内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记 SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于 0;
  • socket 的写操作被关闭(close 或者 shutdown). 对一个写操作被关闭的 socket 进行写操作, 会触发 SIGPIPE 信号;
  • socket 使用非阻塞 connect 连接成功或失败之后;
  • socket 上有未读取的错误;
异常就绪(选学)
  • socket 上收到带外数据. 关于带外数据, 和 TCP 紧急模式相关(回忆 TCP 协议头中, 有一个紧急指针的字段), 自己收集相关资料

🎀 select 的特点

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值. 我这边服务器上 sizeof(fd_set)= 512, 每 bit 表示一个文件描述符, 则我服务器上支持的最大文件描述符是 512*8=4096.
  • I将 fd 加入 select 监控集的同时, 还要再使用一个数据结构 array 保存放到 select 监控集中的 fd
    • 一是用于再 select 返回后, array 作为源数据和 fd_set 进行 FD_ISSET 判断
    • 二是 select 返回后会把以前加入的但并无事件发生的 fd 清空, 则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先), 扫描 array 的同时 取得 fd 最大值 maxfd, 用于 select 的第一个参数

备注: fd_set 的大小可以调整, 可能涉及到重新编译内核.

🎀 select 缺点

  • 每次调用 select, 都需要手动设置 fd 集合, 从接口使用角度来说也非常不便.
  • 每次调用 select, 都需要把 fd 集合从用户态拷贝到内核态, 这个开销在 fd 很多时会很大
  • 同时每次调用 select 都需要在内核遍历传递进来的所有 fd, 这个开销在 fd 很多时也很大
  • select 支持的文件描述符数量太小.

🦋 select 使用示例: selectserver

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <memory>
#include "Socket.hpp"
#include "Log.hpp"
#include <sys/select.h>

using namespace SocketModule;
using namespace LogModule;

#define NUM sizeof(fd_set) * 8

const int gdefaultfd = -1;

// 最开始的时候,tcpserver 只有一个socketfd
class SelectServer
{
public:
    SelectServer(int port)
        : _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        memset(_fd_array, -1, sizeof(_fd_array));

        // 添加 listensockfd到数组中
        _fd_array[0] = _listen_socket->Fd();
    }

    void Loop()
    {
        fd_set rfds; // 读文件描述符集
        _isrunning = true;
        // 根据listen套接字检测 它上面有没有新连接
        // accept() 阻塞的吗
        // 从 listensockfd 获取新连接,本质也是一种 IO
        // 我们这种 IO , input -> listensockfd 只会关心读事件就绪
        // IO = 等 + 拷贝
        // 是要把 listensockfd 添加到 select 内部

        while (_isrunning)
        {
            // 清空 rfds
            FD_ZERO(&rfds);
            struct timeval timeout = {10, 0};
            int maxfd = gdefaultfd;

            for (int i = 0; i < NUM; i++)
            {
                if (_fd_array[i] == -1)
                    continue;
                // 合法 fd
                FD_SET(_fd_array[i], &rfds);
                //  更新最大值
                maxfd = std::max(maxfd, _fd_array[i]);
            }

            // 我们不能让 accept 来阻塞检测新连接到来,而因该让 select 来负责进行 就绪事件的检测
            // 用户告诉内核,你要帮我关心 &rfds,该事件啊!!
            int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);   // 通知上层的任务!
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("select");
                break;
            default:
                // 有事件就绪了
                // rfds: 内核告诉用户,你关心的 rfds 中的 fd,有哪些已经就绪了!!!
                std::cout << "有事件就绪啦..." << timeout.tv_sec << ":" << timeout.tv_usec << std::endl;
                Dispatcher(rfds);    // 把已经就绪的 sockfd 派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter() // 回调函数
    {
        InetAddr client;
        // listensockfd 就绪了! 获取新连接不就好了吗?
        int newfd = _listen_socket->Accepter(&client); //  此时一定不会被阻塞 select 告诉我 listensockfd 已经就绪了! 只执行 “拷贝”
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新的连接: " << newfd << " client info " << client.Addr() << std::endl;
            // recv() ?? 读事件是否就绪,我们并不清楚! newfd 也托管给select 让 select 帮我关心 sockfd 上面的读事件就绪
            // 怎么把新的 newfd 托管给 select 让select帮我去关心 newfd 上面的读事件 ?? 添加到辅助数组即可

            int pos = -1;
            for (int j = 0; j < NUM; j++)
            {
                if (_fd_array[j] == gdefaultfd)
                {
                    pos = j;
                    _fd_array[j] = newfd;
                    break;
                }
            }

            if (pos == -1)
            {
                LOG(LogLevel::ERROR) << "服务器已经满了...";
                close(newfd);
            }
        }
    }

    void Recver(int who)    // 回调函数
    {
        // 合法的就绪的普通的 fd
        char buffer[1024];
        ssize_t n = recv(_fd_array[who], buffer, sizeof(buffer) - 1, 0); // 此时也不会被阻塞 就绪
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;
            // 把读到的信息, 再回显回去
            std::string message = "echo# ";
            message += buffer;
            send(_fd_array[who], message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出, sockfd: " << _fd_array[who];
            close(_fd_array[who]);
            _fd_array[who] = gdefaultfd;
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取出错, sockfd: " << _fd_array[who];
            close(_fd_array[who]);
            _fd_array[who] = gdefaultfd;
        }
    }

    void Dispatcher(fd_set &rfds) // rfds 就可能会有更多的 fd 就绪了,就不仅仅是 listenfd 就绪了
    {
        for (int i = 0; i < NUM; i++)
        {
            if (_fd_array[i] == gdefaultfd)
                continue;

            if (_fd_array[i] == _listen_socket->Fd())
            {
                // 判断 listensockfd,是否在 rfds 里
                if (FD_ISSET(_fd_array[i], &rfds))
                {
                    Accepter(); // 连接的获取
                }
            }
            else
            {
                if (FD_ISSET(_fd_array[i], &rfds))
                {
                    Recver(i);  // IO 的处理
                }
            }
        }
    }

    ~SelectServer()
    {
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    int _fd_array[NUM]; // 辅助数组
};

二:🔥 poll

🦋 初识 poll

 poll  \colorbox{pink}{ poll }  poll  作为早期的一种多路转接方案,解决了 select 函数中文件描述符数量有限和每次调用都需要重新设置的问题。然而,随着网络技术的发展和服务器负载的不断增加,poll 在某些场景下也显露出了性能瓶颈。此时,epoll 作为 Linux 2.6内核 引入的一种更为高效的 I/O多路复用机制,凭借其出色的性能和灵活性,逐渐成为高性能服务器应用的首选。

由于 poll 与之前所提到过的 select 有许多相似之处,所以我们对于 poll 将只进行简单的介绍

🦋 poll 函数原型

💤 poll 的函数原型如下:

NAME
       poll - wait for some event on a file descriptor

SYNOPSIS
       #include <poll.h>

       int poll(struct pollfd *fds, nfds_t nfds, int timeout);

// pollfd结构
struct pollfd 
{
	int fd; /* file descriptor */
	short events; /* requested events */
	short revents; /* returned events */
};

参数说明:

  • fds 是一个 poll 函数监听的结构列表,每一个元素中,包含了三部分内容:文件描述符监听的事件集合返回的事件集合
  • nfds 表示 fds 数组的长度
  • timeout 表示 poll 函数的超时时间,单位是毫秒(ms)

events 和 revents 的取值: \colorbox{cyan}{events 和 revents 的取值:} events  revents 的取值:

在这里插入图片描述

返回结果:

  • 返回值小于0,表示出错
  • 返回值等于0,表示 poll 函数等待超时
  • 返回值大于0,表示 poll 由于监听的文件描述符就绪而返回

注意:poll 中 socket 就绪条件和 select 是一样的

🦋 poll_server.hpp:

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <memory>
#include "Socket.hpp"
#include "Log.hpp"
#include <poll.h>

using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

#define MAX 4096

// 最开始的时候,tcpserver 只有一个socketfd
class PollServer
{
public:
    PollServer(int port)
        : _port(port), 
        _listen_socket(std::make_unique<TcpSocket>()),
        _isrunning(false)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        for(int i = 0; i < MAX; i++)
        {
            _fds[i].fd = gdefaultfd;
            _fds[i].events = 0;
            _fds[i].revents = 0;
        }
        // 先把唯一的 fd 添加到poll中
        _fds[0].fd = _listen_socket->Fd();
        _fds[0].events |= POLLIN;
    }

    void Loop()
    {
        int timeout = 1000;
        _isrunning = true;

        while (_isrunning)
        {
            // 我们不能让 accept 来阻塞检测新连接到来,而因该让 select 来负责进行 就绪事件的检测
            // 用户告诉内核,你要帮我关心 &rfds,该事件啊!!
            int n = poll(_fds, MAX, timeout);   // 通知上层的任务!
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("poll");
                break;
            default:
                // 有事件就绪了
                // rfds: 内核告诉用户,你关心的 rfds 中的 fd,有哪些已经就绪了!!!
                Dispatcher();    // 把已经就绪的 sockfd 派发给指定的模块
                TestFd();
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter() // 回调函数
    {
        InetAddr client;
        // listensockfd 就绪了! 获取新连接不就好了吗?
        int newfd = _listen_socket->Accepter(&client); //  此时一定不会被阻塞 select 告诉我 listensockfd 已经就绪了! 只执行 “拷贝”
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新的连接: " << newfd << " client info " << client.Addr() << std::endl;
            // recv() ?? 读事件是否就绪,我们并不清楚! newfd 也托管给select 让 select 帮我关心 sockfd 上面的读事件就绪
            // 怎么把新的 newfd 托管给 select 让select帮我去关心 newfd 上面的读事件 ?? 添加到辅助数组即可

            int pos = -1;
            for (int j = 0; j < MAX; j++)
            {
                if (_fds[j].fd == gdefaultfd)
                {
                    pos = j;
                    _fds[j].fd = newfd;
                    _fds[j].events = POLLIN;
                    break;
                }
            }

            if (pos == -1)
            {
                // fds进行扩容
                LOG(LogLevel::ERROR) << "服务器已经满了...";
                close(newfd);
            }
        }
    }

    void Recver(int who)    // 回调函数
    {
        // 合法的就绪的普通的 fd
        char buffer[1024];
        ssize_t n = recv(_fds[who].fd, buffer, sizeof(buffer) - 1, 0); // 此时也不会被阻塞 就绪
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;
            // 把读到的信息, 再回显回去
            std::string message = "echo# ";
            message += buffer;
            send(_fds[who].fd, message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出, sockfd: " << _fds[who].fd;
            close(_fds[who].fd);
            _fds[who].fd = gdefaultfd;
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取出错, sockfd: " << _fds[who].fd;
            close(_fds[who].fd);
            _fds[who].fd = gdefaultfd;
        }
    }

    void Dispatcher()
    {
        for (int i = 0; i < MAX; i++)
        {
            if (_fds[i].fd == gdefaultfd)
                continue;

            if (_fds[i].fd == _listen_socket->Fd())
            {
                if (_fds[i].revents & POLLIN)
                {
                    Accepter(); // 连接的获取
                }
            }
            else
            {
                if (_fds[i].revents & POLLIN)
                {
                    Recver(i);  // IO 的处理
                }
                // else if(_fds[i].revents & POLLOUT)
                // {

                // }
            }
        }
    }

    void TestFd()
    {
        std::cout << "pollfd: ";
        for(int i = 0; i < MAX; i++)
        {
            if(_fds[i].fd == gdefaultfd) continue;
            std::cout << _fds[i].fd << "[" << Evects2Str(_fds[i].events) <<  "] " << std::endl;
        }
        std::cout << '\n';
    }

    std::string Evects2Str(short events)
    {
        std::string s = (events & POLLIN) ? "POLLIN" : "";
        s +=  (events & POLLOUT) ? "POLLOUT" : "";
        return s;
    }

    ~PollServer()
    {
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    struct pollfd _fds[MAX];
};

🦋 poll 的优点:

  • 🧊 输入,输出参数分离,eventsrevents,不用在频繁的对 poll参数 进行重置
  • 🧊 poll 关心的 fd 没有上限, 可以等待多个fd,效率高

🦋 poll 的缺点:

  • 🧊 依旧需要,内核在底层进行遍历式的检测,fd是否就绪
  • 🧊 每次调用 poll,都需要把 fd集合 从用户态拷贝到内核态,这个开销在fd很多时会很大

poll 作为Linux中的多路转接技术之一,在处理多个并发连接时具有一定的优势。然而,随着网络技术的发展和服务器负载的不断增加,poll 在某些场景下可能无法满足高性能的需求。因此,在实际应用中需要根据具体场景选择合适的I/O多路复用技术

三:🔥 epoll

 epoll  \colorbox{pink}{ epoll }  epoll 是 Linux下多路复用I/O 接口 select/poll 的增强版本,旨在提高程序在大量并发连接中只有少量活跃情况下的系统 CPU 利用率。按照 man 手册的说法:是为处理大批量句柄而作了改进的 poll,但其实 epollpoll 还是有很大差别的

🦋 epoll 的相关系统调用

epoll 有3个相关的系统调用:

  • epoll_create
  • epoll_ctl
  • epoll_wait
  • epoll_create
int epoll_create(int size);

epoll_create 的功能是创建一个 epoll 的句柄 (文件描述符),自从 linux2.6.8之后,size 参数是被忽略的,注意用完之后, 必须调用 close() 关闭

  • epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll 的事件注册函数,它不同于 select() 是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型

  • 第一个参数是 epoll_create() 的返回值 (epoll的句柄)
  • 第二个参数表示动作,用三个宏来表示
  • 第三个参数是需要监听的fd
  • 第四个参数是告诉内核需要监听什么事

第二个参数的取值:

  • EPOLL_CTL_ADD注册新的fd到 epfd
  • EPOLL_CTL_MOD修改已经注册的fd的监听事件
  • EPOLL_CTL_DELepfd 中删除一个fd

struct epoll_event 结构如下:

在这里插入图片描述

✨events \colorbox{pink}{✨events} ✨events可以是以下几个宏的集合:

含义
EPOLLIN 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)
EPOLLOUT 表示对应的文件描述符可以写
EPOLLPRI 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)
EPOLLERR 表示对应的文件描述符发生错误
EPOLLHUP 表示对应的文件描述符被挂断
EPOLLET 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
EPOLLONESHOT 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
  • epoll_wait
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll_wait 的功能是收集在 epoll监控的事件中已经发送的事件

  • 参数 events 是分配好的 epoll_event 结构体数组
  • epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)
  • maxevents 告之内核这个events有多大,这个 maxevents 的值不能大于创建 epoll_create() 时的size
  • 参数 timeout 是超时时间 (毫秒,0会立即返回,-1是永久阻塞)
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败

🦋 epoll 工作原理

在这里插入图片描述

当某一进程调用 epoll_create 方法时,Linux内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关

在这里插入图片描述

注意了:那么 epfd 这个文件描述符如何关联到 eventpoll 结构体呢,我们查看linux内核源代码发现,struct file 中的 private_data 就指向了 eventpoll 结构体,关联到了这个 epoll 模型

在这里插入图片描述

struct eventpoll {
	...
	/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件 就绪队列*/
	struct list_head rdllist;

	/*红黑树的根节点, 这颗树中存储着所有添加到 epoll 中的需要监控的事件*/
	struct rb_root rbr;
	...
};
  • 每一个 epoll 对象都有一个独立的 eventpoll 结构体, 用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件.
  • 这些事件都会挂载在红黑树中, 如此, 重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 lgn, 其中 n 为树的高度).
  • 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系, 也就是说, 当响应的事件发生时会调用这个回调方法.
  • 这个回调方法在内核中叫 ep_poll_callback, 它会将发生的事件添加到 rdlist 双链表中.
  • epoll 中, 对于每一个事件, 都会建立一个 epitem 结构体

红黑树的键值key 就是 文件描述符fd!!!

在这里插入图片描述

struct epitem{
	struct rb_node rbn;			//红黑树节点
	struct list_head rdllink;	//双向链表节点
	struct epoll_filefd ffd; 	//事件句柄信息
	struct eventpoll *ep; 		//指向其所属的 eventpoll 对象
	struct epoll_event event; 	//期待发生的事件类型
}
  • 当调用 epoll_wait 检查是否有事件发生时, 只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可.
  • 如果 rdlist 不为空, 则把发生的事件复制到用户态, 同时将事件数量返回给用户. 这个操作的时间复杂度是 O(1).

📚 关于 epoll 的使用其实就只有三步:

  • 调用 epoll_create 创建一个 epoll 句柄
  • 调用 epoll_ctl,将要监控的文件描述符进行注册
  • 调用 epoll_wait,等待文件描述符就绪

🦋 epoll 的优点 (和 select 的缺点对应)

  • 接口使用方便虽然拆分成了三个函数,反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,同时做到了输入输出参数分离开
  • 数据拷贝轻量只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中,这个操作并不频繁 ( select/poll 都是每次循环都要进行拷贝)
  • 事件回调机制避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度O(1),即使文件描述符数目很多,效率也不会受到影响
  • 没有数量限制文件描述符数目无上限

📚 注意:有人说,epoll 中使用了内存映射机制,这种说法是不准确的,我们定义的 struct epoll_event 是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中的

我们来看看内存映射机制是什么:

内存映射机制: 内核直接将就绪队列通过 mmap 的方式映射到用户态,避免了拷贝内存这样的额外性能开销

📚 大家务必对比总结 select, poll, epoll 之间的优点和缺点(重要, 面试中常见).

🦋 epoll 工作方式

🌰 我们来举个生活中例子:你在网上网购了一袋零食,快递员送到你家楼下的时候,这时候就有可能出现两种方式:

🛺 新来的快递员小王,在楼下叫你下楼取快递,你可能觉得他好欺负,就没有下楼,小王就一直在楼下喊你,一遍,两遍,三遍 (水平触发)

🛺 后面换了快递员小赵来给你送快递,在楼下叫了你一次,你没下来,他也没惯着你,就离开去送下一个快递了(边缘触发)

epoll 有2种工作方式:

  • 水平触发(Level Triggered简称 LT)
  • 边缘触发(Edge Triggered简称 ET)

🎀 水平触发Level Triggered 工作模式:

epoll 默认状态下就是 LT 工作模式

  • epoll 检测到 socket 上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
  • 如上面的例子, 由于只读了 1K 数据, 缓冲区中还剩 1K 数据, 在第二次调用 epoll_wait 时, epoll_wait 仍然会立刻返回并通知 socket 读事件就绪.
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
  • 支持阻塞读写和非阻塞读写

🎀 边缘触发Edge Triggered工作模式:

如果我们在第 1 步将 socket 添加到 epoll 描述符的时候使用了 EPOLLET 标志, epoll 进入 ET 工作模式.

  • epoll 检测到 socket 上事件就绪时, 必须立刻处理.
  • 如上面的例子, 虽然只读了 1K 的数据, 缓冲区还剩 1K 的数据, 在第二次调用 epoll_wait 的时候, epoll_wait 不会再返回了.
  • 也就是说, ET 模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
  • ET 的性能比 LT 性能更高( epoll_wait 返回的次数少了很多). Nginx 默认采用 ET 模式使用 epoll .
  • 只支持非阻塞的读写

selectpoll 其实也是工作在 LT 模式下. epoll 既可以支持 LT, 也可以支持 ET

🦋 对比LT和ET:

  • LT是 epoll 的默认行为.
  • 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完.
  • 相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.
  • 另一方面, ET 的代码复杂程度更高了

🦋 理解ET模式和非阻塞文件描述符

使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞,这个不是接口上的要求, 而是 “工程实践” 上的要求

假设这样的场景:服务器接受到一个10k的请求,会向客户端返回一个应答数据,如果客户端收不到应答,不会发送第二个10k请求

在这里插入图片描述

如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话(read 不能保证一次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的 9k 数据就会待在缓冲区中

在这里插入图片描述

此时由于 epoll 是 ET 模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回

但是问题来了.

  • 服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.
  • 客户端要读到服务器的响应, 才会发送下一个请求
  • 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.

在这里插入图片描述

为了解决上述问题(阻塞read不一定能一下把完整的请求读完),于是就可以使用非阻塞轮训的方式来读缓冲区,保证一定能把完整的请求都读出来

如果是LT没这个问题,只要缓冲区中的数据没读完,就能够让 epoll_wait 返回文件描述符读就绪

🦋 epoll 的使用场景

epoll 的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll 的性能可能适得其反.

  • 对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用 epoll .

例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网 APP 的入口服务器, 这样的服务器就很适合 epoll .
如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用 epoll 就并不合适. 具体要根据需求和场景特点来决定使用哪种 IO 模型.

🦋 epoll_server.hpp (LT 模式):

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <memory>
#include "Socket.hpp"
#include "Log.hpp"
#include <sys/epoll.h>

using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

#define MAX 4096

// 最开始的时候,tcpserver 只有一个socketfd
class EpollServer
{
    static const int revs_num = 64;
public:
    EpollServer(int port)
        : _port(port), 
        _epfd(gdefaultfd),
        _listen_socket(std::make_unique<TcpSocket>()),
        _isrunning(false)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        // 1. 创建epoll模型
        _epfd = epoll_create(256);
        if(_epfd < 0)
        {
            LOG(LogLevel::ERROR) << "epoll_create error";
            exit(EPOLL_CREATE_ERR);
        }
        LOG(LogLevel::DEBUG) << "epoll_create success: " << _epfd; 

        // 2. 至少要将listensock 添加到epoll模型中
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = _listen_socket->Fd();
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listen_socket->Fd(), &ev);
        if(n < 0)
        {
            LOG(LogLevel::ERROR) << "epoll_ctl error";
            exit(EPOLL_CTL_ERR);
        }
    }

    void Loop()
    {
        int timeout = 1000;
        _isrunning = true;

        while (_isrunning)
        {
            // 我们不能让 accept 来阻塞检测新连接到来,而因该让 select 来负责进行 就绪事件的检测
            // 用户告诉内核,你要帮我关心 &rfds,该事件啊!!
            int n = epoll_wait(_epfd, _revs, revs_num, timeout);   
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("epoll_wait");
                break;
            default:
                // 有事件就绪了
                //  内核告诉用户,你关心的 有哪些已经就绪了!!!
                Dispatcher(n);    // 把已经就绪的 sockfd 派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter() // 回调函数
    {
        InetAddr client;
        // listensockfd 就绪了! 获取新连接不就好了吗?
        int newfd = _listen_socket->Accepter(&client); //  此时一定不会被阻塞 select 告诉我 listensockfd 已经就绪了! 只执行 “拷贝”
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新的连接: " << newfd << " client info " << client.Addr() << std::endl;

            // 我们需要将新的 newfd添加给epoll,让epoll帮我们进行监管
            struct epoll_event ev;
            ev.data.fd = newfd;
            ev.events = EPOLLIN;
            int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, newfd, &ev);
            if(n < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl error";
                close(newfd);
                return ;
            }
            LOG(LogLevel::DEBUG) << "epoll_ctl success: " << newfd;
        }
    }

    void Recver(int who)    // 回调函数
    {
        // 合法的就绪的普通的 fd
        char buffer[1024];
        ssize_t n = recv(who, buffer, sizeof(buffer) - 1, 0); // 此时也不会被阻塞 就绪
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;
            // 把读到的信息, 再回显回去
            std::string message = "echo# ";
            message += buffer;
            send(who, message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出, sockfd: " << who;
            // 把fd从epoll中移除, 必须保证fd是一个合法的fd
            int m = epoll_ctl(_epfd, EPOLL_CTL_DEL, who, nullptr);
            if(m < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl_del error";
                return ;
            }
            LOG(LogLevel::DEBUG) << "epoll_ctl_del success: " << who;
            close(who); // 坑
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取出错, sockfd: " << who;
            // 把fd从epoll中移除
            int m = epoll_ctl(_epfd, EPOLL_CTL_DEL, who, nullptr);
            if(m < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl_del error";
                return ;
            }
            LOG(LogLevel::DEBUG) << "epoll_ctl_del success: " << who;
            close(who); // 坑
        }
    }

    void Dispatcher(int rnum)
    {
        for (int i = 0; i < rnum; i++)
        {
            int events = _revs[i].events;
            int fd = _revs[i].data.fd;
            if(fd == _listen_socket->Fd())
            {
                if(events & EPOLLIN)
                {
                    // listen sock fd 就绪
                    Accepter();
                }
            }
            else 
            {
                // 普通文件描述符就绪
                if(events & EPOLLIN)
                {
                    // 读事件就绪
                    Recver(fd);
                }
                // else if(events & EPOLLOUT)
                // {
                //     // 写事件就绪
                // }
            }
        }
    }

    std::string Evects2Str(short events)
    {
        std::string s = (events & EPOLLIN) ? "POLLIN" : "";
        s +=  (events & EPOLLOUT) ? "POLLOUT" : "";
        return s;
    }

    ~EpollServer()
    {
        _listen_socket->Close();
        if(_epfd >= 0) close(_epfd);
    }

private:
    uint16_t _port;
    int _epfd;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    struct epoll_event _revs[revs_num];
};

四:🔥 共勉

😋 以上就是我对 【Linux】I/O 多路转接:select & epoll 技术剖析 的理解, 觉得这篇博客对你有帮助的,可以点赞收藏关注支持一波~ 😉
在这里插入图片描述