网络io与select,poll,epoll

发布于:2024-06-30 ⋅ 阅读:(21) ⋅ 点赞:(0)

前言

网络 IO,会涉及到两个系统对象,一个是用户空间调用 IO 的进程或者线程,另一个是内核空间的内核系统,比如发生 IO 操作 read 时,它会经历两个阶段:

1. 等待数据准备就绪

2. 将数据从内核拷贝到进程或者线程中

因为在以上两个阶段上各有不同的情况,所以出现了多种网络 IO 模型

一、五种 IO 网络模型

1.1、阻塞 IO(blocking IO)

所谓阻塞型接口是指系统调用(一般是 IO 接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。一个典型的读操作流程如下:

除非特别指定,几乎所有的 IO 接口 ( 包括 socket 接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用 send()的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求

一个简单的改进方案是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。多线程的服务器模型似乎完美的解决了为多个客户机提供问答服务的要求,但其实并不尽然。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题

1.2、非阻塞 IO(non-blocking IO)

Linux 下,可以通过设置 socket 使其变为 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

循环调用 read()将大幅度推高 CPU 占用率;此外,在这个方案中 read()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如 select()多路复用模式,可以一次检测多个连接是否活跃。

1.3、多路复用 IO(IO multiplexing)

IO multiplexing 这个词可能有点陌生,但是提到 select/epoll,大概就都能明白了。有些地方也称这种 IO 方式为事件驱动 IO(event driven IO)。我们都知道,select/epoll 的好处就在于单个 process 就可以同时处理多个网络连接的 IO。它的基本原理就是 select/epoll 这个 function会不断的轮询所负责的所有 socket,当某个 socket 有数据到达了,就通知用户进程。它的流程如图:

当用户进程调用了 select,那么整个进程会被 block,而同时,kernel 会“监视”所有 select 负责的 socket,当任何一个 socket 中的数据准备好了,select 就会返回。这个时候用户进程再调用 read 操作,将数据从 kernel 拷贝到用户进程。

使用 select 以后最大的优势是用户可以在一个线程内同时处理多个 socket 的 IO 请求。用户可以注册多个 socket,然后不断地调用 select 读取被激活的 socket,即可达到在同一个线程内同时处理多个 IO 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的

这种模型的特征在于每一个执行周期都会探测一次或一组事件,一个特定的事件会触发某个特定的响应。我们可以将这种模型归类为“事件驱动模型”。相比其他模型,使用 select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

大部分 Unix/Linux 都支持 select 函数,该函数用于探测多个文件句柄的状态变化。

下面给出 select 接口的原型:

FD_ZERO(int fd, fd_set* fds)
FD_SET(int fd, fd_set* fds)
FD_ISSET(int fd, fd_set* fds)
FD_CLR(int fd, fd_set* fds)
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout)

在 select()函数中,readfds、writefds 和exceptfds 同时作为输入参数和输出参数

但这个模型依旧有着很多问题。首先 select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll

其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。如下例,庞大的执行体 1 的将直接导致响应事件 2 的执行体迟迟得不到执行,并在很大程度上降低了事件探测的及时性

幸运的是,有很多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有libevent 库,还有作为 libevent 替代者的 libev 库

select存在一个bug:当某个socket接收缓冲区有新数据到达,然后select报告这个socket描述符可读,但随后,协议栈检查到这个新分节检验和错误时,会丢弃这个分节,这时再调用read则无数据可读,如果socket没有被设置成nonblocking,此read将阻塞当前线程。需要将socket设置成nonblocking

在select中关闭socket时需要注意:关闭socket需要及时更新监测socket集合。当一个socket被关闭之后,我们需要将其对应的文件描述符从select的监测集合中删除。否则,会导致select函数在下一次调用时伪唤醒,也就是说,调用后立即返回,并且一直返回“有事件发生”,而实际上这个已关闭的文件描述符并没有事件可处理。

1.4、异步 IO(Asynchronous I/O)

Linux 下的 asynchronous IO 用在磁盘 IO 读写操作,不用于网络 IO,从内核 2.6 版本才开始引入。先看一下它的流程:

用户进程发起 read 操作之后,立刻就可以开始去做其它的事。而另一方面,从 kernel的角度,当它受到一个 asynchronous read 之后,首先它会立刻返回,所以不会对用户进程产生任何 block。然后,kernel 会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel 会给用户进程发送一个 signal,告诉它 read 操作完成了。

异步 IO 是真正非阻塞的,它不会对请求进程产生任何的阻塞,因此对高并发的网络服务器实现至关重要

1.5、信号驱动 IO(signal driven I/O, SIGIO)

首先我们允许套接口进行信号驱动 I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 I/O 操作函数处理数据。当数据报准备好读取时,内核就为该进程产生一个 SIGIO 信号。我们随后既可以在信号处理函数中调用 read 读取数据报,并通知主循环数据已准备好待处理,也可以立即通知主循环,让它来读取数据报。无论如何处理 SIGIO 信号,这种模型的优势在于等待数据报到达(第一阶段)期间,进程可以继续执行,不被阻塞。免去了 select 的阻塞与轮询,当有活跃套接字时,由注册的 handler 处理

经过上面的介绍,会发现 non-blocking IO 和 asynchronous IO 的区别还是很明显的。在non-blocking IO 中,虽然进程大部分时间都不会被 block,但是它仍然要求进程去主动的 check,并且当数据准备完成以后,也需要进程主动的再次调用 recvfrom 来将数据拷贝到用户内存。

而 asynchronous IO 则完全不同。它就像是用户进程将整个 IO 操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查 IO 操作的状态,也不需要主动的去拷贝数据。

总结

五种I/O模型的比较: 

除了异步I/O模型,前四种I/O模型都是按顺序分步执行,且需要通过阻塞应用进程来完成数据的复制,因此前四种I/O模型被成为同步I/O模型。

问题:blocking 和 non-blocking 的区别在哪,synchronous IO 和 asynchronous IO 的区别在哪?

blocking 与 non-blocking:前面的介绍中其实已经很明确的说明了这两者的区别。调用 blocking IO 会一直 block 住对应的进程直到操作完成,而non-blocking IO 在 kernel 还在准备数据的情况下会立刻返回。

synchronous IO 和 asynchronous IO:两者的区别就在于 synchronous IO 做”IO operation”的时候会将 process 阻塞。按照这个定义,之前所述的 blocking IO,non-blocking IO,IO multiplexing 都属于synchronous IO。有人可能会说,non-blocking IO 并没有被 block 啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的 IO 操作,就是例子中的 read 这个系统调用。non-blocking IO 在执行 read 这个系统调用的时候,如果 kernel 的数据没有准备好,这时候不会 block 进程。但是当 kernel 中数据准备好的时候,read 会将数据从 kernel 拷贝到用户内存中,这个时候进程是被 block 了,在这段时间内进程是被 block的。而 asynchronous IO 则不一样,当进程发起 IO 操作之后,就直接返回再也不理睬了,直到 kernel 发送一个信号,告诉进程说 IO 完成。在这整个过程中,进程完全没有被 block。

二、简单并发服务器实现示例

2.1 多进程或多线程方式

01-thread_process_tcpserver

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <memory.h>
#include <signal.h>
#include <time.h>

#define BUFFER_LENGTH    128
int listenfd;

void sig_handler(int signo)
{
    if(signo == SIGINT)
    {
        printf("server close\n");
        close(listenfd);
        exit(1);
    }
}

void *routine(void *arg) 
{
    int clientfd = *(int *)arg;

    while (1) 
    {
    unsigned char buffer[BUFFER_LENGTH] = {0};
    int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
    if (ret == 0) { //返回0表示客户端连接断开,服务端也需要调用close断开
        close(clientfd);
        break;
    }
    printf("buffer : %s, ret: %d\n", buffer, ret);
    
    ret = send(clientfd, buffer, ret, 0);
    }
}

// socket --> 
// bash --> execve("./server", "");
// 
// 0, 1, 2
// stdin, stdout, stderr
int main() 
{
    //注册ctrl+c的信号,用于终止服务器
    if(signal(SIGINT, sig_handler) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }
    //处理僵尸进程
    if(signal(SIGCHLD, SIG_IGN) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }
    
     /*步骤1:创建socket
     *注:socket创建在内核中,是一个结构体
     *AF_INET:IPV4
     *SOCK_STREAM:TCP协议
    */
    listenfd = socket(AF_INET, SOCK_STREAM, 0); 
    if (listenfd == -1) 
        return -1;

    /*步骤2:调用bind函数将socket和地址(ip、port)进行绑定*/
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) {
        return -2;
    }

#if 0 // nonblock
    int flag = fcntl(listenfd, F_GETFL, 0);
    flag |= O_NONBLOCK;
    fcntl(listenfd, F_SETFL, flag);
#endif

    /*步骤3:调用listen启动监听(指定port监听)
     *通知系统去接收来自客户端的连接请求(将接受到的客户端连接请求放置到对应的队列中)
     *第二个参数:指定队列的长度
    */
    listen(listenfd, 10);
    
    struct sockaddr_in client;
    socklen_t len = sizeof(client);
    while (1) 
    {
        /*步骤4:调用accept函数从队列中获得一个客户端的请求连接,并返回新的socket描述符*/
        int clientfd = accept(listenfd, (struct sockaddr*)&client, &len);
        
        /*步骤5:调用IO函数和连接的客户端进行双向的通信*/
        #if 0
        //(1)多线程实现多并发
        pthread_t threadid;
        pthread_create(&threadid, NULL, routine, &clientfd);
    #else
        //(2)多进程实现多并发
        pid_t pid;
        if((pid=fork()) < 0)
        {
            perror("fork error");
            exit(1);
        }
        else if (pid == 0)//子进程,处理客户端的具体消息,不需要父进程的套接字
        {
            close(listenfd);
            routine(&clientfd);
            break;
        }
        else //父进程,不需要子进程的套接字
        {
            close(clientfd);
        }
#endif
    }
    
     return 0;
}

