io_uring的异步IO机制

发布于:2025-06-13 ⋅ 阅读:(26) ⋅ 点赞:(0)

io_uring 原理

io_uring 是 Linux 内核 5.1 版本引入的全新异步 I/O 接口,由 Jens Axboe 开发。它通过两个共享环形缓冲区(ring buffers)在内核和用户空间之间高效传递 I/O 请求和完成事件,避免了传统 AIO 的各种限制。

我们之前了解过,在网络高并发的场景下,epoll 在这方面的性能是独树一帜,通过 epoll 的工作模式,实现了 reactor 这种事件触发的机制,在 reactor 中异步的实现我们可以通过多线程,也可以通过协程去进行实现。但是本质上最后还是调用到 read/write/recv/send 这样的接口,来完成数据的收发工作。

理解read/write/recv/send本质

他们作为系统调用函数,其实本质上还是一种拷贝函数,如果底层缓冲区中有数据,就会调用这些接口,如果没有数据,就会等待数据就绪,或者设置为非阻塞的状态,不等就直接返回一个错误码,表示此时数据还未就绪。

IO 操作本质上就是 “等 + 拷贝” 的操作,单纯的去看待 read/write/recv/send 这些函数,其实他返回请求,接收数据和返回数据这些操作都是一起的。 在 epoll 的处理中我们可以看见,他可以一次性处理多个文件描述符,将这种等的操作重叠了起来,透过他的底层实现我们可以发现,他其实是将 “等 + 拷贝” 的操作实现了一种分离,被监听的 fd 与就绪的 fd 实则是两种不同的数据结构进行处理,构成了一种生产者消费者模型,也就是说 “等” 其实在 epoll 的接口中就已经完成了,而我们调用read/write/recv/send 函数的时候就不用再等了,数据已经就绪,由于这种可以对多个 fd 进行处理的机制,加上使用回调函数进行处理,使得epoll 的功能就很强大。

如何理解io_uring

对于 io_uring 来说,其实更倾向于将这种 IO 操作实现成异步的,在 io_uring 的实现当中,提供了两个队列结构,一个是提交队列(SQ),另一个是完成队列(CQ),注意,这两个队列都是环形队列结构。

提交队列的作用就是提交 IO 请求,完成队列的作用就是内核通知完成的 I/O 操作,假设当前有 100 个客户端发起读的请求,在 io_uring 的工作方式中,会将这 100 个 IO 请求先 push 到提交队列当中,然后进行处理,然后处理完成的在 push 到完成队列当中,返回结果,他也是由不同线程去完成的。这两个队列干的事两件不同的事情,从而产生了异步的效果。

在这里插入图片描述
由于 io_uring 的内部使用 mmap 去进行实现,这种方式是他在整个过程中也只会进行一次的数据拷贝,无异于也是对效率的一个提升,而且通过这种无锁的环形队列接口,减少了频繁进行加锁解锁的消耗,这对于高并发的场景无异于是一个巨大的提升,其实这两个流程也是一个典型的生产者消费者模型。

io_uring接口应用

io_uring 在这儿主要提供了 3 个系统调用接口:

int io_uring_setup(unsigned entries, struct io_uring_params *params);

io_uring_setup 是 Linux 内核提供的系统调用,用于初始化一个异步 I/O 上下文(io_uring 实例),参数如下:

  • entries:指定提交队列(SQ)和完成队列(CQ)的初始大小(条目数)。通常为 2 的幂次方,内核可能会调整实际大小。
  • params:指向 io_uring_params 结构的指针,用于传递配置参数并返回队列信息。结构定义如下:
struct io_uring_params {
    __u32 sq_entries;        // 内核实际分配的 SQ 大小
    __u32 cq_entries;        // 内核实际分配的 CQ 大小
    __u32 flags;             // 配置标志(如 IORING_SETUP_IOPOLL)
    __u32 sq_thread_cpu;     // 绑定 SQ 线程的 CPU
    __u32 sq_thread_idle;    // SQ 线程空闲超时(毫秒)
    __u32 features;          // 内核返回的支持特性
    __u32 resv[4];
    struct io_sqring_offsets sq_off; // SQ 环的偏移信息
    struct io_cqring_offsets cq_off; // CQ 环的偏移信息
};

返回值:

  • 成功时返回一个文件描述符(fd),代表创建的 io_uring 实例;失败时返回 -1 并设置 errno。
int io_uring_enter(unsigned int fd, unsigned int to_submit,
 				   unsigned int min_complete, unsigned int flags, 
 				   sigset_t *sig);  

io_uring_enter 用于提交 I/O 操作请求或等待已完成事件,参数如下:

  • fd: 关联的 io_uring 实例的文件描述符。
  • to_submit: 准备提交的 I/O 操作数量。
  • min_complete: 要求内核等待至少完成的事件数(若 flags 包含 IORING_ENTER_GETEVENTS)。
  • flags: 控制行为的标志位(如 IORING_ENTER_GETEVENTS)。
  • sig: 等待时临时屏蔽的信号集(可为 NULL)。
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);

io_uring_register 是 Linux 内核提供的系统调用(syscall),用于为 io_uring 实例注册资源(如文件描述符、缓冲区等),以优化异步 I/O 操作的性能,参数如下:

  • fd: io_uring 实例的文件描述符,由 io_uring_setup 创建。
  • opcode: 注册操作的类型,如 IORING_REGISTER_BUFFERS(注册缓冲区)或 IORING_REGISTER_FILES(注册文件描述符)。
  • arg: 指向用户空间数据的指针,具体内容取决于 opcode。
  • nr_args: arg 指向的数组中的条目数。

其中,opcode 的类型有如下几种:

  • IORING_REGISTER_BUFFERS:注册固定缓冲区,用于减少 read/write 操作中的内核-用户空间数据拷贝。
  • IORING_REGISTER_FILES:注册文件描述符,避免每次 I/O 操作重复传递文件描述符。
  • IORING_REGISTER_EVENTFD:注册事件文件描述符(eventfd),用于异步通知 I/O 完成事件。
  • IORING_REGISTER_PROBE:检查内核支持的 io_uring 功能(需配合 struct io_uring_probe)。

io_uring 的库其实对这三个函数进行了封装,然后提供给我们一个使用的库。

接下来我们实际写一段代码来看一下:

#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>

#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024

#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2

struct conn_info
{
    int fd;
    int event;
};

int init_server(int port)
{
    // 创建socket连接
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
    serveraddr.sin_port = htons(port);

    // 绑定套接字
    int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));

    if (ret == -1)
    {
        perror("bind");
        return -1;
    }

    // 监听套接字
    listen(sockfd, 10);
    return sockfd;
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
    // 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = ACCEPT_EVENT};

    // 用于准备一个异步接受连接(accept)的请求
    io_uring_prep_accept(sqe, sockfd, addr, len, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));

    return 1;
}

int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = READ_EVENT};

    // 准备一个接收数据的操作请求
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);

    return 1;
}

int main()
{
    unsigned short port = 9999;
    int sockfd = init_server(port);

    struct io_uring_params params;
    memset(&params, 0, sizeof(params));

    struct io_uring ring;
    // 先构建两个队列出来
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
#if 0
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);

#endif

    char buffer[BUFFER_LENGTH] = {0};

    while (1)
    {
        // 内部实现就是 io_uring_enter,用于提交 IO 请求
        io_uring_submit(&ring);

        // 创建一个完成队列事件结构,通过 io_uring_wait_cqe
        // 获取完成 IO 操作的事件
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        // 批量获取完成 IO 操作的事件
        struct io_uring_cqe *cqes[128];
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);

        int i = 0;
        for (i = 0; i < nready; i++)
        {
            // 获取到 accept 事件的入口
            struct io_uring_cqe *entries = cqes[i];
            struct conn_info result;
            memcpy(&result, &entries->user_data, sizeof(struct conn_info));

            if (result.event == ACCEPT_EVENT)
            {
                printf("set_event_accept\n");
            }
        }
    }
    return 0;
}

接口介绍

其中,io_uring_prep_accept 接口作用就是准备一个异步接受连接(accept)的请求。

void io_uring_prep_accept(struct io_uring_sqe *sqe, 
                          int sockfd, 
                          struct sockaddr *addr,
                          socklen_t *addrlen, 
                          int flags);

参数介绍:

  • sqe: 指向 io_uring 提交队列条目(Submission Queue Entry)的指针。
  • sockfd: 监听套接字的文件描述符。
  • addr: 用于存储客户端地址信息的结构体指针(可选,可为 NULL)。
  • addrlen: 输入时为 addr 的缓冲区大小,输出时为实际地址长度。
  • flags: 额外的标志位(如 SOCK_NONBLOCK 或 SOCK_CLOEXEC)。

返回值处理:

  • 通过 io_uring_wait_cqe 等待完成事件后,cqe->res 为返回的客户端文件描述符。
  • 若返回值 < 0,表示错误(如 -EINVAL 或 -EBADF)

io_uring_sqe 结构体,用于描述一个待提交的 I/O 操作。每个 io_uring_sqe 对应一个异步 I/O 请求(如读写、网络操作等),通过填充该结构并提交到提交队列(Submission Queue, SQ)。

struct io_uring_sqe {
    __u8    opcode;      // 操作类型(如 IORING_OP_READ, IORING_OP_WRITE)
    __u8    flags;       // 请求标志(如 IOSQE_FIXED_FILE, IOSQE_IO_LINK)
    __u16   ioprio;      // I/O 优先级
    __s32   fd;          // 文件描述符
    __u64   off;         // 文件偏移量
    __u64   addr;        // 用户态缓冲区地址(读写操作)
    __u32   len;         // 操作长度
    union {
        __kernel_rwf_t   rw_flags;  // 读写标志(如 RWF_NOWAIT)
        __u32            fsync_flags;
        __u16            poll_events;
    };
    __u64   user_data;   // 用户自定义数据,用于回调识别
    union {
        __u16    buf_index;  // 固定缓冲区的索引(IORING_OP_READ_FIXED)
        __u64    __pad2[3];
    };
};

关键字段说明:

  • opcode:指定操作类型,常见值包括:
    IORING_OP_READ/IORING_OP_WRITE:文件读写。
    IORING_OP_SEND/IORING_OP_RECV:网络通信。
    IORING_OP_POLL_ADD:事件监听。
  • flags:控制请求行为,例如:
    IOSQE_FIXED_FILE:使用固定文件描述符(预先注册的文件表)。
    IOSQE_IO_LINK:链接多个请求,形成依赖链。
  • user_data:用于在完成事件(CQE)中标识请求的唯一值。

使用方法:

  1. 获取空闲 SQE:通过 io_uring_get_sqe 从提交队列中获取一个空闲条目。
  2. 设置操作参数:填充 opcode、fd、addr、len 等字段。
  3. 提交请求:调用 io_uring_submit 将请求提交到内核。

运行程序,我们就会发现一个有意思的现象,此时一直在打印 set_event_accept 这条信息。
在这里插入图片描述

原因就在于在当前这个循环中,我们并没有将已完成的队列中特定的条目给回收掉,当循环回去以后,此时又继续通知处理该条目,就会一直打印,此时就需要用到 io_uring_cq_advance 接口:

void io_uring_cq_advance(struct io_uring *ring, unsigned nr);  

io_uring_cq_advance 接口用于通知内核用户空间已处理完成队列(Completion Queue, CQ)中的特定条目,允许内核回收相关资源。

参数解析:

  • ring: 指向 io_uring 实例的指针。
  • nr: 需要推进的完成队列条目数量,通常为已处理的条目数。

它的作用就在于:

  • 推进完成队列:每次从 CQ 中取出并处理一个事件后,需调用此函数更新队列头指针,避免重复处理同一事件。
  • 资源管理:内核会回收已标记为 “完成” 的条目,释放相关资源(如内存)。

添加这个接口以后就正常了,但是此时就存在一个问题,我们将这个事件标记为完成以后,后续就不会再发送 accept 请求了,所以在这儿就需要我们每一次都发送一个 accept 请求,所以这儿也是需要进行修改的,修改后代码如下:

#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>

#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024

#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2

struct conn_info
{
    int fd;
    int event;
};

int init_server(int port)
{
    // 创建socket连接
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
    serveraddr.sin_port = htons(port);

    // 绑定套接字
    int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));

    if (ret == -1)
    {
        perror("bind");
        return -1;
    }

    // 监听套接字
    listen(sockfd, 10);
    return sockfd;
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
    // 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = ACCEPT_EVENT};

    // 用于准备一个异步接受连接(accept)的请求
    io_uring_prep_accept(sqe, sockfd, addr, len, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));

    return 1;
}

int main()
{
    unsigned short port = 9999;
    int sockfd = init_server(port);

    struct io_uring_params params;
    memset(&params, 0, sizeof(params));

    struct io_uring ring;
    // 先构建两个队列出来
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
#if 0
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);

#endif

    char buffer[BUFFER_LENGTH] = {0};

    while (1)
    {
        // 内部实现就是 io_uring_enter,用于提交 IO 请求
        io_uring_submit(&ring);

        // 创建一个完成队列事件结构,通过 io_uring_wait_cqe
        // 获取完成 IO 操作的事件
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        // 批量获取完成 IO 操作的事件
        struct io_uring_cqe *cqes[128];
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);

        int i = 0;
        for (i = 0; i < nready; i++)
        {
            // 获取到 accept 事件的入口
            struct io_uring_cqe *entries = cqes[i];
            struct conn_info result;
            memcpy(&result, &entries->user_data, sizeof(struct conn_info));

            if (result.event == ACCEPT_EVENT)
            {
                // 保证每一次都会有 accept 请求
                set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
                printf("set_event_accept\n");
            }
        }

        // 避免重复处理同一事件
        io_uring_cq_advance(&ring, nready);
    }
    return 0;
}

运行代码,正常连接断开,再次进行连接也不会存在问题:
在这里插入图片描述
接下来就需要接收到这个信息,我们在这儿使用的也是 io_uring 库里面提供的函数 io_uring_prep_recv ,io_uring_prep_recv 用于准备一个接收数据的操作请求:

void io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd, 
					    void *buf, size_t len, int flags);

参数如下:

  • sqe: 指向 io_uring_sqe 结构的指针,表示提交队列条目(Submission Queue Entry)。
  • sockfd: 文件描述符,通常是套接字。
  • buf: 缓冲区指针,用于存储接收到的数据。
  • len: 缓冲区的长度。
  • flags: 接收操作的标志,与 recv(2) 系统调用中的 flags 参数相同。

代码改写如下:

#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>

#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024

#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2

struct conn_info
{
    int fd;
    int event;
};

int init_server(int port)
{
    // 创建socket连接
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
    serveraddr.sin_port = htons(port);

    // 绑定套接字
    int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));

    if (ret == -1)
    {
        perror("bind");
        return -1;
    }

    // 监听套接字
    listen(sockfd, 10);
    return sockfd;
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
    // 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = ACCEPT_EVENT
    };

    // 用于准备一个异步接受连接(accept)的请求
    io_uring_prep_accept(sqe, sockfd, addr, len, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));

    return 1;
}

int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = READ_EVENT
    };

    // 准备一个接收数据的操作请求
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
    
    return 1;
}

int main()
{
    unsigned short port = 9999;
    int sockfd = init_server(port);

    struct io_uring_params params;
    memset(&params, 0, sizeof(params));

    struct io_uring ring;
    // 先构建两个队列出来
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
#if 0
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);

#endif

    char buffer[BUFFER_LENGTH] = {0};

    while (1)
    {
        // 内部实现就是 io_uring_enter,用于提交 IO 请求
        io_uring_submit(&ring);

        // 创建一个完成队列事件结构,通过 io_uring_wait_cqe
        // 获取完成 IO 操作的事件
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        // 批量获取完成 IO 操作的事件
        struct io_uring_cqe *cqes[128];
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);

        int i = 0;
        for (i = 0; i < nready; i++)
        {
            // 获取到 accept 事件的入口
            struct io_uring_cqe *entries = cqes[i];
            struct conn_info result;
            memcpy(&result, &entries->user_data, sizeof(struct conn_info));

            if (result.event == ACCEPT_EVENT)
            {
                // 保证每一次都会有 accept 请求
                set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
                // printf("set_event_accept\n");

                
                int connfd = entries->res;
                set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
            }
            else if (result.event == READ_EVENT)
            {
                int ret = entries->res;
                printf("set_event_recv ret: %d, %s\n", ret, buffer);
            }
        }

        // 避免重复处理同一事件
        io_uring_cq_advance(&ring, nready);
    }

    return 0;
}

运行程序,此时就可以看见我们的服务端是可以正常的接收到消息的,但是此时只能接受一次,后续客户端继续发送我们又接受不到了,而且我们也不支持回发消息:
在这里插入图片描述
我们需要解决上面的问题就需要调用回发数据的接口,回发数据以后,更新状态,当前又需要接收到客户端所发的数据,就像前面 accept 的时候一样,这儿我们每一次都是需要发起一个 recv 的请求的,否则就会被标记为已完成的事件。

完整代码如下:

#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>

#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024

#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2

struct conn_info
{
    int fd;
    int event;
};

int init_server(unsigned short port)
{
    // 创建socket连接
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
    serveraddr.sin_port = htons(port);

    // 绑定套接字
    int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));

    if (ret == -1)
    {
        perror("bind");
        return -1;
    }

    // 监听套接字
    listen(sockfd, 10);
    return sockfd;
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
    // 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = ACCEPT_EVENT,
    };

    // 用于准备一个异步接受连接(accept)的请求
    io_uring_prep_accept(sqe, sockfd, addr, len, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));

    return 1;
}

int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = READ_EVENT,
    };

    // 准备一个接收数据的操作请求
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));

    return 1;
}

int set_event_send(struct io_uring *ring, int sockfd, const void *buf, size_t len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info = {
        .fd = sockfd,
        .event = WRITE_EVENT,
    };

    io_uring_prep_send(sqe, sockfd, buf, len, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

int main()
{
    unsigned short port = 9999;
    int sockfd = init_server(port);

    struct io_uring_params params;
    memset(&params, 0, sizeof(params));

    struct io_uring ring;
    // 先构建两个队列出来
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
#if 0
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
    // 建立与客户端的连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);

#endif

    char buffer[BUFFER_LENGTH] = {0};

    while (1)
    {
        // 内部实现就是 io_uring_enter,用于提交 IO 请求
        io_uring_submit(&ring);

        // 创建一个完成队列事件结构,通过 io_uring_wait_cqe
        // 获取完成 IO 操作的事件
        // struct io_uring_cqe *cqe;
        // io_uring_wait_cqe(&ring, &cqe);

        // 批量获取完成 IO 操作的事件
        struct io_uring_cqe *cqes[128];
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);

        int i = 0;
        for (i = 0; i < nready; i++)
        {
            // 获取到已完成 IO 事件的入口
            struct io_uring_cqe *entries = cqes[i];
            struct conn_info result;
            memcpy(&result, &entries->user_data, sizeof(struct conn_info));

            if (result.event == ACCEPT_EVENT)
            {
                // 保证每一次都会有 accept 请求
                set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
                printf("set_event_accept\n");

                int connfd = entries->res;
                printf("connfd: %d\n", connfd);
                set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
            }
            else if (result.event == READ_EVENT)
            {
                int ret = entries->res;
                printf("set_event_recv ret: %d, %s\n", ret, buffer);

                if (ret == 0)
                {
                    close(result.fd);
                }
                else if (ret > 0)
                {
                    set_event_send(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
                }
            }
            else if (result.event == WRITE_EVENT)
            {
                int ret = entries->res;
                printf("set_event_send ret: %d, %s\n", ret, buffer);

                set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
            }
        }

        // 避免重复处理同一事件
        io_uring_cq_advance(&ring, nready);
    }

    return 0;
}

运行代码,创建 3 个客户端,此时每个客户端都可以连接上,对于每次发送的消息,客户端也可以接收到:
在这里插入图片描述

性能测试

接下来我们编写一段客户端代码,对 io_uring 的性能进行一下测试

#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>

#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>

#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TEST_MESSAGE   "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH		2048
#define WBUFFER_LENGTH		2048

typedef struct test_context_s {

	char serverip[16];
	int port;
	int threadnum;
	int connection;
	int requestion;

#if 1
	int failed;
#endif
	
} test_context_t;

int connect_tcpserver(const char *ip, unsigned short port) {

	int connfd = socket(AF_INET, SOCK_STREAM, 0);

	struct sockaddr_in tcpserver_addr;
	memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));

	tcpserver_addr.sin_family = AF_INET;
	tcpserver_addr.sin_addr.s_addr = inet_addr(ip);
	tcpserver_addr.sin_port = htons(port);

	int ret = connect(connfd, (struct sockaddr*)&tcpserver_addr, sizeof(struct sockaddr_in));
	if (ret) {
		perror("connect");
		return -1;
	}

	return connfd;
}

int send_recv_tcppkt(int fd) {


	char wbuffer[WBUFFER_LENGTH] = {0};
	int i = 0;

	for (i = 0;i < 16;i ++) {
		strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
	}

	int res = send(fd, wbuffer, strlen(wbuffer), 0);
	if (res < 0) {
		exit(1);
	}
	
	char rbuffer[RBUFFER_LENGTH] = {0};
	res = recv(fd, rbuffer, RBUFFER_LENGTH, 0);
	if (res <= 0) {
		exit(1);
	}

	if (strcmp(rbuffer, wbuffer) != 0) {
		printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);
		return -1;
	}

	return 0;
}


static void *test_qps_entry(void *arg) {

	test_context_t *pctx = (test_context_t*)arg;

	
	int connfd = connect_tcpserver(pctx->serverip, pctx->port);
	if (connfd < 0) {
		printf("connect_tcpserver failed\n");
		return NULL;
	}


	int count = pctx->requestion / pctx->threadnum;
	int i = 0;
	
	int res;

	while (i++ < count) {
		res = send_recv_tcppkt(connfd);
		if (res != 0) {
			printf("send_recv_tcppkt failed\n");
			pctx->failed ++; // 
			continue;
		}
	}

	return NULL;
}

// ./test_qps_tcpclient -s 127.0.0.1 -p 2048 -t 50 -c 100 -n 10000
int main(int argc, char *argv[]) {

	int ret = 0;
	test_context_t ctx = {0};
	

	int opt;
	while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1) {

		switch (opt) {

			case 's':
				printf("-s: %s\n", optarg);
				strcpy(ctx.serverip, optarg);
				break;

			case 'p':
				printf("-p: %s\n", optarg);

				ctx.port = atoi(optarg);
				break;

			case 't':
				printf("-t: %s\n", optarg);
				ctx.threadnum = atoi(optarg);
				break;

			case 'c':
				printf("-c: %s\n", optarg);
				ctx.connection = atoi(optarg);
				break;

			case 'n':
				printf("-n: %s\n", optarg);
				ctx.requestion = atoi(optarg);
				break;

			default:
				return -1;
		
		}
		
	}

	pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));
	int i = 0;

	struct timeval tv_begin;
	gettimeofday(&tv_begin, NULL);
	for (i = 0;i < ctx.threadnum;i ++) {
		pthread_create(&ptid[i], NULL, test_qps_entry, &ctx);
	}
	
	for (i = 0;i < ctx.threadnum;i ++) {
		pthread_join(ptid[i], NULL);
	}

	struct timeval tv_end;
	gettimeofday(&tv_end, NULL);

	int time_used = TIME_SUB_MS(tv_end, tv_begin);

	printf("success: %d, failed: %d, time_used: %d, qps: %d\n", ctx.requestion-ctx.failed, 
		ctx.failed, time_used, ctx.requestion * 1000 / time_used);


	free(ptid);

	return ret;
}

简单介绍一下客户端的代码,就是将一段数据写入到缓冲区当中,然后发送给服务端,我们在这儿的逻辑就是通过启动多个客户端发送请求,然后对应的服务端进行处理,看其处理时间,在这儿跟之前的 epoll 进行对比,查看两者之间的性能差距。

之前 epoll 的代码如下:

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>

#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024 * 1024
#define MAX_PORTS 1
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

typedef int (*CALLBACK)(int fd);

int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
struct timeval begin;

int epfd = 3;

struct conn
{
    // 负责IO的文件描述符
    int fd;

    // 接收缓冲区的buffer
    char rbuffer[BUFFER_LENGTH];
    int rlength;

    // 发送缓冲区的buffer
    char wbuffer[BUFFER_LENGTH];
    int wlength;

    // 三个对应的回调函数
    CALLBACK send_callback;

    union
    {
        CALLBACK accept_callback;
        CALLBACK recv_callback;
    } r_action;
};

struct conn con_list[CONNECTION_SIZE] = {0};

void send_event(int fd, int event, int flag)
{
    if (flag)
    {
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = event;

        epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
    }
    else
    {
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = event;

        epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
    }
}

int event_register(int fd, int event)
{
    if (fd < 0)
    {
        return -1;
    }

    con_list[fd].fd = fd;
    con_list[fd].r_action.recv_callback = recv_cb;
    con_list[fd].send_callback = send_cb;

    memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);
    con_list[fd].rlength = 0;

    memset(con_list[fd].wbuffer, 0, BUFFER_LENGTH);
    con_list[fd].wlength = 0;

    send_event(fd, event, 1);
}

int accept_cb(int fd)
{
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &len);
    if (clientfd < 0)
    {
        printf("accept failed !!!\n");
    }
    else
    {
        // printf("accept finished: %d\n", clientfd);
    }

    event_register(clientfd, EPOLLIN);
    if ((clientfd % 1000) == 0)
    {

        struct timeval current;
        gettimeofday(&current, NULL);

        int time_used = TIME_SUB_MS(current, begin);
        memcpy(&begin, &current, sizeof(struct timeval));

        printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);
    }

    return 0;
}

int recv_cb(int fd)
{
    // memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);
    int count = recv(fd, con_list[fd].rbuffer, BUFFER_LENGTH, 0);
    if (count == 0)
    {
        printf("client disconnect: %d\n", fd);
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        close(fd);

        return 0;
    }
    else if (count < 0)
    {
        printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        close(fd);
        return 0;
    }

    con_list[fd].rlength = count;
    // printf("recv succ: %s\n", con_list[fd].rbuffer);

#if 1
    con_list[fd].wlength = con_list[fd].rlength;
    memcpy(con_list[fd].wbuffer, con_list[fd].rbuffer, con_list[fd].rlength);
#endif
    send_event(fd, EPOLLOUT, 0);
    return count;
}

int send_cb(int fd)
{
    int count = send(fd, con_list[fd].wbuffer, BUFFER_LENGTH, 0);

    send_event(fd, EPOLLIN, 0);
    return count;
}

int init_server(unsigned short port)
{
    // 创建套接字
    int socketfd = socket(AF_INET, SOCK_STREAM, 0);
    printf("socketfd: %d\n", socketfd);

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
    serveraddr.sin_port = htons(port);              // 0 ~ 1023

    // 绑定套接字
    int ret = bind(socketfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));
    if (ret == -1)
    {
        printf("bind failed: %s\n", strerror(errno));
    }

    // 监听套接字
    listen(socketfd, 10);

    return socketfd;
}

int main()
{
    unsigned short port = 2000;

    int epfd = epoll_create(1);
    printf("epfd: %d\n", epfd);

    int i = 0;
    for (i = 0; i < MAX_PORTS; i++)
    {
        int sockfd = init_server(port + i);
        // printf("socket fd: %d\n", sockfd);
        con_list[sockfd].fd = sockfd;
        con_list[sockfd].r_action.recv_callback = accept_cb;

        send_event(sockfd, EPOLLIN, 1);
    }

    gettimeofday(&begin, NULL);

    while (1)
    {
        struct epoll_event events[1024] = {0};
        // 将就绪事件放入到就绪队里当中
        int nready = epoll_wait(epfd, events, 1024, -1);

        for (int i = 0; i < nready; i++)
        {
            int connfd = events[i].data.fd;

#if 0
            if((events[i].events & EPOLLIN))
            {
                con_list[i].r_action.recv_callback(connfd);
            }
            else if ((events[i].events & EPOLLOUT))
            {
                con_list[i].send_callback(connfd);
            }
#else
            if ((events[i].events & EPOLLIN))
            {
                con_list[connfd].r_action.recv_callback(connfd);
            }

            if ((events[i].events & EPOLLOUT))
            {
                con_list[connfd].send_callback(connfd);
            }
#endif
        }
    }

    return 0;
}

测试的两者均创建 100 个线程,发起一百万个请求,查看其处理时间,首先来看 io_uring 的测试结果,花费时间为 16 ms左右:在这里插入图片描述

接下来再来看 epoll 的处理性能:
在这里插入图片描述
两者的差距在 2ms 左右,其实对比下来 io_uring 还是有一个提升的,对于更多的连接的时候,io_uring 的效率还是会优于 epoll 的。