【网络】高级IO(2)

发布于:2025-02-23 ⋅ 阅读:(18) ⋅ 点赞:(0)

或者在某些情况下,它可能是:

 typedef unsigned int nfds_t; 

前言

由于select函数有下面几个特别明显的缺点,就推演出了改进版本——poll函数

比如select监视的fd是有上限的,我的云服务器内核版本下最大上限是1024个fd,主要还是因为fd_set他是一个固定大小的位图结构,位图中的数组开辟之后不会在变化了,这是内核的数据结构,除非你修改内核参数,否则不会在变化了,所以一旦select监视的fd数量超过1024,则select会报错。
 

除此之外,select大部分的参数都是输入输出型参数,用户和内核都会不断的修改这些参数的值,导致每次调用select前,都需要重新设置fd_set位图中的内容,这在用户层面上会带来很多不必要的遍历+拷贝的成本。

  poll接口主要解决了select接口的两个问题,一个是select监视的fd有上限,另一个是select每次调用前都需要借助第三方数组,向fd_set里面重新设置关心的fd。 

        select() 和 poll() 系统调用的本质一样,poll() 的机制与 select() 类似,与 select() 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。

只要我们理解了我们的select,poll也就不在话下

    poll() 和 select() 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

一,poll函数

这个函数的功能和select一模一样。都是监视并等待多个文件描述符的属性变化

poll函数处理的还是IO过程里面的等待过程!!!

1.1.参数一:fds

这个参数就需要我们知道struct pollfd结构体是什么东西!!!

struct pollfd{
	int fd;			//文件描述符
	short events;	//等待的事件
	short revents;	//实际发生的事件
};

 poll 函数是 Unix/Linux 系统中用于监视多个文件描述符(file descriptors)状态变化的一种机制。poll 函数能够同时监视多个文件描述符,以检查其上是否发生了感兴趣的事件(如可读、可写、错误等)。为了实现这一功能,poll 函数使用了一个 pollfd 结构体数组作为输入参数,每个 pollfd 结构体代表了一个被监视的文件描述符及其相关的事件。

下面是对 pollfd 结构体各成员的详细解释:

int fd;:这个成员变量是一个整数,表示被监视的文件描述符。文件描述符是一个非负整数,它是一个索引值,指向内核中打开文件的表项。通过文件描述符,我们可以对打开的文件进行读写操作。在 poll 函数的上下文中,这个文件描述符可以是任何类型的文件、套接字(socket)或者管道(pipe)等。

short events;:这个成员变量是一个位掩码(bitmask),用于指定我们想要监视的、在该文件描述符上可能发生的事件。这些事件可以是以下几种之一(或它们的组合,通过位或操作符 | 实现):

  1. POLLIN:有数据可读。
  2. POLLOUT:写数据不会阻塞。
  3. POLLERR:发生错误。
  4. POLLHUP:挂起(hang up)。
  5. POLLNVAL:无效的文件描述符。
  6. 以及其他一些可能依赖于特定实现的标志。

相比于select,poll将输入和输出事件进行了分离 

比如我们希望内核帮我们关注 0号文件描述符上的读事件

struct pollfd rfds;    
rfds.fd = 0;              //希望内核帮我们关注0号文件描述符上的事件
rfds.events |= POLLIN;    //希望内核关注0号文件描述符上的读事件
//rfds.events = POLLIN | POLLOUT;    //希望既监听读事件又监听写事件
rfds.revents = 0;         //这个参数用于内核通知我们有事件就绪了,让我们赶紧来取

如果我们要判断读事件是否就绪

if(rfds.revents & POLLIN){
    std::cout << "读事件就绪了..." << std::endl;
}

short revents;:这个成员变量在 poll 函数调用返回时被填充,它也是一个位掩码,表示在调用 poll 期间,实际发生在文件描述符上的事件。与 events 不同,revents 是由 poll 函数自动填写的,用户不需要(也不应该)在调用 poll 之前设置它。revents 的值可能包含 events 中指定的任何事件,或者由于某种原因(如错误或挂起)而包含其他事件。

events和revents的取值都是以宏的方式进行定义的,它们的二进制序列当中有且只有一个比特位是1,且为1的比特位是各不相同的。

  • 在调用poll函数之前,可以通过“或”运算符将要检测的事件添加到events成员当中。
  • 在poll函数返回后,可以通过“与”运算符检测revents成员中是否包含特定事件,以得知对应文件描述符的特定事件是否就绪。

注意:每个结构体的 events 域是由用户来设置,告诉内核我们关注的是什么,而 revents 域是返回时内核设置的,以说明对该描述符发生了什么事件 

  • 使用 poll 函数时,你通常会创建一个 pollfd 结构体数组,每个元素代表一个你想要监视的文件描述符及其事件。
  • 然后,你调用 poll 函数,并将这个数组和数组的大小作为参数传递给它。
  • poll 函数会阻塞(或根据需要立即返回),直到一个或多个文件描述符上发生了请求的事件,或者超过了指定的超时时间。
  • 最后,你可以通过检查每个 pollfd 结构体的 revents 成员来确定哪些文件描述符上发生了哪些事件。

1.2.参数二,nfds

nfds_t类型是什么?

 nfds_t 类型是在 Unix/Linux 系统编程中,特别是在使用 poll、ppoll、select 等系统调用时,用来表示文件描述符数量的数据类型。这个类型的确切定义可能会根据不同的系统和库实现而有所不同,但通常它是一个足够大的整数类型,以容纳系统可能支持的最大文件描述符数量。

    在大多数现代 Unix-like 系统中,nfds_t 通常是 unsigned int 或 unsigned long 的别名,但这不是一个固定的规则,因此最好查看你的系统或库的文档以获取确切的定义。

例如,在 glibc(GNU C Library)中,nfds_t 的定义可能看起来像这样(尽管这取决于 glibc 的版本和配置):

 typedef unsigned long nfds_t; 

或者在某些情况下,它可能是:

 typedef unsigned int nfds_t; 

当你使用 poll 函数时,你需要将一个 nfds_t 类型的值作为第一个参数传递给函数,这个值表示 pollfd 结构体数组中的元素数量。这个值应该大于或等于数组中实际元素的数量,因为 poll 会检查这个范围内的所有 pollfd 结构体。

 #include <poll.h>  
   int main() {  
 struct pollfd fds[2];  
 // 初始化 fds 数组...  
   nfds_t nfds = sizeof(fds) / sizeof(fds[0]);  
 int timeout = -1; // 无限等待  
   int ret = poll(fds, nfds, timeout);  
 // 处理 poll 的返回值...  
   return 0;  
 } 

在这个例子中,nfds 被设置为 fds 数组的大小,这是调用 poll 时应该传递的正确值。注意,虽然在这个例子中 nfds 被显式地计算为数组的大小,但在许多情况下,你可能已经知道要监视的文件描述符数量,因此可以直接使用那个值。

1.3.参数三,timeout

这个参数是一个输入型参数,单位是毫秒,代表阻塞等待的时间,超过该时间就会变为非阻塞等待。

timeout = -1,代表永久阻塞等待
timeout = 0,代表永久非阻塞等待
timeout > 0,代表先阻塞等待 timeout 毫秒,超过这个时间变为非阻塞等待,poll函数返回

1.4.返回值

成功时,poll() 返回结构体中 revents 域不为 0 的文件描述符个数;如果在超时前没有任何事件发生,poll()返回 0;

失败时,poll() 返回 -1,并设置 errno 为下列值之一:

  1. EBADF:一个或多个结构体中指定的文件描述符无效。
  2. EFAULT:fds 指针指向的地址超出进程的地址空间。
  3. EINTR:请求的事件之前产生一个信号,调用可以重新发起。
  4. EINVAL:nfds 参数超出 PLIMIT_NOFILE 值。
  5. ENOMEM:可用内存不足,无法完成请求。

1.5.poll函数简单使用示例

#include <stdio.h>  // 引入标准输入输出库,用于printf等函数  
#include <stdlib.h> // 引入标准库,用于EXIT_FAILURE等宏定义  
#include <unistd.h> // 引入POSIX操作系统API,用于open、close等函数  
#include <fcntl.h>  // 引入文件控制选项,如O_RDONLY  
#include <poll.h>   // 引入poll函数及其相关结构体pollfd  
  
int main() {    
    // 尝试以只读模式打开名为"test.txt"的文件  
    int fd = open("test.txt", O_RDONLY);    
    if (fd < 0) {    
        // 如果文件打开失败,打印错误信息并返回失败状态  
        perror("Failed to open file");    
        return EXIT_FAILURE;    
    }    
    
    // 定义一个pollfd结构体数组,用于存储要监视的文件描述符及其事件  
    struct pollfd fds[1];    
    // 设置数组的第一个元素,指定要监视的文件描述符及其感兴趣的事件(POLLIN,表示可读)  
    fds[0].fd = fd;    
    fds[0].events = POLLIN;    
    
    // 设置poll函数的超时时间为5000毫秒(5秒)  
    int timeout = 5000; // 5 seconds    
    // 调用poll函数,监视fds数组中的文件描述符,超时时间为timeout  
    int ret = poll(fds, 1, timeout);    
    
    // 检查poll函数的返回值  
    if (ret == -1) {    
        // 如果poll调用失败,打印错误信息,关闭文件描述符,并返回失败状态  
        perror("poll failed");    
        close(fd);    
        return EXIT_FAILURE;    
    }    
    
    // 如果poll超时,没有任何事件发生  
    if (ret == 0) {    
        printf("No data within five seconds\n");    
    } else if (fds[0].revents & POLLIN) {    
        // 如果在文件描述符上发生了POLLIN事件(即可读)  
        char buf[1024];  // 定义一个缓冲区,用于存储读取的数据  
        ssize_t bytes_read = read(fd, buf, sizeof(buf) - 1);  // 从文件描述符中读取数据到缓冲区  
        if (bytes_read > 0) {  // 如果成功读取到数据  
            buf[bytes_read] = '\0';  // 在读取到的数据末尾添加字符串结束符'\0'  
            printf("Read %zd bytes: %s\n", bytes_read, buf);  // 打印读取到的数据及其长度  
        }    
    }    
    
    // 关闭文件描述符,释放资源  
    close(fd);    
    // 程序正常结束  
    return EXIT_SUCCESS;    
}

这段代码展示了如何使用poll函数来监视一个文件描述符(在这个例子中是一个打开的文件)的状态,特别是检查文件是否有数据可读。如果文件在指定的超时时间内变得可读,程序将读取并打印文件内容;如果超时,则打印一条消息表示没有数据可读。

二,poll版TCP服务器编写

2.1.编写

我们这个poll版TCP服务器和那个select版本的差不多!!!所以我们基本就是在select版本上面的基础上进行修改

PollServer.hpp初始版本

#pragma once
#include <iostream>
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <poll.h> 
 
const uint16_t default_port = 8877;       // 默认端口号
const std::string default_ip = "0.0.0.0"; // 默认IP
const int default_fd = -1;
const int fd_num_max=64;
const int non_event=0;
 
class PollServer
{
public:
    PollServer(const uint16_t port = default_port, const std::string ip = default_ip)
        : ip_(ip), port_(port)
    {
        for(int i=0;i<fd_num_max;i++)
        {
            event_fds[i].fd=default_fd;
            event_fds[i].events=non_event;//暂时不关心
            event_fds[i].revents=non_event;//暂时不关心
        }
    }
    ~PollServer()
    {
        listensock_.Close();
    }
 
    bool Init()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
 
        return true;
    }
 
    void Start()
    {
        
    }
 
private:
    uint16_t port_;          // 绑定的端口号
    Sock listensock_;        // 专门用来listen的
    std::string ip_;         // ip地址
    struct pollfd event_fds[fd_num_max];
};

接下来就是修改 Accept()和Receive()了

void Accept()
    {
        // 我们的连接事件就绪了
        std::string clientip;
        uint16_t clientport;
 
        int sockfd = listensock_.Accept(&clientip, &clientport); // 这里会返回一个新的套接字
        // 请问进程会阻塞在Accept这里吗?答案是不会的,因为上层的select已经完成的了等待部分,accept只需要完成建立连接即可
        if (sockfd < 0)
            return;
        else // 把新fd加入位图
        {
            int i = 1;
            for (; i < fd_num_max; i++) // 为什么从1开始,因为我们0号下标对应的是listen套接字,我们不要修改
            {
                if (event_fds[i].fd != default_fd) // 没找到空位
                {
                    continue;
                }
                else
                { // 找到空位,但不能直接添加
                    break;
                }
            }
            if (i != fd_num_max) // 没有满
            {
                event_fds[i].fd = sockfd; // 把新连接加入数组
                 event_fds[i].events=POLLIN ;//关心读事件
                 event_fds[i].revents = non_event;//重置一下
                 
                Printfd();
            }
            else // 满了
            {
                close(sockfd); // 处理不了了,可以直接选择关闭连接
                //当然,如果想要连接更多链接,我们可以在这里进行扩容操作,不过我们这里就不做了
            }
        }
    }
     void Printfd()
    {
        std::cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (event_fds[i].fd == default_fd)
                continue;
            else
            {
                std::cout << event_fds[i].fd << " ";
            }
        }
        std::cout << std::endl;
    }
 
 
    void Receiver(int fd, int i)
    {
        char in_buff[1024];
        int n = read(fd, in_buff, sizeof(in_buff) - 1);
        if (n > 0)
        {
            in_buff[n] = 0;
            std::cout << "get message: " << in_buff << std::endl;
        }
        else if (n == 0) // 客户端关闭连接
        {
            close(fd);               // 我服务器也要关闭
            event_fds[i].fd = default_fd; // 重置数组内的值
        }
        else
        {
            close(fd);               // 我服务器也要关闭
            event_fds[i].fd = default_fd; // 重置数组内的值
        }
    }

PollServer.hpp

#pragma once
#include <iostream>
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <poll.h> 
 
const uint16_t default_port = 8877;       // 默认端口号
const std::string default_ip = "0.0.0.0"; // 默认IP
const int default_fd = -1;
const int fd_num_max=64;
const int non_event=0;
 
class PollServer
{
public:
    PollServer(const uint16_t port = default_port, const std::string ip = default_ip)
        : ip_(ip), port_(port)
    {
        for(int i=0;i<fd_num_max;i++)
        {
            event_fds[i].fd=default_fd;
            event_fds[i].events=non_event;//暂时不关心
            event_fds[i].revents=non_event;//暂时不关心
        }
    }
    ~PollServer()
    {
        listensock_.Close();
    }
 
    bool Init()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
 
        return true;
    }
 
    void Accept()
    {
        // 我们的连接事件就绪了
        std::string clientip;
        uint16_t clientport;
 
        int sockfd = listensock_.Accept(&clientip, &clientport); // 这里会返回一个新的套接字
        // 请问进程会阻塞在Accept这里吗?答案是不会的,因为上层的select已经完成的了等待部分,accept只需要完成建立连接即可
        if (sockfd < 0)
            return;
        else // 把新fd加入位图
        {
            int i = 1;
            for (; i < fd_num_max; i++) // 为什么从1开始,因为我们0号下标对应的是listen套接字,我们不要修改
            {
                if (event_fds[i].fd != default_fd) // 没找到空位
                {
                    continue;
                }
                else
                { // 找到空位,但不能直接添加
                    break;
                }
            }
            if (i != fd_num_max) // 没有满
            {
                event_fds[i].fd = sockfd; // 把新连接加入数组
                 event_fds[i].events=POLLIN ;//关心读事件
                 event_fds[i].revents = non_event;//重置一下
                 
                Printfd();
            }
            else // 满了
            {
                close(sockfd); // 处理不了了,可以直接选择关闭连接
                //当然,如果想要连接更多链接,我们可以在这里进行扩容操作,不过我们这里就不做了
            }
        }
    }
     void Printfd()
    {
        std::cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (event_fds[i].fd == default_fd)
                continue;
            else
            {
                std::cout << event_fds[i].fd << " ";
            }
        }
        std::cout << std::endl;
    }
 
 
    void Receiver(int fd, int i)
    {
        char in_buff[1024];
        int n = read(fd, in_buff, sizeof(in_buff) - 1);
        if (n > 0)
        {
            in_buff[n] = 0;
            std::cout << "get message: " << in_buff << std::endl;
        }
        else if (n == 0) // 客户端关闭连接
        {
            close(fd);               // 我服务器也要关闭
            event_fds[i].fd = default_fd; // 重置数组内的值
        }
        else
        {
            close(fd);               // 我服务器也要关闭
            event_fds[i].fd = default_fd; // 重置数组内的值
        }
    }
 
    void HandlerEvent()
    {
        for (int n = 0; n < fd_num_max; n++)
        {
            int fd = event_fds[n].fd;
            if (fd == default_fd) // 无效的
                continue;
 
            if (event_fds[n].revents&POLLIN) // fd套接字就绪了
            {
                // 1.是listen套接字就绪了
                if (fd == listensock_.Fd()) // 如果是listen套接字就绪了!!!
                {
                    Accept();
                }
                // 2.是通信的套接字就绪了,fd不是listen套接字
                else // 读事件
                {
                    Receiver(fd,n);
                }
            }
        }
    }
    
 
    void Start()
    {
        int listensock = listensock_.Fd();
        event_fds[0].fd=listensock;//把listen放到首个数组下标里面
        event_fds[0].events=POLLIN;//只关心读事件
        //revent可以不设置
        int timeout=3000;//3s
        for (;;)
        {
            int n = poll(event_fds,fd_num_max,timeout);
 
            switch (n)
            {
            case 0:
                std::cout << "time out....." << std::endl;
                break;
            case -1:
                std::cout << "poll error" << std::endl;
                break;
            default:
                // 有事件就绪
                std::cout << "get a new link" << std::endl;
                HandlerEvent(); // 处理事件
                break;
            }
        }
    }
 
private:
    uint16_t port_;          // 绑定的端口号
    Sock listensock_;        // 专门用来listen的
    std::string ip_;         // ip地址
    struct pollfd event_fds[fd_num_max];
};

我们再链接一个看看

非常完美了啊!!!

2.2.poll的优缺点

  • poll不是也用了一个数组吗?他是怎么解决了fd有上限的问题?

凭的是这个数组的大小是我们自己设置的,而select的那个数组是已经固定了的。

  其实poll的优点就是解决了select支持的fd有上限,以及用户输入信息和内核输出信息耦合的两个问题。

 但poll的缺点其实在上面的代码已经体现出来了一部分,内核在检测fd是否就绪时,需要遍历整个结构体数组检测events的值,同样用户在处理就绪的fd事件时,也需要遍历整个结构体数组检测revents的值,当rfds结构体数组越来越大时,每次遍历数组其实就会降低服务器的效率,为此,内核提供了epoll接口来解决这样的问题。

与select相同的是,poll也需要用户自己维护一个第三方数组来存储用户需要关心的fd及事件,只不过poll不需要在每次调用前都重新设置关心的fd,因为用户的输入和内核的输出是分离的,分别在结构体的events和revents的两个字段,做到了输入和输出分离。

2.3.源代码

PollServer.hpp

#pragma once
#include <iostream>
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <poll.h> 
 
const uint16_t default_port = 8877;       // 默认端口号
const std::string default_ip = "0.0.0.0"; // 默认IP
const int default_fd = -1;
const int fd_num_max=64;
const int non_event=0;
 
class PollServer
{
public:
    PollServer(const uint16_t port = default_port, const std::string ip = default_ip)
        : ip_(ip), port_(port)
    {
        for(int i=0;i<fd_num_max;i++)
        {
            event_fds[i].fd=default_fd;
            event_fds[i].events=non_event;//暂时不关心
            event_fds[i].revents=non_event;//暂时不关心
        }
    }
    ~PollServer()
    {
        listensock_.Close();
    }
 
    bool Init()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
 
        return true;
    }
 
    void Accept()
    {
        // 我们的连接事件就绪了
        std::string clientip;
        uint16_t clientport;
 
        int sockfd = listensock_.Accept(&clientip, &clientport); // 这里会返回一个新的套接字
        // 请问进程会阻塞在Accept这里吗?答案是不会的,因为上层的select已经完成的了等待部分,accept只需要完成建立连接即可
        if (sockfd < 0)
            return;
        else // 把新fd加入位图
        {
            int i = 1;
            for (; i < fd_num_max; i++) // 为什么从1开始,因为我们0号下标对应的是listen套接字,我们不要修改
            {
                if (event_fds[i].fd != default_fd) // 没找到空位
                {
                    continue;
                }
                else
                { // 找到空位,但不能直接添加
                    break;
                }
            }
            if (i != fd_num_max) // 没有满
            {
                event_fds[i].fd = sockfd; // 把新连接加入数组
                 event_fds[i].events=POLLIN ;//关心读事件
                 event_fds[i].revents = non_event;//重置一下
                 
                Printfd();
            }
            else // 满了
            {
                close(sockfd); // 处理不了了,可以直接选择关闭连接
                //当然,如果想要连接更多链接,我们可以在这里进行扩容操作,不过我们这里就不做了
            }
        }
    }
     void Printfd()
    {
        std::cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (event_fds[i].fd == default_fd)
                continue;
            else
            {
                std::cout << event_fds[i].fd << " ";
            }
        }
        std::cout << std::endl;
    }
 
 
    void Receiver(int fd, int i)
    {
        char in_buff[1024];
        int n = read(fd, in_buff, sizeof(in_buff) - 1);
        if (n > 0)
        {
            in_buff[n] = 0;
            std::cout << "get message: " << in_buff << std::endl;
        }
        else if (n == 0) // 客户端关闭连接
        {
            close(fd);               // 我服务器也要关闭
            event_fds[i].fd = default_fd; // 重置数组内的值
        }
        else
        {
            close(fd);               // 我服务器也要关闭
            event_fds[i].fd = default_fd; // 重置数组内的值
        }
    }
 
    void HandlerEvent()
    {
        for (int n = 0; n < fd_num_max; n++)
        {
            int fd = event_fds[n].fd;
            if (fd == default_fd) // 无效的
                continue;
 
            if (event_fds[n].revents&POLLIN) // fd套接字就绪了
            {
                // 1.是listen套接字就绪了
                if (fd == listensock_.Fd()) // 如果是listen套接字就绪了!!!
                {
                    Accept();
                }
                // 2.是通信的套接字就绪了,fd不是listen套接字
                else // 读事件
                {
                    Receiver(fd,n);
                }
            }
        }
    }
    
 
    void Start()
    {
        int listensock = listensock_.Fd();
        event_fds[0].fd=listensock;//把listen放到首个数组下标里面
        event_fds[0].events=POLLIN;//只关心读事件
        //revent可以不设置
        int timeout=3000;//3s
        for (;;)
        {
            int n = poll(event_fds,fd_num_max,timeout);
 
            switch (n)
            {
            case 0:
                std::cout << "time out....." << std::endl;
                break;
            case -1:
                std::cout << "poll error" << std::endl;
                break;
            default:
                // 有事件就绪
                std::cout << "get a new link" << std::endl;
                HandlerEvent(); // 处理事件
                break;
            }
        }
    }
 
private:
    uint16_t port_;          // 绑定的端口号
    Sock listensock_;        // 专门用来listen的
    std::string ip_;         // ip地址
    struct pollfd event_fds[fd_num_max];
};

Socket.hpp

#pragma once  
  
#include <iostream>  
#include <string>  
#include <unistd.h>  
#include <cstring>  
#include <sys/types.h>  
#include <sys/stat.h>  
#include <sys/socket.h>  
#include <arpa/inet.h>  
#include <netinet/in.h>  
 
  
// 定义一些错误代码  
enum  
{  
    SocketErr = 2,    // 套接字创建错误  
    BindErr,          // 绑定错误  
    ListenErr,        // 监听错误  
};  
  
// 监听队列的长度  
const int backlog = 10;  
  
class Sock  //服务器专门使用
{  
public:  
    Sock() : sockfd_(-1) // 初始化时,将sockfd_设为-1,表示未初始化的套接字  
    {  
    }  
    ~Sock()  
    {  
        // 析构函数中可以关闭套接字,但这里选择不在析构函数中关闭,因为有时需要手动管理资源  
    }  
  
    // 创建套接字  
    void Socket()  
    {  
        sockfd_ = socket(AF_INET, SOCK_STREAM, 0);  
        if (sockfd_ < 0)  
        {  
            printf("socket error, %s: %d", strerror(errno), errno); //错误  
            exit(SocketErr); // 发生错误时退出程序  
        } 
        int opt=1;
        setsockopt(sockfd_,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); //服务器主动关闭后快速重启
    }  
  
    // 将套接字绑定到指定的端口上  
    void Bind(uint16_t port)  
    {  
        //让服务器绑定IP地址与端口号
        struct sockaddr_in local;  
        memset(&local, 0, sizeof(local));//清零  
        local.sin_family = AF_INET;  // 网络
        local.sin_port = htons(port);  // 我设置为默认绑定任意可用IP地址
        local.sin_addr.s_addr = INADDR_ANY; // 监听所有可用的网络接口  
  
        if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)  //让自己绑定别人
        {  
            printf("bind error, %s: %d", strerror(errno), errno);  
            exit(BindErr);  
        }  
    }  
  
    // 监听端口上的连接请求  
    void Listen()  
    {  
        if (listen(sockfd_, backlog) < 0)  
        {  
            printf("listen error, %s: %d", strerror(errno), errno);  
            exit(ListenErr);  
        }  
    }  
  
    // 接受一个连接请求  
    int Accept(std::string *clientip, uint16_t *clientport)  
    {  
        struct sockaddr_in peer;  
        socklen_t len = sizeof(peer);  
        int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);  
        
        if(newfd < 0)  
        {  
            printf("accept error, %s: %d", strerror(errno), errno);  
            return -1;  
        }  
        
        char ipstr[64];  
        inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));  
        *clientip = ipstr;  
        *clientport = ntohs(peer.sin_port);  
  
        return newfd; // 返回新的套接字文件描述符  
    }  
  
    // 连接到指定的IP和端口——客户端才会用的  
    bool Connect(const std::string &ip, const uint16_t &port)  
    {  
        struct sockaddr_in peer;//服务器的信息  
        memset(&peer, 0, sizeof(peer));  
        peer.sin_family = AF_INET;  
        peer.sin_port = htons(port);
 
        inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));  
  
        int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));  
        if(n == -1)   
        {  
            std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;  
            return false;  
        }  
        return true;  
    }  
  
    // 关闭套接字  
    void Close()  
    {  
        close(sockfd_);  
    }  
  
    // 获取套接字的文件描述符  
    int Fd()  
    {  
        return sockfd_;  
    }  
  
private:  
    int sockfd_; // 套接字文件描述符  
};

main.cc

#include"PollServer.hpp"
#include<memory>
 
int main()
{
    std::unique_ptr<PollServer> svr(new PollServer());
    svr->Init();
    svr->Start();
}

makefile

poll_server:main.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -rf poll_server

三.epoll函数

我们得知道epoll是e+poll的意思,很明显epoll就是poll的升级版本!!!

        epoll 是对 select 和 poll 的改进,解决了“性能开销大”和“文件描述符数量少”这两个缺点,是性能最高的多路复用实现方式,能支持的并发量也是最大。

 epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。

 另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

一,epoll的三个系统调用接口

epoll的使用流程如下

  1. 创建 epoll 实例:通过 epoll_create 创建一个 epoll 文件描述符。
  2. 添加文件描述符到 epoll 实例:使用 epoll_ctl 将需要监视的文件描述符(如套接字)添加到 epoll 实例中,并指定关心的事件类型(如可读、可写等)
  3. 等待事件发生:通过 epoll_wait 或 epoll_pwait 等待文件描述符上发生指定的事件。这些函数会阻塞调用线程,直到有文件描述符上的事件发生,或者超时。

  4. 处理事件:根据 epoll_wait 或 epoll_pwait 返回的就绪事件列表,处理相应的文件描述符上的事件。

接下来我们就要好好认识这些接口

1.1.epoll_create函数

epoll_create函数用于创建epoll文件描述符,该文件描述符用于后续的epoll操作。

参数:

  • size:目前内核还没有实际使用,只要大于0就行

返回值:

  • 返回epoll文件描述符
1.1.1.epoll_create函数干了什么
  1. epoll需要使用一个额外的文件描述符(epoll文件描述符),来唯一标识内核中的这个事件表。 这个文件描述符使用如下epoll_create函数来创建;
  2. epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。
  3. 调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点(epoll_create创建的文件描述符),在内核cache里建了个 红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件.(概括就是:调用epoll_create方法时,内核会跟着创建一个eventpoll对象)

 eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。

struct eventpoll
{
    spin_lock_t lock;            //对本数据结构的访问
    struct mutex mtx;            //防止使用时被删除
    wait_queue_head_t wq;        //sys_epoll_wait() 使用的等待队列
    wait_queue_head_t poll_wait; //file->poll()使用的等待队列
    struct list_head rdllist;    //事件满足条件的链表
    struct rb_root rbr;          //用于管理所有fd的红黑树
    struct epitem *ovflist;      //将事件到达的fd进行链接起来发送至用户空间
}
 

在 Linux 内核中,struct eventpoll 是一个关键的数据结构,用于实现 epoll 机制。这个结构体管理了与 epoll 相关的所有资源和状态。

下面是对 struct eventpoll 结构体的详细解释:

  1. spin_lock_t lock;:这是一个自旋锁,用于保护对 eventpoll 结构体的并发访问。自旋锁在持有锁的线程(或进程)释放锁之前,会忙等待(即持续检查锁的状态),而不是像互斥锁(mutex)那样将线程阻塞。这使得自旋锁在预期锁持有时间非常短的情况下非常高效。
  2. struct mutex mtx;这是一个互斥锁,用于防止在 eventpoll 结构体被使用时被删除。与自旋锁不同,互斥锁在无法立即获取锁时会将线程阻塞,直到锁被释放。这在锁持有时间较长或等待锁释放的线程可能会进行较长时间的睡眠时更为合适。
  3. wait_queue_head_t wq;:这是一个等待队列的头节点,用于 sys_epoll_wait() 系统调用。当没有事件就绪时,调用 epoll_wait 的进程会被挂起在这个等待队列上,直到有事件发生或超时。
  4. wait_queue_head_t poll_wait;这是另一个等待队列的头节点,通常用于支持传统的 poll() 或 select() 系统调用。如果文件描述符(如套接字)同时被添加到 epoll 实例和传统的轮询机制中,这个等待队列就会被用到。
  5. struct list_head rdllist;这是一个链表头,用于存储已经准备好进行 I/O 操作(即事件已就绪)的文件描述符。这个链表在 epoll_wait 调用返回给用户空间之前被填充。
  6. struct rb_root rbr;这是红黑树的根节点,用于高效地存储和管理所有添加到 epoll 实例中的文件描述符。红黑树是一种自平衡的二叉搜索树,它能够在对数时间内完成插入、删除和查找操作。
  7. struct epitem *ovflist;:这个成员看起来是用于某种溢出处理的链表头。在某些情况下,如果 epoll 事件表满了(尽管实际上这种情况很少发生,因为 epoll 支持的文件描述符数量通常很大),可能需要一种机制来暂存额外的事件。然而,标准的 Linux epoll 实现中可能并不直接使用这个成员,或者它可能用于特定的内核版本或定制的内核模块中。

请注意,随着 Linux 内核的发展,struct eventpoll 结构体的具体实现和成员可能会发生变化

  • struct list_head rdllist; 和 struct rb_root rbr; 确实是 epoll 机制中常提及的关键组成部分,它们分别对应于 epoll 中的“就绪链表”和“红黑树”。

在epoll机制中,“就绪链表”和“红黑树”各自扮演着关键角色,它们共同协作以实现高效的事件通知和文件描述符管理。

        就绪链表是epoll中的一个关键数据结构,它主要用于存储那些已经准备好进行I/O操作(即事件已就绪)的文件描述符。当epoll监控的文件描述符(红黑树上面的)上有I/O事件发生时,相应的epoll_event会被添加到这个链表中。epoll_wait函数在调用时,会检查这个就绪链表,如果有事件存在,则将这些事件复制到用户空间提供的数组中,并返回事件的数量。就绪链表的使用大大提高了事件通知的效率,因为它避免了不必要的文件描述符扫描。

        红黑树是一种自平衡的二叉搜索树,它在epoll中用于高效地存储和管理所有添加到epoll实例中的文件描述符。每个文件描述符在红黑树中都有一个对应的节点,这些节点按照文件描述符的值进行排序。红黑树提供了快速的查找、插入和删除操作,这些操作的时间复杂度都是对数的,即O(log n),其中n是树中节点的数量。这使得epoll能够在处理大量文件描述符时保持较高的性能。

在epoll中,红黑树的主要作用包括:

  • 快速查找:当需要检查某个文件描述符是否已经被添加到epoll实例中时,可以通过红黑树快速定位到该文件描述符对应的节点。
  • 高效插入和删除:当向epoll实例中添加或删除文件描述符时,红黑树能够确保这些操作在对数时间内完成,从而保持较高的性能。
  • 有序管理:红黑树中的节点按照文件描述符的值进行排序,这有助于实现有序的文件描述符管理,虽然epoll本身并不直接依赖于文件描述符的顺序,但在某些情况下,有序性可能有助于优化性能或简化处理逻辑。

总结
 

在epoll机制中,“就绪链表”和“红黑树”是两个相辅相成的数据结构。

        就绪链表用于存储已经准备好进行I/O操作的文件描述符,以便epoll_wait函数能够快速返回这些事件;
        而红黑树则用于高效地存储和管理所有添加到epoll实例中的文件描述符,以确保查找、插入和删除操作的高效性。这两个数据结构的结合使得epoll能够在处理大量并发连接时保持较高的性能。

当调用 epoll_create 时,内核会执行一系列操作来创建一个新的 epoll 实例,并为其分配必要的资源。以下是内核中发生的主要步骤:

  1. 分配 eventpoll 对象内核首先会分配一个 struct eventpoll 类型的对象。这个对象将用于存储与 epoll 实例相关的所有信息,包括锁、等待队列、就绪事件列表、红黑树等。
  2. 初始化数据结构:对 struct eventpoll 对象进行初始化,包括设置锁、等待队列、就绪事件列表(rdllist)和红黑树(rbr)等成员变量的初始状态
  3. 分配文件描述符内核会分配一个未使用的文件描述符(fd),并将其与新建的 struct eventpoll 对象关联起来。这个文件描述符将作为用户空间与内核中 epoll 实例通信的接口。
  4. 创建文件对象:创建一个 struct file 类型的对象,并将其与 struct eventpoll 对象关联。这个 struct file 对象将包含对 eventpoll 对象的引用,以及一系列文件操作函数(如 file_operations),这些函数定义了针对 epoll 文件描述符的各种操作(如读、写、控制等)

  5. 注册文件操作:将 eventpoll_fops(一个包含 epoll 相关文件操作函数的结构体)设置为 struct file 对象的 f_op 成员。这样,当用户空间通过文件描述符对 epoll 实例执行操作时,内核就会调用这些预定义的函数来处理。
  6. 将文件描述符添加到进程的文件描述符表:将新分配的文件描述符添加到当前进程的文件描述符表中,以便用户空间可以通过标准的文件描述符操作(如 read、write、close 等)来访问 epoll 实例。
  7. 返回文件描述符:最后,epoll_create 系统调用将新分配的文件描述符返回给用户空间。用户空间程序可以使用这个文件描述符来调用其他 epoll 相关的系统调用(如 epoll_ctl、epoll_wait 等),以添加要监视的文件描述符、等待事件发生以及处理就绪事件。

总结来说,当调用 epoll_create 时,内核会创建一个新的 epoll 实例,并为其分配和初始化必要的资源。这个实例通过一个特殊的文件描述符与用户空间进行交互,允许用户空间程序高效地监视和处理多个文件描述符上的 I/O 事件。

我们只需要知道这个epoll_create会返回一个epoll文件描述符即可。 这个文件描述符也会占用一个fd值,在linux下如果查看/proc/进程id/fd/,能够看到这个fd,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

1.2.  epoll_ctl函数

 epoll_ctl 函数是 Linux 下 epoll 接口的一个重要组成部分,它用于向 epoll 实例注册、修改或删除文件描述符及其关联的事件。

参数解释

        int epfd:这是由 epoll_create 函数返回的文件描述符,用于标识一个 epoll 实例。通过这个文件描述符,用户空间程序可以与内核中的 epoll 实例进行通信。
        int op:这个参数指定了要执行的操作类型,它是一个宏,决定了 epoll_ctl 函数的具体行为。常见的操作类型包括:

  1. EPOLL_CTL_ADD:向 epoll 实例注册一个新的文件描述符及其事件。
  2. EPOLL_CTL_MOD:修改已经注册到 epoll 实例中的文件描述符的事件。
  3. EPOLL_CTL_DEL:从 epoll 实例中删除一个文件描述符及其事件。
  • int fd:这是要操作的目标文件描述符,即用户希望注册、修改或删除的文件描述符。这个文件描述符可以是一个已打开的套接字、管道等。
  • struct epoll_event *event:这是一个指向 struct epoll_event 结构体的指针,用于指定要注册或修改的事件信息。这个结构体包含了事件的类型(如可读、可写、错误等)和与该事件相关联的数据。如果操作是删除(EPOLL_CTL_DEL),则这个参数可以为 NULL,因为删除操作不需要指定事件信息。

结构体 epoll_event

#include <sys/epoll.h>
 
// 定义epoll_data_t为union类型  
typedef union epoll_data {    
    void *ptr;  // 可以指向任何类型的数据  
    int fd;     // 套接字文件描述符  
    uint32_t u32;  // 32位无符号整数  
    uint64_t u64;  // 64位无符号整数  
} epoll_data_t;  
  
// 定义epoll_event结构体  
struct epoll_event {  
  uint32_t events; // epoll事件,参考事件列表(如EPOLLIN, EPOLLOUT等)  
  epoll_data_t data; // 关联的数据,可以是文件描述符、指针或其他  
};

struct epoll_event 结构体通常包含以下成员:

  1. events:这是一个位掩码,用于指定事件的类型。常见的类型包括 EPOLLIN(可读事件)、EPOLLOUT(可写事件)、EPOLLERR(错误事件)等。多个事件类型可以通过位或操作符(|)组合在一起。
  2. data:这是一个联合体,可以包含不同类型的数据。在实际使用中,它通常用于存储文件描述符或与事件相关联的用户定义数据。当事件被触发时,这些信息会被原样返回给用户空间程序。
  3. epoll事件——events成员

    头文件:<sys/epoll.h>
     
    enum EPOLL_EVENTS
    {
        EPOLLIN = 0x001, //读事件
        EPOLLPRI = 0x002,
        EPOLLOUT = 0x004, //写事件
        EPOLLRDNORM = 0x040,
        EPOLLRDBAND = 0x080,
        EPOLLWRNORM = 0x100,
        EPOLLWRBAND = 0x200,
        EPOLLMSG = 0x400,
        EPOLLERR = 0x008, //出错事件
        EPOLLHUP = 0x010, //出错事件
        EPOLLRDHUP = 0x2000,
        EPOLLEXCLUSIVE = 1u << 28,
        EPOLLWAKEUP = 1u << 29,
        EPOLLONESHOT = 1u << 30,
        EPOLLET = 1u << 31 //边缘触发
      };

返回值

  1. 如果 epoll_ctl 函数执行成功,则返回 0。
  2. 如果执行失败,则返回 -1,并设置 errno 以指示错误原因

1.2.1.epoll_ctl函数函数干了什么
  1.  epoll_ctl函数用于增加,删除,修改epoll事件,epoll事件会存储于内核epoll结构体红黑树中。
  2. 这个也是epoll的事件注册函数,epoll_ctl向 epoll对象中添加、修改或者删除感兴趣的事件,返回0表示成功,否则返回–1,此时需要根据errno错误码判断错误类型。
  3. 它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

        epoll_ctl 函数是 epoll 接口中用于增加、删除或修改 epoll 监控的事件的系统调用。当你对一个 epoll 实例执行 epoll_ctl 操作时,内核会根据操作类型(增加、删除或修改)来更新内部的数据结构,特别是红黑树(RB-tree)和就绪事件列表(rdllist)。

            在 epoll 的上下文中,红黑树主要用于高效地存储和查找所有添加到 epoll 实例中的文件描述符(fd)及其对应的事件。每个文件描述符在红黑树中都有一个对应的节点(通常是 struct epitem 类型的结构体),这些节点按照文件描述符的值进行排序,以便于快速查找。

当你通过 epoll_ctl 向 epoll 实例添加一个新的文件描述符和事件时,内核会做以下几件事:

  1. 分配并初始化 epitem 结构体:为每个新添加的文件描述符分配一个 epitem 结构体,并初始化其成员变量,包括指向文件描述符的指针、事件类型、回调函数等。
  2. 将 epitem 添加到红黑树中:根据文件描述符的值,将新的 epitem 结构体插入到红黑树中。这保证了文件描述符的快速查找和排序。

  当你通过 epoll_ctl 删除一个事件时,内核会从红黑树中找到对应的 epitem 结构体,并将其从树中删除。同时,如果该事件已经在就绪事件列表中,也需要从列表中删除它。

    修改事件通常意味着更改事件的某些属性(如事件类型),这可能需要从红黑树中找到对应的 epitem 结构体,并更新其成员变量。但是,修改操作通常不会改变文件描述符在红黑树中的位置。

         就绪事件列表(rdllist)用于存储那些已经满足条件(即事件已经发生)的文件描述符。当 epoll_wait 被调用时,内核会遍历红黑树中的 epitem 结构体,检查是否有事件已经就绪,并将它们从红黑树中移动到就绪事件列表中。然后,epoll_wait 会返回这些就绪事件的列表给用户空间。

          需要注意的是,虽然红黑树是 epoll 实现中的一个关键数据结构,但 epoll 的高效性并不仅仅依赖于红黑树。epoll 还使用了其他技术,如内存映射(memory-mapped)的就绪事件列表、边缘触发(edge-triggered)和水平触发(level-triggered)事件模式等,来优化事件通知和减少系统调用的开销。

 1.3.epoll_wait函数

注意:在调用 epoll_wait 之前,必须先通过 epoll_ctl 向 epoll 实例注册文件描述符及其事件。

 等待事件的产生,类似于select()调用。

        参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。

        第1个参数 epfd是 epoll的描述符。也就是epoll_creat返回的文件描述符
        第2个参数 events则是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。

        第3个参数指定了 events 数组的最大长度,即 epoll_wait 可以告知调用者的最大事件数量。如果同时有多个事件发生时,epoll_wait 将尽可能多地填充 events 数组,但不会超过 maxevents 指定的数量。通常 maxevents参数与预分配的events数组的大小是相等的。
        第4个参数 timeout表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待。

返回值:

  • 已经就绪的fd的个数,如返回0表示已超时。如果返回–1,则表示出现错误,需要检查 errno错误码判断错误类型。
 1.3.1.epoll_wait到底干了什么

epoll_wait 在内核中主要执行了以下操作,以实现高效的事件通知机制:

1. 等待事件发生

  • 当进程调用 epoll_wait 时,它会被阻塞(除非设置了非阻塞模式或超时时间),直到有注册的文件描述符上发生了感兴趣的事件。
  • epoll_wait 依赖于内核中维护的数据结构(主要是红黑树和就绪链表)来高效地管理这些文件描述符和它们的事件。

2. 检查就绪链表

        在内核中,epoll 使用了一个就绪链表(就是我们上面提到的struct eventpoll里面的struct list_head rdllist,通常是一个双向链表)来存储那些已经准备好(即发生了感兴趣的事件)的文件描述符。
        当 epoll_wait 被调用时,它会检查这个就绪链表。如果链表不为空,说明有事件已经发生。

3. 复制事件到用户空间

  • 如果就绪链表中有事件,epoll_wait 会将这些事件从内核空间复制到用户空间提供的 epoll_event 结构体数组中。
  • 这个过程会尽可能多地复制事件,但不超过用户指定的 maxevents 数量。

4. 更新内核状态

  • 在复制事件后,epoll_wait 会更新内核中的数据结构,以反映哪些事件已经被处理。
  • 对于边缘触发(ET)模式,如果事件已经被处理并且没有新的数据到来,那么相应的文件描述符可能会被从就绪链表中移除。

5. 唤醒进程

  • 一旦有事件被复制到用户空间,epoll_wait 会唤醒调用它的进程,并返回发生的事件数量。
  • 如果在调用 epoll_wait 时设置了超时时间,并且在这段时间内没有事件发生,那么 epoll_wait 也会超时返回。

6. 高效性实现

  • epoll 之所以高效,主要是因为它避免了像 select 和 poll 那样的重复扫描和文件描述符限制。
  • 它使用红黑树来存储和快速查找文件描述符,使用就绪链表来高效地管理就绪事件。

    这些数据结构使得 epoll 能够在 O(1) 的时间复杂度内完成大部分操作,从而支持大规模的文件描述符和高效的事件通知。

  • 综上所述,epoll_wait 在内核中主要负责等待事件发生、检查就绪链表、复制事件到用户空间、更新内核状态以及唤醒进程等操作,从而实现了高效的事件通知机制。

1.4.epoll的工作过程中内核在干什么

        当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用密切相关:

struct eventpoll {
  ...
  /*红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,
  也就是这个epoll监控的事件*/
  struct rb_root rbr;
  /*双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件*/
  struct list_head rdllist;
  ...
};

          我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个rdllist双向链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个rdllist双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

         所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。

        在epoll中对于每一个事件都会建立一个epitem结构体,如下所示:

struct epitem {
  ...
  //红黑树节点
  struct rb_node rbn;
  //双向链表节点
  struct list_head rdllink;
  //事件句柄等信息
  struct epoll_filefd ffd;
  //指向其所属的eventepoll对象
  struct eventpoll *ep;
  //期待的事件类型
  struct epoll_event event;
  ...
}; // 这里包含每一个事件对应着的信息。

【总结】:

  • 一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。
  • 执行epoll_create()时,创建了红黑树和就绪链表;
  • 执行epoll_ctl()时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据;
  • 执行epoll_wait()时立刻返回准备就绪链表里的数据即可。

四,epoll高效的原理 

4.1.预备知识的储备

  • 第一步:从硬件的角度看计算机怎样接收网络数据

了解epoll本质的第一步,要从硬件的角度看计算机怎样接收网络数据。

在①阶段,网卡收到网线传来的数据;经过②阶段的硬件电路的传输;最终将数据写入到内存中的某个地址上(③阶段)。这个过程涉及到DMA传输、IO通路选择等硬件有关的知识,但我们只需知道:网卡会把接收到的数据写入内存。

通过硬件传输,网卡接收的数据存放到内存中。操作系统就可以去读取它们。

  • 问题二:操作系统是怎么知道网卡是有数据了

了解epoll本质的第二步,要从CPU的角度来看数据接收。要理解这个问题,要先了解一个概念——中断。

        计算机执行程序时,会有优先级的需求。比如,当计算机收到断电信号时(电容可以保存少许电量,供CPU运行很短的一小段时间),它应立即去保存数据,保存数据的程序具有较高的优先级。

 一般而言,由硬件产生的信号需要cpu立马做出回应(不然数据可能就丢失),所以它的优先级很高。cpu理应中断掉正在执行的程序,去做出响应;当cpu完成对硬件的响应后,再重新执行用户程序。中断的过程如下图,和函数调用差不多。只不过函数调用是事先定好位置,而中断的位置由“信号”决定。

 以键盘为例,当用户按下键盘某个按键时,键盘会给cpu的中断引脚发出一个高电平。cpu能够捕获这个信号,然后执行键盘中断程序。

        现在可以回答本节提出的问题了:当网卡把数据写入到内存后,网卡向cpu发出一个中断信号,操作系统便能得知有新数据到来,再通过网卡中断程序去处理数据。

  • 问题三——操作系统怎么知道红黑树上的哪些节点就绪了

操作系统怎么知道红黑树上的哪些节点就绪了呢?难道操作系统也要遍历整棵红黑树,检测每个节点的就绪情况?操作系统其实并不会这样做,如果这样做的话,那epoll还谈论什么高效呢?你epoll不也得遍历所有的fd吗?和我poll遍历有什么区别呢?红黑树是查找的效率高,不是遍历的效率高,如果遍历所有的节点,红黑树其实和链表遍历在效率上是差不多的,一点都不高效!
那操作系统是怎么知道红黑树上的哪个节点就绪了呢?其实是通过底层的回调机制来实现的,这也是epoll接口公认非常高效的重要的一个实现环节!

  当数据到达网卡时,我们知道数据会经过硬件中断,CPU执行中断向量表等步骤来让数据到达内存中的操作系统内部,而所有添加到epoll模型中的事件都会与网卡建立回调关系,当事件就绪时调用这个回调方法,将就绪的事件链接到就绪队列当中,这个回调方法在内核中叫做ep_poll_callback。

 2.2.内核接受网络数据的全过程

        当数据到达网络设备网卡时,会以硬件中断作为发起点,将中断信号通过中断设备发送到CPU的针脚,接下来CPU会查讯中断向量表,找到中断序号对应的驱动回调方法,在回调方法内部会将数据从硬件设备网卡拷贝到软件OS里。数据包在OS中会向上贯穿协议栈,到达传输层时,数据会被拷贝到struct file的内核缓冲区中,同时OS会执行一个叫做private_data的回调函数指针字段,在该回调函数内部会通过修改红黑树节点中的就绪队列指针的内容,将该节点链入到就绪队列,内核告知用户哪些fd就绪时,只需要将就绪队列中的节点内容拷贝到epoll_wait的输出型参数events即可,这就是epoll模型的底层回调机制!

 2.3.一些小问题

  • 1.为什么说epoll模型是高效的呢?

   因为大部分的工作操作系统都帮我们做了,比如添加节点到红黑树,我们只需要调用epoll_ctrl即可,返回就绪的fd,直接相当于返回就绪队列中的节点即可,上层直接就可以拿到就绪的fd,检测是否就绪的工作也不用遍历,而是当底层数据就绪时,会有回调机制自动将红黑树的节点链入到就绪队列中,操作系统也无须遍历红黑树进行就绪检测,上层在拿到就绪的fd后,可以确定范围的遍历输出型参数struct epoll_event数组,而不是盲目的遍历整个数组的所有元素。

2.为什么选用红黑树作为epoll模型的底层数据结构?
        因为红黑树的搜索效率非常的高,可以达到logN的时间复杂度,所以无论是epoll_ctl的插入,删除还是修改,这些工作的首要前提是先找到目标节点或目标位置,找到之后,再进行具体的操作,而找到这一步红黑树的效率就非常的高。

 有人可能会说红黑树需要旋转调整平衡啊,虽然在逻辑上我们感觉红黑树的旋转调平衡很费时间,可能会造成红黑树的效率降低,但其实并不是这样的,所谓的旋转调平衡只是在逻辑上复杂而已,在实际操作上仅仅只是修改节点内的指针而已,对红黑树的效率影响并不大。

        同时红黑树对于平衡的要求并没有AVL高,所以在旋转调平衡的次数上,红黑树要比AVL树少很多,在整体效率上是要比AVL树高的,这也是使用红黑树,不使用AVL树的原因。

  • 3.epoll_wait有哪些细节?
  1. epoll_wait会将所有就绪的fd,依次按照顺序放到输出型参数events中,用户在遍历数组处理就绪的事件时,无须遍历多余的任何一个fd,只需要遍历从0到epoll_wait的返回值个fd即可。
  2. 如果就绪队列的节点数量很多,epoll_wait的输出型参数数组一次拿不完也不用担心,因为队列是先进先出,下一次在调用epoll_wait时,再拿就绪的事件也可以。

  3. select poll在使用的时候,都需要程序员自己维护一个第三方数组来存储用户关心的fd及事件,但epoll不需要,因为内核为epoll在底层维护了一棵红黑树,用户直接通过epoll_ctl来对红黑树的节点进行增删改即可,无须自己在应用层维护第三方的数组。

五,简陋版本epoll版本TCP服务器

5.1.准备工作

 Socket.hpp

#pragma once  
  
#include <iostream>  
#include <string>  
#include <unistd.h>  
#include <cstring>  
#include <sys/types.h>  
#include <sys/stat.h>  
#include <sys/socket.h>  
#include <arpa/inet.h>  
#include <netinet/in.h>  
 
  
// 定义一些错误代码  
enum  
{  
    SocketErr = 2,    // 套接字创建错误  
    BindErr,          // 绑定错误  
    ListenErr,        // 监听错误  
};  
  
// 监听队列的长度  
const int backlog = 10;  
  
class Sock  //服务器专门使用
{  
public:  
    Sock() : sockfd_(-1) // 初始化时,将sockfd_设为-1,表示未初始化的套接字  
    {  
    }  
    ~Sock()  
    {  
        // 析构函数中可以关闭套接字,但这里选择不在析构函数中关闭,因为有时需要手动管理资源  
    }  
  
    // 创建套接字  
    void Socket()  
    {  
        sockfd_ = socket(AF_INET, SOCK_STREAM, 0);  
        if (sockfd_ < 0)  
        {  
            printf("socket error, %s: %d", strerror(errno), errno); //错误  
            exit(SocketErr); // 发生错误时退出程序  
        } 
        int opt=1;
        setsockopt(sockfd_,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); //服务器主动关闭后快速重启
    }  
  
    // 将套接字绑定到指定的端口上  
    void Bind(uint16_t port)  
    {  
        //让服务器绑定IP地址与端口号
        struct sockaddr_in local;  
        memset(&local, 0, sizeof(local));//清零  
        local.sin_family = AF_INET;  // 网络
        local.sin_port = htons(port);  // 我设置为默认绑定任意可用IP地址
        local.sin_addr.s_addr = INADDR_ANY; // 监听所有可用的网络接口  
  
        if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)  //让自己绑定别人
        {  
            printf("bind error, %s: %d", strerror(errno), errno);  
            exit(BindErr);  
        }  
    }  
  
    // 监听端口上的连接请求  
    void Listen()  
    {  
        if (listen(sockfd_, backlog) < 0)  
        {  
            printf("listen error, %s: %d", strerror(errno), errno);  
            exit(ListenErr);  
        }  
    }  
  
    // 接受一个连接请求  
    int Accept(std::string *clientip, uint16_t *clientport)  
    {  
        struct sockaddr_in peer;  
        socklen_t len = sizeof(peer);  
        int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);  
        
        if(newfd < 0)  
        {  
            printf("accept error, %s: %d", strerror(errno), errno);  
            return -1;  
        }  
        
        char ipstr[64];  
        inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));  
        *clientip = ipstr;  
        *clientport = ntohs(peer.sin_port);  
  
        return newfd; // 返回新的套接字文件描述符  
    }  
  
    // 连接到指定的IP和端口——客户端才会用的  
    bool Connect(const std::string &ip, const uint16_t &port)  
    {  
        struct sockaddr_in peer;//服务器的信息  
        memset(&peer, 0, sizeof(peer));  
        peer.sin_family = AF_INET;  
        peer.sin_port = htons(port);
 
        inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));  
  
        int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));  
        if(n == -1)   
        {  
            std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;  
            return false;  
        }  
        return true;  
    }  
  
    // 关闭套接字  
    void Close()  
    {  
        close(sockfd_);  
    }  
  
    // 获取套接字的文件描述符  
    int Fd()  
    {  
        return sockfd_;  
    }  
  
private:  
    int sockfd_; // 套接字文件描述符  
};

main.cc

#include"EpollServer.hpp"
#include<memory>
 
int main()
{
    std::unique_ptr<EpollServer> svr(new EpollServer());
    svr->Init();
    svr->Start();
}

makefile

epoll_server:main.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -rf epoll_server

EpollServer.hpp

#pragma once
 
#include<iostream>
#include<sys/epoll.h>
#include"Socket.hpp"
 
const uint16_t default_port = 8877;       // 默认端口号
const std::string default_ip = "0.0.0.0"; // 默认IP
 
class EpollServer
{
public:
    EpollServer(const uint16_t port = default_port, const std::string ip = default_ip)
        : ip_(ip), port_(port)
    {
       
    }
    ~EpollServer()
    {
        listensock_.Close();
    }
 
    void Init()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
    }
 
    
 
    void Start()
    {
    }
 
private:
    uint16_t port_;          // 绑定的端口号
    Sock listensock_;        // 专门用来listen的
    std::string ip_;         // ip地址
};

我们这里先不在EpollServer.hpp直接调用epoll_create,epoll_ctl,eoll_wait等,我们先对epoll的各类接口进行封装,封装到Epoller.hpp里面。

首先,我们需要保证我们的Epoller对象是不能被复制的

nocopy.hpp

#pragma once  
  
class nocopy  
{  
public:  
    // 允许使用默认构造函数(由编译器自动生成)  
    nocopy() = default;   
  
    // 禁用拷贝构造函数,防止通过拷贝来创建类的实例  
    nocopy(const nocopy&) = delete;   
  
    // 禁用赋值运算符,防止类的实例之间通过赋值操作进行内容复制  
    nocopy& operator=(const nocopy&) = delete;   
};

这个是用来防止epoll被拷贝的 

Epoller.hpp

#pragma once
 
#include<iostream>
#include"nocopy.hpp"
 
class Epoller : public nocopy //Eopller是nocpy的子类
{
public:
Epoller()
{
 
}
~Epoller()
{
}
 
private: 
int epfd;
};

我们可以测试一下

main.cc

很明显有错误了!!这样子我们的Epoll对象也就不能被复制啦!

Epoller.hpp

#pragma once
 
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <cerrno>
#include "nocopy.hpp"
 
class Epoller : public nocopy
{
    static const int size = 128;
 
public:
    Epoller()
    {
        _epfd = epoll_create(size);
        if (_epfd == -1)
        {
 
            perror("epoll_creat error");
        }
        else
        {
            printf("epoll_creat successful:%d\n", _epfd);
        }
    }
    ~Epoller()
    {
        if (_epfd > 0)
        {
            close(_epfd);
        }
    }
 
private:
    int _epfd;
};

我们回到我们的EpollServer.hpp

EpollServer.hpp

#pragma once
 
#include <iostream>
#include <sys/epoll.h>
#include <memory>
#include "Socket.hpp"
#include "Epoller.hpp"
 
const uint16_t default_port = 8877;       // 默认端口号
const std::string default_ip = "0.0.0.0"; // 默认IP
 
class EpollServer
{
public:
    EpollServer(const uint16_t port = default_port, const std::string ip = default_ip)
        : ip_(ip), port_(port),listensock_ptr(new Sock()),epoller_ptr(new Epoller())
    {
    }
    ~EpollServer()
    {
        listensock_ptr->Close();
    }
 
    void Init()
    {
        listensock_ptr->Socket();
        listensock_ptr->Bind(port_);
        listensock_ptr->Listen();
    }
 
    void Start()
    {
        for(;;)
        {
            
        }
    }
 
private:
    uint16_t port_;                    // 绑定的端口号
    std::string ip_; // ip地址
    std::unique_ptr<Sock> listensock_ptr; // 专门用来listen的
    std::unique_ptr<Epoller> epoller_ptr;
};

我们编译运行一下,

5.2.EpollServer.hpp

epoll模型可是只负责IO模型里面的等待部分。

为了美观一点,我们接着封装我们的Epoller,首先我们把我们的epoll_wait函数进行封装一下

Epoller.hpp的EpollWait函数

。。。
class Epoller : public nocopy
{
   。。。
    int EpollerWait(struct epoll_event revents[],int num)
    {
        int n=epoll_wait(_epfd,revents,num,3000);
        return n;
    }
。。。
private:
    int _epfd;
};

 EpollerServer.hpp

class EpollServer
{
    const static int num =64;
。。。
    void Start()
    {
        struct epoll_event revs[num];
        for(;;)
        {
            int n=epoller_ptr->EpollerWait(revs,num);
            if(n>0)//有事件就绪
            {
 
            }
            else if(n==0)//超时了
            {
                std::cout<<"time out..."<<std::endl;
            }
            else//出错了
            {
                std::cerr<<"EpollWait error"<<std::endl;
            }
        }
    }
 
private:
    uint16_t port_;                    // 绑定的端口号
    std::string ip_; // ip地址
    std::unique_ptr<Sock> listensock_ptr; // 专门用来listen的
    std::unique_ptr<Epoller> epoller_ptr;
};

我们接着写Epoller.hpp的epoll_ctl函数的封装

class Epoller : public nocopy
{
    static const int size = 128;
 
 
    int EpollUpDate(int oper,int sock,uint16_t event)
    {
        int n;
        if(oper==EPOLL_CTL_DEL)//将该事件从epoll红黑树里面删除
        {
            n=epoll_ctl(_epfd,oper,sock,nullptr);
             if(n!=0)
            {
                perror("delete epoll_ctl error");
            }
        }
        else{//添加和修改,即EPOLL_CTL_MOD和EPOLL_CTL_ADD
            struct epoll_event ev;
            ev.events=event;
            ev.data.fd=sock;
 
            n=epoll_ctl(_epfd,oper,sock,&ev);
            if(n!=0)
            {
                perror("delete epoll_ctl error");
            }
        }
        return n;
    }
 
private:
    int _epfd;
    int _timeout{3000};
};

接下来我们就可以去写我们的代码了

EpollServer.hpp的Start函数

 void Start()
    {
        //将listen套接字添加到epoll中->将listensock和他关心的事件,添加到内核的epoll模型中的红黑树里面
        //将listensock添加到红黑树
        epoller_ptr->EpollUpDate(EPOLL_CTL_ADD,listensock_ptr->Fd(),EPOLLIN);
 
        struct epoll_event revs[num];
        for(;;)
        {
            int n=epoller_ptr->EpollerWait(revs,num);
            if(n>0)//有事件就绪
            {
               std::cout<<"event happened,fd :"<<revs[0].data.fd<<std::endl;
            }
            else if(n==0)//超时了
            {
                std::cout<<"time out..."<<std::endl;
            }
            else//出错了
            {
                std::cerr<<"EpollWait error"<<std::endl;
            }
        }
    }

我们运行一下我们的程序,然后使用telnet工具测试一下

很好!!! 然后我们很快就能写出下面这些代码

EpollerServer.hpp测试版

#pragma once
 
#include <iostream>
#include <sys/epoll.h>
#include <memory>
#include "Socket.hpp"
#include "Epoller.hpp"
 
const uint16_t default_port = 8877;       // 默认端口号
const std::string default_ip = "0.0.0.0"; // 默认IP
 
class EpollServer
{
    const static int num = 64;
 
public:
    EpollServer(const uint16_t port = default_port, const std::string ip = default_ip)
        : ip_(ip), port_(port), listensock_ptr(new Sock()), epoller_ptr(new Epoller())
    {
    }
    ~EpollServer()
    {
        listensock_ptr->Close();
    }
 
    void Init()
    {
        listensock_ptr->Socket();
        listensock_ptr->Bind(port_);
        listensock_ptr->Listen();
    }
 
    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_ptr->Accept(&clientip, &clientport);
        if (sock > 0) // 连接成功
        {
            // 获取连接成功之后,我们应该把这个连接的文件描述符加入到epoll里面,让epoll来关心对应事件
            epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, sock, EPOLLIN);
        }
    }
 
    void Receiver(int fd)
    {
        char in_buff[1024];
        int n = read(fd, in_buff, sizeof(in_buff) - 1);
        if (n > 0)
        {
            in_buff[n] = 0;
            std::cout << "get message: " << in_buff << std::endl;
 
            // 写事件
            std::string buff=in_buff;
            std::string echo_str = "server echo:" + buff;
            write(fd,echo_str.c_str(),echo_str.size());
        }
        else if (n == 0) // 客户端关闭连接
        {
            // 我们要把这个连接从epoll的红黑树里面移除掉
            epoller_ptr->EpollUpDate(EPOLL_CTL_DEL, fd, 0);
            std::cout << "client close connect,fd:" << fd << std::endl;
            close(fd); // 我服务器也要关闭连接的文件描述符
        }
        else // 出现错误
        {
            // 我们要把这个连接从epoll的红黑树里面移除掉
            epoller_ptr->EpollUpDate(EPOLL_CTL_DEL, fd, 0);
            std::cout << "recv reeor,fd:" << fd << std::endl;
            close(fd); // 我服务器也要关闭连接的文件描述符
        }
    }
 
    void HandlerEvent(struct epoll_event revs[], int num) // epoll_wait的返回值n代表有n个事件就绪
    {
        for (int i = 0; i < num; i++)
        {
            int fd = revs[i].data.fd;        // 哪个文件描述符就绪了
            uint32_t event = revs[i].events; // 什么事情就绪了
            if (event & EPOLLIN)             // 是读事件就绪了
            {
                if (fd == listensock_ptr->Fd()) // 获取了新连接
                {
                    Accepter();
                }
                else // 其他fd上的普通读事件就绪
                {
                    Receiver(fd);
                }
            }
            else if (event & EPOLLOUT) // 是写事件就绪了
            {
            }
            else // 其他事件
            {
            }
        }
    }
    void Start()
    {
        // 将listen套接字添加到epoll中->将listensock和他关心的事件,添加到内核的epoll模型中的红黑树里面
        // 将listensock添加到红黑树
        epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, listensock_ptr->Fd(), EPOLLIN);
 
        struct epoll_event revs[num];
        for (;;)
        {
            int n = epoller_ptr->EpollerWait(revs, num); // 返回值代表有n个事件就绪
            if (n > 0)                                   // 有事件就绪
            {
                std::cout << "event happened,fd :" << revs[0].data.fd << std::endl;
                HandlerEvent(revs, n); // 事件就绪的本质就是看他的文件描述符在不在就绪队列里面
            }
            else if (n == 0) // 超时了
            {
                std::cout << "time out..." << std::endl;
            }
            else // 出错了
            {
                std::cerr << "EpollWait error" << std::endl;
            }
        }
    }
 
