一、介绍
1.为什么需要reactor网络模型
1.1 高并发支持
非阻塞I/O:Reactor模型通过非阻塞I/O操作,允许单线程处理多个连接,减少线程切换开销,提升并发能力。
事件驱动:基于事件驱动机制,系统只在有事件发生时处理,避免忙等待,提高资源利用率。
1.2 资源高效
减少线程开销:传统多线程模型中,每个连接需要一个线程,Reactor模型通过少量线程处理多个连接,降低线程创建和上下文切换的开销。
内存占用低:由于线程数减少,内存占用也相应降低,适合资源受限的环境。
1.3 可扩展性强
模块化设计:Reactor模型将事件分发与处理逻辑分离,便于扩展和维护。
支持多种I/O多路复用:如select、poll、epoll等,适应不同平台和需求。
1.4 响应迅速
低延迟:事件驱动机制确保事件能快速响应,减少等待时间,提升系统响应速度。
1.5 简化编程
统一事件处理:所有事件通过同一接口处理,简化编程模型,降低复杂度。
避免竞态条件:单线程或少量线程处理事件,减少多线程环境下的竞态问题。
1.6 适用场景广泛
高并发服务器:如Web服务器、游戏服务器等。
实时系统:如即时通讯、在线交易等低延迟场景。
分布式系统:如消息队列、RPC框架等。
2.实现思路
根据不同的io事件,调用不同的回调函数
io event callback
listenfd EPOLLIN accept_cb
clientfd EPOLLIN recv_cb
clientfd EPOLLOUT send_cb
二、代码
1.创建服务端
int initserver(int port){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){
perror("bind failed!");
printf("bind failed!");
}
listen(sockfd, 10);
return sockfd;
}
对传入的端口进行绑定
2.创建epoll
创建一个io的一个结构体,定义io的连接数量
struct conn{
int fd;
char rbuffer[BUFFERLENGTH];
int rbufferlength;
char wbuffer[BUFFERLENGTH];
int wbufferlength;
CALLBACK send_callback;
union
{
CALLBACK accept_callback;
CALLBACK recv_callback;
}r_action;
};
struct conn conn_list[CONNECTIONSIZE] = {0};
创建epoll将服务端的fd加入到epoll中,并设置事件类型
epfd = epoll_create(1);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.accept_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
int set_event(int fd, int event, int flag){
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
if(flag){
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}else{
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
3.进入mainloop,根据事件类型调用相应的回调函数
while(1){
struct epoll_event events[1024]={0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for(i = 0; i < nready; i++){
int connfd = events[i].data.fd;
printf("connfd:%d\n", connfd);
//这个地方使用两个if而不是使用if-else if是因为该io可能同时存在EPOLLIN和EPOLLOUT
if(events[i].events & EPOLLIN){
conn_list[connfd].r_action.recv_callback(connfd);
}
if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
}
}
3.1 三个回调函数
1.accept_cb:返回监听到的clientfd
int accept_cb(int fd){
struct sockaddr_in clientaddr;
int len = sizeof(clientaddr);
memset(&clientaddr, 0, sizeof(clientaddr));
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
if(clientfd < 0){
printf("err clientf < 0");
}
event_register(clientfd, EPOLLIN);
return clientfd;
}
2.recv_cb:接收相应客户端发送的数据
int recv_cb(int fd){
memset(conn_list[fd].rbuffer, 0, sizeof(conn_list[fd].rbuffer));
int count = recv(fd, conn_list[fd].rbuffer, sizeof(conn_list[fd].rbuffer), 0);
if(count == 0){
printf("client disconnect: %d\n", fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}else if(count < 0){
printf("count:%d, errno:%s\n", count, strerror(errno));
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, count);
conn_list[fd].wbufferlength = count;
printf("[%d]RECV: %s\n", count, conn_list[fd].rbuffer);
set_event(fd, EPOLLOUT, 0);
return count;
}
3.send_cb:发送数据
int send_cb(int fd){
int count = 0;
if(conn_list[fd].wbufferlength > 0){
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wbufferlength, 0);
if(count <= 0) {
printf("send failed: %s\n", strerror(errno));
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return -1;
}
conn_list[fd].wbufferlength = 0; // 清空发送缓冲区长度
}
set_event(fd, EPOLLIN, 0);
return count;
}
4.完整代码
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/epoll.h>
#include <errno.h>
#define BUFFERLENGTH 1024
#define CONNECTIONSIZE 1024
typedef int (*CALLBACK)(int fd);
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int epfd = 0;
struct conn{
int fd;
char rbuffer[BUFFERLENGTH];
int rbufferlength;
char wbuffer[BUFFERLENGTH];
int wbufferlength;
CALLBACK send_callback;
union
{
CALLBACK accept_callback;
CALLBACK recv_callback;
}r_action;
};
struct conn conn_list[CONNECTIONSIZE] = {0};
int set_event(int fd, int event, int flag){
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
if(flag){
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}else{
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event){ //将事件传入到conn_list中
if(fd < 0){
return -1;
}
conn_list[fd].fd = fd;
conn_list[fd].r_action.accept_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFERLENGTH);
conn_list[fd].rbufferlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFERLENGTH);
conn_list[fd].wbufferlength = 0;
set_event(fd, event, 1);
}
int accept_cb(int fd){
struct sockaddr_in clientaddr;
int len = sizeof(clientaddr);
memset(&clientaddr, 0, sizeof(clientaddr));
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
if(clientfd < 0){
printf("err clientf < 0");
}
event_register(clientfd, EPOLLIN);
return clientfd;
}
int recv_cb(int fd){
memset(conn_list[fd].rbuffer, 0, sizeof(conn_list[fd].rbuffer));
int count = recv(fd, conn_list[fd].rbuffer, sizeof(conn_list[fd].rbuffer), 0);
if(count == 0){
printf("client disconnect: %d\n", fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}else if(count < 0){
printf("count:%d, errno:%s\n", count, strerror(errno));
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, count);
conn_list[fd].wbufferlength = count;
printf("[%d]RECV: %s\n", count, conn_list[fd].rbuffer);
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd){
int count = 0;
if(conn_list[fd].wbufferlength > 0){
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wbufferlength, 0);
if(count <= 0) {
printf("send failed: %s\n", strerror(errno));
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return -1;
}
conn_list[fd].wbufferlength = 0; // 清空发送缓冲区长度
}
set_event(fd, EPOLLIN, 0);
return count;
}
int initserver(int port){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){
perror("bind failed!");
printf("bind failed!");
}
listen(sockfd, 10);
return sockfd;
}
int main(){
unsigned short port = 2077;
int sockfd = initserver(port);
epfd = epoll_create(1);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.accept_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
while(1){
struct epoll_event events[1024]={0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for(i = 0; i < nready; i++){
int connfd = events[i].data.fd;
printf("connfd:%d\n", connfd);
//这个地方使用两个if而不是使用if-else if是因为该io可能同时存在EPOLLIN和EPOLLOUT
if(events[i].events & EPOLLIN){
conn_list[connfd].r_action.recv_callback(connfd);
}
if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
}
}
}
分享一个学习链接,有学习需要的同学可以看一下: