epoll
epoll 是 Linux 内核为处理大批量文件描述符而设计的 I/O 事件通知机制,它提供了一种高效的方式,能够同时监控多个文件描述符的事件(如读、写等),并且回调那些就绪的文件描述符。相比于传统的 select 和 poll,epoll 在处理大量并发连接时具有更高的效率,特别适用于高并发服务器的开发。
相关系统调用
epoll_create
功能:创建一个 epoll 实例,返回一个文件描述符。
int epoll_create(int size);
参数:size 参数已弃用,只需要大于 0 即可。
返回值:
成功:返回
epoll
文件描述符。失败:返回
-1
,并设置errno
。
epoll_ctl
功能:向 epoll 实例注册、修改或删除文件描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
epfd:epoll 实例的文件描述符。
op:操作类型:
EPOLL_CTL_ADD:注册新的文件描述符。
EPOLL_CTL_MOD:修改已注册的文件描述符。
EPOLL_CTL_DEL:删除文件描述符。
fd:需要操作的文件描述符。
event:指向 struct epoll_event 的指针,描述需要监控的事件。
返回值:
成功:返回
0
。失败:返回
-1
,并设置errno
。
epoll_wait
功能:等待注册的文件描述符上的事件发生。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数:
epfd:epoll 实例的文件描述符。
events:用于存储就绪事件的数组。
maxevents:events 数组的最大长度。
timeout:超时时间(毫秒),
-1
表示阻塞等待,0
表示立即返回。返回值:
成功:返回就绪的文件描述符数量。
失败:返回
-1
,并设置errno
。
struct epoll_event
struct epoll_event {
uint32_t events; // 需要监控的事件类型
epoll_data_t data; // 用户数据
};
events 字段:
EPOLLIN:文件描述符可读。
EPOLLOUT:文件描述符可写。
EPOLLERR:文件描述符发生错误。
EPOLLHUP:文件描述符挂起(对端关闭连接)。
EPOLLET:设置为边缘触发模式(默认是水平触发)。
EPOLLONESHOT:事件只通知一次,之后需要重新注册,即监听完该 socket 一次后,如果还要继续监听该 socket 的话,需要再次使用 epoll_ctl 把这个 socket 加入到 EPOLL 队列中。
data 字段是一个联合体,可以存储用户自定义数据,通常用于保存文件描述符:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
工作原理
epoll 的工作原理基于事件驱动,当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关。
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
它内部维护了一个红黑树,用于高效地存储和检索文件描述符及其对应的事件。同时,它还维护了一个就绪事件队列,用于存放已经被操作系统标记为就绪的事件。当调用 epoll_wait 时,进程会休眠并等待文件描述符上的事件发生。一旦有事件发生,内核会将该事件从红黑树中移动到就绪事件队列,并唤醒等待的进程。
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的 eventpoll 对象
struct epoll_event event; //期待发生的事件类型
}

epoll
的工作模式
水平触发(LT,Level-Triggered)
特点:
默认模式。
只要文件描述符处于就绪状态,epoll_wait 就会不断通知,支持阻塞读写和非阻塞读写,直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回。
优点:编程简单,适合大多数场景。
缺点:可能重复通知,效率较低。
边缘触发(ET,Edge-Triggered)
特点:
需要在使用 epoll_ctl 的时候对结构体 struct epoll_event 中的 events 属性显式设置 EPOLLET 标志。
只在文件描述符状态发生变化时通知一次。
优点:减少重复通知,性能更高。
缺点:编程复杂,需要一次性处理完所有数据。
对比 LT 和 ET
理解 ET 模式和非阻塞文件描述符


- 服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.
- 客户端要读到服务器的响应, 才会发送下一个请求
- 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.
void SetNoBlock(int sockfd)
{
int flag = fcntl(sockfd, F_GETFL);
if (flag < 0)
{
perror("fcntl");
return;
}
fcntl(sockfd, F_SETFL, flag | O_NONBLOCK);
}
epoll
的优势
高效性:使用红黑树管理文件描述符,事件通知采用回调机制,避免了 select 和 poll 的线性扫描问题。
可扩展性:适用于监控大量文件描述符的场景,性能不会随着文件描述符数量的增加而显著下降。
支持边缘触发:边缘触发模式可以减少不必要的通知,提高性能。
epoll
与 select
/ poll
的对比
特性 | select |
poll |
epoll |
---|---|---|---|
文件描述符数量限制 | 1024(默认) | 无限制 | 无限制 |
效率 | 低(线性扫描) | 低(线性扫描) | 高(回调机制) |
触发模式 | 水平触发 | 水平触发 | 支持水平触发和边缘触发 |
适用场景 | 小规模并发 | 小规模并发 | 高并发、大规模连接 |
Reactor 反应堆模式
Reactor 模式 是一种事件处理设计模式,用于处理多个并发输入事件。它通过事件驱动的方式,将事件分发给相应的处理程序,从而实现对并发事件的高效处理。Reactor 模式广泛应用于网络编程、服务器框架等领域,例如 Java 的 NIO、Netty 框架,以及 C++ 的 Boost.Asio 等。
Reactor 模式的核心思想
Reactor 模式的核心思想是:
事件驱动:通过事件循环(Event Loop)监听多个事件源(如 Socket、文件描述符等),该事件驱动器可以采用 select,poll,epoll 等。
事件分发:当事件发生时,Reactor 将事件分发给对应的事件处理器(Event Handler)。
非阻塞:Reactor 模式通常与非阻塞 I/O 结合使用,避免线程阻塞,所以需要将 Socket、文件描述符等通过 fcntl 函数设置为非阻塞状态。
Reactor 模式的组成
Reactor 模式通常由以下几个组件组成:
(1) Loop(反应器/事件循环)
负责监听事件源(如 Socket、文件描述符等)。
当事件发生时,将事件分发给对应的事件处理器。
(2) Dispatcher(事件多路分发器)
使用系统调用(如 select,poll,epoll )监听多个事件源。
当事件发生时,通知 Event Handler 进行处理。
(3) Event Handler(事件处理器)
定义处理事件的接口。
每个事件源对应一个事件处理器。
案例使用--基于Reactor的计算器
Reactor.hpp
#pragma once
#include <iostream>
#include "Epoller.hpp"
#include <memory>
#include <unordered_map>
#include "Connection.hpp"
using namespace std;
using connection_t = shared_ptr<Connection>;
class Reactor
{
const static int event_num = 64;
public:
Reactor()
: _isrunning(false), _epoller(make_unique<Epoller>())
{
_epoller->Init();
}
void InsertConnection(connection_t conn)
{
if (!IsConnectionExists(conn->GetSockfd()))
{
// 加入到unordered_map中管理起来
_connections[conn->GetSockfd()] = conn;
// 加入到内核epoll中
_epoller->Add(conn->GetSockfd(), conn->GetEvents());
// 设置关联关系
conn->SetOwner(this);
cout << "add connection success: " << conn->GetSockfd() << endl;
}
}
void EnableReadWrite(int sockfd, bool readable, bool writeable)
{
if (IsConnectionExists(sockfd))
{
uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
// 修改用户层connection事件
_connections[sockfd]->SetEvents(events);
// 写透到epoll内核中
_epoller->Update(sockfd, events);
}
}
void DelConnection(int sockfd)
{
if (IsConnectionExists(sockfd))
{
// 从内核epoll中删除
_epoller->Delete(sockfd);
// 在用户层connection中关闭sockfd文件描述符
_connections[sockfd]->Close();
// 在unordered_map中删除对应连接
_connections.erase(sockfd);
}
}
void Stop()
{
_isrunning = false;
}
//事件循环
void Loop()
{
_isrunning = true;
int timeout = 1000;
while (_isrunning)
{
int n = _epoller->Wait(_revents, event_num, timeout);
cout<<"wait..."<<endl;
Dispatcher(n);
}
_isrunning = false;
}
//事件派发
void Dispatcher(int n)
{
for (int i = 0; i < n; i++)
{
cout<<"Dispatcher..."<<endl;
int sockfd = _revents[i].data.fd;
uint32_t event = _revents[i].events;
if ((event & EPOLLERR) || (event & EPOLLHUP))
event = (EPOLLIN | EPOLLOUT); // 将异常事件转换为读写事件
if ((event & EPOLLIN) && IsConnectionExists(sockfd))
{
_connections[sockfd]->Recver();
}
if ((event & EPOLLOUT) && IsConnectionExists(sockfd))
{
_connections[sockfd]->Sender();
}
}
}
private:
bool IsConnectionExists(int sockfd)
{
return _connections.find(sockfd) != _connections.end();
}
private:
unique_ptr<Epoller> _epoller;
unordered_map<int, connection_t> _connections; // fd:connection 管理所有连接
bool _isrunning;
struct epoll_event _revents[event_num];
};
Epoller.hpp
对于事件监听,我们选择使用 epoll,并将其封装为 Epoller,在 Reactor 层使用作为监听器。
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include "Common.hpp"
using namespace std;
class Epoller
{
public:
Epoller()
: _epfd(-1)
{
}
void Init()
{
_epfd = epoll_create(256);
if (_epfd < 0)
{
cout << "epoll_create error" << endl;
exit(EPOLL_CREATE_ERROR);
}
cout << "epoll_create success,epfd:" << _epfd << endl;
}
int Wait(struct epoll_event evs[], int num, int timeout)
{
int n = epoll_wait(_epfd, evs, num, timeout);
if (n < 0)
{
cout << "epoll_wait error" << endl;
}
return n;
}
void Ctrl(int sockfd, uint32_t events, int flag)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = sockfd;
int n = epoll_ctl(_epfd, flag, sockfd, &ev);
if (n < 0)
{
cout << "epoll_ctl error" << endl;
}
}
void Add(int sockfd, uint32_t events)
{
Ctrl(sockfd, events, EPOLL_CTL_ADD);
}
void Update(int sockfd, uint32_t events)
{
Ctrl(sockfd, events, EPOLL_CTL_MOD);
}
void Delete(int sockfd)
{
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
if (n < 0)
{
cout << "epoll_ctl error" << endl;
}
}
private:
int _epfd;
};
Connection.hpp
对于每个事件源,我们将其封装为 Connection.hpp,其包含了 socket,客户端地址信息,缓冲区,监听的事件等成员,之后在 Reactor 层统一使用 Connection 进行管理。
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "InetAddr.hpp"
#include "Reactor.hpp"
using namespace std;
class Reactor;
class Connection
{
public:
Connection()
: _sockfd(-1), _events(0)
{
}
void SetSockfd(int sockfd)
{
_sockfd = sockfd;
}
void SetPeerInfo(const InetAddr &peeraddr)
{
_peeraddr = peeraddr;
}
void SetEvents(uint32_t events)
{
_events = events;
}
void SetOwner(Reactor *owner)
{
_owner = owner;
}
int GetSockfd()
{
return _sockfd;
}
InetAddr GetPeerInfo()
{
return _peeraddr;
}
uint32_t GetEvents()
{
return _events;
}
Reactor *GetOwner()
{
return _owner;
}
void AppendToInbuffer(const string &in)
{
_inbuffer += in;
}
void AppendToOutbuffer(const string &out)
{
_outbuffer += out;
}
void DiscardOutString(int n)
{
_outbuffer.erase(0, n);
}
bool IsOutbufferEmpty()
{
return _outbuffer.empty();
}
string &Outbuffer()
{
return _outbuffer;
}
string &Inbuffer()
{
return _inbuffer;
}
void Close()
{
if (_sockfd > 0)
close(_sockfd);
}
// 回调方法
virtual void Sender() = 0;
virtual void Recver() = 0;
virtual void Excepter() = 0;
private:
int _sockfd;
string _inbuffer;
string _outbuffer;
InetAddr _peeraddr; // 对应的客户端
Reactor *_owner; // 从属于的Reactor
uint32_t _events; // 关心的事件
};
Listener.hpp
Listener 基于 Connection 对 listensock 进行了封装作为应用层,这样当客户端使用 connect 函数进行TCP连接的时候,就会在事件监听层 Epoller 触发 listensockfd 的 EPOLLIN 读事件,这样就可以立马调用 listensock 的 accept 函数来接受连接。不过因为使用 EPOLLET 工作模式,所以要记得将 listensockfd 设置为非阻塞模式。
#pragma once
#include "Epoller.hpp"
#include "Connection.hpp"
#include "Socket.hpp"
#include "IOService.hpp"
#include "Calculator.hpp"
using namespace std;
class Listener : public Connection
{
public:
Listener(int port)
: _listensock(make_unique<TcpSocket>()), _port(port)
{
_listensock->BuildTcpSocketMethod(_port);
SetSockfd(_listensock->Fd());
// 设置文件描述符为非阻塞
SetNonBlock(_listensock->Fd());
SetEvents(EPOLLIN | EPOLLET);
}
~Listener()
{
_listensock->Close();
}
virtual void Sender() override
{
}
virtual void Recver() override
{
// 由于来的可能是多个连接,不能一次读完
while (true)
{
InetAddr peer;
int aerrno = 0;
SockPtr sockptr = _listensock->Accepter(&peer, &aerrno);
if (sockptr)
{
int sockfd = sockptr->Fd();
// 连接成功,需要将sockfd添加到epoll中
// 就需要创建connection,并将其添加到epollserver
auto conn = make_shared<IOService>(sockfd);
conn->RegisterOnMessage(HandleRequest);
GetOwner()->InsertConnection(conn);
}
else
{
// 返回值为-1,连接出现异常
if (aerrno == EAGAIN || aerrno == EWOULDBLOCK)
{
// 非阻塞状态下无连接请求,连接完成直接退出
cout << "accept all connection...done" << endl;
break;
}
else if (aerrno == EINTR)
{
// 连接中断
cout << "accept intr by signal,continue" << endl;
continue;
}
else
{
cout << "accept error...ignore" << endl;
break;
}
}
}
}
virtual void Excepter() override
{
}
private:
unique_ptr<Socket> _listensock;
int _port;
};
IOService.hpp
IOService 基于 Connection 进行了封装作为应用层,其主要进行 IO 处理,其内部设置了一个回调函数对接收到的数据进行处理。
#pragma once
#include "Connection.hpp"
#include <functional>
#include "Common.hpp"
#include "Epoller.hpp"
using func_t = function<string(string &)>;
class IOService : public Connection
{
static const int size = 1024;
public:
IOService(int sockfd)
{
// 设置文件描述符为非阻塞
SetNonBlock(sockfd);
SetSockfd(sockfd);
SetEvents(EPOLLIN | EPOLLIN | EPOLLET);
}
virtual void Sender() override
{
while (true)
{
int n = send(GetSockfd(), Outbuffer().c_str(), Outbuffer().size(), 0);
if (n > 0)
{
// 发送成功,移除发送缓冲区
DiscardOutString(n);
}
else if (n == 0)
{
// 缓冲区已经无数据
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 对方的接收缓冲区已经满了
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
Excepter();
return;
}
}
}
// 一种:outbuffer empty
// 一种:发送缓冲区被写满了 && outbuffer没有empty,写条件不满足,使能sockfd在epoll中的事件
if(!IsOutbufferEmpty())
{
// 修改对sockfd的事件关心!-- 开启对写事件关心
// 按需设置!
GetOwner()->EnableReadWrite(GetSockfd(), true, true);
}
else{
GetOwner()->EnableReadWrite(GetSockfd(), true, false);
}
}
virtual void Excepter() override
{
// IO出现异常,关闭连接
cout << "客户端连接出现异常情况,关闭连接..." << endl;
GetOwner()->DelConnection(GetSockfd());
}
virtual void Recver() override
{
while (true)
{
char buffer[size];
int n = recv(GetSockfd(), buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
// 读取成功
buffer[n] = 0;
AppendToInbuffer(buffer);
}
else if (n == 0)
{
// 对端关闭
Excepter();
return;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 非阻塞模式下无数据接收
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
Excepter();
break;
}
}
}
/
std::cout << "Recver Intbuffer: \n"
<< Inbuffer() << std::endl;
// 把数据读取完毕,但不知道是否读到完整报文,所以需要协议
// 处理接收缓冲区的数据
string result;
if (_onmessage)
{
result = _onmessage(Inbuffer());
}
// 添加应答消息
AppendToOutbuffer(result);
if (!IsOutbufferEmpty())
{
Sender();
//GetOwner()->EnableReadWrite(GetSockfd(), true, true);
}
}
void RegisterOnMessage(func_t onmessage)
{
_onmessage = onmessage;
}
private:
func_t _onmessage;
};
Protocol.hpp
由于我们在进行 IO 时并不能确保接收到的是一个完整的报文,所以需要定制协议,再将该协议应用到进行 IO 处理的回调函数。
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
using namespace std;
const string Sep = "\r\n";
// 给信息添加报头
//{json} -> length\r\n{json}\r\n
bool Encode(string &message)
{
if (message.size() == 0)
return false;
string package = to_string(message.size()) + Sep + message + Sep;
message = package;
return true;
}
// 解析协议,提取信息
bool Decode(string &package, string *message)
{
auto pos = package.find(Sep);
if (pos == string::npos)
return false;
string message_length_str = package.substr(0, pos);
int message_length = stoi(message_length_str);
int full_length = message_length_str.size() + 2 * Sep.size() + message_length;
if (package.size() < full_length)
return false;
*message = package.substr(pos + Sep.size(), message_length);
package.erase(0, full_length);
return true;
}
class Request
{
public:
Request()
{
}
Request(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
// 使用jsoncpp序列化
void Serialize(string &out_str)
{
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["op"] = _op;
out_str = root.toStyledString();
}
// 反序列化
bool Deserialize(string &in_str)
{
Json::Value root;
Json::Reader reader;
bool parsingSuccessful = reader.parse(in_str, root);
if (!parsingSuccessful)
{
cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages();
return false;
}
_x = root["x"].asInt();
_y = root["y"].asInt();
_op = root["op"].asInt();
return true;
}
void Print()
{
cout << "x: " << _x << endl;
cout << "y: " << _y << endl;
cout << "op: " << _op << endl;
}
int X() const
{
return _x;
}
int Y() const
{
return _y;
}
char Op() const
{
return _op;
}
private:
int _x, _y;
char _op;
};
class Response
{
public:
Response()
{
}
Response(int result, int code)
: _result(result), _code(code)
{
}
void Serialize(string &out_str)
{
Json::Value root;
root["result"] = _result;
root["code"] = _code;
out_str = root.toStyledString();
}
bool Deserialize(string &in_str)
{
Json::Value root;
Json::Reader reader;
bool parsingsuccessful = reader.parse(in_str, root);
if (!parsingsuccessful)
{
cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages() << endl;
return false;
}
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
}
void SetResult(int res)
{
_result = res;
}
void SetCode(int c)
{
_code = c;
}
int Result()
{
return _result;
}
int Code()
{
return _code;
}
private:
int _result = 0;
int _code = 0;
};
Calculator.hpp
业务处理层,对于 IO 接收到的数据进行处理,并需要应用定制好的协议
#pragma once
#include "Protocol.hpp"
class Calculator
{
public:
Calculator()
{
}
Response Execute(const Request &req)
{
Response res;
switch (req.Op())
{
case '+':
res.SetResult(req.X() + req.Y());
break;
case '-':
res.SetResult(req.X() - req.Y());
break;
case '*':
res.SetResult(req.X() * req.Y());
break;
case '/':
if (req.Y() == 0)
res.SetCode(1);
else
res.SetResult(req.X() / req.Y());
break;
case '%':
if (req.Y() == 0)
res.SetCode(1);
else
res.SetResult(req.X() % req.Y());
break;
default:
break;
}
return res;
}
} cal;
string HandleRequest(string &inbuffer)
{
string request_str;
string result_str;
while (Decode(inbuffer, &request_str))
{
string response_str;
// 拿到了一个完整的报文
if (request_str.empty())
break;
Request req;
if (!req.Deserialize(request_str))
break;
// 处理业务
Response resp = cal.Execute(req);
// 序列化
resp.Serialize(response_str);
// 添加长度说明---协议
Encode(response_str);
result_str += response_str;
}
return result_str;
}
Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "InetAddr.hpp"
#include "Common.hpp"
using namespace std;
class Socket; // 前置声明
using SockPtr = shared_ptr<Socket>;
#define glistensockfd -1
#define gbacklog 8
// 基类,规定创建socket方法
class Socket
{
public:
// 创建listensockfd
Socket() = default;
virtual ~Socket() = default;
virtual void SocketOrDie() = 0;
virtual void SetSockOpt() = 0;
virtual bool BindOrDie(int port) = 0;
virtual bool ListenOrDie() = 0;
virtual SockPtr Accepter(InetAddr *client, int *aerrno) = 0;
virtual void Close() = 0;
virtual int Recv(string *out_str) = 0;
virtual int Send(const string &in_str) = 0;
virtual int Fd() = 0;
// 创建TcpSocket的固定方法
void BuildTcpSocketMethod(int port)
{
SocketOrDie();
SetSockOpt();
BindOrDie(port);
ListenOrDie();
}
// 创建UdpSocket的固定方法
void BuildUdpSocketMethod(int port)
{
SocketOrDie();
SetSockOpt();
BindOrDie(port);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket() : _listensockfd(glistensockfd)
{
}
TcpSocket(int listensockfd)
: _listensockfd(listensockfd)
{
}
virtual ~TcpSocket()
{
}
// 创建listensockfd
virtual void SocketOrDie() override
{
_listensockfd = socket(AF_INET, SOCK_STREAM, 0);
SetNonBlock(_listensockfd);//将listensockfd设置为非阻塞
if (_listensockfd < 0)
{
cout << "socket error" << endl;
exit(SOCKET_ERROR);
}
cout << "socket create success" << endl;
}
virtual void SetSockOpt() override
{
int opt = 1;
// 保证服务器异常断开后可以立即重启,不会有bind问题
setsockopt(_listensockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
virtual bool BindOrDie(int port) override
{
if (_listensockfd == glistensockfd)
return false;
// 填充网络信息
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
int n = bind(_listensockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0)
{
cout << "bind error" << endl;
exit(BIND_ERROR);
}
cout << "bind success" << endl;
return true;
}
virtual bool ListenOrDie() override
{
if (_listensockfd == glistensockfd)
return false;
int n = listen(_listensockfd, gbacklog);
if (n < 0)
{
cout << "listen error" << endl;
exit(LISTEN_ERROR);
}
cout << "listen success" << endl;
return true;
}
// 1.文件描述符 2.client info
virtual SockPtr Accepter(InetAddr *client, int *aerrno) override
{
if (client == nullptr)
return nullptr;
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sockfd = accept(_listensockfd, (struct sockaddr *)&peer, &len);
if (sockfd < 0)
{
cout << "Socket accep error" << endl;
return nullptr;
}
*aerrno = errno;
client->SetAddr(peer);
return make_shared<TcpSocket>(sockfd);
}
virtual void Close() override
{
if (_listensockfd == glistensockfd)
return;
close(_listensockfd);
}
virtual int Recv(string *out_str) override
{
char buffer[4096 * 2];
int sz = recv(_listensockfd, buffer, sizeof(buffer) - 1, 0);
if (sz > 0)
{
buffer[sz] = 0;
*out_str = buffer;
}
return sz;
}
virtual int Send(const string &in_str) override
{
int sz = send(_listensockfd, in_str.c_str(), in_str.size(), 0);
return sz;
}
virtual int Fd() override
{
return _listensockfd;
}
private:
int _listensockfd;
};
class UdpSocket : public Socket
{
public:
UdpSocket(int sockfd = glistensockfd)
: _sockfd(sockfd)
{
}
// 创建listensockfd
virtual void SocketOrDie() override
{
_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0)
{
cout << "socket error" << endl;
exit(SOCKET_ERROR);
}
cout << "socket create success" << endl;
}
virtual void SetSockOpt() override
{
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
virtual bool BindOrDie(int port) override
{
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0)
{
cout << "bind error" << endl;
exit(BIND_ERROR);
}
cout << "bind success" << endl;
return true;
}
virtual void Close() override
{
if (_sockfd == glistensockfd)
return;
close(_sockfd);
}
virtual int Recv(string *out_str)
{
char buffer[4096];
socklen_t len = sizeof(_peer);
int sz = recvfrom(_sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&_peer, &len);
if (sz > 0)
{
buffer[sz] = 0;
*out_str = buffer;
}
return sz;
}
virtual int Send(const string &in_str) override
{
int sz = sendto(_sockfd, in_str.c_str(), in_str.size(), 0, (struct sockaddr *)&_peer, sizeof(_peer));
return sz;
}
virtual int Fd() override
{
return _sockfd;
}
private:
int _sockfd;
struct sockaddr_in _peer;
};
InetAddr.hpp
#pragma once
#include <string>
#include <iostream>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
using namespace std;
class InetAddr
{
public:
InetAddr()
{
}
InetAddr(int port, string ip = "")
: _port(port), _ip(ip)
{
bzero(&_sockaddr, sizeof(_sockaddr));
_sockaddr.sin_family = AF_INET;
_sockaddr.sin_port = htons(_port);
if (_ip.empty())
_sockaddr.sin_addr.s_addr = INADDR_ANY;
else
_sockaddr.sin_addr.s_addr = inet_addr(_ip.c_str());
}
InetAddr(const struct sockaddr_in &sockaddr)
{
_port = ntohs(sockaddr.sin_port);
char buf[64];
_ip = inet_ntop(AF_INET, &sockaddr.sin_addr, buf, sizeof(buf));
}
bool operator==(const InetAddr &other)
{
return _ip == other._ip;
}
InetAddr operator=(const InetAddr &other)
{
_ip = other._ip;
_port = other._port;
_sockaddr = other._sockaddr;
return *this;
}
struct sockaddr *getSockaddr()
{
return (struct sockaddr *)&_sockaddr;
}
int getSockaddrLen()
{
return sizeof(_sockaddr);
}
const string &getIp()
{
return _ip;
}
int getPort()
{
return _port;
}
void SetAddr(const struct sockaddr_in &client)
{
_sockaddr = client;
_port = ntohs(client.sin_port);
char buf[64];
_ip = inet_ntop(AF_INET, &client.sin_addr, buf, sizeof(buf));
}
private:
string _ip;
int _port;
struct sockaddr_in _sockaddr;
};
Common.hpp
#pragma once
#include <fcntl.h>
void SetNonBlock(int fd)
{
int flag = fcntl(fd, F_GETFL);
if (flag < 0)
{
std::cout << "SetNonBlock error" << std::endl;
return;
}
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
ACCEPT_ERROR,
CONNECT_ERROR,
EPOLL_CREATE_ERROR
};
Server.cc
#include "Reactor.hpp"
#include "Listener.hpp"
#include <string>
#include <memory>
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cout << "Usage: " << argv[0] << " port" << std::endl;
return 1;
}
int port = stoi(argv[1]);
Reactor reactor;
auto conn = make_shared<Listener>(port);
reactor.InsertConnection(conn);
reactor.Loop();
return 0;
}
Client.cc
#include <iostream>
#include <string>
#include <unistd.h>
#include "Socket.hpp"
#include <cstring>
#include "Common.hpp"
#include "Protocol.hpp"
#include "Reactor.hpp"
//./client server_ip server_port
int main(int argc, char *argv[])
{
if (argc != 3)
{
cout << "Usage:./client server_ip server_port" << endl;
return 0;
}
string server_ip = argv[1];
int server_port = stoi(argv[2]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
cout << "socket create error" << endl;
exit(SOCKET_ERROR);
}
// 填写网络信息
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());
// client 无需显示bind,connect连接时自动bind
int n = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (n < 0)
{
cout << "connect error" << endl;
exit(CONNECT_ERROR);
}
cout << "connect success,sockfd:" << sockfd << endl;
string message;
while (true)
{
int x, y;
char op;
cout << "input x: ";
cin >> x;
cout << "input y: ";
cin >> y;
cout << "input op: ";
cin >> op;
Request req(x, y, op);
req.Serialize(message); // 序列化
Encode(message); // 添加协议
req.Print();
n = send(sockfd, message.c_str(), message.size(), 0);
cout<<"send success"<<endl;
if (n > 0)
{
char buffer[1024];
int m = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (m > 0)
{
cout<<"recv success"<<endl;
buffer[m] = 0;
string package = buffer;
string content;
Decode(package, &content); // 去报头提取内容
Response res; // 反序列化
res.Deserialize(content);
cout << res.Result() << "[" << res.Code() << "]" << endl;
}
else
break;
}
else
break;
}
close(sockfd);
return 0;
}
makefile
all: server client
server:Server.cc
g++ -o $@ $^ -std=c++17 -ljsoncpp
client:Client.cc
g++ -o $@ $^ -std=c++17 -ljsoncpp
.PHONY:clean
clean:
rm -f server client