Linux网络 多路复用epoll && Reactor 反应堆模式

发布于:2025-03-20 ⋅ 阅读:(10) ⋅ 点赞:(0)

epoll

epoll 是 Linux 内核为处理大批量文件描述符而设计的 I/O 事件通知机制,它提供了一种高效的方式,能够同时监控多个文件描述符的事件(如读、写等),并且回调那些就绪的文件描述符。相比于传统的 select 和 poll,epoll 在处理大量并发连接时具有更高的效率,特别适用于高并发服务器的开发。

相关系统调用

epoll_create

功能:创建一个 epoll 实例,返回一个文件描述符。

int epoll_create(int size);

参数:size 参数已弃用,只需要大于 0 即可。

返回值

  • 成功:返回 epoll 文件描述符。

  • 失败:返回 -1,并设置 errno

epoll_ctl

功能:向 epoll 实例注册、修改或删除文件描述符。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数

  • epfd:epoll 实例的文件描述符。

  • op:操作类型:

    • EPOLL_CTL_ADD:注册新的文件描述符。

    • EPOLL_CTL_MOD:修改已注册的文件描述符。

    • EPOLL_CTL_DEL:删除文件描述符。

  • fd:需要操作的文件描述符。

  • event:指向 struct epoll_event 的指针,描述需要监控的事件。

返回值

  • 成功:返回 0

  • 失败:返回 -1,并设置 errno

epoll_wait

功能:等待注册的文件描述符上的事件发生。

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数

  • epfd:epoll 实例的文件描述符。

  • events:用于存储就绪事件的数组。

  • maxevents:events 数组的最大长度。

  • timeout:超时时间(毫秒),-1 表示阻塞等待,0 表示立即返回。

  • 返回值

    • 成功:返回就绪的文件描述符数量。

    • 失败:返回 -1,并设置 errno

struct epoll_event 

struct epoll_event 结构如下 :
struct epoll_event {
    uint32_t     events;  // 需要监控的事件类型
    epoll_data_t data;    // 用户数据
};

events 字段

  • EPOLLIN:文件描述符可读。

  • EPOLLOUT:文件描述符可写。

  • EPOLLERR:文件描述符发生错误。

  • EPOLLHUP:文件描述符挂起(对端关闭连接)。

  • EPOLLET:设置为边缘触发模式(默认是水平触发)。

  • EPOLLONESHOT:事件只通知一次,之后需要重新注册,即监听完该 socket 一次后,如果还要继续监听该 socket 的话,需要再次使用 epoll_ctl 把这个 socket 加入到 EPOLL 队列中。

data 字段是一个联合体,可以存储用户自定义数据,通常用于保存文件描述符:

typedef union epoll_data {
    void    *ptr;
    int      fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

工作原理

epoll 的工作原理基于事件驱动,当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关。

struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};

它内部维护了一个红黑树,用于高效地存储和检索文件描述符及其对应的事件。同时,它还维护了一个就绪事件队列,用于存放已经被操作系统标记为就绪的事件。当调用 epoll_wait 时,进程会休眠并等待文件描述符上的事件发生。一旦有事件发生,内核会将该事件从红黑树中移动到就绪事件队列,并唤醒等待的进程。

每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件 . 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 lgn ,其中 n 为树的高度 ).
而所有添加到 epoll 中的事件都会与设备 ( 网卡 ) 驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法. 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双链表中.
在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的 eventpoll 对象
struct epoll_event event; //期待发生的事件类型
}
当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可 . 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。

epoll 的工作模式

水平触发(LT,Level-Triggered)

  • 特点

    • 默认模式。

    • 只要文件描述符处于就绪状态,epoll_wait 就会不断通知,支持阻塞读写和非阻塞读写,直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回

  • 优点:编程简单,适合大多数场景。

  • 缺点:可能重复通知,效率较低。

边缘触发(ET,Edge-Triggered)

  • 特点

    • 需要在使用 epoll_ctl 的时候对结构体 struct epoll_event 中的 events 属性显式设置 EPOLLET 标志。

    • 只在文件描述符状态发生变化时通知一次。

  • 优点:减少重复通知,性能更高。

  • 缺点:编程复杂,需要一次性处理完所有数据。

对比 LT ET

LT epoll 的默认行为, 使用 ET 能够减少 epoll 触发的次数, 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。相当于一个文件描述符就绪之后, 不会反复被提示就绪,   看起来就比 E T 更高效一些, 是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的。 另一方面, ET 的代码复杂程度更高了 .

理解 ET 模式和非阻塞文件描述符

使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞 . 这个不是接口上的要求 , 而是 " 工程实践" 上的要求 . 假设这样的场景: 服务器接收到一个 10k 的请求 , 会向客户端返回一个应答数据 . 如果客户端收不到应答, 不会发送第二个 10k 请求 .
如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话 (read 不能保证一次就把所有的数据都读出来, 参考 man 手册的说明 , 可能被信号打断 ), 剩下的 9k 数据就会待在缓冲区中
此时由于 epoll ET 模式 , 并不会认为文件描述符读就绪 . epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中 . 直到下一次客户端再给服务器写数据 . epoll_wait 才能返回,但是问题来了.
  • 服务器只读到 1k 个数据, 10k 读完才会给客户端返回响应数据.
  • 客户端要读到服务器的响应, 才会发送下一个请求
  • 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.

所以 , 为了解决上述问题 ( 阻塞 read 不一定能一下把完整的请求读完 ), 于是就可以使用非阻塞轮询的方式来读缓冲区, 保证一定能把完整的请求都读出来,所以需要使用 fcntl 函数把文件描述符设置为非阻塞模式 . 而如果是 LT 没这个问题 . 只要缓冲区中的数据没读完 , 就能够让 epoll_wait 返回文件描述符读就绪。
void SetNoBlock(int sockfd)
{
	int flag = fcntl(sockfd, F_GETFL);
	if (flag < 0)
	{
		perror("fcntl");
		return;
	}
	fcntl(sockfd, F_SETFL, flag | O_NONBLOCK);
}

epoll 的优势

  • 高效性:使用红黑树管理文件描述符,事件通知采用回调机制,避免了 select 和 poll 的线性扫描问题。

  • 可扩展性:适用于监控大量文件描述符的场景,性能不会随着文件描述符数量的增加而显著下降。

  • 支持边缘触发:边缘触发模式可以减少不必要的通知,提高性能。

epoll 与 select / poll 的对比

特性 select poll epoll
文件描述符数量限制 1024(默认) 无限制 无限制
效率 低(线性扫描) 低(线性扫描) 高(回调机制)
触发模式 水平触发 水平触发 支持水平触发和边缘触发
适用场景 小规模并发 小规模并发

高并发、大规模连接

Reactor 反应堆模式

Reactor 模式 是一种事件处理设计模式,用于处理多个并发输入事件。它通过事件驱动的方式,将事件分发给相应的处理程序,从而实现对并发事件的高效处理。Reactor 模式广泛应用于网络编程、服务器框架等领域,例如 Java 的 NIO、Netty 框架,以及 C++ 的 Boost.Asio 等。

Reactor 模式的核心思想

Reactor 模式的核心思想是:

  • 事件驱动:通过事件循环(Event Loop)监听多个事件源(如 Socket、文件描述符等),该事件驱动器可以采用 select,poll,epoll 等。

  • 事件分发:当事件发生时,Reactor 将事件分发给对应的事件处理器(Event Handler)。

  • 非阻塞:Reactor 模式通常与非阻塞 I/O 结合使用,避免线程阻塞,所以需要将 Socket、文件描述符等通过 fcntl 函数设置为非阻塞状态。

Reactor 模式的组成

Reactor 模式通常由以下几个组件组成:

(1) Loop(反应器/事件循环)

  • 负责监听事件源(如 Socket、文件描述符等)。

  • 当事件发生时,将事件分发给对应的事件处理器。

(2) Dispatcher(事件多路分发器)

  • 使用系统调用(如 select,poll,epoll )监听多个事件源。

  • 当事件发生时,通知 Event Handler 进行处理。

(3) Event Handler(事件处理器)

  • 定义处理事件的接口。

  • 每个事件源对应一个事件处理器。

案例使用--基于Reactor的计算器

Reactor.hpp

#pragma once
#include <iostream>
#include "Epoller.hpp"
#include <memory>
#include <unordered_map>
#include "Connection.hpp"
using namespace std;
using connection_t = shared_ptr<Connection>;
class Reactor
{
    const static int event_num = 64;
public:
    Reactor()
        : _isrunning(false), _epoller(make_unique<Epoller>())
    {
        _epoller->Init();
    }
    void InsertConnection(connection_t conn)
    {
        if (!IsConnectionExists(conn->GetSockfd()))
        {
            // 加入到unordered_map中管理起来
            _connections[conn->GetSockfd()] = conn;
            // 加入到内核epoll中
            _epoller->Add(conn->GetSockfd(), conn->GetEvents());
            // 设置关联关系
            conn->SetOwner(this);
            cout << "add connection success: " << conn->GetSockfd() << endl;
        }
    }
    void EnableReadWrite(int sockfd, bool readable, bool writeable)
    {
        if (IsConnectionExists(sockfd))
        {
            uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
            // 修改用户层connection事件
            _connections[sockfd]->SetEvents(events);
            // 写透到epoll内核中
            _epoller->Update(sockfd, events);
        }
    }
    void DelConnection(int sockfd)
    {
        if (IsConnectionExists(sockfd))
        {
            // 从内核epoll中删除
            _epoller->Delete(sockfd);
            // 在用户层connection中关闭sockfd文件描述符
            _connections[sockfd]->Close();
            // 在unordered_map中删除对应连接
            _connections.erase(sockfd);
        }
    }
    void Stop()
    {
        _isrunning = false;
    }
    //事件循环
    void Loop()
    {
        _isrunning = true;
        int timeout = 1000;
        while (_isrunning)
        {
            int n = _epoller->Wait(_revents, event_num, timeout);
            cout<<"wait..."<<endl;
            Dispatcher(n);
        }
        _isrunning = false;
    }
    //事件派发
    void Dispatcher(int n)
    {
        for (int i = 0; i < n; i++)
        {
            cout<<"Dispatcher..."<<endl;
            int sockfd = _revents[i].data.fd;
            uint32_t event = _revents[i].events;
            if ((event & EPOLLERR) || (event & EPOLLHUP))
                event = (EPOLLIN | EPOLLOUT); // 将异常事件转换为读写事件
            if ((event & EPOLLIN) && IsConnectionExists(sockfd))
            {
                _connections[sockfd]->Recver();
            }
            if ((event & EPOLLOUT) && IsConnectionExists(sockfd))
            {
                _connections[sockfd]->Sender();
            }
        }
    }

private:
    bool IsConnectionExists(int sockfd)
    {
        return _connections.find(sockfd) != _connections.end();
    }

private:
    unique_ptr<Epoller> _epoller;
    unordered_map<int, connection_t> _connections; // fd:connection 管理所有连接
    bool _isrunning;
    struct epoll_event _revents[event_num];
};

Epoller.hpp

对于事件监听,我们选择使用 epoll,并将其封装为 Epoller,在 Reactor 层使用作为监听器。

#pragma once
#include <iostream>
#include <sys/epoll.h>
#include "Common.hpp"
using namespace std;
class Epoller
{
public:
    Epoller()
        : _epfd(-1)
    {
    }
    void Init()
    {
        _epfd = epoll_create(256);
        if (_epfd < 0)
        {
            cout << "epoll_create error" << endl;
            exit(EPOLL_CREATE_ERROR);
        }
        cout << "epoll_create success,epfd:" << _epfd << endl;
    }
    int Wait(struct epoll_event evs[], int num, int timeout)
    {
        int n = epoll_wait(_epfd, evs, num, timeout);
        if (n < 0)
        {
            cout << "epoll_wait error" << endl;
        }
        return n;
    }
    void Ctrl(int sockfd, uint32_t events, int flag)
    {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = sockfd;
        int n = epoll_ctl(_epfd, flag, sockfd, &ev);
        if (n < 0)
        {
            cout << "epoll_ctl error" << endl;
        }
    }
    void Add(int sockfd, uint32_t events)
    {
        Ctrl(sockfd, events, EPOLL_CTL_ADD);
    }
    void Update(int sockfd, uint32_t events)
    {
        Ctrl(sockfd, events, EPOLL_CTL_MOD);
    }
    void Delete(int sockfd)
    {
        int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
        if (n < 0)
        {
            cout << "epoll_ctl error" << endl;
        }
    }

private:
    int _epfd;
};

Connection.hpp

对于每个事件源,我们将其封装为 Connection.hpp,其包含了 socket,客户端地址信息,缓冲区,监听的事件等成员,之后在 Reactor 层统一使用 Connection 进行管理。

#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "InetAddr.hpp"
#include "Reactor.hpp"
using namespace std;
class Reactor;
class Connection
{
public:
    Connection()
        : _sockfd(-1), _events(0)
    {
    }
    void SetSockfd(int sockfd)
    {
        _sockfd = sockfd;
    }
    void SetPeerInfo(const InetAddr &peeraddr)
    {
        _peeraddr = peeraddr;
    }
    void SetEvents(uint32_t events)
    {
        _events = events;
    }
    void SetOwner(Reactor *owner)
    {
        _owner = owner;
    }
    int GetSockfd()
    {
        return _sockfd;
    }
    InetAddr GetPeerInfo()
    {
        return _peeraddr;
    }
    uint32_t GetEvents()
    {
        return _events;
    }
    Reactor *GetOwner()
    {
        return _owner;
    }
    void AppendToInbuffer(const string &in)
    {
        _inbuffer += in;
    }
    void AppendToOutbuffer(const string &out)
    {
        _outbuffer += out;
    }
    void DiscardOutString(int n)
    {
        _outbuffer.erase(0, n);
    }
    bool IsOutbufferEmpty()
    {
        return _outbuffer.empty();
    }
    string &Outbuffer()
    {
        return _outbuffer;
    }
    string &Inbuffer()
    {
        return _inbuffer;
    }
    void Close()
    {
        if (_sockfd > 0)
            close(_sockfd);
    }
    // 回调方法
    virtual void Sender() = 0;
    virtual void Recver() = 0;
    virtual void Excepter() = 0;

private:
    int _sockfd;
    string _inbuffer;
    string _outbuffer;
    InetAddr _peeraddr; // 对应的客户端
    Reactor *_owner;    // 从属于的Reactor
    uint32_t _events;   // 关心的事件
};

Listener.hpp

Listener 基于 Connection 对 listensock 进行了封装作为应用层,这样当客户端使用 connect 函数进行TCP连接的时候,就会在事件监听层 Epoller 触发 listensockfd 的 EPOLLIN 读事件,这样就可以立马调用 listensock 的 accept 函数来接受连接。不过因为使用 EPOLLET 工作模式,所以要记得将 listensockfd 设置为非阻塞模式。

#pragma once
#include "Epoller.hpp"
#include "Connection.hpp"
#include "Socket.hpp"
#include "IOService.hpp"
#include "Calculator.hpp"
using namespace std;
class Listener : public Connection
{
public:
    Listener(int port)
        : _listensock(make_unique<TcpSocket>()), _port(port)
    {
        _listensock->BuildTcpSocketMethod(_port);
        SetSockfd(_listensock->Fd());
        // 设置文件描述符为非阻塞
        SetNonBlock(_listensock->Fd());
        SetEvents(EPOLLIN | EPOLLET);
    }
    ~Listener()
    {
        _listensock->Close();
    }
    virtual void Sender() override
    {
    }
    virtual void Recver() override
    {
        // 由于来的可能是多个连接,不能一次读完
        while (true)
        {
            InetAddr peer;
            int aerrno = 0;
            SockPtr sockptr = _listensock->Accepter(&peer, &aerrno);
            if (sockptr)
            {
                int sockfd = sockptr->Fd();
                // 连接成功,需要将sockfd添加到epoll中
                // 就需要创建connection,并将其添加到epollserver
                auto conn = make_shared<IOService>(sockfd);
                conn->RegisterOnMessage(HandleRequest);
                GetOwner()->InsertConnection(conn);
            }
            else
            {
                // 返回值为-1,连接出现异常
                if (aerrno == EAGAIN || aerrno == EWOULDBLOCK)
                {
                    // 非阻塞状态下无连接请求,连接完成直接退出
                    cout << "accept all connection...done" << endl;
                    break;
                }
                else if (aerrno == EINTR)
                {
                    // 连接中断
                    cout << "accept intr by signal,continue" << endl;
                    continue;
                }
                else
                {
                    cout << "accept error...ignore" << endl;
                    break;
                }
            }
        }
    }
    virtual void Excepter() override
    {
    }

private:
    unique_ptr<Socket> _listensock;
    int _port;
};

IOService.hpp

IOService 基于 Connection 进行了封装作为应用层,其主要进行 IO 处理,其内部设置了一个回调函数对接收到的数据进行处理。

#pragma once
#include "Connection.hpp"
#include <functional>
#include "Common.hpp"
#include "Epoller.hpp"
using func_t = function<string(string &)>;
class IOService : public Connection
{
    static const int size = 1024;

public:
    IOService(int sockfd)
    {
        // 设置文件描述符为非阻塞
        SetNonBlock(sockfd);
        SetSockfd(sockfd);
        SetEvents(EPOLLIN | EPOLLIN | EPOLLET);
    }
    virtual void Sender() override
    {
        while (true)
        {
            int n = send(GetSockfd(), Outbuffer().c_str(), Outbuffer().size(), 0);
            if (n > 0)
            {
                // 发送成功,移除发送缓冲区
                DiscardOutString(n);
            }
            else if (n == 0)
            {
                // 缓冲区已经无数据
                break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    // 对方的接收缓冲区已经满了
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    Excepter();
                    return;
                }
            }
        }
        // 一种:outbuffer empty
        // 一种:发送缓冲区被写满了 && outbuffer没有empty,写条件不满足,使能sockfd在epoll中的事件
        if(!IsOutbufferEmpty())
        {
            // 修改对sockfd的事件关心!-- 开启对写事件关心
            // 按需设置!
            GetOwner()->EnableReadWrite(GetSockfd(), true, true);
        }
        else{
            GetOwner()->EnableReadWrite(GetSockfd(), true, false);
        }
    }
    virtual void Excepter() override
    {
        // IO出现异常,关闭连接
        cout << "客户端连接出现异常情况,关闭连接..." << endl;
        GetOwner()->DelConnection(GetSockfd());
    }
    virtual void Recver() override
    {
        while (true)
        {
            char buffer[size];
            int n = recv(GetSockfd(), buffer, sizeof(buffer) - 1, 0);
            if (n > 0)
            {
                // 读取成功
                buffer[n] = 0;
                AppendToInbuffer(buffer);
            }
            else if (n == 0)
            {
                // 对端关闭
                Excepter();
                return;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    // 非阻塞模式下无数据接收
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    Excepter();
                    break;
                }
            }
        }
        /
        std::cout << "Recver Intbuffer: \n"
        << Inbuffer() << std::endl;
        // 把数据读取完毕,但不知道是否读到完整报文,所以需要协议
        // 处理接收缓冲区的数据
        string result;
        if (_onmessage)
        {
            result = _onmessage(Inbuffer());
        }
        // 添加应答消息
        AppendToOutbuffer(result);
        if (!IsOutbufferEmpty())
        {
            Sender();
            //GetOwner()->EnableReadWrite(GetSockfd(), true, true);
        }
    }
    void RegisterOnMessage(func_t onmessage)
    {
        _onmessage = onmessage;
    }

private:
    func_t _onmessage;
};

Protocol.hpp

由于我们在进行 IO 时并不能确保接收到的是一个完整的报文,所以需要定制协议,再将该协议应用到进行 IO 处理的回调函数。

#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
using namespace std;

const string Sep = "\r\n";
// 给信息添加报头
//{json} -> length\r\n{json}\r\n
bool Encode(string &message)
{
    if (message.size() == 0)
        return false;
    string package = to_string(message.size()) + Sep + message + Sep;
    message = package;
    return true;
}
// 解析协议,提取信息
bool Decode(string &package, string *message)
{
    auto pos = package.find(Sep);
    if (pos == string::npos)
        return false;
    string message_length_str = package.substr(0, pos);
    int message_length = stoi(message_length_str);
    int full_length = message_length_str.size() + 2 * Sep.size() + message_length;
    if (package.size() < full_length)
        return false;
    *message = package.substr(pos + Sep.size(), message_length);
    package.erase(0, full_length);
    return true;
}
class Request
{
public:
    Request()
    {
    }
    Request(int x, int y, char op)
        : _x(x), _y(y), _op(op)
    {
    }
    // 使用jsoncpp序列化
    void Serialize(string &out_str)
    {
        Json::Value root;
        root["x"] = _x;
        root["y"] = _y;
        root["op"] = _op;
        out_str = root.toStyledString();
    }
    // 反序列化
    bool Deserialize(string &in_str)
    {
        Json::Value root;
        Json::Reader reader;
        bool parsingSuccessful = reader.parse(in_str, root);
        if (!parsingSuccessful)
        {
            cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages();
            return false;
        }
        _x = root["x"].asInt();
        _y = root["y"].asInt();
        _op = root["op"].asInt();
        return true;
    }
    void Print()
    {
        cout << "x: " << _x << endl;
        cout << "y: " << _y << endl;
        cout << "op: " << _op << endl;
    }
    int X() const
    {
        return _x;
    }
    int Y() const
    {
        return _y;
    }
    char Op() const
    {
        return _op;
    }

private:
    int _x, _y;
    char _op;
};

class Response
{
public:
    Response()
    {
    }
    Response(int result, int code)
        : _result(result), _code(code)
    {
    }
    void Serialize(string &out_str)
    {
        Json::Value root;
        root["result"] = _result;
        root["code"] = _code;
        out_str = root.toStyledString();
    }
    bool Deserialize(string &in_str)
    {
        Json::Value root;
        Json::Reader reader;
        bool parsingsuccessful = reader.parse(in_str, root);
        if (!parsingsuccessful)
        {
            cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages() << endl;
            return false;
        }
        _result = root["result"].asInt();
        _code = root["code"].asInt();
        return true;
    }
    void SetResult(int res)
    {
        _result = res;
    }
    void SetCode(int c)
    {
        _code = c;
    }
    int Result()
    {
        return _result;
    }
    int Code()
    {
        return _code;
    }

private:
    int _result = 0;
    int _code = 0;
};

Calculator.hpp

业务处理层,对于 IO 接收到的数据进行处理,并需要应用定制好的协议

#pragma once
#include "Protocol.hpp"
class Calculator
{
public:
    Calculator()
    {
    }
    Response Execute(const Request &req)
    {
        Response res;
        switch (req.Op())
        {
        case '+':
            res.SetResult(req.X() + req.Y());
            break;
        case '-':
            res.SetResult(req.X() - req.Y());
            break;
        case '*':
            res.SetResult(req.X() * req.Y());
            break;
        case '/':
            if (req.Y() == 0)
                res.SetCode(1);
            else
                res.SetResult(req.X() / req.Y());
            break;
        case '%':
            if (req.Y() == 0)
                res.SetCode(1);
            else
                res.SetResult(req.X() % req.Y());
            break;
        default:
            break;
        }
        return res;
    }
} cal;
string HandleRequest(string &inbuffer)
{
    string request_str;
    string result_str;
    while (Decode(inbuffer, &request_str))
    {
        string response_str;
        // 拿到了一个完整的报文
        if (request_str.empty())
            break;
        Request req;
        if (!req.Deserialize(request_str))
            break;
        // 处理业务
        Response resp = cal.Execute(req);
        // 序列化
        resp.Serialize(response_str);
        // 添加长度说明---协议
        Encode(response_str);
        result_str += response_str;
    }
    return result_str;
}

Socket.hpp

#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "InetAddr.hpp"
#include "Common.hpp"
using namespace std;
class Socket; // 前置声明
using SockPtr = shared_ptr<Socket>;
#define glistensockfd -1
#define gbacklog 8
// 基类,规定创建socket方法

class Socket
{
public:
    // 创建listensockfd
    Socket() = default;
    virtual ~Socket() = default;
    virtual void SocketOrDie() = 0;
    virtual void SetSockOpt() = 0;
    virtual bool BindOrDie(int port) = 0;
    virtual bool ListenOrDie() = 0;
    virtual SockPtr Accepter(InetAddr *client, int *aerrno) = 0;
    virtual void Close() = 0;
    virtual int Recv(string *out_str) = 0;
    virtual int Send(const string &in_str) = 0;
    virtual int Fd() = 0;
    // 创建TcpSocket的固定方法
    void BuildTcpSocketMethod(int port)
    {
        SocketOrDie();
        SetSockOpt();
        BindOrDie(port);
        ListenOrDie();
    }
    // 创建UdpSocket的固定方法
    void BuildUdpSocketMethod(int port)
    {
        SocketOrDie();
        SetSockOpt();
        BindOrDie(port);
    }
};

class TcpSocket : public Socket
{
public:
    TcpSocket() : _listensockfd(glistensockfd)
    {
    }
    TcpSocket(int listensockfd)
        : _listensockfd(listensockfd)
    {
    }
    virtual ~TcpSocket()
    {
    }
    // 创建listensockfd
    virtual void SocketOrDie() override
    {
        _listensockfd = socket(AF_INET, SOCK_STREAM, 0);
        SetNonBlock(_listensockfd);//将listensockfd设置为非阻塞
        if (_listensockfd < 0)
        {
            cout << "socket error" << endl;
            exit(SOCKET_ERROR);
        }
        cout << "socket create success" << endl;
    }
    virtual void SetSockOpt() override
    {
        int opt = 1;
        // 保证服务器异常断开后可以立即重启,不会有bind问题
        setsockopt(_listensockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    }
    virtual bool BindOrDie(int port) override
    {
        if (_listensockfd == glistensockfd)
            return false;
        // 填充网络信息
        struct sockaddr_in local;
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;
        int n = bind(_listensockfd, (struct sockaddr *)&local, sizeof(local));
        if (n < 0)
        {
            cout << "bind error" << endl;
            exit(BIND_ERROR);
        }
        cout << "bind success" << endl;
        return true;
    }
    virtual bool ListenOrDie() override
    {
        if (_listensockfd == glistensockfd)
            return false;
        int n = listen(_listensockfd, gbacklog);
        if (n < 0)
        {
            cout << "listen error" << endl;
            exit(LISTEN_ERROR);
        }
        cout << "listen success" << endl;
        return true;
    }
    // 1.文件描述符 2.client info
    virtual SockPtr Accepter(InetAddr *client, int *aerrno) override
    {
        if (client == nullptr)
            return nullptr;
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int sockfd = accept(_listensockfd, (struct sockaddr *)&peer, &len);
        if (sockfd < 0)
        {
            cout << "Socket accep error" << endl;
            return nullptr;
        }
        *aerrno = errno;
        client->SetAddr(peer);
        return make_shared<TcpSocket>(sockfd);
    }
    virtual void Close() override
    {
        if (_listensockfd == glistensockfd)
            return;
        close(_listensockfd);
    }
    virtual int Recv(string *out_str) override
    {
        char buffer[4096 * 2];
        int sz = recv(_listensockfd, buffer, sizeof(buffer) - 1, 0);
        if (sz > 0)
        {
            buffer[sz] = 0;
            *out_str = buffer;
        }
        return sz;
    }
    virtual int Send(const string &in_str) override
    {
        int sz = send(_listensockfd, in_str.c_str(), in_str.size(), 0);
        return sz;
    }
    virtual int Fd() override
    {
        return _listensockfd;
    }

private:
    int _listensockfd;
};

class UdpSocket : public Socket
{
public:
    UdpSocket(int sockfd = glistensockfd)
        : _sockfd(sockfd)
    {
    }
    // 创建listensockfd
    virtual void SocketOrDie() override
    {
        _sockfd = socket(AF_INET, SOCK_DGRAM, 0);
        if (_sockfd < 0)
        {
            cout << "socket error" << endl;
            exit(SOCKET_ERROR);
        }
        cout << "socket create success" << endl;
    }
    virtual void SetSockOpt() override
    {
        int opt = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    }

    virtual bool BindOrDie(int port) override
    {
        struct sockaddr_in local;
        bzero(&local, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;
        int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
        if (n < 0)
        {
            cout << "bind error" << endl;
            exit(BIND_ERROR);
        }
        cout << "bind success" << endl;
        return true;
    }
    virtual void Close() override
    {
        if (_sockfd == glistensockfd)
            return;
        close(_sockfd);
    }
    virtual int Recv(string *out_str)
    {
        char buffer[4096];
        socklen_t len = sizeof(_peer);
        int sz = recvfrom(_sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&_peer, &len);
        if (sz > 0)
        {
            buffer[sz] = 0;
            *out_str = buffer;
        }
        return sz;
    }
    virtual int Send(const string &in_str) override
    {
        int sz = sendto(_sockfd, in_str.c_str(), in_str.size(), 0, (struct sockaddr *)&_peer, sizeof(_peer));
        return sz;
    }
    virtual int Fd() override
    {
        return _sockfd;
    }

private:
    int _sockfd;
    struct sockaddr_in _peer;
};

InetAddr.hpp

#pragma once
#include <string>
#include <iostream>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
using namespace std;
class InetAddr
{
public:
    InetAddr()
    {
        
    }
    InetAddr(int port, string ip = "")
        : _port(port), _ip(ip)
    {
        bzero(&_sockaddr, sizeof(_sockaddr));
        _sockaddr.sin_family = AF_INET;
        _sockaddr.sin_port = htons(_port);
        if (_ip.empty())
            _sockaddr.sin_addr.s_addr = INADDR_ANY;
        else
            _sockaddr.sin_addr.s_addr = inet_addr(_ip.c_str());
    }
    InetAddr(const struct sockaddr_in &sockaddr)
    {
        _port = ntohs(sockaddr.sin_port);
        char buf[64];
        _ip = inet_ntop(AF_INET, &sockaddr.sin_addr, buf, sizeof(buf));
    }
    bool operator==(const InetAddr &other)
    {
        return _ip == other._ip;
    }
    InetAddr operator=(const InetAddr &other)
    {
        _ip = other._ip;
        _port = other._port;
        _sockaddr = other._sockaddr;
        return *this;
    }
    struct sockaddr *getSockaddr()
    {
        return (struct sockaddr *)&_sockaddr;
    }
    int getSockaddrLen()
    {
        return sizeof(_sockaddr);
    }
    const string &getIp()
    {
        return _ip;
    }
    int getPort()
    {
        return _port;
    }
    void SetAddr(const struct sockaddr_in &client)
    {
        _sockaddr = client;
        _port = ntohs(client.sin_port);
        char buf[64];
        _ip = inet_ntop(AF_INET, &client.sin_addr, buf, sizeof(buf));
    }
    
private:
    string _ip;
    int _port;
    struct sockaddr_in _sockaddr;
};

Common.hpp

#pragma once
#include <fcntl.h>
void SetNonBlock(int fd)
{
    int flag = fcntl(fd, F_GETFL);
    if (flag < 0)
    {
        std::cout << "SetNonBlock error" << std::endl;
        return;
    }
    fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}

enum
{
    SOCKET_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
    ACCEPT_ERROR,
    CONNECT_ERROR,
    EPOLL_CREATE_ERROR
};

Server.cc

#include "Reactor.hpp"
#include "Listener.hpp"
#include <string>
#include <memory>
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cout << "Usage: " << argv[0] << " port" << std::endl;
        return 1;
    }
    int port = stoi(argv[1]);
    Reactor reactor;
    auto conn = make_shared<Listener>(port);
    reactor.InsertConnection(conn);
    reactor.Loop();
    return 0;
}

Client.cc

#include <iostream>
#include <string>
#include <unistd.h>
#include "Socket.hpp"
#include <cstring>
#include "Common.hpp"
#include "Protocol.hpp"
#include "Reactor.hpp"
//./client server_ip server_port
int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        cout << "Usage:./client server_ip server_port" << endl;
        return 0;
    }
    string server_ip = argv[1];
    int server_port = stoi(argv[2]);
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        cout << "socket create error" << endl;
        exit(SOCKET_ERROR);
    }
    // 填写网络信息
    struct sockaddr_in server_addr;
    bzero(&server_addr, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(server_port);
    server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());
    // client 无需显示bind,connect连接时自动bind
    int n = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
    if (n < 0)
    {
        cout << "connect error" << endl;
        exit(CONNECT_ERROR);
    }
    cout << "connect success,sockfd:" << sockfd << endl;
    string message;
    while (true)
    {
        int x, y;
        char op;
        cout << "input x: ";
        cin >> x;
        cout << "input y: ";
        cin >> y;
        cout << "input op: ";
        cin >> op;
        Request req(x, y, op);
        req.Serialize(message); // 序列化
        Encode(message);        // 添加协议
        req.Print();
        n = send(sockfd, message.c_str(), message.size(), 0);
        cout<<"send success"<<endl;
        if (n > 0)
        {
            char buffer[1024];
            int m = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
            if (m > 0)
            {
                cout<<"recv success"<<endl;
                buffer[m] = 0;
                string package = buffer;
                string content;
                Decode(package, &content); // 去报头提取内容
                Response res;              // 反序列化
                res.Deserialize(content);
                cout << res.Result() << "[" << res.Code() << "]" << endl;
            }
            else
                break;
        }
        else
            break;
    }
    close(sockfd);
    return 0;
}

makefile

all: server client
server:Server.cc
	g++ -o $@ $^ -std=c++17 -ljsoncpp
client:Client.cc
	g++ -o $@ $^ -std=c++17 -ljsoncpp
.PHONY:clean
clean:
	rm -f server client