1.初识io_uring
(1)io_uring是什么?
从名字上解读,是一个提供给用户处理IO的环形队列,实现用户程序提交完IO请求后,能执行其他代码,不会被阻塞。让内核来异步(线程池)执行真正的IO操作,当IO操作完成,内核会将操作结果存放在完成环形队列,通过如epoll的通知机制让用户程序直接取出IO操作的结果。
(2)能异步的对象有?
不仅包括传统的文件 I/O 操作(如 read、write、open、close 等),还支持网络相关的系统调用,如 send、recv、accept、connect 等,所以可以通过io_uring构建高并发高性能的网络服务器,例如,在一个文件服务器中,当有多个客户端同时请求读取文件时,使用 io_uring 可以让服务器在处理一个客户端的 I/O 请求时,同时处理其他客户端的请求,而不需要等待每个 I/O 请求都完成后再处理下一个,大大提高了服务器的响应速度和吞吐量。
(3)环形队列的机制是?
在初始化 io_uring 时,通过内存映射(mmap)机制,将提交队列(submit queue)和完成队列(complete queue)映射到用户空间和内核空间,从而避免了数据传递需要的系统调用和上下文切换。
(4)io_uring的三个系统调用的作用分别是?
⑴io_uring_setup
接受两个参数,第一个参数是期望的提交队列(SQ)的大小,即队列中可以容纳的 I/O 请求数量;第二个参数是一个结构体指针,该结构体用于返回 io_uring 实例的相关参数io_uring_params,如实际分配的 SQ 和完成队列(CQ)的大小、队列的偏移量等信息。
工作一:为 io_uring 实例分配所需的内存空间,包括 SQ、CQ 以及相关的控制结构。
工作二:返回一个fd,用于操作初始化的这个操作该 io_uring 实例。
⑵io_uring_enter
主要作用是将应用程序准备好的 I/O 请求提交给内核,并可以根据传入的参数选择 不等待直接返回 / 等待至少n个IO有结果返回 /等待全部有结果返回。
⑶io_uring_register
用于注册文件描述符或事件文件描述符到 io_uring 实例中。这样内核在IO处理的时候,可以直接访问这些预先注册的资源,而无需每次都重新设置相关信息,从而提高了 I/O 操作的效率。例如,在进行大量文件读写操作时,预先注册文件描述符可以避免每次提交 I/O 请求时都进行文件描述符的查找和验证。
(5)io_uring的应用场景有?
高性能网络服务器,无需阻塞等待操作完成,可以立即处理其他请求,从而提高了并发处理能力。
数据库系统、存储服务系统等需要频繁地进行读写操作的业务。
(6)submit queue的结点和compelete queue的结点是什么关系?
实际上是同一个结点。但是情况 compelete queue不会影响submit queue队列中的结点。因为不是删除。
(7)io_uring和epoll有哪些不同?
1)io_uring每处理完一个事件之后,需要再通过prepare再设置事件进submit队列,否则用户操作只能有一次。而epoll是持续监听fd的,不需要多次设置事件。
2)epoll 通过内核和用户空间的数据拷贝来传递事件信息,而 io_uring 基于两个共享环形缓冲区,内核和用户可以共同操作
3)epoll主要用于IO多路复用,给出的是可以操作;io_uring不仅用于事件通知,还能直接向IO操作,直接给出结果。
4)难度不同,因为io_uring在每处理完一个事件后需要思考设置新事件,因此逻辑上更困难,而epoll相对简单。
(8)如果有多个客户端同时连接uring_tcp_server如何应对
在监听的sockfd上投递多个accept请求,要并发10个就投递10个。
2.用io_uring实现一个tcp_server
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2
struct conn_info { //用于关联 连接 与 结点数据
int fd; //方便知道compelete queue中取出的结点数据 对应是哪一个连接
int event; //方便知道是哪一个事件 进行判断
};
int init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(port);
if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {
perror("bind");
return -1;
}
listen(sockfd, 10);
return sockfd;
}
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
int set_event_recv(struct io_uring *ring, int sockfd,
void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring); //创建结点
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_READ,
};
io_uring_prep_recv(sqe, sockfd, buf, len, flags); //加入submit queue 内部有io_uring_register
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); //给结点绑定信息
}
int set_event_send(struct io_uring *ring, int sockfd,
void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_WRITE,
};
io_uring_prep_send(sqe, sockfd, buf, len, flags); //提交到submitqueue
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
socklen_t *addrlen, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_ACCEPT,
};
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int main(int argc, char *argv[]) {
unsigned short port = 9999;
int sockfd = init_server(port);
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);//初始化两个队列 内部有io_uring_setup系统调用
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
char buffer[BUFFER_LENGTH] = {0};
while (1) {
io_uring_submit(&ring); //提交submit queue的结点到内核 内部有io_uring_enter
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe); //等待compelete queue有结点 并且拿到compelete queue的头指针 cqe
struct io_uring_cqe *cqes[128]; //指向从compelete queue中取出的结点
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); //从compelete queue中取出结点 epoll_wait
int i = 0;
for (i = 0;i < nready;i ++) {
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == EVENT_ACCEPT) { //并不是靠事件类型驱动 这只是普通判断分三种情况
set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
//避免连接断开后无法再连接 因为最后会清空compelete queue
// 不像epoll持续监听 所以需要再重新加入submit队列
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
//避免用户连接后发送数据接收不到
} else if (result.event == EVENT_READ) {
int ret = entries->res;
//printf("set_event_recv ret: %d, %s\n", ret, buffer); //
if (ret == 0) { //避免断开连接后fd = 0 也设置事件一直循环
close(result.fd);
} else if (ret > 0) {
set_event_send(&ring, result.fd, buffer, ret, 0); //收到后设置发送事件
}
} else if (result.event == EVENT_WRITE) {
//
int ret = entries->res;
//printf("set_event_send ret: %d, %s\n", ret, buffer);
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);//发送后再设置回接收事件 用户则可以不停发送
}
}
io_uring_cq_advance(&ring, nready); //清空compelete queue
}
}
编程细节:
1.关联compelete queue结点数据到具体的连接。
2.连接断开服务端如何感应。参考Posix API文章recv()的具体工作。
3.Accept事件可以多次投递,但是recv事件只能等取出完成队列的结果数据后再投递(多个recv事件非阻塞会竞争rbuffer,导致数据覆盖或者乱序),send可以多次投递。
3.测试epoll与uring的性能
编写一个client_test,作为客户端代码,测试与两个服务器分别发送100w个包,全部接收返回需要的时间,以及qps(服务器 每秒能处理的请求数量),包的大小分别是128b、256b、512b。
client_test代码:
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
typedef struct test_info_t {
char serverip[16];
int port;
int threadnum;
int connectnum;
int requestnum;
int falied;
} test_info;
#define MESSAGE "12345\r\n" //8B (隐含\0终止符)
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define pkt_size 256 //512 1024 2048
int send_pkt(int fd) {
char wbuffer[2048] = {0}; //2KB
int i=0;
for(i=0;i<pkt_size / strlen(MESSAGE);i++){
strcpy(wbuffer + i*strlen(MESSAGE),MESSAGE);
}
int ret = send(fd, wbuffer, strlen(wbuffer), 0 );
if (ret < 0) {
exit(1);
}
return 0;
}
int recv_pkt(int fd) {
char rbuffer[2048] = {0}; //2KB
char tmp[2048] = {0};
int total_size = 0;
int recv_size;
while((recv_size = recv(fd, tmp, 2048, 0)) > 0){ //当包大小超过1500B的时候 分批接收
memcpy(rbuffer + total_size,tmp,recv_size); //不用strcopy 不依赖\0
total_size += recv_size;
memset(tmp,0,2048);
}
if (strcmp(rbuffer, wbuffer)) {
printf("recv : '%s' != send : '%s'\n", buffer, MESSAGE);
return -1;
}
return 0;
}
int connect_tcpserver(const char *ip, unsigned short port) {
int connfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(struct sockaddr_in));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(ip);
server_addr.sin_port = htons(port);
int ret = connect(connfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in));
if (ret) {
perror("connect error\n");
return -1;
}
return connfd;
}
static void *thread_callback(void *arg) {
test_info info = *(test_info *)arg;
int cnt = info.requestnum / info.threadnum;
int connfd = connect_tcpserver(info.serverip, info.port);
if (connfd < 0) {
printf("connect fail\n");
info.falied += cnt;
return NULL;
}
int i = 0;
for (i = 0; i < cnt; i++) {
int res = send_pkt(connfd);
if (res) {
printf("send fail\n");
info.falied++;
continue;
}
res = recv_pkt(connfd);
if (res) {
printf("recv fail\n");
info.falied++;
continue;
}
}
return NULL;
}
//./client_test -s 192.168.88.9 -p 2048 -t 50 -c 50 -n 1000000
//服务端ip和port 客户端线程数量 连接数量 发送数据包数量
int main(int argc, char *argv[]) {
int op;
test_info info = {0};
while ((op = getopt(argc, argv, "s:p:t:c:n:?")) != -1) {
switch (op) {
case 's':
strcpy(info.serverip, optarg);
break;
case 'p':
info.port = atoi(optarg);
break;
case 't':
info.threadnum = atoi(optarg);
break;
case 'c':
info.connectnum = atoi(optarg);
break;
case 'n':
info.requestnum = atoi(optarg);
break;
default :
return -1;
}
}
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
pthread_t *tid = malloc(info.threadnum * sizeof(pthread_t));
int i;
for (i = 0; i < info.threadnum; i++) {
pthread_create(&tid[i], NULL, thread_callback, &info);
}
for (i = 0; i < info.threadnum; i++) {
pthread_join(tid[i], NULL); //主线程等待子线程全部返回 并且会回收资源 无需free
}
struct timeval tv_end;
gettimeofday(&tv_end, NULL);
int time_used = TIME_SUB_MS(tv_end, tv_begin);
printf("success: %d, failed: %d, time_used:%d, qps: %d\n", info.requestnum - info.falied, info.falied, time_used,info.requestnum /(time_used / 1000));
return 0;
}
性能测试结果:
(1)
耗时相差1s,qps相差1.5w
(2)
耗时相差1.6s,qps相差2.2w
4.网络面试题
1.为什么tcp握手是三次,挥手是四次?
握手:如果握手是两次的话,B一收到A的请求就会进入established状态。可能会有一种情况是A发送的第一个请求被网络阻塞了,随后A又发了第二个请求,并且成功收到了B的ACK,两个人开始通信,通信后关闭连接。但是这时第一个被阻塞的请求到达了B,B进入连接状态等待A发送消息,但是A是close状态。
挥手:如果挥手是三次的话,在A收到B的释放请求后,直接就结束,那B会不知道自己的请求是否被收到,误认为对方还在等自己的释放请求,实际上A已经close了。所以需要对B再发送一次释放请求的确认,保证B到A方向的关闭。
2.udp的并发如何做?
3.tcp和udp的区别
1.前置不同:tcp基于连接,udp基于数据报
2.分包与黏包的解决方案
tcp可以采用在应用层头部加上两个字节记录包大小,比如要发送的buffer前两个数组空间用于标记长度,对方接收先收两字节,再收后面的。或者分隔符,在每固定大小的后面加上/r/n类似的分隔符,让接收方在接收后判断。
udp需要在代码层对每个包进行id标识,并且在代码层实现确认机制
3.并发的做法不一样。udp需要在代码层模拟tcp的操作实现并发,为每一个用户建立一个连接,服务器和用户通过连接并发交互。而tcp原生的accept就能接收多个连接然后epoll管理。
4.业务场景不同。udp可以做游戏的团战或者下载工具,udp必须在实时性和传输量上做出选择,实时性 = 频繁发送小包 (游戏), 传输量 = 间隔发送大包(下载)。tcp适合做http需要顺序传输或者Websocket需要长连接的业务。
5.udp适合简单交互的业务(短“连接”),比如DNS,请求一次后返回就结束了。tcp适合做长连接,存储状态。
文章参考:
(31 封私信 / 80 条消息) 深入解剖io_uring:Linux异步IO的终极武器 - 知乎
代码参考: