io_uring的使用示例及其解释

发布于:2024-05-13 ⋅ 阅读:(51) ⋅ 点赞:(0)

1 io_uring机制

1.1 io_uring机制

io_uring 是 Linux 内核提供的一种高性能的异步 I/O 操作机制,用于提高 I/O 操作的效率和吞吐量。它在 Linux 内核版本 5.1 中首次引入。

以下是 io_uring 机制的一些关键特点和优势:

  • 零拷贝:io_uring 使用了零拷贝技术,将数据从用户空间传输到内核空间,然后再传输到目标设备,避免了数据的多次复制,提高了性能。
  • 内存映射:io_uring 支持内存映射技术,允许用户空间程序直接访问内核空间中的数据,减少了数据的拷贝,提高了效率。
  • 批量提交:io_uring 允许用户在单个系统调用中提交多个 I/O 操作,减少了系统调用的开销,提高了系统的吞吐量。
  • 高性能:相比传统的异步 I/O 模型和线程池模型,io_uring 在大量 I/O 操作的场景下具有更好的性能表现,可以显著减少 CPU 的利用率和系统的开销。
  • 灵活性:io_uring 提供了丰富的系统调用接口,支持各种类型的 I/O 操作,包括文件 I/O、网络
    I/O、定时器等,可以满足不同场景下的需求。
  • 可扩展性:io_uring 可以轻松地与现有的异步编程模型和库(如 libuv、Boost.Asio
    等)集成,提供更加灵活和高效的异步编程解决方案。

io_uring 机制的实现原理涉及到了 Linux 内核的多个子系统,包括块设备、网络设备、文件系统等。它通过使用 ring buffer、completion queue、submission queue 等数据结构,实现了高效的异步 I/O 操作管理和调度机制。同时,io_uring 还提供了3个系统调用接口(io_uring_setup()、io_uring_enter()、io_uring_register()),用于配置和控制 io_uring 环,并提交和处理异步 I/O 操作。

1.2 io_uring系统调用接口功能介绍

这几个系统调用接口都在io_uring.c文件中。

1.2.1 io_uring_setup():

SYSCALL_DEFINE2(io_uring_setup, u32, entries,
                struct io_uring_params __user *, params)                                                                                                                                                           
{
        return io_uring_setup(entries, params);
}

功能:用于初始化和配置 io_uring 。
应用用途:在使用 io_uring 之前,首先需要调用此接口初始化一个 io_uring 环,并设置其参数。

1.2.2 io_uring_enter():

SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,                                                                                                                                                  
                u32, min_complete, u32, flags, const void __user *, argp,
                size_t, argsz)

功能:用于提交和处理异步 I/O 操作。
应用用途:在向 io_uring 环中提交 I/O 操作后,通过调用此接口触发内核处理这些操作,并获取完成的操作结果。

1.2.3 io_uring_register():

SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
                void __user *, arg, unsigned int, nr_args)

功能:用于注册文件描述符、缓冲区、事件文件描述符等资源到 io_uring 环中。
应用用途:在进行 I/O 操作之前,需要将相关的资源注册到 io_uring 环中,以便进行后续的异步 I/O 操作。

2 liburing

2.1 liburing简介

liburing 是io_uring的实现者Jens Axboe为了简化用户使用io_uring所实现的一个用户空间的 C 库,用于简化在 Linux 系统上使用 io_uring 的开发。它提供了一组简洁而强大的 API,使开发者可以更轻松地利用 io_uring 的高性能异步 I/O 功能,而无需深入了解 io_uring 的内部工作原理。以下是 liburing 的主要特点和功能:

  • 简单易用:liburing 提供了一组简洁、清晰的 API,方便开发者快速上手使用 io_uring,无需深入了解复杂的底层实现细节。
  • 高性能:liburing 基于 io_uring 机制实现,能够充分利用 Linux 内核提供的高性能异步 I/O
    能力,提供了比传统的异步 I/O 接口更高的性能和吞吐量。
  • 零拷贝:liburing 支持零拷贝技术,可以直接在用户空间和内核空间之间传递数据,避免了数据的多次拷贝,提高了效率。
  • 灵活性:liburing 提供了丰富的功能和选项,可以满足各种异步 I/O 操作的需求,包括文件 I/O、网络 I/O、定时器等。
  • 高级特性:除了基本的异步 I/O 操作外,liburing 还支持诸如批量提交、注册文件描述符、事件通知等高级特性,帮助开发者更好地利用 io_uring 的全部功能。

