C++ -仿mudou库one thread one loop式并发服务器实现代码实现
上一个项目中我实现了日志系统,因此本项目的日志模块我便使用了该系统。
日志系统博客
日志代码仓库
不过在本项目中使用日志系统模块似乎有点大了,这里有个比较简便的日志宏的实现:
#define INF 0
#define DBG 1
#define ERR 2
// 默认日志级别
#define DEFAULT_LOG_LEVEL DBG
// 通用日志宏
#define LOG(level, format, ...) { \
if (level >= DEFAULT_LOG_LEVEL) { \
time_t t = time(NULL); \
struct tm *m = localtime(&t); \
char ts[32] = {0}; \
strftime(ts, 31, "%H:%M:%S", m); \
fprintf(stdout, "[%p %s %s:%d] " format "\n", \
(void*)pthread_self(), ts, __FILE__, __LINE__, ##__VA_ARGS__); \
} \
}
// 不同级别日志宏
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__);
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__);
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__);
一、SERVER模块
缓存区 Buffer
类
Buffer 类用于实现用户态缓冲区,提供数据存储、读取和管理等功能。
#define BUFF_DEFAULT_SIZE 1024
class Buffer
{
private:
std::vector<char> _buffer; // 缓冲区
uint64_t _read_idx; // 读偏移
uint64_t _write_idx; // 写偏移
public:
char *Begin() { return &*_buffer.begin(); }
// 获取读写位置起始地址
char *WritePosition() { return Begin() + _write_idx; }
char *ReadPosition() { return Begin() + _read_idx; }
// 获取当前缓冲区中 前后缓冲区位置大小
uint64_t TailFreeSpace() { return _buffer.size() - _write_idx; }
uint64_t HeadFreeSpace() { return _read_idx; }
// 获取可读数据大小
uint64_t ReadAbleSize() { return _write_idx - _read_idx; }
// 读写idx 向后移动
void MoveReadOffset(uint64_t len)
{
if (len == 0)
return;
assert(len <= ReadAbleSize());
_read_idx += len;
}
void MoveWriteOffset(uint64_t len)
{
if (len == 0)
return;
assert(len <= TailFreeSpace());
_write_idx += len;
}
void EnsureWriteSpace(uint64_t len)
{
if (len <= TailFreeSpace())
return;
/*如果len 是小于等于缓冲区中所有空闲位置大小,那么将数据往前移动*/
if (len <= TailFreeSpace() + HeadFreeSpace())
{
uint64_t res = ReadAbleSize();
std::copy(ReadPosition(), ReadPosition() + res, Begin());
_read_idx = 0;
_write_idx = res;
}
else // 缓冲区的空间不足,进行扩容
{
Log::DEBUG("RESIZE %ld", _write_idx + len);
_buffer.resize(_write_idx + len);
}
}
public:
Buffer() : _buffer(BUFF_DEFAULT_SIZE), _read_idx(0), _write_idx(0) {}
// write接口
void Write(const char *data, uint64_t len)
{
if (len == 0)
return;
EnsureWriteSpace(len);
std::copy(data, data + len, WritePosition());
}
void WriteAndPush(const char *data, uint64_t len)
{
if (len == 0)
return;
Write(data, len);
MoveWriteOffset(len);
}
void WriteString(const std::string &data)
{
Write(data.c_str(), data.size());
}
void WriteStringAndPush(const std::string &data)
{
WriteString(data);
MoveWriteOffset(data.size());
}
void WriteBuffer(Buffer &buf)
{
Write(buf.ReadPosition(), buf.ReadAbleSize());
}
void WriteBufferAndPush(Buffer &buf)
{
WriteBuffer(buf);
MoveWriteOffset(buf.ReadAbleSize());
}
// read接口
void Read(char *buf, uint64_t len)
{
if (len == 0)
return;
assert(len <= ReadAbleSize());
std::copy(ReadPosition(), ReadPosition() + len, buf);
}
void ReadAndPop(char *buf, uint64_t len)
{
Read(buf, len);
MoveReadOffset(len);
}
std::string ReadAsString(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
std::string ReadAsStringAndPop(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
// 这里我们就不需要再写一个ReadBuffer了 因为使用WriteBuffer可以实现同样的功能
std::string GetLine()
{
char *pos = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());
if (pos == NULL)
{
return "";
}
// +1是为了把换行字符也取出来。读取出\n
return ReadAsString(pos - ReadPosition() + 1);
}
std::string GetLineAndPop()
{
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
void Clear()
{
_write_idx = 0;
_read_idx = 0;
}
};
套接字Socket
类
封装套接字操作。
class Socket
{
private:
int _sockfd;
public:
Socket() : _sockfd(-1) {}
Socket(int fd) : _sockfd(fd) {}
~Socket() { Close(); }
int GetFd() { return _sockfd; }
// 创建套接字
bool CreateSock()
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
Log::ERROR("%s", "Create socket false");
return false;
}
return true;
}
// fd绑定ip port
bool Bind(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
int n = bind(_sockfd, (const sockaddr *)&addr, sizeof(addr));
if (n < 0)
{
Log::ERROR("%s", "SOCKET bind false");
return false;
}
return true;
}
// 监听
bool Listen(int backlog = 15)
{
int ret = listen(_sockfd, backlog);
if (ret < 0)
{
Log::ERROR("%s", "SOCKET listen false");
return false;
}
return true;
}
// 客户端连接服务器
bool Connect(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
int n = connect(_sockfd, (const sockaddr *)&addr, sizeof(addr));
if (n < 0)
{
Log::ERROR("%s", "SOCKET Connect false");
return false;
}
return true;
}
// 监听 接受新连接
int Accept()
{
// 这里我不需要知道ip 和port 所以我设置了 null
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0)
{
Log::ERROR("%s", "SOCKET accept false");
return -1;
}
Log::DEBUG("%s", "get a link");
return newfd;
}
// flag == MSG_DONTWAIT 表示当前接收为非阻塞。
ssize_t Recv(void *buff, size_t len, int nonblock_flag = 0)
{
ssize_t n = recv(_sockfd, buff, len, nonblock_flag);
if (n <= 0)
{
// EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误
// EINTR 表示当前socket的阻塞等待,被信号打断了,
if (errno == EAGAIN || errno == EINTR)
return 0; // 表示这次接收没有接收到数据
Log::ERROR("%s", "SOCKET recv false");
return -1;
}
return n;
}
ssize_t NonBlockRecv(void *buff, size_t len)
{
return Recv(buff, len, MSG_DONTWAIT);
}
ssize_t Send(const void *buff, size_t len, int nonblock_flag = 0)
{
ssize_t n = send(_sockfd, buff, len, nonblock_flag);
if (n <= 0)
{
// EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误
// EINTR 表示当前socket的阻塞等待,被信号打断了,
if (errno == EAGAIN || errno == EINTR)
return 0; // 表示这次接收没有接收到数据
Log::ERROR("%s", "SOCKET send false");
return -1;
}
return n;
}
ssize_t NonBlockSend(const void *buff, size_t len)
{
return Send(buff, len, MSG_DONTWAIT);
}
// 关闭套接字
void Close()
{
if (_sockfd != -1)
{
close(_sockfd);
_sockfd = -1;
}
}
// 设置非堵塞
void SetNonBlock()
{
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
// 设置套接字选项---开启地址端口重用
void ReuseAddress()
{
// int setsockopt(int fd, int leve, int optname, void *val, int vallen)
int val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));
val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));
}
// 创建一个服务端
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool nonblock_flag = false, int backlog = 15)
{
// 1. 创建套接字,2. 设置非阻塞,3. 启动地址重用 ,4.绑定地址 ,5. 开始监听
if (CreateSock() == false)
return false;
if (nonblock_flag == true)
SetNonBlock();
ReuseAddress();
if (Bind(ip, port) == false)
return false;
if (Listen(backlog) == false)
return false;
Log::INFO("%s", "create server success");
return true;
}
bool CreateClient(uint16_t port, const std::string &ip)
{
// 1. 创建套接字。bind(对于客户端可以忽略) 2. 连接服务器
if (CreateSock() == false)
return false;
if (Connect(ip, port) == false)
return false;
Log::INFO("%s", "create client success");
return true;
}
};
事件管理Channel
类
管理描述符的 I/O 事件(读、写、错误等)。
与 Poller 配合,触发事件时回调相应处理函数。
class Poller;
class EventLoop;
// Channel 实现单个 描述符与事件的管理
class Channel
{
using EventCallback = std::function<void()>;
private:
int _fd;
EventLoop *_loop;
uint32_t _events; // 监控的事件
uint32_t _revents; // 已就绪的事件
EventCallback _read_callback;
EventCallback _write_callback;
EventCallback _error_callback;
EventCallback _close_callback; // 连接断开触发的回调函数
EventCallback _event_callback; // 事件触发后调整活跃度
public:
Channel(EventLoop *loop, int fd) : _loop(loop), _fd(fd), _events(0), _revents(0) {}
void SetREvents(uint32_t events) { _revents = events; } // 设置实际就绪的事件
void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
int GetFd() { return _fd; }
uint32_t Events() { return _events; } // 获取需要监控的事件
// 判断当前是否监控了可读事件 或者可写事件
bool ReadAble() { return _events & EPOLLIN; }
bool WriteAble() { return _events & EPOLLOUT; }
// 启动读写事件监控
void EnableRead()
{
_events |= EPOLLIN;
Update();
}
void EnableWrite()
{
_events |= EPOLLOUT;
Update();
}
// 关闭读写事件监控
void DisableRead()
{
_events &= ~EPOLLIN;
Update();
}
void DisableWrite()
{
_events &= ~EPOLLOUT;
Update();
}
// 关闭所有事件监控
void DisableAll() { _events = 0; }
// 直接移除在epoll的监控, 因为需要调用poller的接口,所以类外定义
void Remove();
void Update();
void HandleEvent()
{
// 读事件就绪 | 对端断开 |
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
if (_read_callback)
_read_callback();
}
// EPOLLOUT < EPOLLERR < EPOLLHUP 如果出错了,就别处理写;如果挂掉了,就别处理写/错。
if (_revents & EPOLLHUP)
{
if (_close_callback)
_close_callback();
return;
}
else if (_revents & EPOLLERR)
{
if (_error_callback)
_error_callback();
return;
}
else if (_revents & EPOLLOUT)
{
if (_write_callback)
_write_callback();
}
if (_event_callback)
_event_callback(); // 调整事件的活跃度
}
};
描述符事件监控Poller
类
#define MAX_EPOLLEVENTS 100
class Poller
{
private:
int _epfd; // epoll句柄
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels; // fd和事件管理的映射
private:
// 对epoll中的红黑树事件进行管理
void Updata(Channel *channel, int op)
{
int fd = channel->GetFd();
struct epoll_event ev;
ev.data.fd = fd;
ev.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &ev);
if (ret < 0)
{
Log::ERROR("%s", "epoll_ctl false");
}
return;
}
// 判断channel 是否已经添加到了事件管理中
bool HasChannel(Channel *channel)
{
int fd = channel->GetFd();
auto it = _channels.find(fd);
if (it == _channels.end())
return false;
return true;
}
public:
Poller()
{
_epfd = epoll_create(MAX_EPOLLEVENTS);
if (_epfd < 0)
{
Log::ERROR("epoll_create false");
abort(); // 退出程序
}
}
// 修改事件
void UpdateEvent(Channel *channel)
{
// 判断channel 是否存在 事件管理中
bool ret = HasChannel(channel);
if (ret == false)
{
_channels.insert(std::make_pair(channel->GetFd(), channel));
Updata(channel, EPOLL_CTL_ADD);
return;
}
Updata(channel, EPOLL_CTL_MOD);
return;
}
void RemoveEvent(Channel *channel)
{
auto it = _channels.find(channel->GetFd());
if (it == _channels.end())
return;
// 在hash中删除
_channels.erase(it);
// 在epoll红黑树中删除
Updata(channel, EPOLL_CTL_DEL);
}
// 开始监控,返回活跃连接
void Poll(std::vector<Channel *> *active)
{
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // 永久堵塞
if (nfds < 0)
{
// EINTR 表示当前socket的阻塞等待,被信号打断了,
if (errno == EINTR) // 信号打断
return;
Log::ERROR("EPOLL WAIT ERROR:%s", strerror(errno));
return;
}
for (int i = 0; i < nfds; i++)
{
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
it->second->SetREvents(_evs[i].events); // 设置就绪事件
active->push_back(it->second); // 输出型参数
}
return;
}
};
定时任务管理TimerWheel
类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
uint64_t _id; // 定时器任务ID
uint32_t _timeout; // 定时任务超时事件
bool _canceled; // 取消定时任务,false-表示没有被取消, true-表示被取消
TaskFunc _task_cb; // 定时器对象要执行的定时任务
ReleaseFunc _relea_cb; // 用于删除TimerWheel中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb) : _id(id), _timeout(timeout), _canceled(false), _task_cb(cb) {}
~TimerTask()
{
if (_canceled == false)
_task_cb();
_relea_cb();
}
void Cancel() { _canceled = true; }
void SetRelease(const ReleaseFunc &cb) { _relea_cb = cb; }
uint32_t GetTimeout() { return _timeout; }
};
class TimerWheel
{
using WeakTask = std::weak_ptr<TimerTask>;
using ShareTask = std::shared_ptr<TimerTask>;
private:
int _tick; // 当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务
int _capacity; // 表盘最大数量---其实就是最大延迟时间
std::unordered_map<uint64_t, WeakTask> _timers; //_timers 的作用只是 快速查找任务(通过 id 找对应的任务)。它本身并不想延长任务的生命周期。
std::vector<std::vector<ShareTask>> _wheel;
// timefd
EventLoop *_loop;
int _timerfd; // 定时器描述符--可读事件回调就是读取计数器,执行定时任务
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
static int CreateTimerfd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0)
{
Log::ERROR("timerfd create false");
abort();
}
struct itimerspec itime;
itime.it_value.tv_sec = 1; // 第一次超时时间为1s后
itime.it_value.tv_nsec = 0;
itime.it_interval.tv_sec = 1; // 第一次超时后,每次超时的间隔时
itime.it_interval.tv_nsec = 0;
timerfd_settime(timerfd, 0, &itime, NULL);
return timerfd;
}
// 跑动时间轮
void RunWheel()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉
}
int ReadTimefd()
{
uint64_t times;
// 有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次
// read读取到的数据times就是从上一次read之后超时的次数
int ret = read(_timerfd, ×, 8);
if (ret < 0)
{
Log::ERROR("read Timerfd false");
abort();
}
return times;
}
// 通过timerfd read来跑动时间轮
void OnTime()
{
int times = ReadTimefd();
for (int i = 0; i < times; i++)
{
RunWheel();
}
}
void TimerAddInLoop(uint64_t id, uint32_t timeout, const TaskFunc &cb)
{
ShareTask ptr(new TimerTask(id, timeout, cb));
ptr->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
// 添加到timer 中
_timers[id] = WeakTask(ptr);
int pos = (_tick + timeout) % _capacity;
_wheel[pos].push_back(ptr);
}
// 刷新/延迟定时任务
void TimerRefreshInLoop(uint64_t id)
{
// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 定时任务没有找到
}
ShareTask ptr = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr
// 由于是shared_ptr,所以我们只需要将一个新的shared_ptr插入到_wheel中即可达到延迟的目的
int timeout = ptr->GetTimeout();
int pos = (_tick + timeout) %_capacity;
_wheel[pos].push_back(ptr);
}
// 将定时任务的任务取消
void TaskCancelInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return;
}
ShareTask ptr = it->second.lock();
if (ptr)
ptr->Cancel();
}
public:
TimerWheel(EventLoop *loop) : _tick(0), _capacity(60), _wheel(_capacity), _loop(loop),
_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd))
{
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
_timer_channel->EnableRead();
}
void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb);
void TimerRefresh(uint64_t id);
void TimerCancel(uint64_t id);
/*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行*/
bool HasTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return false;
}
return true;
}
};
Reactor-EventLoop线程池类
using Functor = std::function<void()>;
class EventLoop
{
private:
std::mutex _mutex; // 给任务池上锁
std::vector<Functor> _tasks; // 任务池
std::thread::id _thread_id; // 每一个Evenetloop都由一个线程池来管理
int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞
std::unique_ptr<Channel> _event_channel; // eventfd的Channel
Poller _poller; // 当前Loop下所有描述符的事件监控
TimerWheel _timer_wheel; // 定时器模块
public:
// 执行任务池中的任务
void RunAllTask()
{
std::vector<Functor> tmp;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(tmp);
}
for (auto &f : tmp)
{
f();
}
return;
}
// 创造一个 eventfd , readEvent , WeakUpEventfd
static int CreateEvent()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
Log::ERROR("create eventfd false");
abort(); // 关闭程序
}
return efd;
}
void ReadEventfd()
{
uint64_t ret = 0;
int n = read(_event_fd, &ret, sizeof(ret));
if (n < 0)
{
// EINTR -- 被信号打断; EAGAIN -- 表示无数据可读
if (errno == EINTR || errno == EAGAIN)
{
return;
}
Log::ERROR("read eventfd false");
abort();
}
return;
}
void WakeUpEventfd()
{
uint64_t ret = 1;
int n = write(_event_fd, &ret, sizeof(ret));
if (n < 0)
{
// EINTR -- 被信号打断; EAGAIN -- 表示无数据可读
if (errno == EINTR || errno == EAGAIN)
{
return;
}
Log::ERROR("write eventfd false");
abort();
}
return;
}
public:
EventLoop() : _thread_id(std::this_thread::get_id()),
_event_fd(CreateEvent()),
_event_channel(new Channel(this, _event_fd)),
_timer_wheel(this)
{
// 给 evetfd设置可读回调
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
// 启动读事件监控
_event_channel->EnableRead();
}
// 事件监控->就绪事件处理->执行任务池中任务
void Start()
{
while (1)
{
std::vector<Channel *> actives;
_poller.Poll(&actives);
for (auto &channel : actives)
{
channel->HandleEvent();
}
RunAllTask();
}
}
// 判断当前线程是否是EventLoop对应的线程
bool IsInloop() { return _thread_id == std::this_thread::get_id(); }
void AssertInloop() { assert(_thread_id == std::this_thread::get_id()); }
// 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
void RunInLoop(const Functor &cb)
{
// 如果是eventloop线程内部直接执行
if (IsInloop() == true)
{
cb();
return;
}
QueueInLoop(cb);
}
// 将操作压入任务池
void QueueInLoop(const Functor &cb)
{
// 涉及任务池的操作都要加锁
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 压入任务池的操作,代表任务池中存在数据,而start需要处理就绪事件
// 如果没有就绪事件则导致epol_wait会堵塞
// 所以我们需要通过给eventfd写入一个数据,eventfd就会触发可读事件
WakeUpEventfd();
}
// 添加/修改描述符的事件监控
void UpdateEvent(Channel *channel) { _poller.UpdateEvent(channel); }
// 移除描述符的监控
void RemoveEvent(Channel *channel) { _poller.RemoveEvent(channel); }
// 定时器模块接口
void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb) { _timer_wheel.TimerAdd(id, timeout, cb); }
void TimerRefresh(uint64_t id) { _timer_wheel.TimerRefresh(id); }
void TimerCancel(uint64_t id) { _timer_wheel.TimerCancel(id); }
bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};
void Channel::Remove() { _loop->RemoveEvent(this); }
void Channel::Update() { _loop->UpdateEvent(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb)
{
_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, timeout, cb));
}
// 刷新/延迟定时任务
void TimerWheel::TimerRefresh(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::TaskCancelInLoop, this, id));
}
class LoopThread
{
private:
/*互斥锁和条件变量用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
EventLoop *_loop; // 由线程实例化一个对象
std::thread _thread; // Loop所在的线程
private:
void ThreadEntry()
{ /*这里创造一个对象而不是new,是为了保持_loop的生命周期与线程的一致*/
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all();
}
_loop->Start(); // 循坏
}
public:
LoopThread() : _loop(nullptr), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
EventLoop *GetLoop()
{
EventLoop *loop = nullptr;
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [&]()
{ return _loop != nullptr; });
loop = _loop;
}
return loop;
}
};
class LoopThreadPool
{
private:
int _thread_count; // 从属线程数量
int _loops_idx; //_loops下标
EventLoop *_base_loop; // 主线程
std::vector<LoopThread *> _threads; // 从属线程
std::vector<EventLoop *> _loops; // 从属线程的eventloop
public:
LoopThreadPool(EventLoop *base_loop, int thread_count = 0) : _thread_count(thread_count), _loops_idx(0), _base_loop(base_loop) {}
void SetThreadCount(int count) { _thread_count = count; }
void CreateThread()
{
if (_thread_count > 0)
{
_threads.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++)
{
_threads[i] = new LoopThread();
_loops[i] = _threads[i]->GetLoop();
}
}
}
EventLoop *NextLoop()
{
if (_thread_count == 0)
{
return _base_loop;
}
EventLoop* loop = _loops[_loops_idx];
_loops_idx = (_loops_idx + 1) % _thread_count;
return loop;
}
};
通用类型Any类
class Any
{
private:
class holder
{
public:
virtual ~holder() {}
virtual const std::type_info &type() = 0;
virtual holder *clone() = 0;
};
template <class T>
class placeholder : public holder
{
public:
T _val;
public:
placeholder(const T &val) : _val(val) {}
// 获取对象保存的数据类型
const std::type_info &type() override { return typeid(T); }
holder *clone() override { return new placeholder(_val); }
};
private:
holder *_content;
public:
Any() : _content(nullptr) {}
template <class T>
Any(const T &val) : _content(new placeholder<T>(val)) {}
Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
~Any() { delete _content; }
Any &swap(Any &other)
{
std::swap(_content, other._content);
return *this;
}
template <class T>
Any &operator=(const T &val)
{
// 构造一个临时对象
Any(val).swap(*this);
return *this;
}
Any &operator=(const Any &other)
{
// 构造一个临时对象
Any(other).swap(*this);
return *this;
}
template <class T>
T &GetContent()
{
assert(_content != nullptr);
assert(typeid(T) == _content->type());
auto p = dynamic_cast<placeholder<T> *>(_content);
assert(p != nullptr);
return p->_val;
}
};
通信连接管理Connection
类
class Connection;
typedef enum
{
DISCONNECTED, // 连接关闭状态
CONNECTING, // 连接建立成功-待处理状态
CONNECTED, // 连接建立完成,各种设置已完成,可以通信的状态
DISCONNECTING // 待关闭状态
} ConnStatus;
using PtrConnection = std::shared_ptr<Connection>;
// Connection 实现 单个套接字连接管理
class Connection : public std::enable_shared_from_this<Connection>
{
using ConnectedCallback = std::function<void(const PtrConnection &)>;
// 消息处理
using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection &)>;
using AnyEventCallback = std::function<void(const PtrConnection &)>;
private:
EventLoop *_loop; // 连接锁管理的loop
uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找
// uint64_t _timer_id; //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器ID
int _sockfd; // 连接关联的文件描述符
Socket _socket; // 套接字操作管理
Channel _channel; // 连接的事件管理
Buffer _in_buffer; // 输入缓冲区---存放从socket中读取到的数据
Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据
ConnStatus _statu; // 连接状态
bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为false
Any _context; // 请求的接收处理上下文
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
ClosedCallback _server_closed_callback;
private:
void HandleRead() // 描述符可读事件
{
char buffer[65536];
// 1.接受socket套接字中的数据
ssize_t ret = _socket.NonBlockRecv(buffer, 65535);
if (ret < 0)
{ // recv出错
ShutdownInLoop();
return;
}
else if (ret == 0)
return;
// 将读到的数据放入缓冲区中
_in_buffer.WriteAndPush(buffer, ret);
if (_in_buffer.ReadAbleSize() > 0)
{ // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象
if (_message_callback)
_message_callback(shared_from_this(), &_in_buffer);
}
}
void HandleWrite() // 描述符可写事件
{
ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
if (ret < 0)
{ // send出错,即使发送失败、连接马上要释放,这部分已经读到的数据 还没被应用层处理。如果直接释放连接,这些数据就丢了。
// 所以在释放前调用 _message_callback,可以让业务层先处理掉已经收到的数据,避免数据丢失。
if (_in_buffer.ReadAbleSize() > 0)
{ // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象
if (_message_callback)
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
else if (ret == 0)
return;
_out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动
if (_out_buffer.ReadAbleSize() == 0)
{
// 如果输出缓冲区没有可读数据,则关闭可写事件的监控
_channel.DisableWrite();
if (_statu == DISCONNECTING)
Release();
}
return;
}
void HandleClose() // 描述符触发挂断事件
{
if (_in_buffer.ReadAbleSize() > 0)
{ // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象
if (_message_callback)
_message_callback(shared_from_this(), &_in_buffer);
}
// 输出缓冲 _out_buffer 里的数据是 准备发给对端的。
// 但既然挂断了,对端已经不再接收,所以发也没用,只会出错。
Release();
}
// 描述符触发出错事件
void HandleError() { HandleClose(); }
// 描述符触发任意事件
void HandleEvent()
{
// 刷新活跃度
if (_enable_inactive_release == true)
{
_loop->TimerRefresh(_conn_id);
}
if (_event_callback)
_event_callback(shared_from_this());
}
// 非活跃的销毁任务
void EnableInactiveReleaseInLoop(int sec)
{
_enable_inactive_release = true;
if (_loop->HasTimer(_conn_id))
{
_loop->TimerRefresh(_conn_id);
return;
}
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
void CancelInactiveReleaseInLoop()
{
_enable_inactive_release = false;
if (_loop->HasTimer(_conn_id))
{
_loop->TimerCancel(_conn_id);
}
}
void EstablishedInLoop()
{
assert(_statu == CONNECTING); // 连接中
_statu = CONNECTED;
_channel.EnableRead();
if (_connected_callback)
_connected_callback(shared_from_this());
//_statu = CONNECTED;
}
void SendInLoop(Buffer &buf)
{
if (_statu == DISCONNECTED)
return;
_out_buffer.WriteBufferAndPush(buf);
// 启动可写事件监控
if (_channel.WriteAble() == false)
{
_channel.EnableWrite();
}
}
void ReleaseInLoop() // 实际的释放接口
{
_statu = DISCONNECTED;
// 将channel 从loop的管理中移除
_channel.Remove();
_socket.Close();
// loop中的定时任务器可能存在任务,我们要销毁非活跃任务
if (_loop->HasTimer(_conn_id))
CancelInactiveReleaseInLoop();
// 组件使用者先关闭设置的回调函数
if (_closed_callback)
_closed_callback(shared_from_this());
// 移除服务器内部管理的连接信息
if (_server_closed_callback)
_server_closed_callback(shared_from_this());
}
void ShutdownInLoop() // shutdown判断发送缓冲区中是否有数据,然后调用ReleaseInLoop
{
_statu = DISCONNECTING;
if (_in_buffer.ReadAbleSize() > 0)
{
if (_message_callback)
_message_callback(shared_from_this(), &_in_buffer);
}
if (_out_buffer.ReadAbleSize() > 0)
{
if (_channel.WriteAble() == false)
_channel.EnableWrite();
}
if (_out_buffer.ReadAbleSize() == 0)
{
Release();
}
}
void UpgradeInLoop(const Any &context, const ConnectedCallback &conn,
const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event)
{
_context = context;
_connected_callback = conn;
_message_callback = msg;
_closed_callback = closed;
_event_callback = event;
}
public:
Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _loop(loop), _conn_id(conn_id), _sockfd(sockfd), _socket(_sockfd),
_channel(_loop, _sockfd), _statu(CONNECTING), _enable_inactive_release(false)
{
_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁就会产生问题
}
~Connection() { Log::DEBUG("realse Connection:%p", this); }
// 获取连接id
uint64_t GetConnid() { return _conn_id; }
// 获取sockfd
int GetSockfd() { return _sockfd; }
// 获取上下文
Any *GetContext() { return &_context; }
// 是否Connected状态
bool IsConnected() { return _statu == CONNECTED; }
/*这四个回调函数,是让服务器模块来设置的(即组件的使用者设置)*/
void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
void SetMessageedCallback(const MessageCallback &cb) { _message_callback = cb; }
void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }
void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }
// 设置上下文,连接处理后会调用
void SetContext(const Any &context) { _context = context; }
// 启动非活跃销毁
void EnableInactiveRelease(int sec)
{
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
}
// 关闭非活跃销毁
void CancelInactiveRelease()
{
_loop->RunInLoop(std::bind(&Connection::CancelInactiveRelease, this));
}
// 启动监控调用 _connected_callback
void Established()
{
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
}
// 发送数据到缓冲区
void Send(const char *data, size_t len)
{
Buffer buf;
buf.WriteAndPush(data, len);
// 这个操作是压入任务池,可能调用的时候临时变量已经释放,所以需要创造一个buff保存数据
_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
}
// 关闭连接 提供给组件使用者-在关闭前需要判断发送缓冲区中是否有数据
void Shutdown()
{
_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));
}
void Release()
{
_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));
}
// 切换协议,设置相关的回调函数
void Upgrade(const Any &context, const ConnectedCallback &conn,
const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event)
{
// 该接口必须在本线程中执行
// 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。
_loop->AssertInloop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
}
};
监听描述符管理Acceptor
类
class Acceptor
{
using AcceptCallback = std::function<void(int)>;
private:
Socket _lis_sock; // 监听套接字
EventLoop *_loop;
Channel _channel;
AcceptCallback _accept_callback;
private:
void HandleRead()
{
int newfd = _lis_sock.Accept();
if (newfd < 0)
return;
if (_accept_callback)
_accept_callback(newfd);
}
// 创造监听套接字
int CreateServer(int port)
{
bool ret = _lis_sock.CreateServer(port);
assert(ret == true);
return _lis_sock.GetFd();
}
public:
Acceptor(EventLoop *loop, int port) : _lis_sock(CreateServer(port)), _loop(loop), _channel(_loop, _lis_sock.GetFd())
{
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*/
/*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/
}
void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }
void SetLisEnableRead() { _channel.EnableRead(); }
};
服务器TcpServer
类
class TcpServer
{
using ConnectedCallback = std::function<void(const PtrConnection &)>;
// 消息处理
using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection &)>;
using AnyEventCallback = std::function<void(const PtrConnection &)>;
private:
int _port; // 端口号
uint64_t _next_id; // 自增长的连接id
int _timeout; // 非活跃连接时间
bool _enable_inactive_release; // 非活跃销毁启动标志F
EventLoop _base_loop; // 主线程的Eventloop对象,负责监听处理
Acceptor _acceptor; // 这是监听套接字
LoopThreadPool _thread_poll; // 从属线程池
std::unordered_map<uint64_t, PtrConnection> _conns; // 保存管理所有连接对应的shared_ptr对象
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
private:
void NewConnection(int newfd) // 为新连接构造一个Connection进行管理
{
_next_id++;
PtrConnection conn(new Connection(_thread_poll.NextLoop(), _next_id, newfd));
conn->SetConnectedCallback(_connected_callback);
conn->SetMessageedCallback(_message_callback);
conn->SetClosedCallback(_closed_callback);
conn->SetAnyEventCallback(_event_callback);
conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_enable_inactive_release == true)
conn->EnableInactiveRelease(10);
conn->Established();
_conns.insert(std::make_pair(_next_id, conn));
}
void RemoveConnectionInLoop(const PtrConnection &conn)
{
uint64_t id = conn->GetConnid();
auto it = _conns.find(id);
if (it != _conns.end())
{
_conns.erase(it);
}
}
void RemoveConnection(const PtrConnection &conn) // 删除_conns中保存的对象,这里删除才是真正的删除
{
_base_loop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
}
void TaskRunAfterInLoop(const Functor &task, int delay)
{
_next_id++;
_base_loop.TimerAdd(_next_id, delay, task);
}
public:
TcpServer(int port) : _port(port), _next_id(0), _enable_inactive_release(false),
_acceptor(&_base_loop, _port), _thread_poll(&_base_loop)
{
_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_acceptor.SetLisEnableRead();
}
void SetThreadCount(int count) { _thread_poll.SetThreadCount(count); }
void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }
void SetMessageedCallback(const MessageCallback &cb) { _message_callback = cb; }
void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }
void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }
void EnableInactiveRelease(int timeout)
{
_timeout = timeout;
_enable_inactive_release = true;
}
// 用于添加定时任务
void TaskRunAfter(const Functor &task, int delay)
{
_base_loop.RunInLoop(std::bind(&TcpServer::TaskRunAfterInLoop, this, task, delay));
}
void Start()
{
_thread_poll.CreateThread();
_base_loop.Start();
}
};
class NetWork
{
public:
NetWork()
{
Log::DEBUG("SIGPIPE INIT");
signal(SIGPIPE, SIG_IGN);
}
};
static NetWork nw;
二、HTTP协议模块
Util
实用工具类
std::unordered_map<int, std::string> _status_msg = {
{100, "Continue"},
{101, "Switching Protocol"},
{102, "Processing"},
{103, "Early Hints"},
{200, "OK"},
{201, "Created"},
{202, "Accepted"},
{203, "Non-Authoritative Information"},
{204, "No Content"},
{205, "Reset Content"},
{206, "Partial Content"},
{207, "Multi-Status"},
{208, "Already Reported"},
{226, "IM Used"},
{300, "Multiple Choice"},
{301, "Moved Permanently"},
{302, "Found"},
{303, "See Other"},
{304, "Not Modified"},
{305, "Use Proxy"},
{306, "unused"},
{307, "Temporary Redirect"},
{308, "Permanent Redirect"},
{400, "Bad Request"},
{401, "Unauthorized"},
{402, "Payment Required"},
{403, "Forbidden"},
{404, "Not Found"},
{405, "Method Not Allowed"},
{406, "Not Acceptable"},
{407, "Proxy Authentication Required"},
{408, "Request Timeout"},
{409, "Conflict"},
{410, "Gone"},
{411, "Length Required"},
{412, "Precondition Failed"},
{413, "Payload Too Large"},
{414, "URI Too Long"},
{415, "Unsupported Media Type"},
{416, "Range Not Satisfiable"},
{417, "Expectation Failed"},
{418, "I'm a teapot"},
{421, "Misdirected Request"},
{422, "Unprocessable Entity"},
{423, "Locked"},
{424, "Failed Dependency"},
{425, "Too Early"},
{426, "Upgrade Required"},
{428, "Precondition Required"},
{429, "Too Many Requests"},
{431, "Request Header Fields Too Large"},
{451, "Unavailable For Legal Reasons"},
{501, "Not Implemented"},
{502, "Bad Gateway"},
{503, "Service Unavailable"},
{504, "Gateway Timeout"},
{505, "HTTP Version Not Supported"},
{506, "Variant Also Negotiates"},
{507, "Insufficient Storage"},
{508, "Loop Detected"},
{510, "Not Extended"},
{511, "Network Authentication Required"}};
std::unordered_map<std::string, std::string> _mime_msg = {
{".aac", "audio/aac"},
{".abw", "application/x-abiword"},
{".arc", "application/x-freearc"},
{".avi", "video/x-msvideo"},
{".azw", "application/vnd.amazon.ebook"},
{".bin", "application/octet-stream"},
{".bmp", "image/bmp"},
{".bz", "application/x-bzip"},
{".bz2", "application/x-bzip2"},
{".csh", "application/x-csh"},
{".css", "text/css"},
{".csv", "text/csv"},
{".doc", "application/msword"},
{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
{".eot", "application/vnd.ms-fontobject"},
{".epub", "application/epub+zip"},
{".gif", "image/gif"},
{".htm", "text/html"},
{".html", "text/html"},
{".ico", "image/vnd.microsoft.icon"},
{".ics", "text/calendar"},
{".jar", "application/java-archive"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".js", "text/javascript"},
{".json", "application/json"},
{".jsonld", "application/ld+json"},
{".mid", "audio/midi"},
{".midi", "audio/x-midi"},
{".mjs", "text/javascript"},
{".mp3", "audio/mpeg"},
{".mpeg", "video/mpeg"},
{".mpkg", "application/vnd.apple.installer+xml"},
{".odp", "application/vnd.oasis.opendocument.presentation"},
{".ods", "application/vnd.oasis.opendocument.spreadsheet"},
{".odt", "application/vnd.oasis.opendocument.text"},
{".oga", "audio/ogg"},
{".ogv", "video/ogg"},
{".ogx", "application/ogg"},
{".otf", "font/otf"},
{".png", "image/png"},
{".pdf", "application/pdf"},
{".ppt", "application/vnd.ms-powerpoint"},
{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
{".rar", "application/x-rar-compressed"},
{".rtf", "application/rtf"},
{".sh", "application/x-sh"},
{".svg", "image/svg+xml"},
{".swf", "application/x-shockwave-flash"},
{".tar", "application/x-tar"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".ttf", "font/ttf"},
{".txt", "text/plain"},
{".vsd", "application/vnd.visio"},
{".wav", "audio/wav"},
{".weba", "audio/webm"},
{".webm", "video/webm"},
{".webp", "image/webp"},
{".woff", "font/woff"},
{".woff2", "font/woff2"},
{".xhtml", "application/xhtml+xml"},
{".xls", "application/vnd.ms-excel"},
{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
{".xml", "application/xml"},
{".xul", "application/vnd.mozilla.xul+xml"},
{".zip", "application/zip"},
{".3gp", "video/3gpp"},
{".3g2", "video/3gpp2"},
{".7z", "application/x-7z-compressed"}};
class Util
{
public:
// 字符串分割
static size_t Split(const std::string &str, const std::string &seq, std::vector<std::string> *arry)
{
int offset = 0;
while (offset < str.size())
{
// abc....bcd.aaa.
size_t pos = str.find(seq, offset);
if (pos == std::string::npos) // 如果没找到则表示要截取最后一个字符串
{
arry->push_back(str.substr(offset));
return arry->size();
}
// 找到则pos指向【.】的下标
if (pos == offset) // 代表存在多个【.】,截取的字符没有任何意义
{
offset = pos + seq.size();
continue;
}
arry->push_back(str.substr(offset, pos - offset));
offset = pos + seq.size();
}
return arry->size();
}
// 读取字符串内容
static bool ReadFile(const std::string &filename, std::string *buf)
{
std::ifstream ifs(filename, std::ios::binary);
if (ifs.is_open() == false)
{
Log::ERROR("open %s false", filename.c_str());
return false;
}
size_t fsize = 0;
ifs.seekg(0, ifs.end); // 跳转到文件末尾
fsize = ifs.tellg(); // 获取当前读写位置相对于起始位置的偏移量,从末尾偏移刚好就是文件大小
ifs.seekg(0, ifs.beg); // 跳转到文件起始位置
buf->resize(fsize);
ifs.read(&(*buf)[0], fsize);
if (ifs.good() == false)
{
Log::ERROR("read %s false", filename.c_str());
ifs.close();
return false;
}
ifs.close();
return true;
}
// 向文件写入数据
static bool WriteFile(const std::string &filename, const std::string &buf)
{
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); // 在打开文件时,把已有内容全部清空(truncate to zero length)
if (ofs.is_open() == false)
{
Log::ERROR("open %s false", filename.c_str());
return false;
}
ofs.write(buf.c_str(), buf.size());
if (ofs.good() == false)
{
Log::ERROR("read %s false", filename.c_str());
ofs.close();
return false;
}
ofs.close();
return true;
}
// URL 编码
static std::string UrlEncode(const std::string &url, bool convert_space_to_plus)
{
std::string res;
for (unsigned char c : url)
{
if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c))
{
res += c;
}
else if (c == ' ' && convert_space_to_plus)
{
res += '+';
}
else
{
char tmp[4] = {0};
snprintf(tmp, 4, "%%%02X", c);
res += tmp;
}
}
return res;
}
// %2B == 43
// 只支持 '0'-'9' 和 'A'-'F'
static int CtoInt(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
// 非法字符直接返回 -1(也可以选择抛异常或忽略)
return -1;
}
// URL 解码
static std::string UrlDecode(const std::string &url, bool convert_plus_to_space)
{
std::string res;
for (int i = 0; i < url.size(); i++)
{
if (url[i] == '%')
{
if (i + 2 < url.size())
{
int c1 = CtoInt(url[i + 1]);
int c2 = CtoInt(url[i + 2]);
if (c1 != -1 && c2 != -1)
{
res += static_cast<char>(c1 * 16 + c2);
i += 2;
continue;
}
}
// 如果不是有效的 %xx,直接保留 %
res += '%';
}
else if (url[i] == '+' && convert_plus_to_space)
{
res += ' ';
}
else
res += url[i];
}
return res;
}
// 响应状态码的描述信息获取
static std::string StatusDesc(int status)
{
auto it = _status_msg.find(status);
if (it != _status_msg.end())
{
return it->second;
}
return "Unknow status";
}
// 根据文件后缀名获取文件mine
static std::string GetMimeTypeFromExtension(const std::string &filename)
{
// 获取文件扩展名 a.txt
size_t pos = filename.find_last_of('.');
if (pos == std::string::npos)
{
return "application/octet-stream";
}
std::string ext = filename.substr(pos);
// 通过扩展名获取mime
auto it = _mime_msg.find(ext);
if (it != _mime_msg.end())
{
return it->second;
}
return "application/octet-stream";
}
// 判断一个文件是否是目录
static bool IsDirectory(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISDIR(st.st_mode);
}
// 判断一个文件是否是普通文件
static bool IsRegularFile(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISREG(st.st_mode);
}
// http请求路径是否有效
static bool ValidPath(const std::string &path)
{
std::vector<std::string> sub;
Split(path, "/", &sub);
int level = 0;
for (auto &s : sub)
{
if (s == "..")
{
level--;
if (level < 0)
return false;
continue;
}
if (s != ".")
level++;
}
return true;
}
};
HttpRequest
请求类
class HttpRequest
{
public:
std::string _method; // 请求方法
std::string _path; // 资源路径
std::string _version; // 协议版本
std::string _body; // 请求正文
std::smatch _matches; // 资源路径的正则提取数据
std::unordered_map<std::string, std::string> _headers; // http头部字段
std::unordered_map<std::string, std::string> _params; // 查询字符串
public:
HttpRequest() : _version("HTTP/1.1") {}
void ReSet()
{
_method.clear();
_path.clear();
_version = "HTTP/1.1";
_body.clear();
std::smatch matches;
_matches.swap(matches);
_headers.clear();
_params.clear();
}
// 插入头部字段
void SetHeader(const std::string &key, const std::string &val)
{
_headers.insert(std::make_pair(key, val));
}
// 判断是否存在指定的头部字段
bool HasHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 获取指定头部字段的值
std::string GetHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return "";
}
return it->second;
}
// 插入查询字符串
void SetParam(const std::string &key, const std::string &val)
{
_params.insert(std::make_pair(key, val));
}
// 判断是否有指定的查询字符串
bool HasParam(const std::string &key)
{
auto it = _params.find(key);
if (it == _params.end())
{
return false;
}
return true;
}
// 获取指定的查询字符串
std::string GetParam(const std::string &key) const
{
auto it = _params.find(key);
if (it == _params.end())
{
return "";
}
return it->second;
}
// 获取正文长度
size_t ContentLength() const
{
if (HasHeader("Content-Length") == false)
{
return 0;
}
std::string content = GetHeader("Content-Length");
return std::stol(content);
}
// 判断是否是短连接
bool Close() const
{
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;
}
//Log::DEBUG("长连接");
return true;
}
};
HttpResponse
响应类
class HttpResponse
{
public:
int _status; // 状态码
bool _redirect_flag; // 重定向标志
std::string _redirect_url; // 重定向路径
std::unordered_map<std::string, std::string> _headers; // http头部字段
std::string _body; // 回复正文
public:
HttpResponse() : _status(200), _redirect_flag(false) {}
HttpResponse(int status) : _status(status), _redirect_flag(false) {}
void ReSet()
{
_status = 200;
_redirect_flag = false;
_redirect_url.clear();
_headers.clear();
_body.clear();
}
// 插入头部字段
void SetHeader(const std::string &key, const std::string &val)
{
// std::cout << "插入的键值对是: " << key << ": " << val << std::endl;
_headers.insert(std::make_pair(key, val));
}
// 判断是否存在指定的头部字段
bool HasHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 获取指定头部字段的值
std::string GetHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return "";
}
return it->second;
}
void SetContent(const std::string &body, const std::string &type = "text/html")
{
_body = body;
SetHeader("Content-Type", type);
}
void SetRedirect(const std::string &url, int status = 302)
{
_status = status;
_redirect_flag = true;
_redirect_url = url;
}
// 判断是否是短连接
bool Close()
{
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;
}
return true;
}
};
HttpContext
上下文类
typedef enum
{
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
} HttpRecvStatus;
#define MAX_LINE 8192
class HttpContext
{
private:
int _resp_status; // 响应状态码
HttpRecvStatus _recv_status; // 当前接收及解析的阶段状态
HttpRequest _request; // 已经解析得到的请求信息
private:
bool ParseHttpLine(const std::string &line)
{
std::smatch matches;
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase); // icase忽略大小写
bool ret = std::regex_match(line, matches, e);
if (ret == false)
{
_recv_status = RECV_HTTP_ERROR;
_resp_status = 400; // BAD REQUEST
return false;
}
// 0 : GET /xxx/login?user=xiaoming&pass=123123 HTTP/1.1
// 1 : GET
// 2 : /xxx/login
// 3 : user=xiaoming&pass=123123
// 4 : HTTP/1.1
// 请求方法的获取
_request._method = matches[1];
/*由于忽略了大小写,这里我们得到的方法可能是小写,所以需要转换成大写*/
std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);
// 资源路径的获取,需要进行URL解码操作,但是不需要+转空格
_request._path = Util::UrlDecode(matches[2], false);
// 协议版本获取
_request._version = matches[4];
// 参数的字符串的获取
std::string query_string = Util::UrlDecode(matches[3], true);
std::vector<std::string> query_arry;
Util::Split(query_string, "&", &query_arry);
for (auto &str : query_arry)
{
size_t pos = str.find("="); // user=xiaomin
if (pos == std::string::npos)
{
_recv_status = RECV_HTTP_ERROR;
_resp_status = 400; // BAD REQUEST
return false;
}
std::string key = Util::UrlDecode(str.substr(0, pos), true);
std::string value = Util::UrlDecode(str.substr(pos + 1), true);
_request.SetParam(key, value);
}
return true;
}
bool RecvHttpLine(Buffer *buf)
{
if (_recv_status != RECV_HTTP_LINE)
return false;
std::string line = buf->GetLineAndPop();
if (line.size() == 0) // 1.读取到的数据没有一行
{
if (buf->ReadAbleSize() > MAX_LINE) // 缓冲区的数据大于 MAX_LINE
{
_recv_status = RECV_HTTP_ERROR;
_resp_status = 414; // URL TOO LONG
return false;
}
// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE) // 请求行的数据过大,存在异常
{
_recv_status = RECV_HTTP_ERROR;
_resp_status = 414; // URL TOO LONG
return false;
}
bool ret = ParseHttpLine(line);
if (ret == false)
{
Log::DEBUG("ParseHttpLine error");
return false;
}
//Log::DEBUG("_recv_status 改变为RECV_HTTP_HEAD");
// 处理完调整阶段状态
_recv_status = RECV_HTTP_HEAD;
return true;
}
// 头部处理
bool ParseHttpHead(std::string &line)
{
// key: val\r\n
if (line.back() == '\n')
line.pop_back(); // 末尾是换行则去掉换行字符
if (line.back() == '\r')
line.pop_back(); // 末尾是回车则去掉回车字符
size_t pos = line.find(": ");
if (pos == std::string::npos)
{
_recv_status = RECV_HTTP_ERROR;
_resp_status = 400; // BAD REQUEST
return false;
}
std::string key = line.substr(0, pos);
std::string value = line.substr(pos + 2);
_request.SetHeader(key, value);
//Log::DEBUG("key: %s, value:%s", key.c_str(), value.c_str());
return true;
}
bool RecvHttpHead(Buffer *buf)
{
int a = 0;
if (_recv_status != RECV_HTTP_HEAD)
{
Log::DEBUG("recv_status != RECV_HTTP_HEAD");
return false;
}
while (1)
{
std::string line = buf->GetLineAndPop();
//Log::DEBUG("头部的数据是: %s", line.c_str());
if (line.size() == 0) // 1.读取到的数据没有一行
{
if (buf->ReadAbleSize() > MAX_LINE) // 缓冲区的数据大于 MAX_LINE
{
_recv_status = RECV_HTTP_ERROR;
_resp_status = 414; // URL TOO LONG
return false;
}
// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE) // 请求行的数据过大,存在异常
{
_recv_status = RECV_HTTP_ERROR;
_resp_status = 414; // URL TOO LONG
return false;
}
// 在头部处理中最后一个是\r\n 或者\n
if (line == "\n" || line == "\r\n")
{
//Log::DEBUG("解析完毕");
break;
}
bool ret = ParseHttpHead(line);
if (ret == false)
{
return false;
}
}
_recv_status = RECV_HTTP_BODY;
return true;
}
bool RecvHttpBody(Buffer *buf)
{
if (_recv_status != RECV_HTTP_BODY)
return false;
// 1.获取正文长度
size_t content_length = _request.ContentLength();
// std::cout << "获得的正文长度是: " << content_length << std::endl;
if (content_length == 0)
{
// 没有正文,则请求接收解析完毕
_recv_status = RECV_HTTP_OVER;
return true;
}
// 2. 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据
size_t real_len = content_length - _request._body.size();
// 3.1判断当前缓冲区中的数据是否>= real_len
if (buf->ReadAbleSize() >= real_len)
{
_request._body.append(buf->ReadPosition(), real_len);
buf->MoveReadOffset(real_len);
_recv_status = RECV_HTTP_OVER;
return true;
}
// 3.2缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来
_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
// 这里不能改变状态,因为数据接收并没有完成
return true;
}
public:
HttpContext() : _resp_status(200), _recv_status(RECV_HTTP_LINE) {}
void ReSet()
{
_resp_status = 200;
_recv_status = RECV_HTTP_LINE;
_request.ReSet();
}
int GetRespStatus() { return _resp_status; }
HttpRecvStatus GerRecvStatus() { return _recv_status; }
HttpRequest &GetRequest() { return _request; }
void ParseHttpRequest(Buffer *buf) // 接收并解析http请求
{
// 不同的状态,做不同的事情,但是这里不要break, 因为处理完请求行后,应该立即处理头部,而不是退出等新数据
//_recv_status标志位可以控制处理Recvhttp的阶段!!!
switch (_recv_status)
{
case RECV_HTTP_LINE:
RecvHttpLine(buf);
case RECV_HTTP_HEAD:
//Log::DEBUG("开始解析头部");
RecvHttpHead(buf);
case RECV_HTTP_BODY:
RecvHttpBody(buf);
}
return;
}
};
HttpServer
类
class HttpServer
{
using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
using Handlers = std::vector<std::pair<std::regex, Handler>>;
private:
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std::string _basedir; // 静态资源根目录
TcpServer _server;
private:
// 错误页面设置
void ErrorHandler(const HttpRequest &req, HttpResponse *rsp)
{
//Log::DEBUG("错误页面");
// 1. 组织一个错误展示页面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(rsp->_status);
body += " ";
body += Util::StatusDesc(rsp->_status);
body += "</h1>";
body += "</body>";
body += "</html>";
// 2. 将页面数据,当作响应正文,放入rsp中
rsp->SetContent(body, "text/html");
}
// 将HttpResponse的要素按照http协议格式进行组织并且发送出去
void WriteResponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp)
{
// 完善头部
if (req.Close() == true)
{
rsp.SetHeader("Connection", "close");
}
else
{
rsp.SetHeader("Connection", "keep-alive");
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false)
{
rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false)
{
rsp.SetHeader("Content-Type", "application/octet-stream"); // 默认二进制
}
if (rsp._redirect_flag == true)
{
rsp.SetHeader("Location", rsp._redirect_url);
}
// 按照http格式进行组织
std::stringstream rsp_str;
rsp_str << req._version << " " << std::to_string(rsp._status) << " " << Util::StatusDesc(rsp._status) << "\r\n";
for (auto &head : rsp._headers)
{
rsp_str << head.first << ": " << head.second << "\r\n";
}
rsp_str << "\r\n";
rsp_str << rsp._body;
// 3.发送数据
conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
}
// 静态资源的请求处理
bool IsFileHandler(const HttpRequest &req)
{
// 静态根目录必须设置
if (_basedir.empty() == true)
{
return false;
}
// 请求方法 必须是GER 或者 HEAD
if (req._method != "GET" && req._method != "HEAD")
{
return false;
}
// 请求路径必须合法
if (Util::ValidPath(req._path) == false)
{
return false;
}
// 请求资源必须存在,且是普通文件
/*这里存在一个特殊情况, 当访问的是 "/" 的时候 我们要返回一个默认页面*/
// 为了避免直接修改请求的资源路径,因此定义一个临时对象,如果是功能性请求我们修改路径会造成错误
std::string req_path = _basedir + req._path;
if (req_path.back() == '/')
{
req_path += "index.html";
}
if (Util::IsRegularFile(req_path) == false)
{
return false;
}
return true;
}
void FileHandler(const HttpRequest &req, HttpResponse *rsp)
{
std::string req_path = _basedir + req._path;
if (req._path.back() == '/')
{
req_path += "index.html";
}
// 读取文件内容
bool ret = Util::ReadFile(req_path, &rsp->_body);
if (ret == false)
{
return;
}
// 设置Content-Type
std::string mime = Util::GetMimeTypeFromExtension(req_path);
rsp->SetHeader("Content-Type", mime);
}
// 功能性请求的分类处理
void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers)
{
// 在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404
// 思想:路由表存储的时键值对 -- 正则表达式 & 处理函数
// 使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理
for (auto &handler : handlers)
{
const std::regex &re = handler.first;
const Handler &functor = handler.second;
bool ret = std::regex_match(req._path, req._matches, re);
if (ret == false)
{
continue;
}
functor(req, rsp); // 存在匹配的处理函数,则执行
return;
}
// 没有匹配的函数
rsp->_status = 404;
}
// 通过判断方法,来反馈一个对应的内容
void Route(HttpRequest &req, HttpResponse *rsp)
{
// 加上重定向逻辑
if (req._path == "/oldpage")
{
rsp->SetRedirect("/newpage"); // 默认302
return;
}
if (req._path == "/newpage")
{
std::string body = "<html><body><h1>Welcome to the new page!</h1></body></html>";
rsp->SetContent(body, "text/html");
return;
}
// 判断是否是静态资源
if (IsFileHandler(req) == true)
{
FileHandler(req, rsp);
return;
}
if (req._method == "GET" || req._method == "HEAD")
{
return Dispatcher(req, rsp, _get_route);
}
else if (req._method == "POST")
{
return Dispatcher(req, rsp, _post_route);
}
else if (req._method == "PUT")
{
return Dispatcher(req, rsp, _put_route);
}
else if (req._method == "DELETE")
{
return Dispatcher(req, rsp, _delete_route);
}
rsp->_status = 405; // Method Not Allowed
return;
}
// 设置上下文
void OnConnected(const PtrConnection &conn)
{
conn->SetContext(HttpContext());
Log::DEBUG("NEW CONNECTION %p", conn.get());
}
// 缓冲区数据解析+处理
void OnMessage(const PtrConnection &conn, Buffer *buffer)
{
while (buffer->ReadAbleSize() > 0)
{
// 1. 获取上下文
HttpContext& context = conn->GetContext()->GetContent<HttpContext>(); // 这里是引用
// 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象
// 1. 如果缓冲区的数据解析出错,就直接回复出错响应
// 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理
context.ParseHttpRequest(buffer);
HttpRequest &req = context.GetRequest();
HttpResponse rsp(context.GetRespStatus());
if (context.GetRespStatus() >= 400)
{
// 进行错误响应,关闭连接
ErrorHandler(req, &rsp); // 填充一个错误显示页面数据到rsp的_body中
WriteResponse(conn, req, rsp); // 将错误信息发送给用户
context.ReSet();
buffer->MoveReadOffset(buffer->ReadAbleSize()); // 清空缓冲区的数据
conn->Shutdown();
return;
}
if (context.GerRecvStatus() != RECV_HTTP_OVER)
{ // 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理
return;
}
// 3. 请求路由 + 业务处理,将处理后的数据放入到rsp._body中去
Route(req, &rsp);
// 4. 对HttpResponse进行组织发送
WriteResponse(conn, req, rsp);
// 5. 重置上下文
context.ReSet();
// 6. 根据长短连接判断是否关闭连接或者继续处理
if (rsp.Close() == true)
conn->Shutdown();
}
}
public:
HttpServer(int port, int timeout = 10) : _server(port)
{
_server.EnableInactiveRelease(timeout);
_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageedCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void SetBaseDir(const std::string &path) { _basedir = path; }
// Get等函数的作用是加入到映射中:pattern实际上是正则表达式
void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); }
void Post(const std::string &pattern, const Handler &handler) { _post_route.push_back(std::make_pair(std::regex(pattern), handler)); }
void Put(const std::string &pattern, const Handler &handler) { _put_route.push_back(std::make_pair(std::regex(pattern), handler)); }
void Delete(const std::string &pattern, const Handler &handler) { _delete_route.push_back(std::make_pair(std::regex(pattern), handler)); }
void SetThreadCount(int count) { _server.SetThreadCount(count); }
void Start() { _server.Start(); }
};
基于HttpServer搭建HTTP服务器
#include "http.hpp"
std::string RequestToStr(const HttpRequest &req)
{
std::stringstream ss;
ss << req._method << " " << req._path << " " << req._version << "\r\n";
for (auto &it : req._params)
{
ss << it.first << ": " << it.second << "\r\n";
}
for (auto &it : req._headers)
{
ss << it.first << ": " << it.second << "\r\n";
}
ss << "\r\n";
ss << req._body;
return ss.str();
}
void Hello(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestToStr(req), "text/plain");
//sleep(15); //测试client4
}
void Login(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestToStr(req), "text/plain");
}
void PutFile(const HttpRequest &req, HttpResponse *rsp)
{
std::string pathname = "./wwwroot" + req._path;
Util::WriteFile(pathname, req._body);
}
void DelFile(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestToStr(req), "text/plain");
}
int main()
{
Log::setDefaultLoggerLevel(Log::LogLevel::value::ERROR);
HttpServer server(8080);
server.SetThreadCount(2);
server.SetBaseDir("./wwwroot");
//这里的 Hello 是一个函数指针,它在传参时会转换成一个 右值(临时对象)
server.Get("/hello", Hello);
server.Post("/login", Login);
server.Put("/1234.txt", PutFile);
server.Delete("/1234.txt", DelFile);
server.Start();
return 0;
}
三、功能测试
使用浏览器进行基本功能测试
长连接连续请求测试
创建一个客户端持续给服务器发送数据,直到超过超时时间看看是否正常(当前默
认设置超时时间为10s)。
预期结果:连接不会释放,持续发送消息
#include "../server.hpp"
int main()
{
Socket cli;
cli.CreateClient(8080, "127.0.0.1");
std::string s = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1)
{
cli.Send(s.c_str(), s.size());
char buff[1024] = {0};
cli.Recv(buff, 1023);
Log::DEBUG("[%s]", buff);
sleep(3);
}
return 0;
}
输出:
[xrw@iZ7xv0vjzfc2mzsasssfooZ test]$ ./client
[16:57:43][140259617371968][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[16:57:43][140259617371968][INFO][default][../server.hpp:353] create client success
[16:57:43][140259617371968][DEBUG][default][client1.cc:15] [HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plain
GET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive
]
[16:58:01][140259617371968][DEBUG][default][client1.cc:15] [HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plain
GET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive
]
可以看到16:57:43的时候连接建立,并接收请求,直到10s后连接依然存在。
超时连接测试1
创建一个客户端,给服务器发送一次数据后,不动了,查看服务器是否会正常的超时关闭连接
#include "../server.hpp"
int main()
{
Socket cli;
cli.CreateClient(8080, "127.0.0.1");
std::string s = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1)
{
cli.Send(s.c_str(), s.size());
char buff[1024] = {0};
cli.Recv(buff, 1023);
Log::DEBUG("[%s]", buff);
sleep(20);
}
return 0;
}
结果:
[17:03:06][140131753428736][DEBUG][default][http.hpp:970] NEW CONNECTION 0x146d5a0
[17:03:16][140131770832704][DEBUG][default][../server.hpp:1234] realse Connection:0x146d5a0
可以看到10s后连接关闭
超时连接释放测试2
连接服务器,并告诉服务器要发送 100 字节的正文数据,但实际发送的数据不足 100 字节,然后观察服务器的处理情况。
预期结果:
服务器第一次接收请求时由于数据不完整,可能会将后续请求的数据误认为是本次请求的正文。在处理剩余数据时可能出现错误,并最终关闭连接。
#include "../server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nxxxxxxx";
while(1) {
// assert(cli_sock.Send(req.c_str(), req.size()) != -1);
// assert(cli_sock.Send(req.c_str(), req.size()) != -1);
// assert(cli_sock.Send(req.c_str(), req.size()) != -1);
cli_sock.Send(req.c_str(), req.size());
cli_sock.Send(req.c_str(), req.size());
cli_sock.Send(req.c_str(), req.size());
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023)); //堵塞等待数据
Log::DEBUG("[%s]", buf);
sleep(3);
}
cli_sock.Close();
return 0;
}
结果:
[17:10:46][140088962811712][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[17:10:46][140088962811712][INFO][default][../server.hpp:353] create client success
[17:10:46][140088962811712][DEBUG][default][client3.cc:24] [HTTP/1.1 200 OK
Content-Length: 168
Connection: keep-alive
Content-Type: text/plain
GET /hello HTTP/1.1
Content-Length: 100
Connection: keep-alive
xxxxxxxGET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 100
xxxxxxxGET /hello HTTP/1.HTTP/1.1 400 Bad Request
Content-Length: 129
Connection: close
Content-Type: text/html
<html><head><meta http-equiv='Content-Type' content='text/html;charset=utf-8'></head><body><h1>400 Bad Request</h1></body></html>]
[17:10:49][140088962811712][ERROR][default][../server.hpp:290] SOCKET send false
超时连接释放测试3
当服务器接收请求的数据时,如果业务处理耗时过长,超过了设置的超时销毁时间(即服务器性能达到瓶颈),需要观察服务端的处理情况。
预期结果:
一次业务处理耗时过长可能导致其他连接被连累超时,从而可能会释放其他连接。假设有描述符 1,2,3,4,5 就绪,而处理描述符 1 时耗时 30 秒:
如果接下来的 2,3,4,5 都是通信连接描述符,并且事件也都就绪,那么不会有问题。因为当 1 处理完后,接下来的描述符会依次处理并刷新活跃度。
如果接下来的 2 是定时器事件描述符,定时器触发超时任务,会释放掉 3,4,5 描述符对应的连接。此时,如果在处理 3,4,5 的事件时直接操作已释放的连接,会导致程序崩溃(内存访问错误)。
总结:
在任何事件处理过程中,不应直接释放连接。正确做法是将释放操作压入任务池,等所有连接事件处理完后,再统一执行任务池中的释放操作。
将搭建服务器的处理hello函数中做出小调整:
void Hello(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestToStr(req), "text/plain");
sleep(15); //测试client4
}
/* 业务处理超时,查看服务器的处理情况
当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)
1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放
假设现在 12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度
1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度
2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉
这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)
因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中,
等到事件处理完了执行任务池中的任务的时候,再去释放
*/
#include "../server.hpp"
//在将realese 压入到任务池中执行,而不是RunInLoop中, 因为当触发事件监控时,若此时销毁了某个连接,而后续又触发了该连接的事件处理,那么将会产生内存访问的错误
int main()
{
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < 10; i++) {
pid_t pid = fork();
if (pid < 0) {
Log::DEBUG("FORK ERROR");
return -1;
}else if (pid == 0) {
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
Log::DEBUG("[%s]", buf);
}
cli_sock.Close();
exit(0);
}
}
while(1) sleep(1);
return 0;
}
输出结果:
[17:16:13][140144714336064][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[17:16:13][140144714336064][INFO][default][../server.hpp:342] create server success
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d505a0
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d526a0
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d53e60
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d555f0
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d56d90
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d53250
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d54ac0
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d56190
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d57990
[17:16:46][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d58510
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d53250
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d54ac0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d56190
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d57990
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d505a0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d58510
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d53e60
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d555f0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d526a0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d56d90
数据中多条请求处理测试
/*一次性给服务器发送多条数据,然后查看服务器的处理结果*/
/*每一条请求都应该得到正常处理*/
#include "../server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023)); // 堵塞等待数据
Log::DEBUG("[%s]", buf);
sleep(20);
cli_sock.Close();
return 0;
}
输出结果:
[17:21:35][140427968341824][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[17:21:35][140427968341824][INFO][default][../server.hpp:353] create client success
[17:21:35][140427968341824][DEBUG][default][client5.cc:16] [HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plain
GET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive
HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plain
GET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive
HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plain
GET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive
]
PUT大文件上传测试
/*大文件传输测试,给服务器上传一个大文件,服务器将文件保存下来,观察处理结果*/
/*
上传的文件,和服务器保存的文件一致
*/
#include "../http/http.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n";
std::string body;
Util::ReadFile("./hello.txt",&body);
req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";
std::cout << "[" << body.size() <<"]" << std::endl;;
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
assert(cli_sock.Send(body.c_str(), body.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023)); // 堵塞等待数据
Log::DEBUG("[%s]", buf);
sleep(20);
cli_sock.Close();
return 0;
}
输出结果:
xrw@iZ7xv0vjzfc2mzsasssfooZ test]$ md5sum hello.txt
0e4f8f9fb47564d51bd5d186f77e94e8 hello.txt
[xrw@iZ7xv0vjzfc2mzsasssfooZ wwwroot]$ md5sum 1234.txt
0e4f8f9fb47564d51bd5d186f77e94e8 1234.txt
四、性能测试
服务器性能测试 —— 使用 Webbench
Webbench 是知名的网站压力测试工具,由 Lionbridge 公司开发(http://www.lionbridge.com)。
测试目标
Webbench 的标准测试主要展示服务器的两项内容:
- 每秒响应请求数(Requests per Second, QPS)
- 每秒传输数据量(Throughput)
测试原理
Webbench 的工作原理:
- 创建指定数量的进程。
- 每个进程不断创建套接字向服务器发送请求。
- 每个进程将结果通过管道返回给主进程进行汇总和统计。
重点衡量标准
- 吞吐量(Throughput)
- QPS(Queries Per Second)
测试环境
- 服务器环境:2 核 2G 云服务器,CentOS Linux release 7.6.1810 (Core),服务器程序采用 1 主 2 从 Reactor 模式。
- Webbench 客户端环境:同一服务器。
注意:因为客户端和服务器在同一主机上,会存在 CPU 资源争抢,所以测试结果仅作示例说明,主要目的是演示如何进行性能压力测试,而非得到准确的生产数据。
[xrw@iZ7xv0vjzfc2mzsasssfooZ WebBench-master]$ ./webbench -c 1000 -t 60 http://127.0.0.1:8080/hello
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Request:
GET /hello HTTP/1.0
User-Agent: WebBench 1.5
Host: 127.0.0.1
Runing info: 1000 clients, running 60 sec.
Speed=135803 pages/min, 339510 bytes/sec.
Requests: 135803 susceed, 0 failed.
测试其实意义不大,因为测试客户端和服务器都在同一台机器上,传输速度较快,但同时抢占 CPU 也会影响处理。最好的方式是在两台不同的机器上进行测试。目前受限于设备环境配置,尚未进行更多并发量的测试。