目录
项目链接:HighConcurrencyServer · 周不才/cpp_linux study - 码云 - 开源中国
一、Reactor模式简介
Reactor模式是一种事件驱动的高性能网络编程模型,通过多路复用IO统一监听多个客户端的连接请求,再以非阻塞的方式将就绪事件派发给对应的处理线程/进程去处理。
Reactoe模式是构建高并发服务器的核心技术。
1.单Reactor单线程
事件监听、事件派发、事件处理、业务处理都在同一个线程中完成。一个Reactor监听所有的事件(连接事件,IO事件),事件就绪后再由该Reactor派发给对应的事件处理器处理事件,如果是IO事件还要将事件处理后的数据交给业务处理逻辑。
2.单Reactor多线程
主线程负责事件监听(连接事件、IO事件)事件处理和事件派发,线程池负责业务处理。一个Reactor监听所有的事件(连接事件、IO事件),事件就绪后进行事件处理,如果是IO事件就绪,处理后得到的数据再派发给线程池,由线程池对数据进行业务处理。
3.多Reactor多线程(主从Reactor模型)
主线程中的主Reactor负责连接事件监听、连接事件处理、连接事件派发,从Reactor负责IO事件监听和IO事件处理,线程池负责业务处理。主线程中的主Reactor监听连接事件,连接事件就绪后将IO事件交给从从Reactor,从Reactor监听的IO事件就绪后处理IO事件,将处理的数据交给线程池,由线程池对数据进行业务处理。(注:从Reactor不只有一个,而是一个从Reactor池)
二、项目采用的模式
当前项目采用的是简化版的主从Reactor模型,去除线程池部分,直接由从Reactor进行业务处理。主线程中的主Reactor负责连接事件监听和处理,从Reactor负责IO事件监听和IO事件处理以及业务处理。主线程中的Reactor监听连接事件,连接事件就绪后将IO事件交给从Reactor,从Reactor监听IO事件,IO事件就绪后处理IO事件,并对得到的数据进行业务处理。
三、Buffer模块
1.功能
Buffer是一个缓冲区模块,一个连接有两个Buffer对象,即发送缓冲区sendBuffer和接收缓冲区receiveBuffer。不同于TCP通信时传输层中的内核级缓冲区的发送缓冲区和接收缓冲区,此处的Buffer对象缓冲区是应用层缓冲区。当服务器调用read接收内核级接收缓冲区中的数据时,会将接收到的数据存放到接收缓冲区receiveBuffer,再根据协议调用业务处理函数进行数据处理,得到处理后的数据存放到发送缓冲区中,服务器调用write函数给客户端发送发送缓冲区中的数据。
(此处以Http协议为例,接收缓冲区中的数据是Http请求报文,调用业务处理函数对Http请求报文进行解析得到HttpRequest对象,再根据HttpRequest对象构建HttpResponse对象,由HttpResponse对象得到Http响应报文,存放到发送缓冲区中)
2.设计思想
使用vector<char>作为缓冲区,接收数据和发送数据在同一个vector<char>空间内处理,使用read和write指针标记读和写的位置。类似于循环队列。
当写的空间不足时,首先要看read指针前的剩余空间加上write后的剩余空间之和是否满足写的要求,满足则将缓冲区现有的数据整体前移,不满足则对vector进行扩容。
//Buffer.hpp
//缓冲区模块
//功能:用于IO操作时存放数据
//设计思想:
//使用vector<char>作为缓冲区,接收缓冲区和发送缓冲区在同一个vector<char>空间内,使用read和write指针标记读和写的位置
//当写的空间不足时,首先要看read指针前的剩余空间加上write后的剩余空间之和是否满足写的要求,满足则将缓冲区现有的数据整体前移,不满足则对缓冲区进行扩容
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <assert.h>
#include <algorithm>
static const uint64_t BUFFERSIZE=1024;
class Buffer
{
private:
std::vector<char> _buffer;//缓冲区
uint64_t _read=0;//读指针
uint64_t _write=0;//写指针
public:
//构造函数
Buffer():_buffer(BUFFERSIZE)
{}
//析构函数
~Buffer()
{}
public:
//获取读取位置地址
char* GetReadAddress()
{
return _buffer.data()+_read;
}
//获取写入位置地址
char* GetWriteAddress()
{
return _buffer.data()+_write;
}
//获取末尾空闲空间大小
uint64_t GetTailFreeSize()
{
return _buffer.size()-_write;
}
//获取头部空闲空间大小
uint64_t GetHeadFreeSize()
{
return _read;
}
//获取可读数据大小
uint64_t GetReadableSize()
{
return _write-_read;
}
//读指针后移
void MoveRead(uint64_t len)
{
assert(len<=GetReadableSize());//读取的数据大小不能超过可读数据大小
_read+=len;
}
//写指针后移
void MoveWrite(uint64_t len)
{
assert(len<=GetTailFreeSize());//写入的数据大小不能超过尾部剩余空间大小
_write+=len;
}
//确保足够的可写空间大小(移动数据/扩容缓冲区)
void EnsureWiritableSize(uint64_t len)
{
//uint64_t len是要写入的数据大小
if(len>GetTailFreeSize()&&len<=GetTailFreeSize()+GetHeadFreeSize())//数据整体前移
{
uint64_t n=GetReadableSize();
// for(uint64_t i=0;i<n;i++)
// {
// _buffer[i]=_buffer[i+_read];
// }
std::copy(GetReadAddress(),GetReadAddress()+n,_buffer.data());
_read=0;
_write=n;
}
else//扩容
{
_buffer.resize(_write+len);
}
}
//读取数据
void Read(void* buffer,uint64_t len)
{
//void* buffer是存储读取到的数据
//uint64_t len是指定读取数据长度
assert(len<=GetReadableSize());//保证可读的数据大小小于len
std::copy(GetReadAddress(),GetReadAddress()+len,(char*)buffer);//读取数据并存入到buffer中
MoveRead(len);//移动读指针
}
//写入数据
void Write(const void* data,uint64_t len)
{
//void* data是要写入的数据
//uint64_t len是指定写入数据的长度
EnsureWiritableSize(len);//写入之前先保证可写空间足够
std::copy((char*)data,(char*)data+len,GetWriteAddress());//写入数据
MoveWrite(len);//移动写指针
}
//清空缓冲区
void Clear()
{
_read=_write=0;
}
//写入字符串
void WriteString(const std::string& str)
{
Write(str.c_str(),str.size());
}
//写入Buffer缓冲区
void WriteBuffer(Buffer& buffer)
{
Write(buffer.GetReadAddress(),buffer.GetReadableSize());
}
//读取一个字符串
std::string ReadString(uint64_t len)
{
assert(len<=GetReadableSize());
std::string buffer;
buffer.resize(len);
Read(&buffer[0],len);
return buffer;
}
//读取一行数据(读取到有\n的位置)(适用于HTTP协议)
std::string ReadLine()
{
uint64_t pos=_write;//没有\n则全部读取完
for(uint64_t i=_read;i<_write;i++)
{
if(_buffer[i]=='\n')
{
pos=i;
break;
}
}
if(pos==_write)//没找到\n,返回空串
{
//std::cout<<"Debug: 没找到\\n"<<std::endl;
return "";
}
else
return ReadString(pos-_read+1);// \n也读取出来
}
};
四、Socket模块
1.功能
封装TCP通信的Socket接口,简化Socket操作
//Socket.hpp
//Socket模块
//功能:Socket模块用于管理连接的套接字操作(主要是发送数据、接收数据、发起连接、获取连接)
//设计思想:封装socket编程接口
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include "Log.hpp"
#define MAX_LISTEN 1024 //连接队列最大值
class Socket
{
private:
int _sockfd;//套接字文件描述符
public:
//无参构造函数
Socket():_sockfd(-1)
{}
//有参构造函数
Socket(int sockfd):_sockfd(sockfd)
{}
//析构函数
~Socket()
{
Close();//关闭套接字
_sockfd = -1;
}
//获取套接字文件描述符
int GetSockfd()
{
return _sockfd;
}
//创建套接字
bool CreateSocket()
{
_sockfd=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(_sockfd<0)
{
LOG(ERROR,"Create socket failed!\n");
return false;
}
return true;
}
//绑定地址信息
bool Bind(const std::string& ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family=AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=inet_addr(ip.c_str());
int ret=bind(_sockfd,(struct sockaddr*)&addr,sizeof(addr));
if(ret<0)
{
LOG(ERROR,"Bind address failed!\n");
return false;
}
return true;
}
//开始监听(服务器设置为监听模式)
bool Listen(int backlog=MAX_LISTEN)
{
//backlog为最大连接数
int ret=listen(_sockfd,backlog);
if(ret<0)
{
LOG(ERROR,"Listen failed!\n");
return false;
}
return true;
}
//向服务器发起连接
bool Connect(const std::string& ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family=AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=inet_addr(ip.c_str());
int ret=connect(_sockfd,(struct sockaddr*)&addr,sizeof(addr));
if(ret<0)
{
LOG(ERROR,"Connect server failed!\n");
return false;
}
return true;
}
//获取新连接
int Accept()
{
//返回值是新连接的套接字文件描述符
int newSockfd=accept(_sockfd,nullptr,nullptr);
if(newSockfd<0)
{
LOG(ERROR,"Accept failed!\n");
return -1;
}
return newSockfd;
}
//接收数据
ssize_t Receive(void* buffer,size_t len,int flag=0)
{
//返回值ssize_t是有符号数
//buffer是缓冲区,存放接收到的数据
//len是接收的数据的长度
//flag用于设置 阻塞/非阻塞读取,默认为0阻塞模式
ssize_t ret=recv(_sockfd,buffer,len,flag);
if(ret<=0)//ret=0表示对端正常关闭,ret=-1表示读取出错(errno==EAGAIN表示接收缓冲区中没有数据了;errno==EINTR表示被信号中断了)
{
if(errno==EAGAIN||errno==EINTR)
{
return 0;//接收缓冲区中没有数据了/被信号中断了
}
else
{
LOG(ERROR,"Socket recv failed!");
return -1;
}
}
return ret;//实际接收到的数据大小
}
//非阻塞式接收数据
ssize_t NonBlockReceive(void* buffer,size_t len)
{
return Receive(buffer,len,MSG_DONTWAIT);//MSG_DONTWAIT表示非阻塞模式
}
//发送数据
ssize_t Send(const void* buffer,size_t len,int flag=0)
{
//返回值ssize_t是有符号数
//buffer是要发送的数据
//len是发送数据的长度
//flag用于设置 阻塞/非阻塞读取,默认为0阻塞模式
ssize_t ret=send(_sockfd,buffer,len,flag);
if(ret<0)
{
if(errno==EAGAIN||errno==EINTR)
{
return 0;//发送缓冲区中没有数据了/被信号中断了
}
else
{
LOG(ERROR,"Socket send fialed!\n");
return -1;
}
}
return ret;//发送成功返回发送的数据大小
}
//非阻塞式发送数据
ssize_t NonBlockSend(const void* buffer,size_t len)
{
return Send(buffer,len,MSG_DONTWAIT);
}
//关闭套接字
void Close()
{
//LOG(DEBUG,"关闭fd:"<<_sockfd);
if(_sockfd!=-1)
{
close(_sockfd);
_sockfd=-1;
}
}
//创建一个服务端连接(服务端开始监听)
bool CreateServer(uint16_t port,const std::string& ip="0.0.0.0",bool flag=false)
{
//1.创建套接字
//2.设置为非阻塞模式
//3.绑定地址
//4.开始监听
//5.开启地址端口重用
if(CreateSocket()==false) return false;
ReuseAddress();
if(flag==true) SetNonBlock();
if(Bind(ip,port)==false) return false;
if(Listen()==false) return false;
return true;
}
//创建一个客户端连接(客户端开始连接)
bool CreateClient(uint16_t port,const std::string& ip)
{
//创建套接字
//开始连接服务器
if(CreateSocket()==false) return false;
if(Connect(ip,port)==false) return false;
return true;
}
//开启地址端口重用
void ReuseAddress()
{
int val=1;
setsockopt(_sockfd,SOL_SOCKET,SO_REUSEADDR,(void*)&val,sizeof(int));//重用IP地址
val=1;
setsockopt(_sockfd,SOL_SOCKET,SO_REUSEPORT,(void*)&val,sizeof(int));//重用端口号
}
//设置套接字文件描述符为非阻塞模式
void SetNonBlock()
{
int flag=fcntl(_sockfd,F_GETFL,0);
fcntl(_sockfd,F_SETFL,flag|O_NONBLOCK);
}
};
五、Channel模块
1.功能
管理一个文件描述符,即一个连接需要监控的事件以及事件触发后的对应的处理函数。提供了对监控事件的添加、修改、删除、判断接口,并且是直接在Channel模块绑定的epoll模型进行操作。(epoll模型在EventLoop模块中)
//Channel.hpp
//Channel模块
//功能:管理文件描述符及其要监控的事件,以及事件触发后相应处理的回调函数
//本质是封装文件描述符及其要监控的事件为一个Channel对象,再将Channel对象交给Poller模块中的epoll监控
//Poller模块监控到事件就绪后会将就绪的事件返还到Channel对象中
//总结:Channel模块用于管理连接(文件描述符)的事件,包括设置要监控的事件、事件就绪后的回调函数
//事件主要包括(epoll):
//EPOLLIN可读事件:描述符可读(使用可读事件回调函数处理)
//EPOLLOUT可写事件:描述符可写(使用可写回调函数处理)
//EPOLLERR错误事件:套接字出错(使用错误回调函数处理)
//EPOLLHUP挂断事件:连接异常终止(使用挂断回调函数处理)
//EPOLLRDHUP对端关闭事件:对端关闭连接(使用可读事件回调函数处理)
//EPOLLPRI紧急数据事件:要优先读取紧急数据(使用可读事件回调函数处理)
//回调函数说明:
//可读事件回调函数:读取成功添加可写监控并正常返回;读取失败调用挂断事件回调函数(移除该套接字文件描述符的监控)
//可写事件回调函数:写入成功关闭可写监控并正常返回;写入失败调用挂断事件回调函数(移除该套接字文件描述符的监控)
//错误事件回调函数:调用挂断事件回调函数(移除该套接字文件描述符的监控)
//挂断事件回调函数:调用挂断事件回调函数(移除该套接字文件描述符的监控)
//任意事件回调函数:只要有事件触发,就可以调用该函数,具体实现根据需求决定
//说明:将 可读相关事件(EPOLLIN、EPOLLRDHUP、EPOLLPRI)合并到同一个回调函数,简化处理流程。
//例如,对端关闭连接(EPOLLRDHUP)时,读取操作会返回 0,可在 _readCallBack 中统一处理。
#pragma once
#include <iostream>
#include <functional>
#include <sys/epoll.h>
#include "Log.hpp"
//#include "Poller.hpp"
class Poller;//向前声明,解决头文件循环依赖问题
class EventLoop;//向前声明
using EventCallBack=std::function<void()>;//回调函数类型
class Channel
{
private:
//Poller* _poller;//Poller对象,本质是epoll
//使用EventLoop代替Poller,因为EventLoop中封装了Poller
EventLoop* _eventLoop;
int _sockfd;//文件描述符
uint32_t _events;//当前文件描述符监控的事件集合
uint32_t _revents;//当前文件描述符触发的事件集合
EventCallBack _readCallBack;//可读事件触发的回调函数
EventCallBack _writeCallBack;//可写事件触发的回调函数
EventCallBack _errorCallBack;//错误事件触发的回调函数
EventCallBack _closeCallBack;//挂断事件触发的回调函数
EventCallBack _anyCallBack;//任意事件触发的回调函数
public:
//构造函数
Channel(EventLoop* eventLoop, int sockfd)
:_eventLoop(eventLoop),
_sockfd(sockfd),
_events(0),
_revents(0)
{}
//获取文件描述符
int GetSockfd()
{
return _sockfd;
}
public:
//设置可读事件触发的回调函数
void SetReadCallBack(const EventCallBack& callBack)
{
_readCallBack=callBack;
}
//设置可写事件触发的回调函数
void SetWriteCallBack(const EventCallBack& callBack)
{
_writeCallBack=callBack;
}
//设置错误事件触发的回调函数
void SetErrorCallBack(const EventCallBack& callBack)
{
_errorCallBack=callBack;
}
//设置挂断事件触发的回调函数
void SetCloseCallBack(const EventCallBack& callBack)
{
_closeCallBack=callBack;
}
//设置任意事件触发的回调函数
void SetAnyCallBack(const EventCallBack& callBack)
{
_anyCallBack=callBack;
}
//设置实际就绪的事件(在Poller模块中,epoll监控到文件描述符的事件就绪后,调用Channel对象的该函数,将就绪的事件设置给_revents)
void SetRevents(uint32_t revents)
{
_revents=revents;
}
//获取文件描述符要监控的事件
uint32_t GetEvents()
{
return _events;
}
public:
//事件处理(事件触发后,由该函数调用相应的回调函数处理)
void HandleEvent();
public:
//检测是否监控了可读事件
bool CheckRead()
{
return _events&EPOLLIN;
}
//检测是否监控了可写事件
bool CheckWrite()
{
return _events&EPOLLOUT;
}
//添加监控读事件
void AddRead()
{
_events|=EPOLLIN;
UpdateMonitor();//Channel中修改文件描述符的事件后,还要同步更新到Poller模块中,即epoll中
}
//添加监控写事件
void AddWrite()
{
_events|=EPOLLOUT;
UpdateMonitor();//Channel中修改文件描述符的事件后,还要同步更新到Poller模块中,即epoll中
}
//移除读事件的监控
void RemoveRead()
{
_events&=~EPOLLIN;
UpdateMonitor();//Channel中修改文件描述符的事件后,还要同步更新到Poller模块中,即epoll中
}
//移除写事件的监控
void RemoveWrite()
{
_events&=~EPOLLOUT;
UpdateMonitor();//Channel中修改文件描述符的事件后,还要同步更新到Poller模块中,即epoll中
}
//移除所有事件的监控
void RemoveAll()
{
_events=0;
UpdateMonitor();//Channel中修改文件描述符的事件后,还要同步更新到Poller模块中,即epoll中
}
/*以下两个函数是声明,函数定义在Channel.cpp中*/
//更新文件描述符事件(调用Poller模块中的UpdateEvent实现,本质是将文件描述符事件添加到epoll的红黑树中或者更新文件描述符的监控事件)
void UpdateMonitor();
//取消文件描述符的监控(调用Poller模块中的RemoveEvent实现,本质是将该文件描述符事件从epoll的红黑树中移除)
void CancelMonitor();
};
// Channel.cpp
// 功能:解决Channel.hpp和Poller.hpp头文件循环依赖的问题
// 通过在Channel类和Poller类前进行向前声明后,向前声明仅仅是声明,而Channel类中还调用了Poller类中的成员函数,由于只有声明而没有定义,因此编译器无法识别
// 通过新建Channel.cpp文件,在该文件中包含Poller.hpp和Channel.hpp
// 将Channel类中调用了Poller类函数的成员函数定义在这里,而Channel.hpp中的函数仅改为声明
#include "Channel.hpp"
#include "Poller.hpp"
#include "EventLoop.hpp"
// 更新文件描述符事件(调用Poller模块中的UpdateEvent实现,本质是将文件描述符事件添加到epoll的红黑树中或者更新文件描述符的监控事件)
void Channel::UpdateMonitor()
{
_eventLoop->UpdateEvent(this);
}
// 取消文件描述符的监控(调用Poller模块中的RemoveEvent实现,本质是将该文件描述符事件从epoll的红黑树中移除)
void Channel::CancelMonitor()
{
_eventLoop->RemoveEvent(this);
}
// 事件处理(事件触发后,由该函数调用相应的回调函数处理)
void Channel::HandleEvent()
{
// 可读事件、对端关闭事件、紧急数据事件统一交给可读回调函数处理
// 因为对端关闭事件触发后,调用read读取会返回0,表示连接正常关闭;紧急数据事件触发后,本质上也是数据可读
if (_revents & EPOLLIN || _revents & EPOLLRDHUP || _revents & EPOLLPRI)
{
// // 如果是定时器和事件通知文件描述符就绪,无需日志输出
// if (_sockfd != _eventLoop->GetTimerfd() && _sockfd != _eventLoop->GetEventfd())
// {
// if (_revents & EPOLLIN)
// LOG(INFO, "连接" << _sockfd << ":可读事件触发");
// if (_revents & EPOLLRDHUP)
// LOG(INFO, "连接" << _sockfd << ":对端关闭事件触发");
// if (_revents & EPOLLPRI)
// LOG(INFO, "连接" << _sockfd << ":紧急数据事件触发");
// }
// 调用可读事件回调函数统一处理
if (_readCallBack)
{
_readCallBack();
}
}
// 说明:可写事件处理过程中如果出现错误或者连接挂断,会调用错误事件或者挂断事件处理,而错误事件、挂断事件处理时会释放连接
// 它们都属于可能会释放连接的操作,因此只能三选一进行处理,否则连接会被多次释放
if (_revents & EPOLLOUT) // 可写事件交给可写回调函数处理
{
//LOG(INFO, "连接" << _sockfd << ":可写事件触发");
if (_writeCallBack)
_writeCallBack();
}
else if (_revents & EPOLLERR) // 错误事件交给错误回调函数处理
{
//LOG(INFO, "连接" << _sockfd << ":错误事件触发");
if (_errorCallBack)
_errorCallBack();
}
else if (_revents & EPOLLHUP) // 挂断事件交给挂断回调函数处理
{
//LOG(INFO, "连接" << _sockfd << ":挂断事件触发");
if (_closeCallBack)
_closeCallBack();
}
//只要有事件就绪,就调用任意事件的回调函数
if(_anyCallBack) _anyCallBack();
}
六、Poller模块
1.功能
封装epoll,主要就是三个接口:更新监控事件(添加/修改)、取消监控事件(删除)、开始监控。Poller模块又被EventLoop模块封装,所以包含EventLoop对象,本质也就是包含epoll,可以直接操作监控事件。
//Poller.hpp
//Poller模块
//功能:本质是封装epoll,封装epoll的接口将Channel对象管理的文件描述符及其事件添加/修改到epoll中,或者通过epoll中删除
//通过epoll实现对Channel对象管理的文件描述符进行监控,监控的事件就绪后通过Channel对象的SetREvents将就绪事件返回到Channel对象的_revents中
//通过哈希表建立文件描述符与Channel对象的映射关系
//功能接口:
//1.向epoll中添加/修改要监控的文件描述符及其对应事件(更新)
//2.移除epoll正在监控的文件描述符
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unordered_map>
#include <vector>
#include <errno.h>
#include <string.h>
#include <sys/epoll.h>
#include <assert.h>
#include <utility>
//#include "Channel.hpp"
#include "Log.hpp"
class Channel;//向前声明,解决头文件循环依赖问题
#define MAXEVENTS 1024//就绪事件数组的最大容量
class Poller
{
private:
int _epfd;//epoll实例文件描述符
struct epoll_event _readyEvents[MAXEVENTS];//就绪事件数组,作为epoll_wait函数的参数。当epoll wait监控到就绪事件后,会将该就绪事件存放到其中
std::unordered_map<int,Channel*> _channels;// fd->Channel 管理所有已监控的Channel对象,即epoll中监控的文件描述符
private:
// 添加/更新/删除文件描述符监控的事件(根据op选项的不同确定不同的操作,UpdateEvent和RemoveEvent都是调用Update来实现)
void Update(Channel* channel,int op);
//判断某个Channel对象是否被监控了(即Channel对象是否被Poller模块的_channels哈希表管理)
//即判断Channel对象管理的文件描述符是否被添加到epoll中
bool HasChannel(Channel* channel);
public:
//构造函数
Poller()
{
//创建epoll实例
_epfd=epoll_create(1);
if(_epfd<0)
{
LOG(ERROR,"Epoll create error!");
abort();//退出程序
}
}
//向epoll中添加监控文件描述符/更新已有的监控文件描述符的事件
void UpdateEvent(Channel* channel);
//取消监控指定的文件描述符
void RemoveEvent(Channel* channel);
//开始监控事件
void Poll(std::vector<Channel*>* active);
};
//Poller.cpp
#include "Poller.hpp"
#include "Channel.hpp"
// 添加/更新/删除文件描述符监控的事件(根据op选项的不同确定不同的操作,UpdateEvent和RemoveEvent都是调用Update来实现)
void Poller::Update(Channel *channel, int op)
{
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int fd = channel->GetSockfd();
struct epoll_event ev;
ev.data.fd = fd;
ev.events = channel->GetEvents();
int ret = epoll_ctl(_epfd, op, fd, &ev);
if (ret < 0)
{
LOG(ERROR, "Epoll ctl failed!");
LOG(DEBUG, "fd is " << channel->GetSockfd() << ", error: " << strerror(errno));
}
}
// 判断某个Channel对象是否被监控了(即Channel对象是否被Poller模块的_channels哈希表管理)
// 即判断Channel对象管理的文件描述符是否被添加到epoll中
bool Poller::HasChannel(Channel *channel)
{
auto it = _channels.find(channel->GetSockfd());
if (it != _channels.end())
return true;
else
return false;
}
// 向epoll中添加监控文件描述符/更新已有的监控文件描述符的事件
void Poller::UpdateEvent(Channel *channel)
{
bool ret = HasChannel(channel);
if (ret == false) // epoll中没有该文件描述符
{
// epoll中没有该文件描述符,首先要在Poller类中的_channels中为channel对象和文件描述符建立映射关系
_channels.insert(std::make_pair(channel->GetSockfd(), channel));
Update(channel, EPOLL_CTL_ADD);
}
else // epoll已经监控了该文件描述符,更新其事件
{
Update(channel, EPOLL_CTL_MOD);
}
}
// 取消监控指定的文件描述符
void Poller::RemoveEvent(Channel *channel)
{
// 不仅是要删除epoll中管理的文件描述符
Update(channel, EPOLL_CTL_DEL);
// 还要删除_channels中管理的文件描述符,即Channel对象
auto it = _channels.find(channel->GetSockfd());
if (it != _channels.end())
_channels.erase(it);
}
// 开始监控事件
void Poller::Poll(std::vector<Channel *> *active)
{
// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// vector<Channel*>* active 是就绪的文件描述符,即Channel对象
// (根据就绪事件数组找到对应的文件描述符,并将该文件描述符的Channel对象存放到active中)
int n = epoll_wait(_epfd, _readyEvents, MAXEVENTS, -1); // 永久阻塞模式,直到有事件就绪才返回
// 返回值n是就绪事件的数量
if (n < 0)
{
if (errno == EINTR) // 被信号中断
{
return;
}
else // epoll wait错误
{
LOG(ERROR, "Epoll wait failed: " << strerror(errno));
}
}
else // 成功监控到就绪事件
{
for (int i = 0; i < n; i++)
{
// 通过就绪事件的data数据确定是哪一个文件描述符的就绪事件
auto it = _channels.find(_readyEvents[i].data.fd);
// 将就绪的事件存储到Channel对象的_revents中
it->second->SetRevents(_readyEvents[i].events);
// 将有事件就绪的文件描述符添加到active中,即Channel对象添加到active中
active->push_back(it->second);
}
}
}
七、EventLoop模块
1.功能
EventLoop模块即Reactor模块。EventLoop模块封装了Poller模块,即可以更新监控事件(添加/修改)、取消监控事件(删除)、开始监控。后续使用EventLoop模块时,是一个线程对应一个EventLoop对象,主线程中的EventLoop对象就是主Reactor,负责监听连接,其他每个子线程中也有一个EventLoop对象,即从属Reactor,负责通信连接的数据通信。
另外EventLoop模块还添加了任务队列、事件通知eventfd、时间轮定时器。
任务队列的作用是当出现多个线程交互时,避免加锁。例如当主Reactor监听到新连接后,要在主线程中初始化连接、启动超时销毁功能等,而此时新连接已经创建并且分配到其子线程中,主线程中的操作就是多线程交互。通过将初始化连接、启动超时销毁功能等涉及到跨线程的操作都存放到新连接所属线程的任务队列中,即从属Reactor的任务队列中,这些操作会在从属Reactor中执行,就避免了跨线程调用时的加锁,提高效率。
事件通知eventfd的作用是防止epoll_wait没有事件就绪导致阻塞(epoll_wait采用永久阻塞模式)。eventfd也交给epoll监控其读事件,当有任务插入任务队列时,向eventfd写入数据即可唤醒eventfd,epoll_wait监控的eventfd读事件就绪,不会阻塞。
时间轮定时器的作用是定时执行任务,当前项目中只涉及超时连接销毁。设置timerfd为每秒触发一次,并将timerfd交给epoll监控其读事件,也就实现了每秒钟timerfd的读事件都会就绪。每当timerfd读事件就绪时,秒指针+1并执行对应时间的定时任务。如果一个连接开启了超时连接销毁功能并设置超时10秒销毁,10秒内如果该连接没有任何事件就绪,则在第10秒时就会销毁该连接;如果在10秒内有任意事件就绪,则刷新超时销毁任务,即将该销毁任务置后10秒再执行。
//EventLoop.hpp
// EventLoop模块,即Reactor模块
// 功能:对Poller模块进行封装,进行事件监控。
// 一个EventLoop对应一个线程
// 使用Poller模块对文件描述符进行事件监控,当事件就绪后再调用相应的回调函数进行事件处理。要保证所有的事件处理都要在EventLoop模块的线程中
#pragma once
#include <assert.h>
#include <vector>
#include <functional>
#include <mutex>
#include <thread>
#include <memory>
#include <sys/eventfd.h>
#include <unistd.h>
#include "Poller.hpp"
#include "Channel.hpp"
#include "TimeWheelTimer.hpp"
using Functor = std::function<void()>; // 任务函数
class EventLoop
{
private:
std::thread::id _threadId; // 线程id,用于记录EventLoop创建时所在的线程
int _eventfd; // 事件通知描述符
std::shared_ptr<Channel> _eventfdChannel; // 用Channel对象管理事件通知文件描述符
Poller _poller; // poller对象,监控文件描述符
std::vector<Functor> _tasks; // 任务队列
std::mutex _mutex; // 任务队列锁
TimerWheel _timerWheel; // 时间轮定时器
public:
// 构造函数
EventLoop()
: _threadId(std::this_thread::get_id()),
_eventfd(CreateEventfd()),
_eventfdChannel(new Channel(this, _eventfd)),
_timerWheel(this)
{
// 开启eventfd的读事件监控
_eventfdChannel->AddRead();
// 为eventfd设置可读事件回调函数
_eventfdChannel->SetReadCallBack(std::bind(&EventLoop::ReadEventfd, this));
}
// 开始循环
void Start()
{
while(true)
{
//LOG(DEBUG,"开始一轮事件监控---------------");
// 1.事件监控
std::vector<Channel *> actives; // epoll_wait返回的是就绪事件数组,将这些就绪事件的文件描述符重新封装成Channel对象添加到actives中
_poller.Poll(&actives);
//LOG(DEBUG,"结束一轮事件监控---------------");
//LOG(DEBUG,"开始一轮事件处理---------------");
// 2.事件处理
for (auto &channel : actives)
{
channel->HandleEvent(); // 事件处理
}
//LOG(DEBUG,"结束一轮事件处理---------------");
//LOG(DEBUG,"开始一轮任务队列执行---------------");
// 3.执行任务队列中的所有任务
RunTasks();
//LOG(DEBUG,"结束一轮任务队列执行---------------");
}
}
// 任务是在当前EventLoop线程中则直接执行,不在EventLoop线程中则压入任务队列
void RunOrPush(const Functor &func)
{
if (IsInLoop()) // 任务在当前EventLoop线程中。直接执行
{
func();
}
else // 任务不在当前EventLoop线程中,压入任务队列中
{
PushInTasks(func);
}
}
// 将任务压入任务队列中(涉及到任务队列,要加锁)
void PushInTasks(const Functor &func)
{
{
std::unique_lock<std::mutex> lock(_mutex);
_tasks.push_back(func);
}
// 唤醒eventfd,防止因为没有事件就绪而导致事件监控_poller.Poll(&actives)阻塞
WeakupEventfd();
}
// 判断当前线程是否在EventLoop线程中
bool IsInLoop()
{
return _threadId == std::this_thread::get_id();
}
//断言当前线程是否在EventLoop线程中(有的接口必须是在eventloop线程中调用,就需要使用该接口去断言)
void AssertInLoop()
{
assert(_threadId == std::this_thread::get_id());
}
// 更新文件描述符的事件监控(添加/修改)
void UpdateEvent(Channel *channel)
{
_poller.UpdateEvent(channel);
}
// 取消监控指定的文件描述符
void RemoveEvent(Channel *channel)
{
_poller.RemoveEvent(channel);
}
// ---------------------------------定时器相关接口-------------------------------------------
// 添加定时任务
void AddTimerTask(uint64_t id, uint32_t timeout, const TaskFunc &task)
{
_timerWheel.AddTimerTask(id, timeout, task);
}
// 刷新定时任务
void RefreshTimerTask(uint64_t id)
{
_timerWheel.RefreshTimerTask(id);
}
// 取消定时任务
void CancelTimerTask(uint64_t id)
{
_timerWheel.CancelTimerTask(id);
}
// 检查指定定时任务是否存在
bool CheckTimerTask(uint64_t id)
{
return _timerWheel.CheckTimerTask(id);
}
//获取定时器文件描述符
int GetTimerfd()
{
return _timerWheel.GetTimerfd();
}
//获取事件通知文件描述符
int GetEventfd()
{
return _eventfd;
}
private:
// 执行任务队列中的所有任务
void RunTasks()
{
std::vector<Functor> tasks; // 临时任务队列,用于交换_tasks任务队列
// 交换任务队列时要加锁
{
std::unique_lock<std::mutex> lock(_mutex);
_tasks.swap(tasks); // 交换后任务队列为空
}
// 执行任务队列中的所有任务
for (auto &func : tasks)
{
func();
}
}
// 创建事件通知eventfd
static int CreateEventfd()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
LOG(ERROR, "Create eventfd failed!");
abort(); // 程序退出
}
return efd;
}
// eventfd事件通知的回调函数
void ReadEventfd()
{
uint64_t ret;
int n = read(_eventfd, &ret, sizeof(ret));
if (n < 0)
{
if (errno == EINTR || errno == EAGAIN) // EINTR表示读取被信号中断,EAGAIN表示无数据可读
{
return;
}
LOG(ERROR, "Read eventfd failed!");
abort();
}
return;
}
// 唤醒eventfd(本质是给eventfd写入一个数据,触发eventfd的可读事件)
// 防止因为没有事件就绪而导致事件监控_poller.Poll(&actives)阻塞
void WeakupEventfd()
{
uint64_t val = 1;
int n = write(_eventfd, &val, sizeof(val));
if (n < 0)
{
if (errno == EINTR)
{
return;
}
LOG(ERROR, "Write eventfd failed!");
abort();
}
}
};
八、TimeWheelTimer模块
1.功能
秒级定时器,支持为每个连接设置定时任务,在当前项目中的定时任务是超时连接销毁功能。
2.设计思想
根据Linux系统中的定时器timerfd,设计的一个秒级定时器。创建一个二维vector容器,即vector<vector<SharedTask>> _wheels,容量为60并由一个秒级指针tick,timerfd设置为每秒触发一次,每当timerfd触发时,(tick+1)%60并执行当前所在位置的vector容器中所有的定时任务。而定时任务是设计在析构函数中执行的,所以实际上只需要清空当前所在位置的vector容器即可。
对于定时任务使用的是shared_ptr管理,因此如果要刷新定时任务,只需要在其延后的某个位置拷贝定时任务的shared_ptr即可,引用计数+1,即使某个定时任务前面的shared_ptr被销毁了,但是由于引用计数并不是1,并不会执行析构函数,也就不会执行定时任务,实现刷新定时任务的功能。
//TimeWheelTimer.hpp
//时间轮定时器模块
//功能:秒级定时器,通过EventLoop监控timerfd定时器,实现每秒钟都去检查定时器任务队列中的任务,如果有
//包含定时器任务TimerTask类和时间轮TimerWheel类,还需要加上timerfdding
#pragma once
#include <iostream>
#include <sys/timerfd.h>
#include <functional>
#include <vector>
#include <unordered_map>
#include <memory>
#include <unistd.h>
#include "Channel.hpp"
#include "Log.hpp"
//#include "EventLoop.hpp"
using TaskFunc=std::function<void()>;
using ReleaseFunc=std::function<void()>;
//定时器任务:时间到了定时任务就要被触发,执行
class TimerTask
{
private:
TaskFunc _task;//定时任务,析构时执行
uint64_t _id;//任务序号
uint32_t _timeout;//定时任务触发时间(超时时间)
ReleaseFunc _release;//当定时任务被触发时,删除TimerWheel时间轮中的定时任务对象,析构时执行
bool _isCancel;//取消定时任务,false表示没有被取消,true表示被取消
public:
//构造函数
TimerTask(uint64_t id,uint32_t timeout,const TaskFunc& task)
:_id(id),
_timeout(timeout),
_task(task),
_isCancel(false)
{}
//析构函数
~TimerTask()
{
if(_isCancel==false) _task();//在析构函数中执行定时任务
_release();//在析构函数中删除时间轮中的定时任务对象
}
//设置删除定时任务的函数
void SetRelease(ReleaseFunc release)
{
_release=release;
}
//获取定时任务的时间
uint32_t GetTimeout()
{
return _timeout;
}
//取消定时任务(只能考虑在定时任务本身取消,而不是在时间轮中释放定时任务,因为释放定时任务等于提前执行定时任务,而不是取消)
void Cancel()
{
_isCancel=true;
}
};
using SharedTask=std::shared_ptr<TimerTask>;
using WeakTask=std::weak_ptr<TimerTask>;
//-------------------------------------------------------------------------------------------------------------------//
class EventLoop;//向前声明
//时间轮:存储定时任务,并使用秒针来确定定时任务何时被执行
class TimerWheel
{
private:
int _tick;//秒针
int _capacity;//时间轮容量,本质是时间轮的最大延迟时间
std::vector<std::vector<SharedTask>> _wheels;//二维数组存放定时任务,并且定时任务使用shared_ptr智能指针封装
std::unordered_map<uint64_t,WeakTask> _timers;// id->TimerTask 建立定时任务序号和定时任务weak_ptr的映射关系,所有的定时任务都使用weak_ptr管理
//因为当需要延迟定时任务时,增添相同的定时任务要使用shared_ptr针对shared_tr拷贝才会共享引用计数
//如果直接对原始对象构造shared_ptr不会共享引用计数,因此使用weak_ptr管理原始对象,
//所有的shared_ptr都是针对weak_ptr的拷贝,共享引用计数,但是weak_ptr本身没有引用计数
int _timerfd;//timerfd定时器文件描述符
EventLoop* _eventLoop;//事件监控
std::shared_ptr<Channel> _timerfdChannel;//使用Channel对象管理timerfd定时器文件描述符
public:
//构造函数
TimerWheel(EventLoop* eventLoop)
:_tick(0),
_capacity(60),
_wheels(_capacity),
_timerfd(CreateTimerfd()),
_eventLoop(eventLoop),
_timerfdChannel(new Channel(_eventLoop,_timerfd))
{
//设置timerfd定时器可读事件的回调函数,是OnTime
_timerfdChannel->SetReadCallBack(std::bind(&TimerWheel::OnTime,this));
//启动EventLoop对timerfd的读事件监控
_timerfdChannel->AddRead();
}
//析构函数
~TimerWheel()
{}
//由EventLoop添加定时任务(定时器的操作可能会在多线程中进行,为了避免加锁提高效率,将定时器的操作都放在同一个线程EventLoop中进行)
void AddTimerTask(uint64_t id,uint32_t timeout,const TaskFunc& task);
//添加定时任务
void AddTimerTaskInLoop(uint64_t id,uint32_t timeout,const TaskFunc& task)
{
//构建定时任务TimerTask对象
SharedTask st(new TimerTask(id,timeout,task));
st->SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id));
//在哈希表中建立定时任务序号和定时任务weak_ptr对象的映射关系
_timers[id]=WeakTask(st);
//在时间轮中添加定时任务
_wheels[(_tick+st->GetTimeout())%_capacity].emplace_back(st);
}
//由EventLoop刷新定时任务(定时器的操作可能会在多线程中进行,为了避免加锁提高效率,将定时器的操作都放在同一个线程EventLoop中进行)
void RefreshTimerTask(uint64_t id);
//延迟/刷新定时任务:通过WeakTask构建出新的SharedTask智能指针,再将新的智能指针添加到时间轮中
void RefreshTimerTaskInLoop(uint64_t id)
{
//通过WeakTask的lock函数构建出新的SharedTask智能指针
auto it=_timers.find(id);
if(it==_timers.end())
{
return;
}
SharedTask st=it->second.lock();
//将新的智能指针添加到时间轮中
_wheels[(_tick+st->GetTimeout())%_capacity].emplace_back(st);
}
//由EventLoop取消定时任务(定时器的操作可能会在多线程中进行,为了避免加锁提高效率,将定时器的操作都放在同一个线程EventLoop中进行)
void CancelTimerTask(uint64_t id);
//取消定时任务
void CancelTimerTaskInLoop(uint64_t id)
{
auto it=_timers.find(id);
if(it==_timers.end())
{
return;
}
SharedTask st=it->second.lock();//通过weak_ptr构建出shared_ptr
if(st!=nullptr)
st->Cancel();//取消定时任务
}
//检查指定定时任务是否存在(注意此接口存在线程安全问题,只可以在EventLoop线程中调用)
bool CheckTimerTask(uint64_t id)
{
if(_timers.find(id)==_timers.end())
{
return false;
}
else
{
return true;
}
}
//获取定时器文件描述符(注意此接口存在线程安全问题,只可以在EventLoop线程中调用)
int GetTimerfd()
{
return _timerfd;
}
private:
//删除时间轮中的定时任务
void RemoveTimer(uint64_t id)
{
auto it=_timers.find(id);
if(it!=_timers.end())
{
_timers.erase(id);
}
}
//创建并设置timerfd定时器(阻塞模式)
static int CreateTimerfd()
{
//创建timerfd定时器
int timerfd=timerfd_create(CLOCK_REALTIME,0);//此处的0表示timerfd为阻塞模式
if(timerfd<0)
{
LOG(ERROR,"Create timerfd fialed!");
abort();
}
// 设置定时器的触发时间
struct itimerspec its = {{1, 0}, {1, 0}}; // 首次1秒触发,后续间隔1秒触发(实际上就是每秒钟触发一次)
int n = timerfd_settime(timerfd, 0, &its, nullptr);
if (n < 0)
{
perror("Set timerfd failed!");
abort();
}
return timerfd;
}
//读取timerfd定时器中的数据(读取一次即可)
uint64_t ReadTimerfd()
{
uint64_t times;//存放timerfd定时器的数据(即距离上一次读取数据已经触发多少次)
ssize_t ret=read(_timerfd,×,sizeof(times));
if(ret<0)
{
LOG(ERROR,"Read timerfd fialed!");
abort();
}
return times;//实际超时的次数,即实际超时时间(单位秒)
}
//执行定时任务:时间轮的秒针指向哪里,哪里的定时任务就要被触发执行(本质是释放此处的SharedTask,引用计数-1)
void RunTimerTask()
{
//秒针+1
_tick=(_tick+1)%_capacity;
//执行任务,即销毁shared_ptr对象(具体是否销毁取决于引用计数)
_wheels[_tick].clear();
}
//定时器时间到
void OnTime()
{
//读取定时器数据,即超时时间
uint64_t times = ReadTimerfd();
//超时多长时间,就要执行多少次定时任务,因为每执行一次定时任务,秒指针+1,否则如果超时n秒,但是秒指针只走1秒,超时关闭连接就会出错
for(int i=0;i<times;i++)
{
RunTimerTask();//执行到期的定时任务
}
}
};
//TimeWheelTimer.cpp
#include "TimeWheelTimer.hpp"
#include "EventLoop.hpp"
// 由EventLoop添加定时任务(定时器的操作可能会在多线程中进行,为了避免加锁提高效率,将定时器的操作都放在同一个线程EventLoop中进行)
void TimerWheel::AddTimerTask(uint64_t id, uint32_t timeout, const TaskFunc &task)
{
_eventLoop->RunOrPush(std::bind(&TimerWheel::AddTimerTaskInLoop, this, id, timeout, task));
}
// 由EventLoop刷新定时任务(定时器的操作可能会在多线程中进行,为了避免加锁提高效率,将定时器的操作都放在同一个线程EventLoop中进行)
void TimerWheel::RefreshTimerTask(uint64_t id)
{
_eventLoop->RunOrPush(std::bind(&TimerWheel::RefreshTimerTaskInLoop, this, id));
}
// 由EventLoop取消定时任务(定时器的操作可能会在多线程中进行,为了避免加锁提高效率,将定时器的操作都放在同一个线程EventLoop中进行)
void TimerWheel::CancelTimerTask(uint64_t id)
{
_eventLoop->RunOrPush(std::bind(&TimerWheel::CancelTimerTaskInLoop, this, id));
}
九、Connection模块
1.功能
Connection模块即通信连接模块,管理监听到的新连接,本质是整合了Socket模块、Channel模块和Buffer模块,并添加了连接ID、连接状态和协议上下文。
设计了事件就绪的5个回调函数(读事件就绪、写事件就绪、错误事件就绪、挂断事件就绪、任意事件就绪),并新增4个由用户设置的回调函数(连接成功后的回调函数、业务处理函数、连接关闭后的回调函数、任意事件回调函数),以及发送/接收数据功能、协议切换功能、开/关非活跃连接销毁功能。
// Connection.hpp
// Connection模块
// 功能:管理一个通信套接字(通信连接),包括:
// 1.发送/接收数据(本质是将数据存放到缓冲区中);
// 2.管理发送/接收缓冲区;
// 3.管理协议切换(协议上下文);
// 4.开/关非活跃连接销毁;
// 5.关闭连接
#pragma once
#include <iostream>
#include <memory>
#include <stdint.h>
#include <functional>
#include "Channel.hpp"
#include "Socket.hpp"
#include "Buffer.hpp"
#include "Any.hpp"
#include "EventLoop.hpp"
#include "Log.hpp"
// 连接状态
typedef enum
{
DISCONNECTED, // 连接关闭
CONNECTING, // 正在连接
CONNECTED, // 已连接
DISCONNECTING // 连接待关闭
} ConnectionStatus;
class Connection;
using SPtrConnection = std::shared_ptr<Connection>;
using ConnectedCallback = std::function<void(const SPtrConnection &)>;
using ProcessBusinessCallback = std::function<void(const SPtrConnection &, Buffer*)>;
using ClosedCallback = std::function<void(const SPtrConnection &)>;
using AnyEventCallback = std::function<void(const SPtrConnection &)>;
// 连接类
class Connection: public std::enable_shared_from_this<Connection>
{
private:
uint64_t _connectionID; // 连接ID,唯一标识符(如果开启非活跃连接销毁功能,用到定时器,该ID也会作为定时任务ID)
int _sockfd; // 连接的文件描述符
Channel _channel; // 管理连接的监控事件(监控哪些事件和如何处理就绪事件)
Socket _socket; // 管理连接的套接字操作
bool _isReleaseInactiveConnection; // 是否启动非活跃连接销毁功能
Buffer _receiveBuffer; // 接收缓冲区
Buffer _sendBuffer; // 发送缓冲区
Any _protocolContext; // 协议上下文
ConnectionStatus _status; // 连接状态
EventLoop *_eventLoop; // 关联EventLoop线程,使Connection固定工作在同一个线程中(todo)
// 这四个回调函数是让用户设置的
ConnectedCallback _connectedCallback;//连接成功后的回调函数
ProcessBusinessCallback _processBusinessCallback; // 业务处理回调函数
ClosedCallback _closedCallback;//连接关闭后的回调函数
AnyEventCallback _anyEventCallback;//任意事件回调函数
//移除服务器中管理的连接信息(在关闭连接后调用)
ClosedCallback _removeConnectionFromServerCallback;
public:
// 构造函数
Connection(EventLoop* eventLoop, uint64_t connectionID, int sockfd)
:_connectionID(connectionID),
_sockfd(sockfd),
_channel(eventLoop, sockfd),
_socket(sockfd),
_isReleaseInactiveConnection(false),
//_receiveBuffer(),
//_sendBuffer(),
//_protocolContext(),
_status(CONNECTING),
_eventLoop(eventLoop)
{
_channel.SetReadCallBack(std::bind(&Connection::HandleRead,this));
_channel.SetWriteCallBack(std::bind(&Connection::HandleWrite,this));
_channel.SetCloseCallBack(std::bind(&Connection::HandleClose,this));
_channel.SetErrorCallBack(std::bind(&Connection::HandleError,this));
_channel.SetAnyCallBack(std::bind(&Connection::HandleAny,this));
}
// 析构函数
~Connection()
{
LOG(INFO,"Close connection:"<<_connectionID);
}
// 获取连接的文件描述符
int GetSockfd()
{
return _sockfd;
}
// 获取连接ID
int GetConnectionID()
{
return _connectionID;
}
// 判断连接是否是已连接状态
bool IsConnected()
{
return _status==CONNECTED;
}
// 设置连接协议上下文
void SetProtocolContext(const Any& protocolContext)
{
_protocolContext=protocolContext;
}
// 获取连接协议上下文
Any *GetProtocolContext()
{
return &_protocolContext;
}
// 设置连接成功后的回调函数
void SetConnectedCallback(const ConnectedCallback &callback)
{
_connectedCallback=callback;
}
// 设置业务处理函数
void SetProcessBusinessCallback(const ProcessBusinessCallback &callback)
{
_processBusinessCallback=callback;
}
// 设置连接关闭后的回调函数
void SetClosedCallback(const ClosedCallback &callback)
{
_closedCallback=callback;
}
// 设置任意事件回调函数
void SetAnyEventCallback(const AnyEventCallback &callback)
{
_anyEventCallback=callback;
}
// 设置移除服务器中管理的连接信息的回调函数
void SetRemoveConnectionFromServerCallback(const ClosedCallback& callback)
{
_removeConnectionFromServerCallback=callback;
}
//------------------------------------------------------------------------------
//初始化连接(正在连接状态转变到已连接状态)
void InitializeConnection()
{
_eventLoop->RunOrPush(std::bind(&Connection::InitializeConnectionInLoop,this));
}
// 发送数据
void Send(const char *data, size_t len)
{
//由于Send操作要被压入到任务队列,不是立即执行。如果当用户传入的是一个临时空间,那么可能出现数据还没有发送临时空间已经被销毁的情况
//因此此处要创建一个Buffer缓冲区存储data的数据
Buffer buffer;
buffer.Write(data,len);
_eventLoop->RunOrPush(std::bind(&Connection::SendInLoop,this,buffer));
}
// 即将关闭连接(并非真正关闭连接,关闭前要处理接收/发送缓冲区的数据)
void AboutToCloseConnection()
{
_eventLoop->RunOrPush(std::bind(&Connection::AboutToCloseConnectionInLoop,this));
}
// 实际关闭连接
void CloseConnection()
{
_eventLoop->PushInTasks(std::bind(&Connection::CloseConnectionInLoop,this));
//_eventLoop->RunOrPush(std::bind(&Connection::CloseConnectionInLoop,this));//错误的
}
// 启动非活跃连接销毁功能
void EnableReleaseInactiveConnection(int second)
{
_eventLoop->RunOrPush(std::bind(&Connection::EnableReleaseInactiveConnectionInLoop,this,second));
}
// 关闭非活跃连接销毁功能
void CancelReleaseInactiveConnection()
{
_eventLoop->RunOrPush(std::bind(&Connection::CancelReleaseInactiveConnectionInLoop,this));
}
// 切换协议(必须在EVentLoop线程中立即执行)
//(不是线程安全的,因为其必须在eventloop线程中调用。虽然通过RunOrPush函数调用,使得其如果是在EventLoop以外的线程中调用时,会被添加到任务队列
//但是任务队列中的任务执行可能会有延迟,导致协议还未及时切换,就已经有新数据到来,因此SwitchProtocol函数必须是在EventLoop线程中调用,才会立即执行)
void SwitchProtocol(const Any& protocolContext,
const ConnectedCallback& connectedCallback,
const ProcessBusinessCallback& processBusinessCallback,
const ClosedCallback& closedCallback,
const AnyEventCallback& anyEventCallback)
{
_eventLoop->AssertInLoop();//必须保证在EventLoop线程中调用
_eventLoop->RunOrPush(std::bind(&Connection::SwitchProtocolInLoop,this,
protocolContext,connectedCallback,processBusinessCallback,closedCallback,anyEventCallback));
}
//---------------------------------------------------------------------------------------------------//
private:
//初始化连接(正在连接状态转变到已连接状态)
void InitializeConnectionInLoop()
{
assert(_status==CONNECTING);
//1.修改连接状态
_status=CONNECTED;
//2.启动读事件监控
_channel.AddRead();
//3.调用用户设置的已连接的回调函数
if(_connectedCallback)
{
_connectedCallback(shared_from_this());
}
//LOG(DEBUG,"new connection's fd:"<<_sockfd);
}
// 真正关闭连接
void CloseConnectionInLoop()
{
//1.修改连接状态
_status=DISCONNECTED;
//2.移除连接的所有事件监控
//_channel.RemoveAll();
_channel.CancelMonitor();
//3.关闭socket文件描述符
_socket.Close();
//4.取消定时器中的定时任务(非活跃连接销毁)
if(_eventLoop->CheckTimerTask(_connectionID))
{
CancelReleaseInactiveConnectionInLoop();
}
//5.调用用户设置的关闭连接回调函数
if(_closedCallback)
{
_closedCallback(shared_from_this());
}
//6.移除服务器中管理的连接信息
if(_removeConnectionFromServerCallback)
{
_removeConnectionFromServerCallback(shared_from_this());
}
}
// 即将关闭连接(关闭连接前还需要处理接收/发送缓冲区中的数据)
void AboutToCloseConnectionInLoop()
{
//1.设置连接状态为 正在关闭连接
_status=DISCONNECTING;
//2.处理接收缓冲区中的数据(调用业务处理函数去处理)
if(_receiveBuffer.GetReadableSize()>0)
{
_processBusinessCallback(shared_from_this(),&_receiveBuffer);
}
//3.处理发送缓冲区中的数据(直接开启写事件监控即可)
if(_sendBuffer.GetReadableSize()>0)
{
if(_channel.CheckWrite()==false)
{
_channel.AddWrite();
}
}
//4.关闭连接
if(_sendBuffer.GetReadableSize()==0)
{
CloseConnection();
}
}
//发送数据(并非真正的发送数据,而是将数据放到连接的发送缓冲区_sendBuffer中)
void SendInLoop(Buffer buffer)
{
if(_status==DISCONNECTED)
{
return;
}
//1.将数据buffer放到_sendBuffer中
_sendBuffer.WriteBuffer(buffer);
//2.启动写事件监控
if(_channel.CheckWrite()==false)
{
_channel.AddWrite();
}
}
//开启非活跃连接销毁功能
void EnableReleaseInactiveConnectionInLoop(int second)
{
//1.判断标志置为true
_isReleaseInactiveConnection=true;
//2.如果定时器已存在销毁任务,则刷新销毁任务
if(_eventLoop->CheckTimerTask(_connectionID))
{
_eventLoop->RefreshTimerTask(_connectionID);
}
else//3.如果定时器中没有销毁任务,直接添加
{
_eventLoop->AddTimerTask(_connectionID,second,std::bind(&Connection::CloseConnection,this));
}
}
//关闭非活跃连接销毁功能
void CancelReleaseInactiveConnectionInLoop()
{
//1.判断标志置为false
_isReleaseInactiveConnection=false;
//2.从定时器中删除销毁任务
if(_eventLoop->CheckTimerTask(_connectionID))
{
_eventLoop->CancelTimerTask(_connectionID);
}
}
//切换协议
void SwitchProtocolInLoop(const Any& protocolContext,
const ConnectedCallback& connectedCallback,
const ProcessBusinessCallback& processBusinessCallback,
const ClosedCallback& closedCallback,
const AnyEventCallback& anyEventCallback)
{
_protocolContext=protocolContext;//切换协议上下文
_connectedCallback=connectedCallback;
_processBusinessCallback=processBusinessCallback;
_closedCallback=closedCallback;
_anyEventCallback=anyEventCallback;
}
private:
//---------------------------------------连接事件触发的5个回调函数----------------------------------------------------
// 读事件就绪回调函数
void HandleRead()
{
// 读事件触发时,即服务端接收到了客户端的数据,首先要将接收到的数据存入接收缓冲区中,再调用业务处理函数处理这些数据
// 1.接收数据,存入接收缓冲区中
char buffer[65536]; // 临时缓冲区,64KB
ssize_t len = _socket.NonBlockReceive(buffer, sizeof(buffer) - 1); // 非阻塞式接收数据放到buffer中,返回实际接收到的数据长度
if (len < 0) // receive出错
{
return AboutToCloseConnection(); // 即将关闭连接
}
else
{
_receiveBuffer.Write(buffer, len); // 存放到接收缓冲区中
}
// 2.调用用户设置的业务处理函数,处理数据
if (_receiveBuffer.GetReadableSize() > 0)
{
_processBusinessCallback(shared_from_this(), &_receiveBuffer);
}
}
// 写事件就绪回调函数
void HandleWrite()
{
// 写事件触发时,说明socket的写缓冲区是不满的,此时应用层可以调用send/write发送数据,本质是将数据拷贝到socket的发送缓冲区。
//如果发送缓冲区的数据全部发送完成之后,要关闭写事件监控
// 1.将发送缓冲区中的可读数据发送给客户端(本质是将数据拷贝到socket的发送缓冲区中)
ssize_t len = _socket.NonBlockSend(_sendBuffer.GetReadAddress(), _sendBuffer.GetReadableSize()); // 返回实际发送数据的长度
if (len < 0) //发送失败
{
//如果此时接收缓冲区中还有未被处理的数据,则要先处理一下
if (_receiveBuffer.GetReadableSize() > 0)
{
// 调用用户设置的业务处理函数,处理接收缓冲区中未被处理的数据
_processBusinessCallback(shared_from_this(), &_receiveBuffer);
}
// 接收缓冲区的数据处理完毕后直接关闭连接
return CloseConnection();
}
else
{
// 发送缓冲区的可读指针向后移动,因为数据已经发送出去了
_sendBuffer.MoveRead(len);
}
//2.关闭写事件监控
if(_sendBuffer.GetReadableSize()==0)
{
_channel.RemoveWrite();
//如果当前连接是正在关闭状态,此处可以直接关闭连接
if(_status==DISCONNECTING)
{
return CloseConnection();
}
}
}
// 挂断事件就绪回调函数
void HandleClose()
{
//挂断事件触发时,直接关闭连接即可,但是关闭前要先处理接收缓冲区中的数据
//1.处理接收缓冲区的数据(调用用户设置的业务处理函数)
if(_receiveBuffer.GetReadableSize()>0)
{
_processBusinessCallback(shared_from_this(), &_receiveBuffer);
}
//2.立即关闭连接
CloseConnection();
}
// 错误事件就绪回调函数
void HandleError()
{
//错误事件触发时,也要关闭连接,但是关闭前要先处理接收缓冲区中的数据,逻辑和挂断事件一样,直接调用挂断事件回调函数
HandleClose();
}
// 任意事件就绪回调函数
void HandleAny()
{
//任意事件触发时,即连接中有事件触发(不论什么事件)。
//1.检查是否开启非活跃连接销毁功能,开启了就要刷新连接
if(_isReleaseInactiveConnection)
{
_eventLoop->RefreshTimerTask(_connectionID);
}
//2.调用用户设置的任意事件回调函数
if(_anyEventCallback)
{
_anyEventCallback(shared_from_this());
}
}
};
十、Acceptor模块
1.功能
Acceptor模块即监听连接,和Connection本质是相同的(Connection是通信连接)。用于管理服务端的监听套接字,并将该监听文件描述符交给epoll监控其读事件,每当读事件就绪时,就表示有新连接到来。
//Acceptor.hpp
//Acceptor模块
//功能:管理监听套接字,负责连接的获取(Acceptor模块和Connection模块本质是一样的,都是管理连接,只不过一个是监听连接,一个是通信连接)
//1.创建监听套接字
//2.启动读事件监控
//3.获取新连接
#pragma once
#include <functional>
#include "EventLoop.hpp"
#include "Socket.hpp"
#include "Channel.hpp"
using AcceptCallback=std::function<void(int)>;
class Acceptor
{
private:
Socket _listenSocket;//监听套接字
EventLoop* _eventLoop;//对监听套接字的事件监控
Channel _channel;//管理监听套接字的事件(读事件)
AcceptCallback _acceptCallback;//监听到新连接到来后的处理函数(由用户设置)
public:
//构造函数
Acceptor(EventLoop* eventLoop, int port)
:_listenSocket(CreateServer(port)),
_eventLoop(eventLoop),
_channel(eventLoop,_listenSocket.GetSockfd())
{
//设置读事件就绪处理函数(监听到新连接后的处理)
_channel.SetReadCallBack(std::bind(&Acceptor::HandleRead,this));
}
//设置回调函数
void SetAcceptCallback(AcceptCallback acceptCallback)
{
_acceptCallback=acceptCallback;
}
//添加监听
void AddListen()
{
_channel.AddRead();
}
private:
//处理获取到的新连接(读事件就绪)
void HandleRead()
{
do
{
//将新连接的文件描述符交给用户设置的回调函数处理(本质是交给服务端处理)
int commfd=_listenSocket.Accept();
if(commfd<0)
{
break;
}
if(_acceptCallback)
{
_acceptCallback(commfd);
}
}while(false);
}
//创建服务端监听连接(提供给_listenSocket初始化使用)
int CreateServer(int port)
{
bool ret=_listenSocket.CreateServer(port);//创建服务端监听连接
assert(ret);
return _listenSocket.GetSockfd();
}
};
十一、LoopThread模块
1.功能
LoopThread模块即循环线程,创建LoopThread对象,同时会创建子线程,并在子线程函数中创建EventLoop对象,即每个EventLoop对象都在独立的线程中,这些EventLoop对象就是从属Reactor,用于监控通信连接的事件并处理就绪的事件。
//LoopThread.hpp
//LoopThread模块(循环线程)
//功能:将EventLoop绑定到线程中,每个EventLoop都又独属于自己的线程。(先创建线程,再在线程中创建EventLoop对象,并提供给外部返回EventLoop对象的接口)
//注意:必须先创建线程,再实例化EventLoop对象,因为创建EventLoop的构造函数中要绑定其所属的线程ID
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include "EventLoop.hpp"
class LoopThread
{
private:
std::mutex _mutex;//互斥锁(用于实现EventLoop获取的同步,避免出现线程创建好了,但是未实例化EventLoop对象时,就要获取EventLoop对象的情况)
std::condition_variable _cv;//条件变量(用于实现EventLoop获取的同步,避免出现线程创建好了,但是未实例化EventLoop对象时,就要获取EventLoop对象的情况)
EventLoop* _eventLoop;//EventLoop对象指针(线程创建好之后,再实例化)
std::thread _thread;//EventLoop对象所属的线程
public:
//构造函数
LoopThread()
:_eventLoop(nullptr),
_thread(std::bind(&LoopThread::ThreadExecute,this))
{}
//析构函数
~LoopThread()
{
delete _eventLoop;
}
//获取EventLoop对象指针
EventLoop* GetEventLoop()
{
EventLoop* tmpEventLoop=nullptr;//使用临时副本,在加锁范围内将_eventLoop地址交给tmpEventLoop,防止解锁后被其他线程修改
{
std::unique_lock<std::mutex> ulock(_mutex);
_cv.wait(ulock, [this]{ return _eventLoop != nullptr; });
tmpEventLoop=_eventLoop;
}
return tmpEventLoop;
}
private:
//线程启动函数(实例化EventLoop对象,执行EventLoop的事件循环)
void ThreadExecute()
{
EventLoop eventLoop;
{
std::unique_lock<std::mutex> ulock(_mutex);
//_eventLoop=new EventLoop;//实例化EventLoop对象
_eventLoop=&eventLoop;
_cv.notify_all();
}
eventLoop.Start();//事件循环
}
};
十二、LoopThreadPool模块
1.功能
LoopThreadPool模块即从属Reactor池,可以创建指定的数量的从属Reactor,并使用vector管理,当有新连接到来时,采用RR轮转思想分配从属Reactor给新连接,即将新连接交给其被分配到的从属Reactor进行事件监控和处理。
//LoopThreadPool.hpp
//循环线程池模块(从属Reactor池)
//功能:
//管理和分配所有的循环线程:
//1.配置循环线程的数量,0个或多个。如果是0个,则主Reactor同时负责监听连接和通信连接的事件监控和处理,即单Reactor服务器;
//如果是多个,则主Reactor只负责监听连接的事件监控和处理,从属Reactor负责通信连接的事件监控和处理
//2.管理所有的循环线程
//3.线程分配,如果有多个循环线程,则采用RR轮转思想去进行线程分配
#pragma once
#include <vector>
#include "EventLoop.hpp"
#include "LoopThread.hpp"
class LoopThreadPool
{
private:
int _loopThreadCount;//循环线程(从属Reactor)数量
int _loopThreadIndex;//循环线程(从属Reactor)下标
std::vector<LoopThread*> _loopThreads;//管理循环线程(从属Reactor)
std::vector<EventLoop*> _eventLoops;//管理循环线程中的Reactor
EventLoop* _mainEventLoop;//主Reactor(如果从属Reactor数量为0,那么新连接的事件监控和处理都交给主Reactor)
public:
//构造函数
LoopThreadPool(EventLoop* mainEventLoop)
:_loopThreadCount(0),
_loopThreadIndex(0),
_mainEventLoop(mainEventLoop)
{}
//设置循环线程数量
void SetLoopThreadCount(int loopThreadCount)
{
_loopThreadCount=loopThreadCount;
}
//创建所有的循环线程
void CreateAllLoopThread()
{
if(_loopThreadCount>0)
{
_loopThreads.resize(_loopThreadCount);
_eventLoops.resize(_loopThreadCount);
for(int i=0;i<_loopThreadCount;i++)
{
_loopThreads[i]=new LoopThread;
_eventLoops[i]=_loopThreads[i]->GetEventLoop();
}
}
}
//分配循环线程(采用RR轮转思想)
EventLoop* AssignLoopThread()
{
//如果只有主Reactor,则新连接的事件监控和处理都交给主Reactor
if(_loopThreadCount==0)
{
return _mainEventLoop;
}
else//有从属Reactor,将新连接交给从属Reactor
{
_loopThreadIndex=(_loopThreadIndex+1)%_loopThreadCount;
return _eventLoops[_loopThreadIndex];
}
}
};
十三、TcpServer模块
1.功能
TcpServer模块即高并发服务器本体,之后需要搭建任意服务器只需要包含TcpServer对象,再根据具体的服务器需求添加功能即可。
TcpServer类中有
- EventLoop对象:即主Reactor,只负责监听新连接
- Acceptor对象:绑定的是主Reactor,用于监听新连接,TcpServer类中设计了Acceptor对象监听到新连接,即读事件就绪的处理函数Accept,通过Accept根据新获取到的通信套接字创建通信连接,并分配从属Reactor给通信连接(如果从属Reactor池设置的从属Reactor数量为0,则将新连接绑定到主Reactor,那么主Reactor既要负责监听新连接,又要负责通信连接的数据处理)
- LoopThreadPool对象:从属Reactor池,用于分配从属Reactor
除此以外,TcpServer还管理所有的通信连接,并为它们分配连接ID,并且提供四个对外接口让用户为新连接设置Connection模块中的4个回调函数(连接成功后的回调函数、业务处理函数、连接关闭后的回调函数、任意事件回调函数),以及是否开启超时连接销毁功能的接口
//TcpServer.hpp
//服务器模块
//功能:
//1.管理一个主事件循环EventLoop和一个监听模块对象Acceptor
//2.管理所有的连接
//3.设置连接的各种回调函数
//4.管理从属线程池并设置从属线程池数量
//5.选择是否开启超时连接销毁功能
//6.添加定时任务
//7.启动服务器
#pragma once
#include <cstdint>
#include <unordered_map>
#include "EventLoop.hpp"
#include "Acceptor.hpp"
#include "LoopThreadPool.hpp"
#include "Connection.hpp"
#include "Log.hpp"
using ConnectedCallback = std::function<void(const SPtrConnection &)>;
using ProcessBusinessCallback = std::function<void(const SPtrConnection &, Buffer*)>;
using ClosedCallback = std::function<void(const SPtrConnection &)>;
using AnyEventCallback = std::function<void(const SPtrConnection &)>;
using Functor = std::function<void()>; // 任务函数
class TcpServer
{
private:
uint64_t _connectionID;//连接的自增ID(每次有新连接到来,+1)
int _port;//服务器监听的端口号
EventLoop _mainEventLoop;//主Reactor
Acceptor _acceptor;//监听对象(内部绑定的是主Reactor)
LoopThreadPool _loopThreadPool;//从属Reactor线程池
std::unordered_map<uint64_t,SPtrConnection> _connections;//管理所有连接的哈希表
bool _isReleaseInactiveConnection;//是否启动非活跃连接销毁功能
int _timeout;//超时时间(非活跃连接销毁功能开启的前提下设置)
ConnectedCallback _connectedCallback;//连接成功后的回调函数
ProcessBusinessCallback _processBusinessCallback; // 业务处理回调函数
ClosedCallback _closedCallback;//连接关闭后的回调函数
AnyEventCallback _anyEventCallback;//任意事件回调函数(通常设置为刷新连接)
public:
//构造函数
TcpServer(int port)
:_connectionID(0),
_port(port),
_acceptor(&_mainEventLoop,_port),
_loopThreadPool(&_mainEventLoop),
_isReleaseInactiveConnection(false)
{
//监听连接设置新连接处理函数
_acceptor.SetAcceptCallback(std::bind(&TcpServer::Accept,this,std::placeholders::_1));
//监听连接添加监听
_acceptor.AddListen();
}
//设置连接成功后的回调函数
void SetConnectedCallback(std::function<void(const SPtrConnection &)> callback)
{
_connectedCallback=callback;
}
//设置业务处理回调函数
void SetProcessBusinessCallback(std::function<void(const SPtrConnection &, Buffer*)> callback)
{
_processBusinessCallback=callback;
}
//设置连接关闭后的回调函数
void SetClosedCallback(std::function<void(const SPtrConnection &)> callback)
{
_closedCallback=callback;
}
//设置任意事件回调函数
void SetAnyEventCallback(std::function<void(const SPtrConnection &)> callback)
{
_anyEventCallback=callback;
}
//设置从属线程池中的线程数量
void SetLoopThreadCount(int loopThreadCount)
{
_loopThreadPool.SetLoopThreadCount(loopThreadCount);
}
// 启动非活跃连接销毁功能
void EnableReleaseInactiveConnection(int timeout)
{
_isReleaseInactiveConnection=true;
_timeout=timeout;
}
//添加定时任务
void AddTimedTask(const Functor& task, int timeout)
{
_mainEventLoop.RunOrPush(std::bind(&TcpServer::AddTimedTaskInLoop,this,task,timeout));
}
//启动服务器
void StartTcpServer()
{
//从属Reactor线程池创建所有线程
_loopThreadPool.CreateAllLoopThread();
//主Reactor开始监听新连接
_mainEventLoop.Start();
}
private:
//接收连接(监听连接的可读事件处理函数)
void Accept(int commfd)
{
//连接ID自增
_connectionID++;
LOG(INFO,"监听到新连接"<<_connectionID);
//LOG(DEBUG,"文件描述符是"<<commfd);
//创建通信连接
SPtrConnection newConnection(new Connection(_loopThreadPool.AssignLoopThread(),_connectionID,commfd));
//设置通信连接的4个回调函数
newConnection->SetConnectedCallback(_connectedCallback);
newConnection->SetProcessBusinessCallback(_processBusinessCallback);
newConnection->SetClosedCallback(_closedCallback);
newConnection->SetAnyEventCallback(_anyEventCallback);
//设置通信连接的...
newConnection->SetRemoveConnectionFromServerCallback(std::bind(&TcpServer::RemoveConnectionFromServer,this,std::placeholders::_1));
//启动非活跃链接销毁功能
if(_isReleaseInactiveConnection)
{
newConnection->EnableReleaseInactiveConnection(_timeout);
}
//初始化连接
newConnection->InitializeConnection();
//将新连接添加到服务器的哈希表中
_connections.insert(std::make_pair(_connectionID,newConnection));
}
//从服务器中移除连接()
void RemoveConnectionFromServer(const SPtrConnection& connection)
{
_mainEventLoop.RunOrPush(std::bind(&TcpServer::RemoveConnectionFromServerInLoop,this,connection));
}
//从服务器中移除连接
void RemoveConnectionFromServerInLoop(const SPtrConnection& connection)
{
auto it=_connections.find(connection->GetConnectionID());
if(it!=_connections.end())
{
_connections.erase(it);
}
}
//添加定时任务
void AddTimedTaskInLoop(const Functor& task, int timeout)
{
_connectionID++;//???????????????
_mainEventLoop.AddTimerTask(_connectionID,timeout,task);
}
};
十四、使用TcpServer搭建Http服务器
1.Util工具类模块
只介绍几个核心功能:
(1)字符串分割函数
按照指定分割字符对字符串进行分割,分割结果存放在vector容器中。在解析Http请求时会用到该函数。
static size_t SplitString(const std::string& str, const std::string& sep, std::vector<std::string>* array)
{
//功能:将字符串str按分隔符sep进行分割,结果存放在array中
//返回值:最终分割得到的字符串数量
//分隔符不能为空串"",因为空串被认为存在于字符串的任何位置,find立即返回0,SplitString会进入死循环
assert(!sep.empty() && array!=nullptr);
//str为空串直接返回0
if(str.empty())
{
return 0;
}
size_t startPos=0;//开始查找的起始位置
size_t retPos=str.find(sep,startPos);//查找结果
while(retPos!=std::string::npos)
{
if(0!=retPos-startPos)//跳过多个分隔符连续的情况,不插入(否则插入的是空串,没意义)
{
array->emplace_back(str.substr(startPos,retPos-startPos));
}
startPos=retPos+sep.size();
retPos=str.find(sep,startPos);//注意:startPos大于字符串长度会直接返回npos
}
//将最后一个分隔符之后的字符串加入array中(或者没有字符串中没有分隔符)
if(startPos<str.size())//跳过末尾是分隔符的情况,不插入(否则插入的是空串,没意义)
{
array->emplace_back(str.substr(startPos));
}
//返回分割得到的字串数量
return array->size();
}
(2)文件读取/写入函数
文件读取和文件写入采用的是fstream实现。根据不同的Http请求可能会用到这两个函数,例如处理Http静态资源请求时就需要读取指定资源数据,并将数据存放到Http响应的正文中;处理Http的PUT和POST方法时,上传文件就需要用到文件写入。
//1.读取文件内容
static bool ReadFile(const std::string& filePath, std::string* buffer)
{
//功能:读取路径为filePath的文件内容,并存储到buffer中
bool ret=false;
do
{
//以二进制方式打开文件
std::fstream file(filePath,std::ios::in | std::ios::binary);
//打开文件失败
if(file.is_open()==false)
{
LOG(ERROR,"打开文件"<<filePath<<"失败");
break;
}
//获取文件大小
file.seekg(0,file.end);//文件指针定位到文件末尾(从文件末尾开始向后偏移0个位置,即文件末尾)
size_t fileSize=file.tellg();//文件大小(从文件开头算起,获取文件指针的当前位置,即文件大小)
//读取数据
file.seekg(0,file.beg);//文件指针回到文件开头
buffer->resize(fileSize);//扩充buffer大小
file.read(buffer->data(),fileSize);//一次性读取全部数据,并存放在buffer中
if(file.good()==false)
{
LOG(ERROR,"读取文件"<<filePath<<"失败:"<<strerror(errno));
file.close();//关闭文件
break;
}
ret=true;
}while(false);
return ret;
}
//2.写入文件
static bool WriteFile(const std::string& filePath, const std::string& buffer)
{
//功能:将buffer的数据写入到文件filePath中
//返回值
bool ret=false;
//打开文件
std::fstream file(filePath,std::ios::out | std::ios::binary | std::ios::trunc);//trunc表示文件不存在则创建,文件已存在则清空内容
do
{
//打开文件失败
if(file.is_open()==false)
{
LOG(ERROR,"打开文件"<<filePath<<"失败");
break;
}
//写入数据
file.write(buffer.c_str(),buffer.size());
//判断是否全部写入
if(file.good()==false)
{
LOG(ERROR,"写入文件"<<filePath<<"失败");
file.close();
break;
}
ret=true;
} while (false);
return ret;
}
(3)URL编码/解码函数
URL是HTTP协议中用于定位和访问网络资源的地址,即对网络资源进行唯一标识。
URL只能使用字母、数字和部分特殊字符(如- _ . ~ ! * ' ( ) ; : @ & = + $ , / ? # [ ]),对于其他的字符(如中文、空格、日文等)直接出现在URL中会被视为非法,因此需要对它们进行编码。
编码方法:将字符的ASCII值转换为16进制,并在前面加上%(对于不是ASCII字符的字符,将其通过其他字符集,如UTF-8,转换为字节序列,再进行编码)
注意:URL编码和解码只针对URL中的资源路径和查询字符串
此处的URL编码很简洁,对于字母、数字和- _ . ~不进行编码,对于空格可选编码为+,其他所有字符都需要进行编码
// 3.URL编码
static std::string URLEncode(const std::string& URL, bool isSpecialEncodeSpace)
{
//功能:对URL进行编码,并通过isSpecialEncodeSpace提供可选项:是否对空格编码为+
//编码方法:对于字母、数字和- _ . ~不进行编码,对于空格可选编码为+,其他所有字符都编码为16进制并在前面加上%
//注意:URL编码和解码只针对URL中的资源路径和查询字符串,其他的协议版本和请求方法中的符号都是不需要编码的
std::string result;//编码结果
for(auto& c:URL)
{
if(isalnum(c) || c=='-' || c=='_' || c=='.' || c=='~')//字母、数字和- _ . ~不进行编码(isalnum函数用于判断单个字符是否为字母或数字)
{
result+=c;
}
else if(c==' ' && isSpecialEncodeSpace)//空格编码为+
{
result+='+';
}
else//其他字符进行百分号编码
{
char hex[4]={0};//前3个字节存放%xx,最后一个字节存放'\0'
snprintf(hex,4,"%%%02X",static_cast<unsigned char>(c));//%%表示输出一个%,%02X表示将c转换为16进制形式,宽度为2,不足2位则在前面补0
//注意:要将有符号数(-128~127)c转换为无符号数(范围是0~255),因为UTF-8的编码可能会超过127,被作为负数去再去转换为16进制,导致转换出错
result+=hex;
}
}
return result;
}
// 4.URL解码
static std::string URLDecode(const std::string& URL, bool isSpecialDecodeSpace)
{
//功能:对URL进行解码,并通过isSpecialDecodeSpace提供是否对空格符号进行特殊解码的选项(即+号转换为空格)
std::string result;//解码结果
for(int i=0;i<URL.size();i++)
{
if(URL[i]=='+'&&isSpecialDecodeSpace)//+解码为空格
{
result+=' ';
}
else if(URL[i]=='%' && (i+2)<URL.size())
{
char hex[3]={URL[i+1],URL[i+2],'\0'};
char tmp=std::strtol(hex,nullptr,16);//tsrtol函数是将hex字符串以16进制为基准,转换为10进制数
result+=tmp;
i+=2;//跳过已解码的符号
}
else//无需解码的符号
{
result+=URL[i];
}
}
return result;
}
(4)完整代码
//Util.hpp
//概述:提供给Http服务器使用的一些公用接口
//功能:
//1.读取文件内容
//2.写入文件
//3.URL编码
//4.URL解码
//5.根据HTTP状态码获取描述信息
//6.根据文件后缀获取mime(文件类型,指导客户端如何解析和展示资源)
//7.判断一个文件是否为目录
//8.判断一个文件是否为普通文件
//9.判断HTTP资源路径是否有效
//10.按指定字符对字符串分割
#pragma once
#include <string>
#include <vector>
#include <assert.h>
#include <fstream>
#include <cerrno>
#include <cstring>
#include <unordered_map>
#include <sys/types.h>
#include <sys/stat.h>
#include "Log.hpp"
#include "Buffer.hpp"
std::unordered_map<int, std::string> statusDescription = {
{100, "Continue"},
{101, "Switching Protocol"},
{102, "Processing"},
{103, "Early Hints"},
{200, "OK"},
{201, "Created"},
{202, "Accepted"},
{203, "Non-Authoritative Information"},
{204, "No Content"},
{205, "Reset Content"},
{206, "Partial Content"},
{207, "Multi-Status"},
{208, "Already Reported"},
{226, "IM Used"},
{300, "Multiple Choice"},
{301, "Moved Permanently"},
{302, "Found"},
{303, "See Other"},
{304, "Not Modified"},
{305, "Use Proxy"},
{306, "unused"},
{307, "Temporary Redirect"},
{308, "Permanent Redirect"},
{400, "Bad Request"},
{401, "Unauthorized"},
{402, "Payment Required"},
{403, "Forbidden"},
{404, "Not Found"},
{405, "Method Not Allowed"},
{406, "Not Acceptable"},
{407, "Proxy Authentication Required"},
{408, "Request Timeout"},
{409, "Conflict"},
{410, "Gone"},
{411, "Length Required"},
{412, "Precondition Failed"},
{413, "Payload Too Large"},
{414, "URL Too Long"},
{415, "Unsupported Media Type"},
{416, "Range Not Satisfiable"},
{417, "Expectation Failed"},
{418, "I'm a teapot"},
{421, "Misdirected Request"},
{422, "Unprocessable Entity"},
{423, "Locked"},
{424, "Failed Dependency"},
{425, "Too Early"},
{426, "Upgrade Required"},
{428, "Precondition Required"},
{429, "Too Many Requests"},
{431, "Request Header Fields Too Large"},
{451, "Unavailable For Legal Reasons"},
{501, "Not Implemented"},
{502, "Bad Gateway"},
{503, "Service Unavailable"},
{504, "Gateway Timeout"},
{505, "HTTP Version Not Supported"},
{506, "Variant Also Negotiates"},
{507, "Insufficient Storage"},
{508, "Loop Detected"},
{510, "Not Extended"},
{511, "Network Authentication Required"}
};
std::unordered_map<std::string, std::string> mime = {
{".aac", "audio/aac"},
{".abw", "application/x-abiword"},
{".arc", "application/x-freearc"},
{".avi", "video/x-msvideo"},
{".azw", "application/vnd.amazon.ebook"},
{".bin", "application/octet-stream"},
{".bmp", "image/bmp"},
{".bz", "application/x-bzip"},
{".bz2", "application/x-bzip2"},
{".csh", "application/x-csh"},
{".css", "text/css"},
{".csv", "text/csv"},
{".doc", "application/msword"},
{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
{".eot", "application/vnd.ms-fontobject"},
{".epub", "application/epub+zip"},
{".gif", "image/gif"},
{".htm", "text/html"},
{".html", "text/html"},
{".ico", "image/vnd.microsoft.icon"},
{".ics", "text/calendar"},
{".jar", "application/java-archive"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".js", "text/javascript"},
{".json", "application/json"},
{".jsonld", "application/ld+json"},
{".mid", "audio/midi"},
{".midi", "audio/x-midi"},
{".mjs", "text/javascript"},
{".mp3", "audio/mpeg"},
{".mpeg", "video/mpeg"},
{".mpkg", "application/vnd.apple.installer+xml"},
{".odp", "application/vnd.oasis.opendocument.presentation"},
{".ods", "application/vnd.oasis.opendocument.spreadsheet"},
{".odt", "application/vnd.oasis.opendocument.text"},
{".oga", "audio/ogg"},
{".ogv", "video/ogg"},
{".ogx", "application/ogg"},
{".otf", "font/otf"},
{".png", "image/png"},
{".pdf", "application/pdf"},
{".ppt", "application/vnd.ms-powerpoint"},
{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
{".rar", "application/x-rar-compressed"},
{".rtf", "application/rtf"},
{".sh", "application/x-sh"},
{".svg", "image/svg+xml"},
{".swf", "application/x-shockwave-flash"},
{".tar", "application/x-tar"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".ttf", "font/ttf"},
{".txt", "text/plain"},
{".vsd", "application/vnd.visio"},
{".wav", "audio/wav"},
{".weba", "audio/webm"},
{".webm", "video/webm"},
{".webp", "image/webp"},
{".woff", "font/woff"},
{".woff2", "font/woff2"},
{".xhtml", "application/xhtml+xml"},
{".xls", "application/vnd.ms-excel"},
{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
{".xml", "application/xml"},
{".xul", "application/vnd.mozilla.xul+xml"},
{".zip", "application/zip"},
{".3gp", "video/3gpp"},
{".3g2", "video/3gpp2"},
{".7z", "application/x-7z-compressed"}
};
class Util
{
public:
//10.按指定字符对字符串分割
static size_t SplitString(const std::string& str, const std::string& sep, std::vector<std::string>* array)
{
//功能:将字符串str按分隔符sep进行分割,结果存放在array中
//返回值:最终分割得到的字符串数量
//分隔符不能为空串"",因为空串被认为存在于字符串的任何位置,find立即返回0,SplitString会进入死循环
assert(!sep.empty() && array!=nullptr);
//str为空串直接返回0
if(str.empty())
{
return 0;
}
size_t startPos=0;//开始查找的起始位置
size_t retPos=str.find(sep,startPos);//查找结果
while(retPos!=std::string::npos)
{
if(0!=retPos-startPos)//跳过多个分隔符连续的情况,不插入(否则插入的是空串,没意义)
{
array->emplace_back(str.substr(startPos,retPos-startPos));
}
startPos=retPos+sep.size();
retPos=str.find(sep,startPos);//注意:startPos大于字符串长度会直接返回npos
}
//将最后一个分隔符之后的字符串加入array中(或者没有字符串中没有分隔符)
if(startPos<str.size())//跳过末尾是分隔符的情况,不插入(否则插入的是空串,没意义)
{
array->emplace_back(str.substr(startPos));
}
//返回分割得到的字串数量
return array->size();
}
//1.读取文件内容
static bool ReadFile(const std::string& filePath, std::string* buffer)
{
//功能:读取路径为filePath的文件内容,并存储到buffer中
bool ret=false;
do
{
//以二进制方式打开文件
std::fstream file(filePath,std::ios::in | std::ios::binary);
//打开文件失败
if(file.is_open()==false)
{
LOG(ERROR,"打开文件"<<filePath<<"失败");
break;
}
//获取文件大小
file.seekg(0,file.end);//文件指针定位到文件末尾(从文件末尾开始向后偏移0个位置,即文件末尾)
size_t fileSize=file.tellg();//文件大小(从文件开头算起,获取文件指针的当前位置,即文件大小)
//读取数据
file.seekg(0,file.beg);//文件指针回到文件开头
buffer->resize(fileSize);//扩充buffer大小
file.read(buffer->data(),fileSize);//一次性读取全部数据,并存放在buffer中
if(file.good()==false)
{
LOG(ERROR,"读取文件"<<filePath<<"失败:"<<strerror(errno));
file.close();//关闭文件
break;
}
ret=true;
}while(false);
return ret;
}
//2.写入文件
static bool WriteFile(const std::string& filePath, const std::string& buffer)
{
//功能:将buffer的数据写入到文件filePath中
//返回值
bool ret=false;
//打开文件
std::fstream file(filePath,std::ios::out | std::ios::binary | std::ios::trunc);//trunc表示文件不存在则创建,文件已存在则清空内容
do
{
//打开文件失败
if(file.is_open()==false)
{
LOG(ERROR,"打开文件"<<filePath<<"失败");
break;
}
//写入数据
file.write(buffer.c_str(),buffer.size());
//判断是否全部写入
if(file.good()==false)
{
LOG(ERROR,"写入文件"<<filePath<<"失败");
file.close();
break;
}
ret=true;
} while (false);
return ret;
}
// 3.URL编码
static std::string URLEncode(const std::string& URL, bool isSpecialEncodeSpace)
{
//功能:对URL进行编码,并通过isSpecialEncodeSpace提供可选项:是否对空格编码为+
//编码方法:对于字母、数字和- _ . ~不进行编码,对于空格可选编码为+,其他所有字符都编码为16进制并在前面加上%
//注意:URL编码和解码只针对URL中的资源路径和查询字符串,其他的协议版本和请求方法中的符号都是不需要编码的
std::string result;//编码结果
for(auto& c:URL)
{
if(isalnum(c) || c=='-' || c=='_' || c=='.' || c=='~')//字母、数字和- _ . ~不进行编码(isalnum函数用于判断单个字符是否为字母或数字)
{
result+=c;
}
else if(c==' ' && isSpecialEncodeSpace)//空格编码为+
{
result+='+';
}
else//其他字符进行百分号编码
{
char hex[4]={0};//前3个字节存放%xx,最后一个字节存放'\0'
snprintf(hex,4,"%%%02X",static_cast<unsigned char>(c));//%%表示输出一个%,%02X表示将c转换为16进制形式,宽度为2,不足2位则在前面补0
//注意:要将有符号数(-128~127)c转换为无符号数(范围是0~255),因为UTF-8的编码可能会超过127,被作为负数去再去转换为16进制,导致转换出错
result+=hex;
}
}
return result;
}
// 4.URL解码
static std::string URLDecode(const std::string& URL, bool isSpecialDecodeSpace)
{
//功能:对URL进行解码,并通过isSpecialDecodeSpace提供是否对空格符号进行特殊解码的选项(即+号转换为空格)
std::string result;//解码结果
for(int i=0;i<URL.size();i++)
{
if(URL[i]=='+'&&isSpecialDecodeSpace)//+解码为空格
{
result+=' ';
}
else if(URL[i]=='%' && (i+2)<URL.size())
{
char hex[3]={URL[i+1],URL[i+2],'\0'};
char tmp=std::strtol(hex,nullptr,16);//tsrtol函数是将hex字符串以16进制为基准,转换为10进制数
result+=tmp;
i+=2;//跳过已解码的符号
}
else//无需解码的符号
{
result+=URL[i];
}
}
return result;
}
// 5.根据HTTP状态码获取描述信息
static std::string GetStatusDescription(int status)
{
auto ret=statusDescription.find(status);
if(ret==statusDescription.end())
{
return "Unknown";
}
else
{
return ret->second;
}
}
// 6.根据文件后缀获取mime(文件类型,指导客户端如何解析和展示资源)
static std::string GetMime(const std::string& fileName)
{
//提取文件后缀
size_t pos=fileName.find_last_of('.');
if(pos==std::string::npos)
{
return "application/octet-stream";//默认为二进制流,即当作二进制文件进行下载处理
}
std::string suffix=fileName.substr(pos);//文件后缀
//获取mime
auto ret=mime.find(suffix);
if(ret==mime.end())
{
return "application/octet-stream";//默认为二进制流,即当作二进制文件进行下载处理
}
else
{
return ret->second;
}
}
// 7.判断一个文件是否为目录
static bool IsDirectory(const std::string& filePath)
{
struct stat st;
int ret=stat(filePath.c_str(),&st);//获取文件状态信息,并存储到st中
if(ret<0)//获取失败
{
return false;
}
else
{
return S_ISDIR(st.st_mode);//通过宏定义判断是否为目录
}
}
// 8.判断一个文件是否为普通文件
static bool IsRegularFile(const std::string& filePath)
{
struct stat st;
int ret=stat(filePath.c_str(),&st);//获取文件状态信息,并存储到st中
if(ret<0)//获取失败
{
return false;
}
else
{
return S_ISREG(st.st_mode);//通过宏定义判断是否为目录
}
}
// 9.判断HTTP请求的资源路径是否有效
static bool IsValidResourcePath(const std::string& filePath)
{
//功能:判断请求的资源路径是否有效
//思想:通过字符串分割函数对请求资源路径按‘/’分隔符进行分割,通过目录深度来判断路径是否有效(深度是相对于Web根目录的)
//正常的请求资源路径是“/a/b/c/index.html”
//对资源路径按/分割
std::vector<std::string> array;
SplitString(filePath,"/",&array);
//检查目录深度(只要出现目录深度为负数,即跳到Web根目录的上层目录,就判断为无效)
int depth=0;//目录深度
for(auto& str:array)
{
if(str=="..")
{
depth--;
}
else
{
depth++;
}
if(depth<0)
{
return false;
}
}
return true;
}
};
2.HttpRequest模块
解析接收缓冲区中的Http请求,构建HttpRequest对象
//HttpRequest.hpp
//HttpRequest模块
//概述:解析Http请求,将Http请求的内容格式化到HttpRequest对象中
//功能:
//1.设置/获取Http请求的属性(请求方法、请求的资源路径、Http版本、请求正文)(通过公开这些成员变量实现)
//2.插入、查询、获取请求头部
//3.插入、查询、获取查询字符串参数
//4.获取正文长度
//5.判断Http请求是否为短连接
//6.重置HttpRequest
#pragma once
#include <string>
#include <unordered_map>
#include <regex>
class HttpRequest
{
public:
std::string _requestMethod;//请求方法
std::string _resourcePath;//请求的资源路径
std::string _protocolVersion;//协议版本
std::string _content;//请求正文
std::unordered_map<std::string,std::string> _requestHeaders;//请求报头
std::unordered_map<std::string,std::string> _parameters;//查询字符串的参数
std::smatch _matches;//存储正则表达式的匹配结果
public:
//构造函数
HttpRequest()
:_protocolVersion("HTTP/1.1")
{}
//重置HttpRequest
void ResetHttpRequest()
{
_requestMethod.clear();
_resourcePath.clear();
_protocolVersion="HTTP/1.1";
_content.clear();
_requestHeaders.clear();
_parameters.clear();
std::smatch matches;
swap(_matches,matches);
}
//插入请求报头
void InsertRequestHeader(const std::string& key, const std::string& value)
{
if(HasRequestHeader(key))//已有该请求头部,不插入
{
return;
}
else
{
_requestHeaders.insert(std::make_pair(key,value));
}
}
//检查是否存在请求报头
bool HasRequestHeader(const std::string& key)
{
auto ret=_requestHeaders.find(key);
if(ret==_requestHeaders.end())
{
return false;
}
else
{
return true;
}
}
//获取指定的请求报头
std::string GetRequestHeader(const std::string& key)
{
if(HasRequestHeader(key)==false)//没有该请求头部,返回空串
{
return "";
}
else
{
return _requestHeaders[key];
}
}
//插入查询字符串参数
void InsertParameter(const std::string& key, const std::string& value)
{
if(HasParameter(key))//已有该请求头部,不插入
{
return;
}
else
{
_parameters.insert(std::make_pair(key,value));
}
}
//检查是否存在查询字符串参数
bool HasParameter(const std::string& key)
{
auto ret=_parameters.find(key);
if(ret==_parameters.end())
{
return false;
}
else
{
return true;
}
}
//获取指定的查询字符串参数
std::string GetParameter(const std::string& key)
{
if(HasParameter(key)==false)//没有该请求头部,返回空串
{
return "";
}
else
{
return _parameters[key];
}
}
//获取正文长度
size_t GetContentLength()
{
//思想:请求头部中有Content-Length:xx ,直接通过该请求头部来获取正文长度
bool ret=HasRequestHeader("Content-Length");
if(ret==false)//没有正文
{
return 0;
}
else
{
return std::stol(_requestHeaders["Content-Length"]);
}
}
//判断是否为短连接
bool IsShortConnection()
{
//思想:请求头部中有Connection:xxx ,如果是长连接则是Connection:keep-alive,如果是短连接则是Connection:close
//如果没有Connection请求报头也是短连接
if(HasRequestHeader("Connection")==false)//没有Connection请求报头是短连接
{
return true;
}
else
{
if(_requestHeaders["Connection"]=="close")
{
return true;
}
else
{
return false;
}
}
}
};
3.HttpResponse模块
根据Http请求处理结果和HttpRequest对象,构建HttpResponse对象
//HttpResponse.hpp
//HttpResponse模块
//概述:根据HttpRequest和业务处理结果来填充HttpResponse
#pragma once
#include <string>
#include <unordered_map>
class HttpResponse
{
public:
//std::string _protocolVersion;//协议版本?
//std::string _statusDescription;//状态码描述?
int _status;//状态码
std::string _content;//响应正文
bool _isRedirect;//重定向标志
std::string _redirectURL;//重定向地址
std::unordered_map<std::string,std::string> _responseHeaders;//响应报头
public:
//无参构造函数
HttpResponse()
:_status(200),
_isRedirect(false)
{}
//有参构造函数
HttpResponse(int status)
:_status(status),
_isRedirect(false)
{}
//重置HttpResponse
void ResetHttpResponse()
{
_status=200;//默认为OK
_isRedirect=false;//默认不重定向
_content.clear();
_redirectURL.clear();
_responseHeaders.clear();
}
//插入响应报头
void InsertResponseHeader(const std::string& key, const std::string& value)
{
if(HasResponseHeader(key))//已有该请求头部,不插入
{
return;
}
else
{
_responseHeaders.insert(std::make_pair(key,value));
}
}
//检查是否存在响应报头
bool HasResponseHeader(const std::string& key)
{
auto ret=_responseHeaders.find(key);
if(ret==_responseHeaders.end())
{
return false;
}
else
{
return true;
}
}
//获取指定的响应报头
std::string GetResponseHeader(const std::string& key)
{
if(HasResponseHeader(key)==false)//没有该请求头部,返回空串
{
return "";
}
else
{
return _responseHeaders[key];
}
}
//设置响应正文
void SetContent(const std::string& content, const std::string& type="text/html")
{
_content=content;
InsertResponseHeader("Content-Type",type);//设置响应正文的同时,响应报头中的正文类型也要设置,默认为text/html
}
//设置重定向
void SetRedirect(const std::string& redirectURL, int status=302)
{
//如果有重定向,则Http响应的状态码必须为3xx,具体的重定向类型取决于具体的状态码,例如302表示临时重定向,301表示永久重定向
_isRedirect=true;
_status=status;//默认为302,临时重定向
_redirectURL=redirectURL;
}
//判断是否为短连接
bool IsShortConnection()
{
//思想:请求头部中有Connection:xxx ,如果是长连接则是Connection:keep-alive,如果是短连接则是Connection:close
//如果没有Connection请求报头也是短连接
if(HasResponseHeader("Connection")==false)//没有Connection请求报头是短连接
{
return true;
}
else
{
if(_responseHeaders["Connection"]=="close")
{
return true;
}
else
{
return false;
}
}
}
};
4.HttpContext模块
HttpContext即Http上下文,主要用于解析Http请求,由于接收缓冲区中的Http请求数据可能不完整,因此需要分批次接收解析Http请求,先接收解析请求行,再接收解析请求报头,并且根据接收解析进度设置接收解析状态,HttpContext的作用就是当再一次接收解析Http请求时,根据接收解析状态直接跳转到上一次未解析完的地方开始解析。每当一个完整的Http请求解析完毕后,需要重置上下文,因为一个连接会接收到很多Http请求,但是只有一个HttpContext,因此每次解析完毕都要重置。
//HttpContext.hpp
//请求接收上下文模块
#pragma once
#include <string>
#include <vector>
#include <cctype>
#include "HttpRequest.hpp"
#include "Util.hpp"
#include "Buffer.hpp"
#define MAX_LINE 8*1024 //请求行的最大字节数,8KB
//当前Http请求的接收状态
typedef enum
{
RECV_HTTP_ERROR, //接收Http请求错误
RECV_HTTP_LINE, //正在接收Http请求行
RECV_HTTP_HEADER, //正在接收Http请求报头
RECV_HTTP_CONTENT, //正在接收Http请求正文
RECV_HTTP_OVER //接收Http请求完毕
}HttpRecvStatus;
class HttpContext
{
private:
int _responseStatus;//响应状态码(对应的是Http响应中的状态码,在解析Http请求时会出现许多错误,不同的错误对应不同的响应状态码)
HttpRecvStatus _httpReceiveStatus;//当前的Http请求的接收状态
HttpRequest _httpRequest;//Http请求对象,所有已经接收并解析好的数据都存放在Http请求对象中
public:
//构造函数
HttpContext()
:_responseStatus(200),//默认响应状态码为OK
_httpReceiveStatus(RECV_HTTP_LINE)//默认当前Http请求接收状态为“正在接收请求行”
{}
//重置上下文
void ResetContext()
{
_responseStatus=200;//默认为OK
_httpReceiveStatus=RECV_HTTP_LINE;//初始时为正在接收Http请求行
_httpRequest.ResetHttpRequest();//HttpRequest重置
}
//获取响应状态码
int GetResponseStatus()
{
return _responseStatus;
}
//获取
HttpRecvStatus GetHttpRecvStatus()
{
return _httpReceiveStatus;
}
//获取Http请求对象
HttpRequest& GetHttpRequest()
{
return _httpRequest;
}
//接收并解析Http请求
void ReceiveAndParseHttpRequest(Buffer* buffer)
{
//思想:根据http请求的接收状态,确定从哪一步开始接收,不需要break,因为每一步接收函数前都有接收状态判断,如果状态不对,是不会进行接受的
//如果在接收过程中,ReceiveHttpxxx函数返回false,接收状态会被设置为RECV_HTTP_ERROR,不会进行任何接收
switch(_httpReceiveStatus)
{
case RECV_HTTP_LINE:ReceiveHttpRequestLine(buffer);
case RECV_HTTP_HEADER:ReceiveHttpRequestHeaders(buffer);
case RECV_HTTP_CONTENT:ReceiveHttpRequestContent(buffer);
}
}
private:
//---------------------下面的函数都为ReceiveAndParseHttpRequest函数根据不同的Http请求接收状态去调用-----------------------------
//接收Http请求行
bool ReceiveHttpRequestLine(Buffer* buffer)
{
//功能:从接收缓冲区中读取一行数据(即Http请求行),并分析有无问题,如果没有问题调用ParseHttpRequestLine函数解析Http请求行
if (_httpReceiveStatus != RECV_HTTP_LINE)
{
return false;
}
//从接收缓冲区中获取请求行(读取到\n位置)
std::string line=buffer->ReadLine();//读取一行的时候,读指针会后移(不必担心等待下一次再接收会有问题,因为一旦接收出错,直接关闭连接)
//未获取到请求行(接收缓冲区中不足一行数据,没有\n)
if(line.size()==0)
{
if(buffer->GetReadableSize()>MAX_LINE)//接收缓冲区中不足一行数据,但是已有的数据已经超过8KB,显然是错误的
{
_httpReceiveStatus=RECV_HTTP_ERROR;//设置Http请求的接收状态为“接收错误”
_responseStatus=414;//设置响应状态码为414,即URL Too Long(URL 过长)
LOG(DEBUG,"接收缓冲区中不足一行数据但是已有的数据已经超过8KB,显然是错误的");
return false;
}
else//接收缓冲区不足一行数据,继续等待新数据到来
{
return true;
}
}
else//成功获取到请求行
{
if(line.size()>MAX_LINE)//请求行的长度太长,超过8KB,显然错误
{
_httpReceiveStatus=RECV_HTTP_ERROR;//设置Http请求的接收状态为“接收错误”
_responseStatus=414;//设置响应状态码为414,即URL Too Long(URL 过长)
LOG(DEBUG,"成功获取到请求行,但是请求行的长度太长,超过8KB,显然错误");
return false;
}
else//请求行没有问题
{
bool ret=ParseHttpRequestLine(line);//解析Http请求行
if(ret==false)
{
return false;
}
else
{
_httpReceiveStatus=RECV_HTTP_HEADER;//设置http请求接收状态为“正在接收请求头部”(说明,请求行已经接收完毕并且解析完毕了)
return true;
}
}
}
}
//解析Http请求行
bool ParseHttpRequestLine(const std::string& requestLine)
{
//思想:使用正则则表达式对请求行进行解析
std::smatch matches;//存储正则表达式解析的结果
//正则表达式解析规则(不捕获末尾的\r\n)
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);
bool ret=std::regex_match(requestLine,matches,e);//正则表达式解析
if(ret==false)//解析失败
{
_httpReceiveStatus=RECV_HTTP_ERROR;//设置Http请求的接收状态为“接收错误”
_responseStatus=400;//设置响应状态码为400,即Bad Request,表示服务器无法理解客户端发送的请求
return false;
}
else//解析成功,将matches中的数据存放到HttpRequest对象中
{
//根据正则表达式解析规则:
//matches[0]是完整的请求行
//matches[1]是请求方法
//matches[2]是资源路径
//matches[3]是查询字符串
//matches[4]协议版本
//将解析到的数据存放到HttpRequest对象中
_httpRequest._requestMethod=matches[1];
//将请求方法都转换为大写(例如get->GET)
std::transform(_httpRequest._requestMethod.begin(),_httpRequest._requestMethod.end(),_httpRequest._requestMethod.begin(),::toupper);
_httpRequest._resourcePath=Util::URLDecode(matches[2],false);//要对资源路径进行解码,资源路径中可能会出现特殊符号,如中文字符
_httpRequest._protocolVersion=matches[4];
//对于解析到的查询字符串,需要进一步解析:调用字符串分割函数,按照&符号进行分割(user=zz&password=123)
std::vector<std::string> array1;
std::string queryStr=matches[3];//查询字符串
Util::SplitString(queryStr,"&",&array1);//按照&符号对查询字符串进行分割
for(auto& parameter:array1)//如果没有查询字符串,会直接跳过for循环,返回true
{
std::vector<std::string> array2;
Util::SplitString(parameter,"=",&array2);//按照=符号对参数进行分割
if(array2.size()<=1)//http请求的参数错误(说明没有=符号,未进行分割)
{
_httpReceiveStatus=RECV_HTTP_ERROR;//设置Http请求的接收状态为“接收错误”
_responseStatus=400;//设置响应状态码为400,即Bad Request,表示服务器无法理解客户端发送的请求
return false;
}
else
{
//对于查询字符串中的参数需要解码,参数中可能会出现特殊符号
std::string key=Util::URLDecode(array2[0],true);
std::string value=Util::URLDecode(array2[1],true);
//将解析到的参数存放到HttpRequest对象中
_httpRequest.InsertParameter(key,value);
}
}
return true;
}
}
//接收Http请求报头
bool ReceiveHttpRequestHeaders(Buffer* buffer)
{
//思想:从接收缓冲区中一行一行的读取数据,直到遇到换行\n,即空行
if(_httpReceiveStatus!=RECV_HTTP_HEADER)
{
return false;
}
while(true)
{
// 从接收缓冲区中获取一个请求报头(读取到\n位置)
std::string line = buffer->ReadLine(); // 读取一行的时候,读指针会后移(不必担心等待下一次再接收会有问题,因为一旦接收出错,直接关闭连接)
// 未获取到一个请求报头(接收缓冲区中不足一行数据,没有\n)
if (line.size() == 0)
{
if (buffer->GetReadableSize() > MAX_LINE) // 接收缓冲区中不足一行数据,但是已有的数据已经超过8KB,显然是错误的
{
_httpReceiveStatus = RECV_HTTP_ERROR; // 设置Http请求的接收状态为“接收错误”
_responseStatus = 431; // 设置响应状态码为431,即Request Header Fields Too Large,表示客户端发送的请求头字段过长,服务器无法或不愿处理该请求
return false;
}
else // 接收缓冲区不足一行数据,继续等待新数据到来
{
return true;
}
}
else // 成功获取到一行数据,即一个请求报头
{
if (line.size() > MAX_LINE) // 一个请求报头的长度太长,超过8KB,显然错误
{
_httpReceiveStatus = RECV_HTTP_ERROR; // 设置Http请求的接收状态为“接收错误”
_responseStatus = 431; // 设置响应状态码为431,即Request Header Fields Too Large,表示客户端发送的请求头字段过长,服务器无法或不愿处理该请求
return false;
}
else // 获取到的一个请求报头长度没有问题
{
if(line=="\n"||line=="\r\n")//已经解析到空行了,所有的请求报头都已经解析完毕,跳出循环
{
break;
}
bool ret = ParseHttpRequestHeaders(line); // 解析Http请求报头
if (ret == false)
{
return false;
}
// else
// {
// _httpReceiveStatus=RECV_HTTP_CONTENT;//设置http请求接收状态为“正在接收请求正文”(说明请求报头已经接收完毕并且解析完毕了)
// return true;
// }
}
}
}
_httpReceiveStatus=RECV_HTTP_CONTENT;//设置http请求接收状态为“正在接收请求正文”(说明请求报头已经接收完毕并且解析完毕了)
return true;
}
//解析Http请求报头
bool ParseHttpRequestHeaders(std::string& requestHeader)
{
//注意:此时传入进来的requestHeader是 key: value\r\n 格式
//去除末尾的\r\n
if(requestHeader.back()=='\n')
{
requestHeader.pop_back();
}
if(requestHeader.back()=='\r')
{
requestHeader.pop_back();
}
std::vector<std::string> array;
Util::SplitString(requestHeader,": ",&array);
if(array.size()<=1)
{
_httpReceiveStatus=RECV_HTTP_ERROR;//设置Http请求的接收状态为“接收错误”
_responseStatus=400;//设置响应状态码为400,即Bad Request,表示服务器无法理解客户端发送的请求
return false;
}
else
{
std::string key=array[0];
std::string value=array[1];
_httpRequest.InsertRequestHeader(key,value);
return true;
}
}
//接收Http请求正文
bool ReceiveHttpRequestContent(Buffer* buffer)
{
if(_httpReceiveStatus!=RECV_HTTP_CONTENT)
{
return false;
}
//1.判断是否存在正文(通过获取正文长度来判断)
size_t contentLength=_httpRequest.GetContentLength();//正文长度
if(contentLength==0)//没有正文
{
_httpReceiveStatus=RECV_HTTP_OVER;//设置Http请求的接收状态为“接收完毕”
return true;
}
//2.从接收缓冲区中获取正文
size_t leftSize=contentLength-_httpRequest._content.size();//还需要从缓冲区中获取的正文数据长度
if(buffer->GetReadableSize()>=leftSize)//接收缓冲区中的数据长度超过请求正文长度
{
_httpRequest._content.append(buffer->GetReadAddress(),leftSize);
buffer->MoveRead(leftSize);
_httpReceiveStatus=RECV_HTTP_OVER;//设置Http请求的接收状态为“接收完毕”
return true;
}
else//接收缓冲区中的数据长度不足请求正文长度,获取全部数据,剩余数据等待新数据到来再获取
{
_httpRequest._content.append(buffer->GetReadAddress(),buffer->GetReadableSize());
buffer->MoveRead(buffer->GetReadableSize());
//此处返回时就不能设置Http请求的接收状态为“接收完毕”
return true;
}
}
};
5.HttpServer模块
HttpServer模块即Http服务器,封装TcpServer对象和Http协议。使用浏览器访问服务器地址后,即浏览器向服务器建立连接并发送Http请求,服务器处理Http请求并返回Http响应。
Http服务器处理流程:
- 1.通过Socket接口接收数据,存放到接收缓冲区中
- 2.调用ProcessBusiness业务处理函数处理数据
- 3.接收和解析Http请求,得到HttpRequest对象
- 4.根据Http请求到请求路由中执行对应的处理函数
- 5.处理完毕后得到HttpResponse对象和Http响应字符串,返回给客户端Http响应
//HttpServer.hpp
//HttpServer模块:利用TcpServer搭建的Http服务器
//概述:管理请求路由表和TcpServer对象
//Http服务器处理流程:
//1.通过Socket接口接收数据,存放到接收缓冲区中
//2.调用ProcessBusiness业务处理函数处理数据()
//3.接收和解析Http请求,得到HttpRequest对象
//4.根据Http请求到请求路由中执行对应的处理函数
//5.处理完毕后得到HttpResponse对象
//功能:
#pragma once
#include <string>
#include <unordered_map>
#include <functional>
#include <sstream>
#include <regex>
#include <functional>
#include "HttpRequest.hpp"
#include "HttpResponse.hpp"
#include "HttpContext.hpp"
#include "TcpServer.hpp"
#define DEFAULT_TIMEOUT 30//默认的超时时间
using Handler=std::function<void(const HttpRequest&,HttpResponse*)>;//请求路由表中的处理函数
class HttpServer
{
private:
//注意:路由表存储的类型是 正则表达式->功能函数 (注意:unordered_map不支持存储regex)
std::vector<std::pair<std::regex,Handler>> _getRoute;//Get方法的请求路由表
std::vector<std::pair<std::regex,Handler>> _postRoute;//Post方法的请求路由表
std::vector<std::pair<std::regex,Handler>> _putRoute;//Put方法的请求路由表
std::vector<std::pair<std::regex,Handler>> _deleteRoute;//Delete方法的请求路由表
std::string _webRootDirectory;//Web根目录
TcpServer _tcpServer;//高性Tcp服务器
public:
//构造函数
HttpServer(int port, int timeout=DEFAULT_TIMEOUT)
:_tcpServer(port)
{
_tcpServer.EnableReleaseInactiveConnection(timeout);//Http服务器默认开启超时连接销毁功能
_tcpServer.SetConnectedCallback(std::bind(&HttpServer::Connected,this,std::placeholders::_1));
_tcpServer.SetProcessBusinessCallback(std::bind(&HttpServer::ProcessBusiness,this,std::placeholders::_1,std::placeholders::_2));
}
//设置Web根目录
void SetWebDirectory(const std::string& webRootDirectory)
{
bool ret=Util::IsDirectory(webRootDirectory);
assert(ret);
_webRootDirectory=webRootDirectory;
}
//设置循环线程数量
void SetThreadCount(int count)
{
_tcpServer.SetLoopThreadCount(count);
}
//添加Get方法的功能函数
void AddGetHandler(const std::string& re, const Handler& handler)
{
_getRoute.push_back(std::make_pair(std::regex(re),handler));
}
//添加Post方法的功能函数
void AddPostHandler(const std::string& re, const Handler& handler)
{
_postRoute.push_back(std::make_pair(std::regex(re),handler));
}
//添加Put方法的功能函数
void AddPutHandler(const std::string& re, const Handler& handler)
{
_putRoute.push_back(std::make_pair(std::regex(re),handler));
}
//添加Delete方法的功能函数
void AddDeleteHandler(const std::string& re, const Handler& handler)
{
_deleteRoute.push_back(std::make_pair(std::regex(re),handler));
}
//启动Http服务器
void Start()
{
_tcpServer.StartTcpServer();
}
private:
//连接成功后的回调函数
void Connected(const SPtrConnection& connection)
{
//功能:设置连接的Http请求上下文
connection->SetProtocolContext(HttpContext());
LOG(INFO,"连接"<<connection->GetConnectionID()<<",连接成功");
}
//业务处理函数(设置给TcpServer)
void ProcessBusiness(const SPtrConnection& connection, Buffer* buffer)
{
//功能:是获取Http请求上下文,并对Http请求进行解析,构建HttpRequest对象
// 再通过请求路由调用对应的处理函数,得到HttpResponse对象,再根据HttpResponse对象给客户端返回Http响应,最后根据长短连接决定是否关闭连接
while (buffer->GetReadableSize() > 0)
{
// 1.获取Http请求上下文
HttpContext *context = connection->GetProtocolContext()->GetPVal<HttpContext>(); // 获取Http请求上下文
// 2.根据Http请求上下文对接收缓冲区中的数据(Http请求)进行接收并解析,解析完毕得到HttpRequest对象
context->ReceiveAndParseHttpRequest(buffer);
HttpResponse response(context->GetResponseStatus());// Http响应
HttpRequest &request = context->GetHttpRequest();// Http请求
if (context->GetResponseStatus() >= 400) // Http请求的解析出错
{
// 向Http响应中添加错误显示页面,并关闭连接
// 这里,失败了,就清空缓冲区
AddErrorPage(request, &response); // 向Http响应中添加错误显示页面(包括响应状态码和描述)
SendHttpResponse(connection,request,response);// 返回给客户端Http响应
//一旦解析出错了,就要清空接收缓冲区并且重置上下文
context->ResetContext();
buffer->Clear();
connection->AboutToCloseConnection(); // 关闭连接
return;
}
if (context->GetHttpRecvStatus() != RECV_HTTP_OVER) // 当前连接的Http请求暂时还未接收完整,先退出,等后续完整的Http请求到达接收缓冲区再继续处理
{
return;
}
//执行到此处,说明一次Http请求已经全部接收并解析完毕了(注意是一次Http请求,接收缓冲区中可能会存在多个Http请求,或者不完整的Http请求)
// 3.路由(根据HttpRequest对象中的请求方法和资源路径确定具体的处理函数)
Route(request, &response);
// 4.给客户端返回Http响应
SendHttpResponse(connection,request,response);
// 5.重置Http请求上下文
context->ResetContext();
// 6.根据长短连接决定是否关闭连接
if (response.IsShortConnection())
{
connection->AboutToCloseConnection();
}
}
}
//发送Http响应
void SendHttpResponse(SPtrConnection connection,HttpRequest& request,HttpResponse& response)
{
//1.填充HttpResponse的响应报头
if(request.IsShortConnection())//填充长短连接报头
{
response.InsertResponseHeader("Connection","close");
}
else
{
response.InsertResponseHeader("Connection","keep-alive");
}
if(!response._content.empty() && !response.HasResponseHeader("Content-Length"))//填充正文长度报头
{
response.InsertResponseHeader("Content-Length",std::to_string(response._content.size()));
}
if(!response._content.empty() && !response.HasResponseHeader("Content-Type"))//填充正文类型
{
response.InsertResponseHeader("Content-Type","application/octet-stream");
}
if(response._isRedirect)//填充重定向报头
{
response.InsertResponseHeader("Location",response._redirectURL);
}
//2.组织Http响应数据(将HttpResponse对象中的内容组织成一个字符串,即Http响应)
std::stringstream responseStr;
//2.1组织状态行
responseStr<<request._protocolVersion<<" "<<response._status<<" "<<Util::GetStatusDescription(response._status)<<"\r\n";
//2.2组织响应报头
for(auto& header:response._responseHeaders)
{
responseStr<<header.first<<": "<<header.second<<"\r\n";
}
//2.3组织空行
responseStr<<"\r\n";
//2.4组织正文
responseStr<<response._content;
//3.发送Http响应数据(字符串)
connection->Send(responseStr.str().c_str(),responseStr.str().size());
}
//向HttpResponse中添加错误显示页面(设置错误显示的Html到响应正文中)
void AddErrorPage(const HttpRequest& request,HttpResponse* response)
{
// 1. 设计一个错误展示页面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(response->_status);
body += " ";
body += Util::GetStatusDescription(response->_status);
body += "</h1>";
body += "</body>";
body += "</html>";
// 2. 将错误显示页面,当作响应正文,添加到Http响应中
response->SetContent(body, "text/html");
}
//路由(根据HttpRequest对象中的请求方法和资源路径确定具体的处理函数)
void Route(HttpRequest& request,HttpResponse* response)
{
//思想:先判断是静态资源请求还是功能性请求,静态资源请求交给HandleStaticResourceRequest函数处理,功能性请求交给HandleFunctionalRequest处理
//判断是否为静态资源请求还是功能性请求
if(IsStaticResourceRequest(request))//是静态资源请求
{
HandleStaticResourceRequest(request, response);
}
else//是功能性请求
{
if(request._requestMethod=="GET"||request._requestMethod=="HEAD")//Get方法
{
HandleFunctionalRequest(request,response,_getRoute);
}
else if(request._requestMethod=="POST")//Post方法
{
HandleFunctionalRequest(request,response,_postRoute);
}
else if(request._requestMethod=="PUT")//Put方法
{
HandleFunctionalRequest(request,response,_putRoute);
}
else if(request._requestMethod=="DELETE")//Delete方法
{
HandleFunctionalRequest(request,response,_deleteRoute);
}
else
{
//当前Http服务器仅支持四种Http方法,其他方法不支持,设置响应状态码为405
//表示客户端发送的请求使用了服务器不允许的 HTTP 方法
response->_status=405;
}
}
}
//判断是否为静态资源请求
bool IsStaticResourceRequest(const HttpRequest& request)
{
bool result=false;
do
{
//1.必须存在Web根目录
if(_webRootDirectory.empty())
{
break;
}
//2.请求方法必须是GET或者HEAD
if(request._requestMethod!="GET"&&request._requestMethod!="HEAD")
{
break;
}
//3.资源路径必须是合法的
if(Util::IsValidResourcePath(request._resourcePath)==false)
{
break;
}
//4.请求的资源必须存在(具体判断资源是否存在,在IsRegularFile函数中有判断)
std::string resourcePath=_webRootDirectory+request._resourcePath;//资源路径
if(resourcePath.back()=='/')//如果资源路径是根目录,或者xxx/xxx/的形式,直接加上index.html,让其访问主页或者对应目录下的主页
{
resourcePath+="index.html";
}
//5.请求的资源必须是普通文件
if(Util::IsRegularFile(resourcePath)==false)
{
break;
}
result=true;
}while(false);
return result;
}
//处理静态资源请求
void HandleStaticResourceRequest(const HttpRequest& request, HttpResponse* response)
{
//思想:读取静态资源的数据,转换为字符串放到HttpResponse的正文中,并设置mime(即Content-Type)
//构建完整的资源路径
std::string resourcePath=_webRootDirectory+request._resourcePath;
if (resourcePath.back() == '/') // 如果资源路径是根目录,或者xxx/xxx/的形式,直接加上index.html,让其访问主页或者对应目录下的主页
{
resourcePath += "index.html";
}
//从请求的资源文件中读取数据存放到Http响应正文中
bool ret=Util::ReadFile(resourcePath,&response->_content);
if(ret==false)
{
return;
}
else
{
std::string mime=Util::GetMime(resourcePath);
response->InsertResponseHeader("Content-Type",mime);
return;
}
}
//处理功能性请求
void HandleFunctionalRequest(HttpRequest& request, HttpResponse* response, std::vector<std::pair<std::regex,Handler>> route)
{
//思想:根据传入的HTTP方法路由表(正则表达式->功能函数),匹配并执行对应的功能函数
for(auto& handler:route)
{
const std::regex& re=handler.first;//正则表达式匹配规则
const Handler& func=handler.second;//功能函数
bool ret=std::regex_match(request._resourcePath,request._matches,re);//匹配资源路径
if(ret)
{
func(request,response);//匹配成功,执行功能函数
return;
}
}
response->_status=404;//设置响应状态码为404,表示“请求的资源未找到”
}
};
十五、HTTP服务器测试
1.长连接测试
//长连接测试1:创建一个客户端持续给服务器发送长连接请求,直到超过超时时间看看是否正常
//预期结果:即使超过超时时间,连接也一直存在,不会超时关闭
#include "../HttpServer.hpp"
#include "assert.h"
int main()
{
Socket clientSocket;
clientSocket.CreateClient(8888, "127.0.0.1");
std::string req = "GET /PrintRequest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(clientSocket.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(clientSocket.Receive(buf, 1023));
LOG(DEBUG,buf);
sleep(3);
}
clientSocket.Close();
return 0;
}
2.超时连接测试
//超时连接测试2:创建一个客户端间隔时间给服务器发送数据,间隔的时间大于超时连接销毁规定的时间,看看能否正常关闭超时连接(超时连接时间为30秒,间隔时间为30秒)
//预期结果:连接超时后关闭,客户端接收Http响应失败
#include "../HttpServer.hpp"
#include "assert.h"
int main()
{
Socket clientSocket;
clientSocket.CreateClient(8888, "127.0.0.1");
std::string req = "GET /PrintRequest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(clientSocket.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(clientSocket.Receive(buf, 1023));
LOG(DEBUG,buf);
sleep(40);
}
clientSocket.Close();
return 0;
}
3.不完整请求测试
//不完整请求测试3:请求正文的数据长度在请求报头中设置的值大于实际长度
//两种测试方案:
//方案1:只发送一次Http请求,请求正文的数据长度在请求报头中设置的值大于实际长度
//预期结果1:服务器在业务处理中发现请求不完整,等待下一次数据到来,但是一直没有新数据,直到超时销毁
//方案2:发送多次Http请求,请求正文的数据长度在请求报头中设置的值都大于实际长度
//预期结果2:服务器在业务处理中发现请求不完整,等待下一次数据到来,当第二次请求数据到来时,根据协议上下文会直接将第二次请求当成第一次请求的正文来接收
//以此类推去接收之后的所有请求,直到第一次请求正文接收完毕,长度满足,再接着去正常接收后面的请求
//(不过此时大概率会解析错误,因为说不定一个完整的Http请求已经有一部分被当成正文接收了,剩余的是残余的Http请求)
#include "../HttpServer.hpp"
#include "assert.h"
int main()
{
Socket clientSocket;
clientSocket.CreateClient(8888, "127.0.0.1");
std::string req = "GET /PrintRequest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\n1234567890";
while(1) {
assert(clientSocket.Send(req.c_str(), req.size()) != -1);
assert(clientSocket.Send(req.c_str(), req.size()) != -1);
assert(clientSocket.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(clientSocket.Receive(buf, 1023));
LOG(DEBUG,"---------------");
LOG(DEBUG,buf);//输出Http响应
LOG(DEBUG,"---------------");
sleep(3);
}
clientSocket.Close();
return 0;
}
4.业务处理超时测试
//处理也出超时测试4:假设一个Reactor中有5个连接,开启超时连接销毁时间是30秒,如果第一个连接业务处理的时间过长,超过超时时间,
//则后面的4个连接都会超时
//预期结果:后面的连接虽然超时了,但是不会立即销毁关闭连接,因为所有的关闭连接操作都不是立即执行的,而是被压入到任务队列中,
//在执行任务队列前,必须先进行事件处理,即业务处理,并且在事件处理时(HandleEvent函数中)会刷新活跃度,
//业务处理完毕后也不会超时关闭,除非下一次没有数据了才会超时关闭
//超时连接测试2:创建一个客户端间隔时间给服务器发送数据,间隔的时间大于超时连接销毁规定的时间,看看能否正常关闭超时连接(超时连接时间为30秒,间隔时间为30秒)
//预期结果:连接超时后关闭,客户端接收Http响应失败
#include "../HttpServer.hpp"
#include "assert.h"
int main()
{
//模拟10个连接
for(int i=0;i<20;i++)
{
pid_t pid=fork();
if(pid<0)
{
LOG(DEBUG,"Fork error");
}
else if(pid==0)
{
Socket clientSocket;
clientSocket.CreateClient(8888, "127.0.0.1");
std::string req = "GET /TimeoutTest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while (1)
{
assert(clientSocket.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(clientSocket.Receive(buf, 1023));
LOG(DEBUG, buf);
}
clientSocket.Close();
exit(0);
}
}
while(1)
{
sleep(1);
}
return 0;
}
5.多条请求测试
//多条请求测试:在一次数据中有多条Http请求
//预期结果:服务器能够正确处理每一条Http请求
#include "../HttpServer.hpp"
#include "assert.h"
int main()
{
Socket clientSocket;
clientSocket.CreateClient(8888, "127.0.0.1");
std::string req = "GET /PrintRequest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req+="GET /PrintRequest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req+="GET /PrintRequest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req+="GET /PrintRequest HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(clientSocket.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(clientSocket.Receive(buf, 1023));
LOG(DEBUG,buf);
sleep(3);
}
clientSocket.Close();
return 0;
}
5.上传大文件测试
//上传大文件测试:上传一个500MB的文件
//预期结果:大文件成功上传到Web根目录中的指定路径下
#include "../HttpServer.hpp"
#include "assert.h"
int main()
{
Socket clientSocket;
clientSocket.CreateClient(8888, "127.0.0.1");
std::string req = "PUT /BigFile.txt HTTP/1.1\r\nConnection: keep-alive\r\n";
std::string content;
Util::ReadFile("./BigFile.txt",&content);
req+="Content-Length: "+std::to_string(content.size())+"\r\n\r\n";
assert(clientSocket.Send(req.c_str(), req.size()) != -1); // 发送请求行和请求报头
assert(clientSocket.Send(content.c_str(), content.size()) != -1); // 发送请求正文
char buf[1024] = {0};
assert(clientSocket.Receive(buf, 1023));
LOG(DEBUG, buf);
clientSocket.Close();
return 0;
}
6.性能压力测试(并发量测试)
- 并发量:同一时刻能够处理的客户端的请求数量
- QPS:每秒能够处理的请求数量
测试环境:2核2G云服务器
测试原理:使用WebBench工具,创建多个子进程模拟并发访问的用户,每个子进程代表一个客户端,同时向服务器发送Http请求
(1)测试并发量1000
./webbench -c 1000 -t 60 http://127.0.0.1:8888/
并发量:1000 测试时间:60秒
Speed=61381 pages/min,换算为QPS约为61381➗60≈1023
3982355 bytes/sec,即服务器每秒传输的数据量约为 3.98MB,反映服务器的网络传输效率(受请求内容大小影响)
Requests: 61381 succeed, 0 failed,说明测试期间共处理 61381 个请求,全部成功,无失败请求,说明服务器在 1000 并发压力下稳定性良好,未出现超时、崩溃等问题