基于 Reactor 模式的高并发 WebSocket Server 实现

发布于:2025-09-11 ⋅ 阅读:(18) ⋅ 点赞:(0)

在高并发网络编程中,传统的 阻塞 I/O + 多线程模型 已经难以支撑百万级连接。为了解决这一问题,Linux 提供了 epoll 等高效 I/O 复用机制,结合 Reactor 模式,我们可以构建一个性能优越的 WebSocket/HTTP Server

一、什么是 Reactor 模式?

Reactor 模式 是一种典型的事件驱动设计模式,核心思想是:

  • Reactor:事件分发器,负责监听和分发事件(如 acceptreadwrite)。

  • Handler:事件处理器,根据事件类型执行相应的业务逻辑。

  • Demultiplexer:事件多路复用器(如 epollselectpoll),负责监控多个连接的状态。

一句话概括:

Reactor 模式就是用一个线程/少量线程,监听所有 socket 的事件,并分发给对应的处理函数。

二、代码结构

在这份实现中,我们将代码分为三个核心文件:

  • Reactor.c
    负责 epoll 事件循环,监听和分发事件。

  • Webserver.c
    负责 HTTP/WebSocket 协议解析和响应。

  • server.h
    定义连接对象 struct conn 以及回调函数指针。

1. Reactor 主循环(Reactor.c)

while(1){   // mainloop
    struct epoll_event events[1024] = {0};
    int nready = epoll_wait(epfd, events, 1024, -1);

    for(int i = 0; i < nready; i++){
        int connfd = events[i].data.fd;

        if(events[i].events & EPOLLIN){
            conn_list[connfd].r_action.recv_callback(connfd);
        }

        if(events[i].events & EPOLLOUT){
            conn_list[connfd].send_callback(connfd);
        }
    }
}

这里就是典型的 Reactor 模式

  • epoll_wait 监听所有事件。

  • 收到 可读事件EPOLLIN)时,调用 recv_callback

  • 收到 可写事件EPOLLOUT)时,调用 send_callback

2. 事件回调(accept / recv / send)

  • accept_cb:处理客户端连接建立。

  • recv_cb:读取请求(支持 HTTP / WebSocket)。

  • send_cb:发送响应(HTTP 响应 或 WebSocket 帧)。

例如 recv_cb

int recv_cb(int fd){
    int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);

    if(count <= 0){
        close(fd);
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        return 0;
    }
    conn_list[fd].rlength = count;

    // 分发给具体协议层(HTTP / WebSocket)
    ws_request(&conn_list[fd]); 

    set_event(fd, EPOLLOUT, 0);  // 切换到可写
    return count;
}

这里可以看到,Reactor 层并不关心业务逻辑,它只负责分发事件。

3. WebServer 协议层(Webserver.c)

http_response 示例:

c->wlength = sprintf(c->wbuffer,
    "HTTP/1.1 200 OK\r\n"
    "Content-Type: text/html\r\n"
    "Content-Length: 82\r\n\r\n"
    "<html><head><title>Charon.html</title></head><body><h1>Charon</h1></body></html>\r\n");
  • 如果是 HTTP 请求,直接返回 HTML 或静态文件。

  • 如果是 WebSocket 请求,则交给 ws_request/ws_response 处理握手和数据帧。

三、Reactor 模式下的 WebSocket/HTTP

1. HTTP 请求流程

  1. 客户端发起 TCP 连接 → accept_cb

  2. 客户端发送 HTTP 报文 → recv_cbhttp_request

  3. 服务器生成 HTTP 响应 → http_responsesend_cb

2. WebSocket 请求流程

  1. 客户端通过 HTTP 请求发起 WebSocket 握手

  2. 服务器返回 Sec-WebSocket-Accept 完成升级。

  3. 后续数据传输通过 WebSocket 帧,走 ws_request/ws_response

这样,整个 Reactor 框架既能处理 普通 HTTP,又能处理 WebSocket 实时通信

四、Reactor 模式业务分层的优点

  • I/O 多路复用层(Reactor.c)

    • 负责 epoll_wait 事件分发

    • 不关心业务逻辑

  • 协议层(Webserver.c / Websocket.c)

    • HTTP / WebSocket 解析

    • 封装成 request/response 接口

  • 业务逻辑层

    • 上层业务只需要实现自己的逻辑,不用关心 epoll

    • 例如:聊天室消息分发、文件传输、心跳检测。

对比点 普通 epoll 写法 Reactor 分层写法
代码结构 事件循环里塞满协议解析和业务逻辑 事件循环只负责分发,协议层和业务层独立
可读性 逻辑混杂,代码庞大难读 清晰分层,逻辑职责单一
可维护性 改协议或改业务逻辑时容易影响主循环 主循环稳定,业务改动只影响对应层
扩展性 新增协议必须改 epoll 循环 新协议只需增加新的回调函数
复用性 协议逻辑和业务绑死在一起 I/O 层、协议层、业务层可独立复用

五、完整代码

1、Reactor.c

#include <errno.h>          // 包含错误码定义
#include <stdio.h>          // 包含标准输入输出函数
#include <sys/socket.h>     // 包含套接字相关函数和结构体
#include <netinet/in.h>     // 包含网络相关结构体定义
#include <string.h>         // 包含字符串操作函数
#include <pthread.h>        // 包含线程相关函数
#include <unistd.h>         // 包含Unix标准函数
#include <sys/epoll.h>      // 包含epoll相关函数和结构体
#include <errno.h>          // 再次包含错误码定义(可能冗余)
#include <sys/time.h>       // 包含时间相关函数和结构体
#include <stdlib.h>         // 包含标准库函数

#include "server.h"         // 包含自定义的服务器头文件

#define CONNECTION_SIZE        1024*1024  // 最大连接数定义
#define MAX_PORTS         20              // 最大监听端口数量
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)  // 计算两个时间戳的毫秒差

// 回调函数声明
int accept_cb(int fd);   // 处理新连接的回调函数
int recv_cb(int fd);     // 处理接收数据的回调函数
int send_cb(int fd);     // 处理发送数据的回调函数

int epfd = 0;            // epoll实例文件描述符
struct timeval begin;    // 用于计时的开始时间

struct conn *conn_list;  // 连接列表指针,用于存储所有连接信息

/**
 * 设置epoll事件
 * @param fd  文件描述符
 * @param event 事件类型(EPOLLIN/EPOLLOUT等)
 * @param flag  1表示添加事件,0表示修改事件
 */
int set_event(int fd, int event, int flag){
    
    if(flag){   // 非零表示添加事件
        struct epoll_event ev;
        ev.events = event;    // 设置事件类型
        ev.data.fd = fd;      // 关联文件描述符
        epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);  // 添加事件到epoll
    }else{  // 零表示修改事件
        struct epoll_event ev;
        ev.events = event;    // 设置新的事件类型
        ev.data.fd = fd;      // 关联文件描述符
        epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);  // 修改epoll中的事件
    }
}

/**
 * 注册事件和连接信息
 * @param fd    文件描述符
 * @param event 要监听的事件
 */
int event_register(int fd, int event){
    if (fd < 0) return -1;  // 无效的文件描述符
    
    // 初始化连接信息
    conn_list[fd].fd = fd;
    conn_list[fd].r_action.recv_callback = recv_cb;  // 设置接收回调
    conn_list[fd].send_callback = send_cb;           // 设置发送回调

    // 初始化接收缓冲区
    memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
    conn_list[fd].rlength = 0;

    // 初始化发送缓冲区
    memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
    conn_list[fd].wlength = 0;

    // 添加事件监听
    set_event(fd, event, 1);
}

/**
 * 处理新连接的回调函数
 * @param fd 服务器监听套接字
 */