private:
    uint16_t port_;                       // 绑定的端口号
    std::string ip_;                      // ip地址
    std::unique_ptr<Sock> listensock_ptr; // 专门用来listen的
    std::unique_ptr<Epoller> epoller_ptr;
};

为了 

我们运行一下,来看看

很完美啊!!! 

我们第一阶段的代码就写到这里,更深入的问题,我们留到进阶篇来讲解

六.epoll的两种工作模式——LT和ET

下面来举一个例子帮助大家理解ET和LT模式的区别(送快递的例子)

 新上任的快递员小李要给学24宿舍楼的张三送快递,张三买了很多的快递,估摸着有6个快递,小李到了学24的楼底,然后就给楼上的张三打电话,通知张三下来拿快递,但是张三正在和他的狐朋狗友开黑打游戏呢,于是张三就嘴上答应着我马上下去,但始终就不下去,老实人小李见张三迟迟不下来拿快递,又给张三打电话,让张三下来拿快递,但张三嘴上又说,我马上下去拿快递,真的马上,但过了一会儿张三依旧还是不下来,小李又只能给张三打电话,张三啊,你的快递到了,你赶快下来取快递吧,终于张三和自己的狐朋狗友推完对面的水晶了,下楼来取快递了,但是张三一个人一次只拿走了3个快递,还剩下三个快递,张三也没办法了,张三一个人一次只能拿这么多快递啊,于是张三就拿着他的三个快递上楼了,继续和他的舍友开黑打游戏。结果没一会儿,小李又给张三打电话,说张三啊,你的快递没拿完呢,你买了6样东西,你只拿了3样,还剩3个包裹你没拿呢,张三又嘴上说,好的好的,我马上下去拿,但其实又重复着前面的动作,好一会儿才下楼拿走了剩余的3个包裹,当包裹全部被拿走之后,小李才不会给张三打电话了。

 老油条快递员小王恰巧也要给学24宿舍楼的张三送快递,恰巧的是,张三这次又买了6个快递,所以小王也碰巧要给张三送6个包裹。小王到了张三楼底下,给张三打了一个电话,说 张三啊,我只给你打一次电话,你现在要是不下来取快递,我后面是不会给你打电话的,除非你又买了新的快递,我手上你的快递数量变多的时候,我才会稍微好心的再给你打一个电话,否则其他情况下,我只会打一次,你要是不下来取快递,那我就不管你了,我给其他客户送快递去了。张三一听,这不行啊,我要是现在不下来取快递,这个快递员以后就不给我打电话了,那我下楼找不到快递员,拿不到我的快递怎么办,所以张三就立马下楼取快递去了。张三一次拿不了这么多快递啊,但张三又不能漏下一些快递,因为小王下一次不会再给张三打电话了,所以张三刚到楼上放下手中的三个快递,又立马返回楼下取走剩余的三个快递了。

在上面的这两个例子中,其实小李的工作模式就是水平触发Level Triggered模式,简称LT模式,小王的工作模式就是边缘触发Edge Triggered模式,简称ET模式,也是多路转接接口高效的模式。

  • 水平触发(Level-Triggered, LT)模式

在LT模式下,当epoll检测到某个文件描述符(如socket)上有就绪的事件(如可读事件)时,epoll_wait会立即返回,并通知程序该事件已经就绪。此时,程序可以选择读取部分数据,或者完全不读取数据。如果程序没有读取完所有的数据,那么下一次调用epoll_wait时,只要该文件描述符上仍然有未读取的数据,epoll_wait仍然会返回并通知程序该事件就绪。这意味着,只要文件描述符上有数据可读,并且这些数据还没有被程序完全读取,LT模式下的epoll_wait就会持续通知程序。   这种方式允许程序在需要时逐步处理数据,而不必担心在单次操作中处理完所有数据。然而,这也可能导致epoll_wait的频繁返回,如果程序处理数据的速度跟不上数据到达的速度,可能会导致性能问题。

  • 边缘触发(Edge-Triggered, ET)模式

在ET模式下,当epoll首次检测到某个文件描述符上有数据可读时,epoll_wait会返回并通知程序。与LT模式不同的是,如果程序没有在一次epoll_wait返回后读取完所有的数据,并且后续没有新的数据到达,那么下一次调用epoll_wait时,即使文件描述符上

仍然有未读取的数据,epoll_wait也不会返回通知。只有当有新的数据到达并且触发了新的可读事件时,epoll_wait才会再次返回。

      这种方式要求程序必须在一次epoll_wait返回后,尽可能多地读取数据,直到没有更多数据可读为止这有助于减少epoll_wait的调用次数,从而提高性能。但是,这也增加了编程的复杂性,因为程序需要能够处理可能到达的任意数量的数据,并且在没有新数据到达的情况下不会再次被通知。

总之,LT模式提供了更大的灵活性,允许程序逐步处理数据;而ET模式则要求程序更高效地处理数据,以减少不必要的epoll_wait调用。在选择使用哪种模式时,需要根据具体的应用场景和需求来决定。

6.1.水平触发模式(LT,Level-Triggered)

注意:epoll默认的工作模式就是LT,不需要任何额外设置

   在水平触发模式下,当一个文件描述符上的I/O事件就绪时,epoll会立即通知应用程序,然后应用程序可以对就绪事件进行处理。即,只要文件描述符处于就绪状态,epoll就会持续通知应用程序,直到应用程序处理完所有就绪事件并且再次进入阻塞等待状态。

对于非阻塞I/O,如果一个文件描述符上有可读或可写事件发生,应用程序可以立即进行读或写操作,即使读写操作无法一次完成。如果读或写操作不能立即完成,应用程序可以再次调用epoll等待新的事件通知。

        在水平触发模式下,当epoll监控的文件描述符上的I/O事件(如可读、可写)就绪时,epoll会通知应用程序。与边缘触发(ET, Edge Triggered)模式不同的是,只要事件条件持续存在(即文件描述符仍然处于就绪状态),epoll就会不断地通知应用程序。这意味着,如果应用程序没有一次性读取完所有可读数据,或者没有处理完所有就绪的事件,epoll将在下一次调用epoll_wait时再次通知这些事件。

水平触发(LT)模式适用的情况:

需要持续处理就绪事件在水平触发模式下,只要文件描述符上的事件(如可读、可写)就绪,epoll 就会持续通知应用程序,直到应用程序明确处理了这些事件(或者事件本身不再就绪)。这特别适用于需要处理多个相关事件或一次性处理大量数据的情况。水平触发模式适用于需要不断检查和处理文件描述符上就绪事件的情况。例如,当处理网络数据时,如果一次读取操作不能接收完所有数据包,则应用程序可以在下次事件通知时继续读取。

阻塞和非阻塞I/O操作混合使用:水平触发模式适用于既有阻塞又有非阻塞I/O操作的情况,可以在阻塞操作中循环调用读取或写入操作。注意:水平触发模式本身并不直接“在阻塞操作中循环调用读取或写入操作”。相反,它是关于事件通知的机制。
 

6.2.边缘触发模式(ET,Edge-Triggered)

   在边缘触发模式下,当一个文件描述符上的状态发生变化时(例如从不可读变为可读,或者从不可写变为可写),epoll会通知应用程序。

        与水平触发模式不同的是,边缘触发模式只在状态变化的瞬间通知应用程序,通知仅发送一次。如果应用程序没有及时处理完这个事件,下次等待时将会错过该事件,即使事件仍然处于就绪状态。因此,在边缘触发模式下,应用程序需要确保尽可能完整地处理每个事件,以避免遗漏事件。

  边缘触发模式适用于需要及时响应状态变化的场景,通常可以提供更高的性能,因为它最大程度上减少了不必要的事件通知。

  在使用 ET 模式时,通常需要将文件描述符设置为非阻塞模式。这是因为如果文件描述符是阻塞的,并且程序没有读取完所有数据,那么下一次调用 read 或类似的 I/O 操作时,程序可能会阻塞在那里,等待更多的数据到达,而 epoll_wait 则不会再次被调用,因为它已经因为之前的状态变化而被通知过了。

 将文件描述符设置为非阻塞模式后,如果 read 操作没有读取到任何数据(因为缓冲区为空),它将立即返回一个错误(通常是 EAGAIN 或 EWOULDBLOCK),而不是阻塞在那里。这样,程序就可以继续执行其他任务,或者再次调用 epoll_wait 等待新的事件。

  •  怎么保证ET模式下一次性读完所有数据?

  在ET(边缘触发)模式下处理数据时,你需要确保在一个事件通知中尽可能多地读取数据,直到read调用返回EAGAIN或EWOULDBLOCK,这确实表示当前没有更多数据可读。这是ET模式的一个关键特性,也是与LT(水平触发)模式的主要区别之一。

      在LT模式下,只要文件描述符处于“就绪”状态(例如,有数据可读),epoll_wait就会持续报告事件,无论你是否已经读取了数据。这意味着,如果你没有在一个事件通知中读取所有数据,epoll_wait将在下一次调用时再次报告该事件,直到你读取了所有数据。

  然而,在ET模式下,一旦文件描述符的状态从非就绪变为就绪,并且epoll_wait报告了一个事件,那么即使还有未读取的数据,如果没有新的数据到达使文件描述符再次变为就绪状态,epoll_wait也不会再次报告该事件。因此,你必须在一个事件通知中尽可能多地读取数据,直到没有更多数据可读(即read返回EAGAIN或EWOULDBLOCK)。    

 这通常意味着你需要在一个循环中调用read,直到它返回EAGAIN或EWOULDBLOCK,或者遇到其他错误(如连接被关闭)。这样做可以确保你不会错过任何数据,并且能够在下次调用epoll_wait时及时响应新的事件。

     需要注意的是,EAGAIN和EWOULDBLOCK在大多数UNIX-like系统中是等价的,它们都表示资源暂时不可用(在这种情况下,是没有更多数据可读)。然而,在不同的系统和库实现中,它们可能会有所不同,但在这个上下文中,你可以将它们视为可互换的。

相比于LT模式,关于ET(边缘触发)模式的使用,确实需要关注这几个关键点

  1. 设置EPOLLET标志:在将文件描述符(如socket)添加到epoll实例时,你需要通过epoll_event结构体中的events字段设置EPOLLET标志,以启用边缘触发模式。这告诉epoll,你希望在该文件描述符的状态从非就绪变为就绪时只接收一次事件通知。
  2. 将socket文件描述符设置为非阻塞:由于ET模式要求你能够在一个事件通知中尽可能多地处理数据,直到没有更多数据可读,因此将socket设置为非阻塞模式是非常重要的。这允许你的read调用在没有数据可读时立即返回EAGAIN或EWOULDBLOCK,而不是阻塞等待。
  3. 循环调用read直到返回EAGAIN或EWOULDBLOCK:在接收到一个ET模式的事件通知后,你需要在一个循环中调用read函数,不断尝试从socket中读取数据,直到read返回EAGAIN或EWOULDBLOCK。这表示当前没有更多数据可读,你可以安全地继续等待下一个事件通知。

这里是一个简化的代码示例

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <sys/epoll.h>  
#include <fcntl.h>  
#include <errno.h>  
  
#define MAX_EVENTS 10  
  
// 设置文件描述符为非阻塞  
int set_non_blocking(int fd) {  
    int flags = fcntl(fd, F_GETFL, 0);  
    if (flags == -1) {  
        perror("fcntl: get flags");  
        return -1;  
    }  
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {  
        perror("fcntl: set non-blocking");  
        return -1;  
    }  
    return 0;  
}  
  
int main() {  
    int epoll_fd = epoll_create1(0);  
    if (epoll_fd == -1) {  
        perror("epoll_create1");  
        exit(EXIT_FAILURE);  
    }  
  
    // 假设我们有一个socket fd,这里为了简化,我们使用stdin作为示例  
    // 注意:在实际应用中,stdin通常不设置为非阻塞,因为标准输入的行为可能不是你所期望的  
    // 但为了演示ET模式,我们仍然这样做  
    int fd = 0; // stdin的文件描述符  
    if (set_non_blocking(fd) == -1) {  
        exit(EXIT_FAILURE);  
    }  
  
    struct epoll_event event, events[MAX_EVENTS];  
    event.events = EPOLLIN | EPOLLET; // 关心读事件和使用ET模式  
    event.data.fd = fd;  
  
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {  //将事件添加到epoll的红黑树里面去
        perror("epoll_ctl: add fd");  
        exit(EXIT_FAILURE);  
    }  
  
    char buffer[1024];  
    ssize_t num_bytes;  
  
    while (1) {  
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);  
        if (num_events == -1) {  
            if (errno == EINTR) {  
                continue; // 处理中断,例如信号处理  
            }  
            perror("epoll_wait");  
            exit(EXIT_FAILURE);  
        }  
  
        for (int i = 0; i < num_events; i++) {  
            if (events[i].data.fd == fd) {  
                // 在ET模式下,需要循环读取直到EAGAIN,表示没有更多数据可读  
                while ((num_bytes = read(fd, buffer, sizeof(buffer) - 1)) > 0) {  
                    buffer[num_bytes] = '\0'; // 确保字符串以null结尾  
                    printf("Received: %s", buffer);  
  
                    // 注意:这里没有处理EAGAIN/EWOULDBLOCK,因为对于stdin来说,  
                    // 在非阻塞模式下,如果没有数据可读,read会立即返回-1并设置errno为EAGAIN  
                    // 但对于套接字等,你需要检查并适当处理  
                }  
  
                if (num_bytes == -1) {  
                    if (errno != EAGAIN && errno != EWOULDBLOCK) {  
                        // 真正的错误处理  
                        perror("read");  
                    }  
                    // EAGAIN或EWOULDBLOCK时,不需要做任何处理,只是表示没有数据可读  
                }  
            }  
        }  
    }  
  
    // 程序实际上不会到达这里,因为有一个无限循环  
    close(epoll_fd);  
    return 0;  
}  
  
// 注意:上面的代码将stdin设置为非阻塞,这在实际应用中是不常见的,  
// 因为它会改变标准输入的行为,可能导致不期望的结果。  
// 通常,ET模式用于套接字等文件描述符,它们可以很好地与非阻塞模式一起工作。

 在这个示例中,我们尝试将 stdin 设置为非阻塞,但这在实际应用中可能不是最佳实践,因为标准输入的行为可能会因此变得复杂且难以预测。ET模式更常用于套接字编程,其中文件描述符(套接字)可以很好地与非阻塞模式结合使用。

     请注意,在ET模式下处理数据时,你需要确保在一个事件通知中尽可能多地读取数据,直到 read 调用返回 EAGAIN 或 EWOULDBLOCK,这表示没有更多数据可读。如果忽略这一点,并且只读取部分数据,那么当新数据到达时,你可能不会收到新的 epoll 事件通知,直到再次有数据可读并且状态从非就绪变为就绪。

6.3.见一见LT和ET

  • 见见LT

首先我们要明白,LT模式的持续通知是什么?

        所谓持续通知就是,我某个关心的事件就绪了,那么我们只要调用epoll_wait函数,每次都会立马返回,通知我们。

例如,下面这个

        for (;;)
        {
            int n = epoller_ptr->EpollerWait(revs, num); // 返回值代表有n个事件就绪
            if (n > 0)                                   // 有事件就绪
            {
                std::cout << "event happened,fd :" << revs[0].data.fd << std::endl;
               // HandlerEvent(revs, n); // 事件就绪的本质就是看他的文件描述符在不在就绪队列里面
            }
            else if (n == 0) // 超时了
            {
                std::cout << "time out..." << std::endl;
            }
            else // 出错了
            {
                std::cerr << "EpollWait error" << std::endl;
            }
        }

    如果有关心的事件就绪了,由于循环的存在,会一直调用epoll_wait函数,然后每次都会立马返回通知我们(由于我们不处理),返回值n肯定大于一,所以会一直打印event happend.....

   在前一篇文章中我们写过epoll_server,当然epoll_server的默认工作模式也是LT模式,在下面的代码中我将处理就绪事件的接口HandlerEvent( )屏蔽掉了,当客户端连接到来时,服务器的epoll_wait一定会检测到listensock上的读事件就绪了,所以epoll_wait会返回,告知程序员要处理数据了,但如果程序员一直不处理数据的话,那epoll_wait每次都会告知程序员要处理数据了,所以从显示器的输出结果来看,epoll_wait返回后,根据返回值n,一定是进入到了default分支中,并且每次epoll_wait都会告知程序员事件就绪,所以显示器会一直疯狂打印have events ready,因为只要底层有事件就绪,对于listensock来说,只要内核监听队列有就绪的连接,那就是就绪,epoll_wait就会一直通知程序员事件就绪了,赶快处理吧。(就像小李一样,只要张三不拿走快递,小李就会一直给张三打电话)

  • 见见ET 

首先我们要明白,ET模式的通知是什么?

        所谓通知一次就是,我某个关心的事件就绪了,那么我们循环调用epoll_wait函数,只有第一次调用会立马返回,通知我们。其他调用都会阻塞

例如,下面这个

        for (;;)
        {
            int n = epoller_ptr->EpollerWait(revs, num); // 返回值代表有n个事件就绪
            if (n > 0)                                   // 有事件就绪
            {
                std::cout << "event happened,fd :" << revs[0].data.fd << std::endl;
               // HandlerEvent(revs, n); // 事件就绪的本质就是看他的文件描述符在不在就绪队列里面
            }
            else if (n == 0) // 超时了
            {
                std::cout << "time out..." << std::endl;
            }
            else // 出错了
            {
                std::cerr << "EpollWait error" << std::endl;
            }
        }

  如果有关心的事件就绪了,由于循环的存在,会一直调用epoll_wait函数,只有第一次调用会返回,后续调用会阻塞掉

2.ET模式高效的原因(fd必须是非阻塞的)
这是非常重要的一个面试题,许多的面试官在问到网络环节时,都会让我们讲一下select poll epoll各自的用法,epoll的底层原理,三个接口的优缺点,还有就是epoll的两种工作模式,以及ET模式高效的原因,ET模式高效的原因也是一个高频的问题。

   ET模式下,只有底层数据从无到有,从有到多的时候,才会通知上层一次,通知的机制就是rbtree+ready_queue+cb,所以ET这种通知机制就会倒逼程序员一次将底层的数据全部读走,如果不一次读走,就可能造成数据丢失,你无法保证对方一定会继续给你发数据啊,如果无法保证这点,那就无法保证epoll_wait还会通知你下一次,如果无法保证这一点,那就有可能你只读取了sock的部分数据,但后续epoll_wait可能不会再通知你了,从而导致后续的数据你永远都读不上来了,所以你必须一次将底层的数据全部读走。

如何保证一次将底层的数据全部读走呢?

 那就只能循环读取了,如果只调用recv一次,是无法保证一次将底层的数据全部读走的。所以我们可以打个while循环一直读sock接收缓冲区中的数据,直到读取不上来数据,但这里其实就又有一个问题了,如果sock是阻塞的,循环读读到最后一定会没数据,而此时由于sock是阻塞的,那么服务器就会阻塞在最后一次的recv系统调用处,直到有数据到来,而此时服务器就会被挂起,服务器一旦被挂起,那就完蛋了~

      服务器被挂起,那就无法运行了,无法给客户提供服务了,这就很有可能造成很多公司盈利上的损失,所以服务器一定不能停下来,更不能被挂起,需要一直运行,以便给客户提供服务。而如果使用非阻塞文件描述符,当recv读取完接收缓冲区中的所有数据时,recv会返回-1,同时错误码被设置为EAGAIN和EWOULDBLOCK,这俩错误码的值是一样的,此时我们就在ET模式下读取完毕了所有的数据了,而我们一次读取完毕所有数据其实本身就是ET模式高效性的一种体现。
所以在工程实践上,epoll以ET模式工作时,文件描述符必须设置为非阻塞,防止服务器由于等待某种资源就绪从而被挂起

解释完ET模式下fd必须是非阻塞的原因后,那为什么ET模式是高效的呢?

可能有人会说,因为ET模式只会通知一次,倒逼程序员将数据一次全部读走,所以ET模式就是高效的,如果这个问题满分100分的话,你这样的回答只能得到20分,因为你的回答其实仅仅只是答案的引线,真正最重要的部分你还是没说出来。

  倒逼程序员一次将数据全部读走,那不就是让上层尽快取走数据吗?尽快取走数据后,就可以给对方发送一个更大的16位窗口大小,让对方更新出更大的滑动窗口大小,提高底层数据发送的效率,更好的使用TCP延迟应答,滑动窗口等策略!!!这才是ET模式高效的最本质的原因!!!

   因为ET模式可以更好的利用TCP提高数据发送效率的种种策略,例如延迟应答,滑动窗口等。

        之前在讲TCP的时候,TCP报头有个字段叫做PSH,其实这个字段如果被设置的话,epoll_wait就会将此字段转换为通知机制,再通知一次上层,让其尽快读走数据。

3.LT和ET模式使用时的读取方式

   在LT模式下,如果fd是阻塞的,那么上次只能读一次,这是出于工程需求,因为我们不能让服务器阻塞挂起,而在文件描述符是阻塞的情况下,如果我们进行循环读,则最后一次肯定会读取不到数据,那么此时服务器进程就会阻塞住,等待fd的skbuff中到来数据,但服务器是不能被阻塞挂起的,所以我们只能读取一行。

       如果fd是非阻塞的,那其实就不用担心了,我们进行循环读就可以,这样是比较高效的,因为在非阻塞且是LT工作模式的情况下,无论我们是一行读还是循环读服务器都是不会被阻塞挂起的。对于读一次来说,在LT模式下也是不会出问题的,因为只要skbuff中有数据,那么epoll_wait就会一直通知程序员来尽快取走数据,我们不用担心丢失数据的情况发生。

    在ET模式下,fd必须是非阻塞的,因为出于工程实践的角度考虑,为了让数据被程序员完整的拿到,我们只能进行循环读,而只要你进行循环读,fd万万就不能是阻塞的,因为循环读的最后一次读取一定会读不到数据,只要读不到数据,且fd是阻塞的,那么服务器就被挂起了,这并不是我们想要看到的结果,所以在ET模式下,没得商量,fd必须是非阻塞的,同时程序员在应用层读取数据的方式也必须是循环读,不可以读一行。