🔥个人主页🔥:孤寂大仙V
🌈收录专栏🌈:计算机网络
🌹往期回顾🌹:【计算机网络】非阻塞IO——epoll 编程与ET模式详解——(easy)高并发网络服务器设计
🔖流水不争,争的是滔滔不息
一、Reactor模式简介
Reactor 模式是网络编程中最为典型且最实用的一种事件处理模式,广泛应用于高速网络服务器中(比如 Nginx、Redis、Netty 等),旨在实现事件驱动下的非阻塞 I/O,从而用少量线程处理海量连接,大幅提升并发能力。
Reactor 模式整体过程
Reactor 模式主要分为以下几个环节:
事件源(Event Source)
由操作系统提供,如socket、文件、时钟等。这些事件源发生时,会准备好数据或者发生条件。事件分离器(Demultiplexer, Epoll/Select 等)
负责监控一组事件源,若其中有准备好进行I/O的数据时,会将它们分离出去,告知Reactor进行处理。
本实现中我们借助 epoll 进行事件检测,epoll为我们提供高速且灵活的I/O事件通知。事件分发器(Reactor)
Reactor根据事件发生时哪个文件描述符准备好,分发给事先为每个文件准备好的事件处理器(Handlers)进行处理。
本实现中,Reactor为每一个连接绑定了对应的handler(比如说 Connection 类),由handler进行数据读写。事件处理器(Handlers)
事件处理器负责对发生的事件进行具体的数据操作。比如说:
读缓冲区的数据。
进行业务逻辑处理。
再写回到缓冲区中。
本实现中(下面代码),Handlers 就是 Connection 的子类,如 Listener(负责接受新连接)、Channel(负责读写数据)等。
Reactor 模式的数据流
整体的数据流可以简述为:
事件源 -> epoll 等事件分离器 -> Reactor -> 事件处理器 -> 读/写缓冲区 -> 业务处理 -> 发送出去
Reactor 模式优点
- 资源消耗少: 单个Reactor中只需要少量线程即可处理数千甚至数万个并发连接。
- 扩展性强: 适用从中、小到大型网络服务。
- 解耦: 事件分离器、事件分发器、事件处理器之间高度解耦,方便代码扩展和测试。
- 非阻塞: 依赖非阻塞I/O,避免资源浪费和时间消耗。
有兴趣可以了解这篇文章——Reactor模式
英文原版——reactor模式
二、Reactor 模式网络框架实现详解
近几年来,随着并发量增加,Reactor 模式凭借非阻塞 I/O + 事件通知成为实现高速网络服务时最优且最稳定的方案。这种模式主体上依赖 Linux 的 epoll 等内核机制进行事件分发,做到用少量线程处理很多连接。 让我带你看看你实现出的这个网络框架是怎样工作的。
Reactor整体结构
Reactor(反应堆)
负责事件循环(poll)、分发和管理每条连接。
class Reactor {
// 负责:
// 1. epoll_create
// 2. epoll_wait 等待事件发生
// 3. 依序分发给每条 Connection 进行处理
// 4. 新连接时进行添加
// 5. 断线时进行清理
}
Epoller(epoll封裝)
对 epoll_create、epoll_ctl、epoll_wait 进行封裝,为 Reactor 减少实现负担
class Epoller {
// 负责:
// 1. epoll_create
// 2. epoll_ctl ADD/MOD/DEL
// 3. epoll_wait 等待事件发生
}
Listener(监听器)
负责对listen socket进行准备(绑定、listen、非阻塞设置),有新连接时为每条连接封裝为 Connection,再交给 Reactor。
class Listener : public Connection {
// 负责:
// 1. 绑定、listen
// 2. Accept 新连接
// 3. 设置为非阻塞
// 4. 交由 Reactor 管理
}
Channel(每条 TCP 连接)
封裝每条 TCP 连接,负责数据缓冲、读写和异常处理。
class Channel : public Connection {
// 负责:
// 1. recv 读数据到缓冲区
// 2. send 发送缓冲区的数据出去
// 3. EPOLLIN 就读
// 4. EPOLLOUT 就写
}
数据流转时序解读(举个例子)
比如说:
有一个Client连到Server。
Reactor 通过 epoll_wait 等到此时有一个 EPOLLIN 事件。
这时 Reactor 就会:检查是不是 Listening socket。
-> 若是,则由 Listener 进行 Accept()。
-> 若不是,则由对应 Connection (比如 Channel) 进行 recv()。
Connection 读到数据后,会进入自己的缓冲区。
再由你为每条 Connection 设置好handler进行业务处理。
处理完后若需要发送出去,则设置为 EPOLLOUT,由 Reactor 继续关注此 socket 的写事件。
Reactor 就是事件分发器,它自己不干活(比如说不存在自己去读数据、写数据),而是根据事件去通知对应的数据源(比如说 socket)由它自己进行处理。
让我举个例子,你可以把它想象为高速公路收费站:
- 收费站(Reactor):负责分流每一条道路上发生的小汽车。
- 每一条道路(socket):可以发生不同的事件(有車到达,需要付费)。
- 收费员(handler):为每一条道路提供服务(比如收钱、找零)。
Reactor 典型工作过程:
- epoll_wait 等待事件:
- Reactor 通过 epoll_wait 等待哪个 socket 有事件发生。
- 可能发生的事件包括:
有新连接到达(listen socket)
有现有连接有数据可以读(client socket)
有缓冲区可以写出去(client socket)
有异常发生(比如对方断掉)
- 分发事件:
Reactor 检查哪个文件发生了哪个事件。
若是listen socket 有 EPOLLIN 事件:
-> 就由 Listener 执行 Accept() ,获取到 新的 Connection。
-> 新 Connection 就被加入到 Reactor 的关注表中。若是现有 Connection 有 EPOLLIN 事件:
-> 就由对应 Connection 执行 recv() ,读出数据到缓冲区。
-> 再由 Connection 绑定的数据处理器(handler)进行业务逻辑处理。若是需要发送出去的数据准备好:
-> 就可以为 Connection 设置 EPOLLOUT ,由 Reactor 关注到时进行 send() 发送出去。
代码实现
Reactor.hpp
#pragma once
#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
#include "Log.hpp"
using namespace std;
using namespace LogModule;
class Reactor // 基座 Reactor反应堆模式高并发网络服务器
{
static const int size = 128;
private:
bool IsAddConnectionHelper(int sockfd) // 判断_connections中是否存在connection
{
auto iter = _connections.find(sockfd);
if (iter == _connections.end())
return false;
else
return true;
}
bool IsAddConnectionExist(const shared_ptr<Connection> &conn)
{
return IsAddConnectionHelper(conn->GetSockFd());
}
bool IsAddConnectionExist(int sockfd)
{
return IsAddConnectionHelper(sockfd);
}
bool IsConnectionEmpty() //判断_connections是否为空
{
return _connections.empty();
}
int LoopOnce(int timeout)
{
int n = _epoller_ptr->WaitEvents(_revs, size, timeout); // 获取就绪队列中的就绪事件
return n;
}
void Dispatcher(int n)
{
for (int i = 0; i < n; i++)
{
int sockfd = _revs[i].data.fd; // 拿到就绪队列中就绪的文件描述符
uint32_t events = _revs[i].events; // 拿到就绪队列中的事件
if (events & EPOLLERR) // 就绪事件这里不管出什么错都进EPOLLIN|EPOLLOUT让具体的异常处理进行处理
events |= (EPOLLIN | EPOLLOUT);
if (events & EPOLLHUP)
events |= (EPOLLIN | EPOLLOUT);
if (events & EPOLLIN)
{
// 读事件就绪
if (IsAddConnectionExist(sockfd))
_connections[sockfd]->Recver();//索引_connection中connection对象
}
if (events & EPOLLOUT)
{
// 写事件就绪
if (IsAddConnectionExist(sockfd))
_connections[sockfd]->Sender();
}
}
}
public:
Reactor()
: _epoller_ptr(make_unique<Epoller>())
,_isrunning(false)
{
memset(_revs, 0, sizeof(_revs)); // 初始化 _revs
}
void Loop()
{
if (IsConnectionEmpty())
return;
_isrunning = true;
int timeout=-1;
while (_isrunning)
{
PrintConnection();
int n=LoopOnce(timeout); //获取就绪队列就绪事件个数
Dispatcher(n); //派发,读事件||写事件
}
_isrunning = false;
}
// 把所有新连接添加到_connections并把事件写到内核
void AddConnection(shared_ptr<Connection> &conn)
{
if (IsAddConnectionExist(conn))
{
LOG(LogLevel::WARNING) << "connection is exist" << conn->GetSockFd();
return;
}
uint32_t events = conn->GetEvent(); // 一个connection中的事件
int sockfd = conn->GetSockFd(); // 一个connection中的文件描述符
_epoller_ptr->AddEvent(sockfd, events); // 写入内核
conn->SetOwner(this); //设置回指指针把reactor对象设置到connection基类
_connections[sockfd] = conn; //把connection放到_connections中
}
//关心写事件,关写事件的connection放进_connections中就又走那个派发逻辑了,然后再去发数据(妙!)
void EnableReadWrite(int sockfd,bool enableread,bool enablewrit)
{
if(!IsAddConnectionExist(sockfd))
{
LOG(LogLevel::WARNING) << "connection is not exist" << sockfd;
return;
}
uint32_t new_event=(EPOLLET | (enableread ? EPOLLIN : 0) | (enablewrit ? EPOLLOUT : 0));
_connections[sockfd]->SetEvent(new_event);
_epoller_ptr->ModEvent(sockfd,new_event);
}
//异常处理(关闭文件描述符)
void DelConnection(int sockfd)
{
_epoller_ptr->DelEvent(sockfd); //移除关心事件
_connections.erase(sockfd); //_connections移除自己
close(sockfd); //关闭文件描述符
LOG(LogLevel::DEBUG)<<"client quit";
}
void PrintConnection()
{
std::cout << "当前Reactor正在进行管理的fd List:";
for(auto &conn : _connections)
{
std::cout << conn.second->GetSockFd() << " ";
}
std::cout << "\r\n";
}
~Reactor()
{
}
private:
unique_ptr<Epoller> _epoller_ptr; // 构造epoll模型
unordered_map<int, shared_ptr<Connection>> _connections; // 管理每一个connection
bool _isrunning; // 判断是否启动
struct epoll_event _revs[size]; // 就绪队列就绪事件
};
_epoll_ptr是对epoll类型的封装,Reactor是负责统筹兼顾的事件分发器。获取就绪队列中的就绪事件,然后根据事件的不同去进行派发。
整个reactor模式有connection管理一套fd及其相对应的事件。为了在reactor中进行管理,让文件描述符和connection建立哈希映射是_connections。listen套接字或者普通套接字的关心事件都会写入connection所以在添加到_connection之前要拿到这个文件描述符关心的事件并写入内核,然后放入hash表中。后面进行事件派发的时候是根据就绪队列中的文件描述符对_connections哈希表中的对应的写事件就绪或者读事件就绪的connection(listen或者channel)进行索引。然后根据不同的事件去走不同的逻辑。
若 sockfd 对应 Listening ,则 Recv() 就是去 Accept() 新连接。
若 sockfd 对应 Client ,则 Recv() 就是去读数据。
若需要 Sender() ,则是对 Client 进行写缓冲的数据发送出去。
异常处理这里就是移除关心的事件然后在_connections中移除索引然后关闭文件描述符。
Epoller.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Common.hpp"
using namespace std;
using namespace LogModule;
class Epoller
{
public:
Epoller()
:_epfd(-1)
{
_epfd=epoll_create(256); //构造epoll模型
if(_epfd<0)
{
LOG(LogLevel::FATAL)<<"epoll_create error";
exit(EPOLL_CREATE_ERR);
}
LOG(LogLevel::INFO)<<"epoll_create success";
}
void AddEvent(int sockfd,uint32_t events) //添加关心的事件
{
struct epoll_event ev;
ev.events=events;
ev.data.fd=sockfd;
int n=epoll_ctl(_epfd,EPOLL_CTL_ADD,sockfd,&ev);
if(n<0)
{
LOG(LogLevel::FATAL)<<"epoll_ctl error";
exit(EPOLL_CTL_ERR);
}
LOG(LogLevel::INFO)<<"epoll_ctl success";
}
int WaitEvents(struct epoll_event revs[],int size,int timeout) //就绪队列中就绪事件个数
{
int n=epoll_wait(_epfd,revs,size,timeout);
if(n<0)
{
LOG(LogLevel::WARNING)<<"epoll_wait error";
return n;
}
else if(n==0)
{
LOG(LogLevel::WARNING)<<"epoll_wait timeout";
return n;
}
else
{
LOG(LogLevel::INFO)<<"读事件就绪";
return n;
}
}
void ModEvent(int sockfd,uint32_t events)
{
struct epoll_event ev;
ev.data.fd=sockfd;
ev.events=events;
int n=epoll_ctl(_epfd,EPOLL_CTL_MOD,sockfd,&ev);
if(n<0)
{
LOG(LogLevel::FATAL)<<"mov_event error";
exit(EPOLL_CTL_ERR);
}
LOG(LogLevel::INFO)<<"mov_event success";
}
void DelEvent(int sockfd)
{
int n=epoll_ctl(_epfd,EPOLL_CTL_DEL,sockfd,nullptr);
if(n<0)
{
LOG(LogLevel::FATAL)<<"del_event error";
exit(EPOLL_CTL_ERR);
}
LOG(LogLevel::INFO)<<"del_event success";
}
~Epoller()
{
}
private:
int _epfd;
};
根据epoll的函数进行封装。
Connection.hpp
#pragma once
#include <iostream>
#include <string>
#include "InetAddr.hpp"
using namespace std;
using namespace LogModule;
using handler_t=function<string(string&)>;
class Reactor;
class Connection;
//封装fd,保证每个fd一套缓冲区
//基类
class Connection
{
public:
Connection()
{
}
virtual void Recver()=0;
virtual void Sender()=0;
virtual void EXcepter()=0; //异常处理
virtual int GetSockFd() = 0;
void SetEvent(const uint32_t &events)//获取事件
{
_events=events;
}
uint32_t GetEvent()//返回事件
{
return _events;
}
void SetOwner(Reactor* owner)//设置回指指针
{
_owner=owner;
}
Reactor* GetOwner()//返回回指指针
{
return _owner;
}
void RegisterHandler(handler_t handler)
{
_handler=handler;
}
~Connection()
{
}
private:
Reactor *_owner; //回指指针
uint32_t _events; //关系的事件
public:
handler_t _handler;
};
作为基类,纯虚方法给Listener和Channel。事件管理和回指指针,为什么用回指指针后面Listener中聊。注册回调也在后面用到的时候聊。
Listener.hpp 和 Channel.hpp
Listener.hpp
#pragma once
#include <iostream>
#include <memory.h>
#include "Connection.hpp"
#include "Common.hpp"
#include "Socket.hpp"
#include "Log.hpp"
#include "Channel.hpp"
using namespace std;
using namespace SocketModule;
using namespace LogModule;
//获取新连接
class Listener : public Connection
{
public:
Listener(int port=defaultport)
:_port(port)
,_listensock(make_unique<TcpSocket>())
{
_listensock->BuildTcpSocketMethod(_port); //注意这里已经获取了listen套接字
SetEvent(EPOLLIN | EPOLLET); //关心事件设置到connection && ET模式
SetNonBlock(_listensock->Fd()); //文件描述符设置为非阻塞(ET模式)
}
void Recver() override //收
{
InetAddr client;
while(true)//ET模式循环读文件描述符
{ //这个文件描述符是正常文件描述符
int sockfd=_listensock->Accept(&client);
if(sockfd==ACCEPTDONE)
break;
else if(sockfd==ACCEPTCONTINUE)
continue;
else if(sockfd==ACCEPTERROR)
break;
else
{
//成功获取连接
shared_ptr<Connection> conn=make_shared<Channel>(sockfd,client);//构造channel对象
conn->SetEvent(EPOLLIN | EPOLLET);
if(_handler!=nullptr)
conn->RegisterHandler(_handler);
GetOwner()->AddConnection(conn);//继承基类函数 拿到reactor指针进而把channel对象添加到connection
}
}
}
void Sender() override
{
}
void EXcepter() override
{
}
int GetSockFd() override
{
return _listensock->Fd();
}
~Listener()
{
}
private:
unique_ptr<Socket> _listensock;
int _port;
};
Channel.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <memory>
#include <functional>
#include "Common.hpp"
#include "Connection.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace std;
using namespace LogModule;
#define SIZE 1024
// 普通文件描述符的IO
class Channel : public Connection
{
public:
Channel(int sockfd, InetAddr &client)
: _sockfd(sockfd), _client_addr(client)
{
SetNonBlock(sockfd);
}
void Recver() override // 收客户端数据
{
char buffer[SIZE];
while (true)
{
buffer[0] = 0;
ssize_t n = recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
_inbuffer += buffer;
}
else if (n == 0) // 读到数据为0
{
EXcepter(); // 进入异常处理
return;
}
else // 读取失败
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
EXcepter();
return;
}
}
}
LOG(LogLevel::DEBUG) << "Channel inbuffer->" << _inbuffer;
if (!_inbuffer.empty())
{
_outbuffer = _handler(_inbuffer); // 处理完的数据给outbuffer
}
if (!_outbuffer.empty()) // 发数据
{
Sender();
}
}
void Sender() override
{
while (true)
{
ssize_t n = send(_sockfd, _outbuffer.c_str(), _outbuffer.size(), 0);
if (n > 0)
{
_outbuffer.erase(0, n); //发送缓冲区收到多少数据就清空多少个
if (_outbuffer.empty())
break;
}
else if (n == 0)
{
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
EXcepter();
return;
}
}
}
//注意什么时候可以设置关系写事件,写事件默认就是就绪的只有当发送缓冲区数据满了的时候才设置关系写事件
//发送缓冲区已经满了,outbuffer的数据发不到发送缓冲区中了,所以判断outbuffer不为空
//这时候就要关系写事件,什么时候发送缓冲区有空位了outbuffer再往发送缓冲区中写
if (!_outbuffer.empty())
{
GetOwner()->EnableReadWrite(_sockfd,true,true);
}
else
{
GetOwner()->EnableReadWrite(_sockfd,true,false);
}
}
void EXcepter() override //异常处理
{
GetOwner()->DelConnection(_sockfd);
}
int GetSockFd() override // 获取文件描述符
{
return _sockfd;
}
~Channel()
{
}
private:
int _sockfd; // 套接字
string _inbuffer;
string _outbuffer;
InetAddr _client_addr;
};
Listener继承connection,构造的时候获取连接,把设置关系事件到connection中,然后因为是ET模式边缘触发要把文件描述符设置为非阻塞的。
继承connection要实现connection的虚函数,Recver()是接收连接创建普通套接字。拿到普通套接字要进行数据的收发了,这时候Channel就要出场了。设置关心事件到connection,这个Channel对象也要放到_connections中吧,不放到_connections中的话在Reactor中对文件描述符进行索引如果是派发读数据写数据那还咋读咋写,所以基类connection中的回指指针就派上用场了,通过Reactor的指针用类内的AddConnection
方法把Channel对象也放进去。
Channel主要是进行数据的收发,收数据放到_inbuffer
中读到数据为零进入异常处理读取错误进行错误处理。注册回调把_inbuffer
通过回调给另一个模块进行处理然后返回后放到_outbuffer
(发数据)。这里的回调function这些放到基类connection中,这样Listener和Channel都能继承到。Listener的if(_handler!=nullptr) conn->RegisterHandler(_handler);
让注册回调更加灵活。如果_outbuffer中不为空那么就发数据。发数据这里要注意,什么时候关心写事件呢写事件默认就是就绪的只有当发送缓冲区数据满了的时候才设置关系写事件发送缓冲区已经满了,outbuffer的数据发不到发送缓冲区中了,所以判断outbuffer不为空 这时候就要关系写事件,什么时候发送缓冲区有空位了outbuffer再往发送缓冲区中写。所以判断_outbuffer不为空说明发送缓冲区已经满了要关心写事件。
这时候要让Reactor进行统一管理了,说白了就是epoll中的epoll_ctl关系事件的方式改一下,去关心写事件,具体不多说了看代码吧。
Main.cc
#include <iostream>
#include <string>
#include "Reactor.hpp"
#include "Listener.hpp"
#include "Common.hpp"
#include "Log.hpp"
#include "NetCal.hpp"
#include "Protocol.hpp"
using namespace std;
using namespace LogModule;
static void Usage(std::string proc)
{
std::cerr << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = stoi(argv[1]);
Enable_Console_Log_Strategy(); // 启用控制台输出
// 顶层业务模块
shared_ptr<Cal> cal = make_shared<Cal>();
// 构造协议对象
shared_ptr<Protocol> protocol = make_shared<Protocol>([&cal](Request &req) -> Response
{
return cal->Execute(req);
});
// connection管理listen连接模块 间接创建channel(进行io)模块
shared_ptr<Connection> conn = make_shared<Listener>(port);
conn->RegisterHandler([&protocol](string &inbuffer) -> string
{
string response_str;
while (true)
{
string package;
if (!protocol->Decode(inbuffer, &package)) // 判断报文完不完整
break;
response_str += protocol->Execute(package);
}
LOG(LogLevel::DEBUG) << "结束匿名函数中...: " << response_str;
return response_str;
});
// rector基座 事件派发
unique_ptr<Reactor> tsvr = make_unique<Reactor>();
tsvr->AddConnection(conn);
tsvr->Loop();
return 0;
}
应用层模块就不多说了,已经做到完美解耦合了。顶层应用层业务模块,然后协议对象去调用业务模块,connection管理listen连接模块,在listener中又创建了channel对象。这里进行回调(这个回调是用RegisterHandler在这里就体现出来了,是拿Listener对象调用的然后进行回调)把channel中的_inbuffer的数据回调给协议模块去进行处理。
Reactor模式高并发网络服务器——源码