使用 liburing 进行开发时,通常的步骤如下:

  • 引入 liburing 头文件,并链接 liburing 库到你的项目中。
  • 调用 io_uring_queue_init() 初始化一个 io_uring 环。
  • 使用 io_uring_register_files()、io_uring_register_eventfd() 等函数注册所需的文件描述符、事件通知等资源到 io_uring 环中。
  • 使用 io_uring_prep_* 系列函数准备异步 I/O 操作。
  • 调用 io_uring_submit() 提交异步 I/O 操作到 io_uring 环中。
  • 调用 io_uring_wait_cqe() 等函数等待异步操作完成,并处理结果。
  • 调用 io_uring_cqe_seen() 函数通知内核已经处理完了这些cqe,可以将其从完成队列中移除。
  • 调用 io_uring_queue_exit() 函数释放了所有与 io_uring 环相关的资源,避免资源泄漏和内存泄漏。

2.2 liburing编译

2.2.1 liburing的代码

liburing的代码链接如下所示,可以直接选择master或者特定tag的带下下载下来使用。
axboe/liburing

2.2.2 编译

通过 configure --cc=xxx --cxx=yyy 指定所需平台的编译工具或者交叉编译工具,然后编译即可。

Building liburing
-----------------

    #
    # Prepare build config (optional).
    #
    #  --cc  specifies the C   compiler.
    #  --cxx specifies the C++ compiler.
    #
    ./configure --cc=gcc --cxx=g++;

    #
    # Build liburing.
    #
    make -j$(nproc);

    #
    # Install liburing (headers, shared/static libs, and manpage).
    #
    sudo make install;

2.2.3 使用方法

在使用liburing来使用io_uring功能的程序时,可以通过指定链接liburing库即可

3 io_uring测试例

3.1 io_uring server测试程序

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <liburing.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>

#define SERVER_IP "127.0.0.1"
#define QUEUE_DEPTH 256
#define BLOCK_SZ    1024

struct io_data {
    int fd;
    struct iovec iov;
};

static int setup_listening_socket(int port) {
    int sfd, flags;
    struct sockaddr_in addr;

    sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd < 0) {
        perror("socket");
        return -1;
    }

    flags = 1;
    if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) < 0) {
        perror("setsockopt(SO_REUSEADDR)");
        close(sfd);
        return -1;
    }

    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(SERVER_IP);

    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
        perror("bind");
        close(sfd);
        return -1;
    }

    if (listen(sfd, 10) < 0) {
        perror("listen");
        close(sfd);
        return -1;
    }

    return sfd;
}

static void submit_accept(struct io_uring *ring, int fd) {
    struct io_uring_sqe *sqe;
    struct io_data *data;

    data = malloc(sizeof(struct io_data));
    data->fd = fd; // 保存监听套接字描述符

    sqe = io_uring_get_sqe(ring);
    if (!sqe) {
        fprintf(stderr, "Could not get sqe.\n");
        exit(1);
    }
    io_uring_prep_accept(sqe, fd, NULL, NULL, 0);
    io_uring_sqe_set_data(sqe, data);
}

static void handle_connection(struct io_uring *ring, int cfd, int client_num) {
    struct io_uring_sqe *sqe;
    struct io_data *data;
    char *buffer;

    buffer = malloc(BLOCK_SZ);
    data = malloc(sizeof(struct io_data));
    data->fd = cfd; // 保存连接套接字描述符
    
    // 发送“Hello client”消息
    sprintf(buffer, "Hello client %d", client_num);
    send(cfd, buffer, strlen(buffer), 0);

    data->iov.iov_base = buffer;
    data->iov.iov_len = BLOCK_SZ;

    sqe = io_uring_get_sqe(ring);
    if (!sqe) {
        fprintf(stderr, "Could not get sqe.\n");
        exit(1);
    }

    // 接收客户端消息
    io_uring_prep_readv(sqe, cfd, &data->iov, 1, 0);
    io_uring_sqe_set_data(sqe, data);
}

int main(int argc, char *argv[]) {
    int sfd;
    struct io_uring ring;
    struct io_uring_cqe *cqe;
    int client_num = 0;
    struct itimerval timer;

    if (argc < 2) {
        printf("Usage: %s <port>\n", argv[0]);
        return 1;
    }

    sfd = setup_listening_socket(atoi(argv[1]));
    if (sfd < 0)
        return 1;

    if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
        perror("io_uring_queue_init");
        return 1;
    }

    submit_accept(&ring, sfd);

    while (1) {
        if (io_uring_submit(&ring) < 0) {
            perror("io_uring_submit");
            break;
        }

        if (io_uring_wait_cqe(&ring, &cqe) < 0) {
            perror("io_uring_wait_cqe");
            break;
        }

        struct io_data *data = (struct io_data *)io_uring_cqe_get_data(cqe);
        if (cqe->res < 0) {
            fprintf(stderr, "Async read failed.\n");
            return 1;
        } else {
            if (data->fd == sfd) {
                int cfd = cqe->res;
                handle_connection(&ring, cfd, ++client_num);
                submit_accept(&ring, sfd);
                printf("New connection accepted, and the socket fd is %d\n", cfd);
            } else {
                write(STDOUT_FILENO, data->iov.iov_base, cqe->res);
                sprintf(data->iov.iov_base, "Response from server to client %d: %.*s", client_num, cqe->res, (char *)data->iov.iov_base);
                send(data->fd, data->iov.iov_base, strlen(data->iov.iov_base), 0);
            }
        }

        io_uring_cqe_seen(&ring, cqe);
        free(data->iov.iov_base);
        free(data);
    }

    io_uring_queue_exit(&ring);

    return 0;
}

3.2 编译测试程序

gcc -o io_uring_tcp_server io_uring_tcp_server.c -luring

3.3 验证方法

在这里插入图片描述

3.4 代码解释

3.4.1 setup_listening_socket

static int setup_listening_socket(int port) {
    int sfd, flags;
    struct sockaddr_in addr;

    sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd < 0) {
        perror("socket");
        return -1;
    }

    flags = 1;
    if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) < 0) {
        perror("setsockopt(SO_REUSEADDR)");
        close(sfd);
        return -1;
    }

    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(SERVER_IP);

    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
        perror("bind");
        close(sfd);
        return -1;
    }

    if (listen(sfd, 10) < 0) {
        perror("listen");
        close(sfd);
        return -1;
    }

    return sfd;
}

该函数的用途是为服务器程序创建一个监听套接字,并将其绑定到指定的端口上,以便接受客户端的连接请求。在服务器程序启动时,通常会调用此函数来初始化监听套接字,从而开始监听指定端口上的连接请求。

  • 使用 socket() 函数创建一个套接字,指定协议族为 IPv4(AF_INET)、套接字类型为流式套接字(SOCK_STREAM)。
  • 设置套接字选项 SO_REUSEADDR,使得在服务器重启后能够立即使用同一端口。这样可以避免出现"Address already in use"错误。
  • 初始化一个 sockaddr_in 结构体,并将其填充为服务器的地址信息,包括协议族、端口号和 IP 地址。
  • 调用 bind() 函数将套接字与指定的地址进行绑定,即将地址绑定到套接字上。
  • 调用 listen() 函数将套接字设置为监听状态,使其可以接受客户端的连接请求,并设置连接请求的队列长度为 10。

3.4.2 submit_accept

static void submit_accept(struct io_uring *ring, int fd) {
    struct io_uring_sqe *sqe;
    struct io_data *data;

    data = malloc(sizeof(struct io_data));
    data->fd = fd; // 保存监听套接字描述符

    sqe = io_uring_get_sqe(ring);
    if (!sqe) {
        fprintf(stderr, "Could not get sqe.\n");
        exit(1);
    }
    io_uring_prep_accept(sqe, fd, NULL, NULL, 0);
    io_uring_sqe_set_data(sqe, data);
}

这段代码的含义是准备一个接受连接请求的操作,并将其提交到 io_uring 中等待执行。在异步 I/O 编程中,通常会通过类似的方式准备各种 I/O 操作,并将其提交到 io_uring 中,以实现高效的异步 I/O 处理。
这段代码主要的完成的工作为:

  • 分配一个 io_data 结构体的内存空间,并将监听套接字描述符 fd 存储在该结构体中,以便后续操作使用。

  • 调用 io_uring_get_sqe() 函数获取一个提交队列元素(Submission Queue Entry,SQE),用于描述待提交的 I/O 操作。

  • 检查获取的 SQE 是否有效,如果为 NULL,则打印错误信息并退出程序。

  • 调用 io_uring_prep_accept() 函数准备一个接受连接请求的操作,填充到获取的 SQE 中。此处传入的参数为监听套接字描述符 fd,其他参数为 NULL,表示接受连接请求时不需要指定对端地址信息和长度。

  • 调用 io_uring_sqe_set_data() 函数将之前分配的 io_data 结构体指针与 SQE关联,以便在完成操作时能够获取到正确的套接字描述符。

3.4.3 handle_connection

static void handle_connection(struct io_uring *ring, int cfd, int client_num) {
    struct io_uring_sqe *sqe;
    struct io_data *data;
    char *buffer;

    buffer = malloc(BLOCK_SZ);
    data = malloc(sizeof(struct io_data));
    data->fd = cfd; // 保存连接套接字描述符
    
    // 发送“Hello client”消息
    sprintf(buffer, "Hello client %d", client_num);
    send(cfd, buffer, strlen(buffer), 0);

    data->iov.iov_base = buffer;
    data->iov.iov_len = BLOCK_SZ;

    sqe = io_uring_get_sqe(ring);
    if (!sqe) {
        fprintf(stderr, "Could not get sqe.\n");
        exit(1);
    }

    // 接收客户端消息
    io_uring_prep_readv(sqe, cfd, &data->iov, 1, 0);
    io_uring_sqe_set_data(sqe, data);
}

这段代码的作用是向客户端发送一条欢迎消息,并准备接收客户端发送的消息。通过使用 io_uring 提供的异步 I/O 操作,实现了高效的客户端连接处理。

  • 调用 send() 函数将消息发送给客户端,发送完成后,客户端将收到 “Hello client [client_num]” 的消息。
  • 调用 io_uring_get_sqe() 函数获取一个提交队列元素(Submission Queue Entry,SQE),用于描述待提交的 I/O 操作。
  • 调用 io_uring_prep_readv() 函数准备一个接收消息的操作,并填充到获取的 SQE 中。此处传入的参数为客户端连接套接字描述符 cfd、接收缓冲区 buffer 的信息,以及缓冲区长度。
  • 调用 io_uring_sqe_set_data() 函数将之前分配的 io_data 结构体指针与 SQE 关联,以便在完成操作时能够获取到正确的套接字描述符和数据缓冲区。

3.4.4 main

int main(int argc, char *argv[]) {
    int sfd;
    struct io_uring ring;
    struct io_uring_cqe *cqe;
    int client_num = 0;
    struct itimerval timer;

    if (argc < 2) {
        printf("Usage: %s <port>\n", argv[0]);
        return 1;
    }

    sfd = setup_listening_socket(atoi(argv[1]));
    if (sfd < 0)
        return 1;

    if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
        perror("io_uring_queue_init");
        return 1;
    }

    submit_accept(&ring, sfd);

    while (1) {
        if (io_uring_submit(&ring) < 0) {
            perror("io_uring_submit");
            break;
        }

        if (io_uring_wait_cqe(&ring, &cqe) < 0) {
            perror("io_uring_wait_cqe");
            break;
        }

        struct io_data *data = (struct io_data *)io_uring_cqe_get_data(cqe);
        if (cqe->res < 0) {
            fprintf(stderr, "Async read failed.\n");
            return 1;
        } else {
            if (data->fd == sfd) {
                int cfd = cqe->res;
                handle_connection(&ring, cfd, ++client_num);
                submit_accept(&ring, sfd);
                printf("New connection accepted, and the socket fd is %d\n", cfd);
            } else {
                write(STDOUT_FILENO, data->iov.iov_base, cqe->res);
                sprintf(data->iov.iov_base, "Response from server to client %d: %.*s", client_num, cqe->res, (char *)data->iov.iov_base);
                send(data->fd, data->iov.iov_base, strlen(data->iov.iov_base), 0);
            }
        }

        io_uring_cqe_seen(&ring, cqe);
        free(data->iov.iov_base);
        free(data);
    }

    io_uring_queue_exit(&ring);

    return 0;
}

该程序实现了一个简单的基于异步 I/O 的 TCP 服务器,能够同时处理多个客户端的连接和消息交互,提供了高性能和高并发的网络服务能力。

  • main 函数首先解析命令行参数,获取服务器监听的端口号。如果命令行参数不足,则打印用法提示并退出程序。
  • 调用 setup_listening_socket 函数创建一个监听套接字,并将其绑定到指定端口上。如果创建和绑定套接字失败,则程序返回并退出。
  • 调用 io_uring_queue_init 函数初始化一个 io_uring 环,设置了队列的深度为 QUEUE_DEPTH,并将其赋值给 ring 结构体。如果初始化失败,则打印错误信息并退出程序。
  • 调用 submit_accept 函数向 io_uring 提交一个接受连接请求的操作。
  • 使用 io_uring_submit 函数将提交队列中的操作提交到 io_uring 环中进行处理。
  • 使用 io_uring_wait_cqe 函数等待一个完成事件,一旦有完成事件就绪,就会返回一个指向完成队列元素(CQE)的指针 cqe。
  • 如果是新的连接请求完成,则调用 handle_connection 函数处理新的连接,并继续提交下一个接受连接请求的操作。
  • 如果是数据读取完成,则将客户端发送的消息原样返回,并发送一条响应消息给客户端。
  • 使用 io_uring_cqe_seen 函数通知 io_uring 环,已经处理完了这个完成事件。
  • 在程序退出之前,调用 io_uring_queue_exit 函数清理和释放与 io_uring 环相关的资源。

3.5 io_uring client测试代码

下面是一个使用io_uring的client端测试代码,编译和测试方式通io_uring server类似。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <liburing.h>
#include <sys/resource.h>
#include <time.h>

#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 12345
#define MESSAGE "Hello, Server!\r\n"
#define BUFFER_SIZE 1024
#define QUEUE_DEPTH 1
//#define NUM_CLIENTS 50
#define NUM_CLIENTS 1
#define STATS_INTERVAL 10

struct io_data {
    int fd;
    struct iovec iov;
};

void collect_stats(time_t start_time, int requests, pid_t pid) {
    struct rusage usage;
    double cpu_usage;
    int ret;
    FILE *fp;
    unsigned long mem_usage;
    time_t end_time;
    double elapsed_time;
    char line[128];
    double rps;

    printf("----------------------- stat pid:%d start ---------------------------\n", pid);
    // 获取CPU利用率
    ret = getrusage(RUSAGE_SELF, &usage);
    if (ret == 0) {
        cpu_usage = ((double)usage.ru_utime.tv_sec + (double)usage.ru_utime.tv_usec / 1000000.0 +
                     (double)usage.ru_stime.tv_sec + (double)usage.ru_stime.tv_usec / 1000000.0) * 100.0;
        printf("CPU Usage (PID %d): %.2f%%\n", pid, cpu_usage);
    }

    // 获取内存使用情况
    fp = fopen("/proc/self/statm", "r");
    if (fp != NULL) {
        if (fgets(line, sizeof(line), fp) != NULL) {
            sscanf(line, "%*s %lu", &mem_usage);
            printf("Memory Usage (PID %d): %lu pages\n", pid, mem_usage);
        }
        fclose(fp);
    }

    // 计算RPS
    end_time = time(NULL);
    elapsed_time = difftime(end_time, start_time);
    rps = (double)requests / elapsed_time;
    printf("Requests Per Second (RPS) (PID %d): %.2f\n", pid, rps);

    // 打印统计信息
    printf("----------------------- stat pid:%d end ---------------------------\n", pid);
}

static void send_message(struct io_uring *ring, int sockfd, const char *msg) {
    struct io_uring_sqe *sqe;
    struct io_data *data;

    data = malloc(sizeof(struct io_data));

    data->iov.iov_base = (void *)msg;
    data->iov.iov_len = strlen(msg);

    sqe = io_uring_get_sqe(ring);
    if (!sqe) {
        fprintf(stderr, "Could not get sqe.\n");
        exit(EXIT_FAILURE);
    }

    // io_uring_prep_writev(sqe, sockfd, &data->iov, 1, 0);
    io_uring_prep_send(sqe, sockfd, msg, strlen(msg), 0);
    io_uring_sqe_set_data(sqe, data);

    if (io_uring_submit(ring) < 0) {
        perror("io_uring_submit");
        exit(EXIT_FAILURE);
    }
}

void client_func() {
    struct sockaddr_in server_addr;
    int sock_fd, ret;
    struct io_uring ring;
    struct io_uring_sqe *sqe;
    struct io_uring_cqe *cqe;
    char buffer[BUFFER_SIZE];
    int requests = 0;
    time_t start_time = time(NULL);
    pid_t pid = getpid();

    printf("The cur task id is: %d\r\n", pid);

    // 创建socket
    sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (sock_fd == -1) {
        perror("socket");
        exit(EXIT_FAILURE);
    }

    // 初始化服务器地址结构
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(SERVER_IP);
    server_addr.sin_port = htons(SERVER_PORT);

    // 连接到服务器
    if (connect(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("connect");
        close(sock_fd);
        exit(EXIT_FAILURE);
    }

    // 初始化io_uring
    if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
        perror("io_uring_queue_init");
        close(sock_fd);
        exit(EXIT_FAILURE);
    }

    while (1) {
        // 准备发送消息的操作
        sqe = io_uring_get_sqe(&ring);
        if (!sqe) {
            perror("io_uring_get_sqe");
            close(sock_fd);
            io_uring_queue_exit(&ring);
            exit(EXIT_FAILURE);
        }
        io_uring_prep_send(sqe, sock_fd, MESSAGE, strlen(MESSAGE), 0);
        io_uring_sqe_set_data(sqe, NULL);

        // 提交发送消息的操作
        if (io_uring_submit(&ring) != QUEUE_DEPTH) {
            perror("io_uring_submit");
            close(sock_fd);
            io_uring_queue_exit(&ring);
            exit(EXIT_FAILURE);
        }

        // 等待发送完成
        ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) {
            perror("io_uring_wait_cqe");
            close(sock_fd);
            io_uring_queue_exit(&ring);
            exit(EXIT_FAILURE);
        }

        // 接收服务器的响应
        ret = recv(sock_fd, buffer, BUFFER_SIZE, 0);
        if (ret == -1) {
            perror("recv");
            printf("recv error.\n");
            close(sock_fd);
            io_uring_cqe_seen(&ring, cqe);
            io_uring_queue_exit(&ring);
            exit(EXIT_FAILURE);
        } else if (ret == 0) {
            io_uring_cqe_seen(&ring, cqe);
            printf("Connection closed by server\n");
            break;
        } else {
            io_uring_cqe_seen(&ring, cqe);
            printf("Received message from server: %.*s\r\n", ret, buffer);

            // 统计请求数
            requests++;

            printf(" ============================= pid:%d requests:%d sleep =============================\r\n", pid, requests);
            // 每隔1秒钟进行一次通信
            sleep(1);
            printf(" ============================= pid:%d requests:%d weekup =============================\r\n", pid, requests);

            // 每隔一定时间打印一次统计信息
            if (requests % STATS_INTERVAL == 0) {
                printf(" ############################################ 1 ##################################################\r\n");
                collect_stats(start_time, requests, pid);
                printf(" ############################################ 2 ##################################################\r\n");
            }

            send_message(&ring, sock_fd, "Reply from client\r\n");
        }
    }

    // 关闭socket
    close(sock_fd);
    io_uring_queue_exit(&ring);
}


int main() {
    int i;

    for (i = 0; i < NUM_CLIENTS; i++) {
        pid_t pid = fork();
        if (pid == -1) {
            perror("fork");
            exit(EXIT_FAILURE);
        } else if (pid == 0) {
            client_func();
            exit(EXIT_SUCCESS);
        }
    }

    // 等待所有子进程结束
    while (wait(NULL) > 0);

    return 0;
}