int accept_cb(int fd){
    struct sockaddr_in  clientaddr;  // 客户端地址结构体
    socklen_t len = sizeof(clientaddr);  // 地址长度

    // 接受新连接
    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    if(clientfd < 0){  // 接受连接失败
        printf("accept errno: %d --> %s\n", errno, strerror(errno));
        return -1;
    }

    // 注册新连接,监听读事件
    event_register(clientfd, EPOLLIN);

    // 每1000个连接打印一次信息,用于性能统计
    if((clientfd % 1000) == 0){
        struct timeval current;
        gettimeofday(&current, NULL);  // 获取当前时间

        // 计算时间差
        int time_used = TIME_SUB_MS(current,begin);
        memcpy(&begin, &current,sizeof(struct timeval));  // 更新开始时间

        printf("accept finished: %d, time_used: %d\n", clientfd, time_used);
    }

    return 0;
}

/**
 * 处理接收数据的回调函数
 * @param fd 客户端连接套接字
 */
int recv_cb(int fd){
    // 清空接收缓冲区
    memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);
    // 接收数据
    int count = recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);
    
    if(count == 0){  // 客户端断开连接
        printf("client disconnect: %d\n",fd);
        close(fd);   // 关闭连接
        epoll_ctl(epfd,EPOLL_CTL_DEL, fd, NULL);  // 从epoll中移除事件
        return 0;
    }else if(count < 0) {  // 接收数据出错
        printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
        close(fd);   // 关闭连接
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);  // 从epoll中移除事件
        return 0;
    }
    
    // 记录接收数据长度
    conn_list[fd].rlength = count;

#if 0  // 回声模式(当前禁用)
    // 将接收缓冲区数据复制到发送缓冲区
    conn_list[fd].wlength = conn_list[fd].rlength;
    memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
    printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);
#elif 0  // HTTP请求处理(当前禁用)
    http_request(&conn_list[fd]);
#else  // WebSocket请求处理(当前启用)
    ws_request(&conn_list[fd]);
#endif

    // 将事件切换为监听可写事件,准备发送数据
    set_event(fd, EPOLLOUT, 0);
    return count;
}

/**
 * 处理发送数据的回调函数
 * @param fd 客户端连接套接字
 */
int send_cb(int fd){
#if 0  // HTTP响应处理(当前禁用)
    http_response(&conn_list[fd]);
#else  // WebSocket响应处理(当前启用)
    ws_response(&conn_list[fd]);
#endif
    
    int count = 0;  // 发送字节数
    
    // 根据连接状态处理发送
    if(conn_list[fd].status == 1){
        // 状态1:发送数据并继续监听可写事件
        count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
        set_event(fd, EPOLLOUT, 0);
    } else if (conn_list[fd].status == 2){
        // 状态2:继续监听可写事件
        set_event(fd, EPOLLOUT, 0);
    } else if (conn_list[fd].status == 0){
        // 状态0:发送数据(如果有)并切换到监听可读事件
        if(conn_list[fd].wlength != 0){
            count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
        }
        set_event(fd, EPOLLIN, 0);
    }

    return count;
}

/**
 * 初始化服务器监听套接字
 * @param port 监听端口
 * @return 成功返回套接字描述符,失败返回-1
 */
int init_server(unsigned short port){
    // 创建TCP套接字
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    // 初始化服务器地址结构体
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;          // IPv4协议
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);  // 监听所有网卡
    servaddr.sin_port = htons(port);        // 设置端口号

    // 绑定套接字到端口
    if(-1 == bind(sockfd,(struct sockaddr*)&servaddr,sizeof(struct sockaddr))){
        printf("bind failed: %s\n",strerror(errno));
    }

    // 开始监听,最大等待队列长度为10
    listen(sockfd,10);

    return sockfd;  // 返回监听套接字
}

/**
 * 主函数
 */
int main(){
    unsigned short port = 2000;  // 起始监听端口

    // 创建epoll实例
    epfd = epoll_create(1);

    // 分配连接列表内存
    conn_list = malloc(sizeof(struct conn) * CONNECTION_SIZE);
    if (!conn_list) {  // 内存分配失败
        perror("malloc conn_list failed");
        return -1;
    }
    // 初始化连接列表
    memset(conn_list, 0, sizeof(struct conn) * CONNECTION_SIZE);

    // 初始化多个监听端口
    int i = 0;
    for(i = 0;i< MAX_PORTS;i++){
        // 初始化服务器,监听从port开始的多个端口
        int sockfd = init_server(port + i);
        // 设置监听套接字的回调函数(接收新连接)
        conn_list[sockfd].fd = sockfd;
        conn_list[sockfd].r_action.recv_callback =accept_cb;

        // 添加监听套接字到epoll,监听可读事件
        set_event(sockfd, EPOLLIN, 1);
    }

    // 记录开始时间
    gettimeofday(&begin, NULL);

    // 主事件循环
    while(1){
        struct epoll_event events[1024] = {0};  // 存储就绪事件
        // 等待事件发生,超时时间为-1(无限等待)
        int nready = epoll_wait(epfd, events, 1024, -1);

        // 处理所有就绪事件
        int i = 0;
        for(i = 0;i< nready;i++){
            int connfd = events[i].data.fd;  // 获取事件对应的文件描述符

            // 处理可读事件
            if(events[i].events & EPOLLIN){
                conn_list[connfd].r_action.recv_callback(connfd);
            }

            // 处理可写事件
            if(events[i].events & EPOLLOUT){
                conn_list[connfd].send_callback(connfd);
            }
        }
    }
    
    // 释放连接列表内存(理论上不会执行到这里)
    free(conn_list);
    return 0;
}

2、Webserver.c

#include <stdio.h>          // 标准输入输出库,用于printf等函数
#include <unistd.h>         // Unix标准库,提供文件操作、进程控制等功能
#include <sys/stat.h>       // 提供文件状态相关结构体和函数
#include <fcntl.h>          // 文件控制相关函数,如open等
#include <string.h>         // 字符串操作函数库
#include <sys/sendfile.h>   // 提供sendfile函数,用于高效文件传输
#include <errno.h>          // 错误处理相关定义

#include "server.h"         // 自定义服务器头文件,包含连接结构体等定义

#define WEBSERVER_ROOTDIR   "./"  // 定义Web服务器的根目录为当前目录

/**
 * 处理HTTP请求
 * @param c 指向连接结构体的指针,包含客户端连接信息和缓冲区
 * @return 函数目前未实现具体功能,返回值未定义
 */
int http_request(struct conn *c){
    // 清空发送缓冲区,准备存储响应数据
    memset(c->wbuffer, 0, BUFFER_LENGTH);
}

/**
 * 生成HTTP响应
 * @param c 指向连接结构体的指针,用于存储响应数据
 * @return 生成的响应数据长度
 */
int http_response(struct conn *c){
    // 构建HTTP响应报文并存储到发送缓冲区
    // 格式化字符串包含HTTP头部和简单的HTML内容
    c->wlength = sprintf(c->wbuffer,
        "HTTP/1.1 200 OK\r\n"               // HTTP版本和状态码(200表示成功)
        "Content-Type: text/html\r\n"       // 响应内容类型为HTML
        "Accept-Ranges: bytes\r\n"          // 支持字节范围请求
        "Content-Length: 82\r\n"            // 响应体长度(字节数)
        "Date: Tue, 30 Apr 2024 13:16:46 GMT\r\n\r\n"  // 响应日期
        "<html><head><title>Charon.html</title></head><boby><h1>Charon</h1></body></html>\r\n\r\n");  // HTML响应体
    
    return c->wlength;  // 返回响应数据的长度
}

3、server.h



#ifndef __SERVER_H__
#define __SERVER_H__

#define BUFFER_LENGTH     1024

typedef int (*RCALLBACK)(int fd);

struct conn{
    int fd;

    char rbuffer[BUFFER_LENGTH];
    int rlength;

    char wbuffer[BUFFER_LENGTH];
    int wlength;

    int status;

    RCALLBACK send_callback;

    union{
        RCALLBACK recv_callback;
        RCALLBACK accept_callback;
    } r_action;
    
};

int http_request(struct conn *c);
int http_response(struct conn *c);

int ws_request(struct conn *c);
int ws_response(struct conn *c);

#endif

0voice · GitHub


网站公告

今日签到

点亮在社区的每一天
去签到