2.2 select I/O多路复用

02_select_tcpserver

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <memory.h>
#include <signal.h>
#include <time.h>

#define BUFFER_LENGTH    128
int listenfd;

void sig_handler(int signo)
{
    if(signo == SIGINT)
    {
        printf("server close\n");
        close(listenfd);
        exit(1);
    }
}

void *routine(void *arg) 
{
    int clientfd = *(int *)arg;

    while (1) 
    {
        unsigned char buffer[BUFFER_LENGTH] = {0};
        int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
        if (ret == 0) { //返回0表示客户端连接断开,服务端也需要调用close断开
            close(clientfd);
            break;
        }
        printf("buffer : %s, ret: %d\n", buffer, ret);

        ret = send(clientfd, buffer, ret, 0);
    }
}

// socket --> 
// bash --> execve("./server", "");
// 
// 0, 1, 2
// stdin, stdout, stderr
int main() 
{
    //注册ctrl+c的信号,用于终止服务器
    if(signal(SIGINT, sig_handler) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }
    //处理僵尸进程
    if(signal(SIGCHLD, SIG_IGN) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }

    /*步骤1:创建socket
     *注:socket创建在内核中,是一个结构体
     *AF_INET:IPV4
     *SOCK_STREAM:TCP协议
    */
    listenfd = socket(AF_INET, SOCK_STREAM, 0); 
    if (listenfd == -1) 
        return -1;

    /*步骤2:调用bind函数将socket和地址(ip、port)进行绑定*/
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) {
        return -2;
    }

#if 0 // nonblock
    int flag = fcntl(listenfd, F_GETFL, 0);
    flag |= O_NONBLOCK;
    fcntl(listenfd, F_SETFL, flag);
#endif

    /*步骤3:调用listen启动监听(指定port监听)
     *通知系统去接收来自客户端的连接请求(将接受到的客户端连接请求放置到对应的队列中)
     *第二个参数:指定队列的长度
    */
    listen(listenfd, 10);

    fd_set rfds, wfds, rset, wset;
    FD_ZERO(&rfds);
    FD_SET(listenfd, &rfds);
    FD_ZERO(&wfds);

    int maxfd = listenfd;
    unsigned char buffer[BUFFER_LENGTH] = {0};
    int ret = 0, i;

    while (1) 
    {
        rset = rfds;
        wset = wfds;

        //调用select监听设置的可读、可写集合中的socket是否有可读活可写事件发生
        //select相当于一个秘书
        int nready = select(maxfd+1, &rset, &wset, NULL, NULL);
        
        //判断listenfd是否有可读事件发生,也就是是否有新的client连接
        if (FD_ISSET(listenfd, &rset)) 
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            int clientfd = accept(listenfd, (struct sockaddr*)&client, &len);
            
            FD_SET(clientfd, &rfds); //将新的client连接设置到可读集合中,下次一起监听

            printf("listenfd accept new client(fd = %d)--> \n", clientfd);
            
            if (clientfd > maxfd) 
                maxfd = clientfd;
        } 
        
        //处理连接的client的读写事件
        for (i = listenfd+1; i <= maxfd;i ++) 
        {
            if (FD_ISSET(i, &rset))
            { 
                ret = recv(i, buffer, BUFFER_LENGTH, 0);
                if (ret == 0) {
                    close(i);
                    FD_CLR(i, &rfds); //客户端主动close后,需要从集合中将其清除
                } else if (ret > 0) {
                    printf("buffer : %s, ret: %d\n", buffer, ret);
                    
                    //接受到client的数据后,将其设置到可写集合中,下次select监听时会立马检测到io的可写事件(因为内核的写缓存未满,就会触发可写事件)
                    //然后执行FD_ISSET(i, &wset),也就实现了读和写的分离
                    FD_SET(i, &wfds); 
                }
            } 
            else if (FD_ISSET(i, &wset)) 
            {
                ret = send(i, buffer, ret, 0); 
                FD_CLR(i, &wfds); //发送完数据后,要将其从可写集合中清除,不然会一直触发!!!!!
            }
        }
    }
}

2.3 epoll I/O多路复用

03_epoll_tcpserver

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <memory.h>
#include <signal.h>
#include <time.h>

#define BUFFER_LENGTH    128
#define EVENTS_LENGTH    128

int listenfd;
char rbuffer[BUFFER_LENGTH] = {0};
char wbuffer[BUFFER_LENGTH] = {0};

void sig_handler(int signo)
{
    if(signo == SIGINT)
    {
        printf("server close\n");
        close(listenfd);
        exit(1);
    }
}

void *routine(void *arg) 
{
    int clientfd = *(int *)arg;

    while (1) 
    {
        unsigned char buffer[BUFFER_LENGTH] = {0};
        int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
        if (ret == 0) { //返回0表示客户端连接断开,服务端也需要调用close断开
            close(clientfd);
            break;
        }
        printf("buffer : %s, ret: %d\n", buffer, ret);

        ret = send(clientfd, buffer, ret, 0);
    }
}

// socket --> 
// bash --> execve("./server", "");
// 
// 0, 1, 2
// stdin, stdout, stderr
int main() 
{
    //注册ctrl+c的信号,用于终止服务器
    if(signal(SIGINT, sig_handler) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }
    //处理僵尸进程
    if(signal(SIGCHLD, SIG_IGN) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }

    /*步骤1:创建socket
     *注:socket创建在内核中,是一个结构体
     *AF_INET:IPV4
     *SOCK_STREAM:TCP协议
    */
    listenfd = socket(AF_INET, SOCK_STREAM, 0); 
    if (listenfd == -1) 
        return -1;

    /*步骤2:调用bind函数将socket和地址(ip、port)进行绑定*/
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) {
        return -2;
    }

#if 0 // nonblock
    int flag = fcntl(listenfd, F_GETFL, 0);
    flag |= O_NONBLOCK;
    fcntl(listenfd, F_SETFL, flag);
#endif

    /*步骤3:调用listen启动监听(指定port监听)
     *通知系统去接收来自客户端的连接请求(将接受到的客户端连接请求放置到对应的队列中)
     *第二个参数:指定队列的长度
    */
    listen(listenfd, 10);

    //创建epoll
    int epfd = epoll_create(1);

    struct epoll_event ev, events[EVENTS_LENGTH];
    ev.events = EPOLLIN;
    ev.data.fd = listenfd;

    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //添加listenfd

    while (1) 
    { 
        int nready = epoll_wait(epfd, events, EVENTS_LENGTH, -1); // -1, ms 
        printf("epoll_wait -------> %d ready\n", nready);
        
        int i = 0;
        for (i = 0;i < nready;i ++) 
        {
            int clientfd = events[i].data.fd;
            
            if (listenfd == clientfd) // accept
            {
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int connfd = accept(listenfd, (struct sockaddr*)&client, &len);
                if (connfd == -1) 
                    break;
                
                printf("accept: clientfd=%d\n\n", connfd);
                ev.events = EPOLLIN;
                ev.data.fd = connfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            } 
            else if (events[i].events & EPOLLIN) //clientfd
            { 
                printf("epoll_wait -------> fd=%d EPOLLIN\n", clientfd);
                int n = recv(clientfd, rbuffer, BUFFER_LENGTH, 0);
                if (n > 0) 
                {
                    printf("recv: %s, n: %d\n", rbuffer, n);
    
                    memset(wbuffer, 0, BUFFER_LENGTH);
                    memcpy(wbuffer, rbuffer, n);
                    memset(rbuffer, 0, BUFFER_LENGTH);
                    
                    //send(clientfd, wbuffer, strlen(wbuffer), 0);
                    
                    //接收完数据后,修改fd的监听事件为EPOLLOUT,这样会立即触发fd的可写事件
                    //当然这里也可以直接send
                    ev.events = EPOLLOUT;
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
                } 
            }
            else if (events[i].events & EPOLLOUT) 
            {
                printf("epoll_wait -------> fd=%d EPOLLOUT\n", clientfd);
                int sent = send(clientfd, wbuffer, strlen(wbuffer), 0);
                printf("send: %d\n\n", sent);
                
                //发送完数据后,修改fd的监听事件为EPOLLIN,这样会继续等待fd的可读事件
                ev.events = EPOLLIN;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
            }
        }
    }
}

参考链接

epoll的本质

Linux内核编程--常见IO模型与select/poll/epoll编程

一道搜狗面试题:IO多路复用中select、poll、epoll之间的区别