Reactor模式

发布于:2025-09-11 ⋅ 阅读:(20) ⋅ 点赞:(0)

目录

Reactor模式简介

Reactor的实现

epoll ET服务器实现(Reactor模式)

Accepter函数

Sender回调

Recver回调

Excepter回调

套接字相关

运行epoll ET服务器

与对端主机约定的协议

接入线程池


Reactor模式简介

Reactor模式叫反应器模式也可以叫做分发者模式或通知者模式,大多数IO相关组件如redis也在使用该模式。 reactor模式原理是I/O 多路复用监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。

Reactor的实现

Reactor 模式主要由 Reactor(分发) 和处理资源池这两个核心部分组成,它俩负责的事情如下:
Reactor 负责监听和分发事件,事件类型包含连接事件、读写事件;
处理资源池负责处理事件,如 recv -> 业务逻辑 -> send;
剩下的 3 个方案都是比较经典的,且都有应用在实际的项目中:
单 Reactor 单进程 / 线程;
单 Reactor 多线程 / 进程;
多 Reactor 多进程 / 线程;

单 Reactor 单进程 / 线程

可以看到进程里有 Reactor、Acceptor、Handler 这三个对象:
Reactor 的作用是监听和分发事件;
Acceptor 的作用是获取连接;
Handler 的作用是处理业务;

单 Reactor 单进程 / 线程的实现

epoll ET服务器实现(Reactor模式)

如果在此之前没有了解过Reactor模式,相信在看了Reactor模式的工作流程后一定是一头雾水,下面我们实现一个Reactor模式下的epoll ET服务器,来感受一下Reactor模式。

在epoll ET服务器中,我们需要处理如下几种事件:

  • 读事件:如果是监听套接字的读事件就绪则调用accept函数获取底层的连接,如果是其他套接字的读事件就绪则调用recv函数读取客户端发来的数据。
  • 写事件:写事件就绪则将待发送的数据写入到发送缓冲区当中。
  • 异常事件:当某个套接字的异常事件就绪时我们不做过多处理,直接关闭该套接字。

当epoll ET服务器监测到某一事件就绪后,就会将该事件交给对应的服务处理程序进行处理。

Reactor模式的五个角色

在这个epoll ET服务器中,Reactor模式中的五个角色对应如下:

  • 句柄:文件描述符。
  • 同步事件分离器:I/O多路复用epoll。
  • 事件处理器:包括读回调、写回调和异常回调。
  • 具体事件处理器:读回调、写回调和异常回调的具体实现。
  • 事件分发器:Reactor类当中的Dispatcher函数。

Dispatcher函数要做的就是调用epoll_wait函数等待事件发生,当有事件发生后就将就绪的事件派发给对应的服务处理程序即可。

公共代码

class Connection;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
const static int g_buffer_size = 128;

using func_t = std::function<void(std::weak_ptr<Connection>)>;
using except_func = std::function<void(std::weak_ptr<Connection>)>;

Connection类

  • 在Reactor的工作流程中说到,在注册事件处理器时需要将其与Handle关联,本质上就是需要将读回调、写回调和异常回调与某个文件描述符关联起来。
  • 这样做的目的就是为了当某个文件描述符上的事件就绪时可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。

所以我们可以设计一个EventItem类,该类当中的成员就包括一个文件描述符,以及该文件描述符对应的各种回调函数,此外还有一些其他成员,后面实现的时候再做详细论述。

class Connection
{
public:
    Connection(int sock) : _sock(sock)
    {
    }
    void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
    int SockFd() { return _sock; }
    void AppendInBuffer(const std::string &info)
    {
        _inbuffer += info;
    }
    void AppendOutBuffer(const std::string &info)
    {
        _outbuffer += info;
    }
    std::string &Inbuffer() // for debug
    {
        return _inbuffer;
    }
    std::string &OutBuffer()
    {
        return _outbuffer;
    }
    void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr)
    {
        _tcp_server_ptr = tcp_server_ptr;
    }
    ~Connection()
    {
    }

private:
    int _sock;
    std::string _inbuffer; // string 二进制流,vector
    std::string _outbuffer;

public:
    func_t _recv_cb;
    func_t _send_cb;
    except_func _except_cb;

    // 添加一个回指指针
    std::weak_ptr<TcpServer> _tcp_server_ptr; // std::weak_ptr<> // bug??
    // std::shared_ptr<TcpServer> _tcp_server_ptr; // std::weak_ptr<>

    std::string _ip;
    uint16_t _port;
};

Epoller类

向Dispatcher当中增加,修改和删除事件的功能我们可以封装在Epoller函数中

让他继承nocopy,禁止他进行拷贝

#pragma once

#include "nocopy.hpp"
#include "Log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>

class Epoller : public nocopy
{
    static const int size = 128;

public:
    Epoller()
    {
        _epfd = epoll_create(size);
        if (_epfd == -1)
        {
            lg(Error, "epoll_create error: %s", strerror(errno));
        }
        else
        {
            lg(Info, "epoll_create success: %d", _epfd);
        }
    }
    int EpollerWait(struct epoll_event revents[], int num, int timeout)
    {
        int n = epoll_wait(_epfd, revents, num, timeout);
        return n;
    }
    int EpllerUpdate(int oper, int sock, uint32_t event)
    {
        int n = 0;
        if (oper == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epfd, oper, sock, nullptr);
            if (n != 0)
            {
                lg(Error, "epoll_ctl delete error! sockfd: %d", sock);
            }
        }
        else
        {
            // EPOLL_CTL_MOD || EPOLL_CTL_ADD
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock; // 目前,方便我们后期得知,是哪一个fd就绪了!

            n = epoll_ctl(_epfd, oper, sock, &ev);
            if (n != 0)
            {
                lg(Error, "epoll_ctl error!");
            }
        }
        return n;
    }
    ~Epoller()
    {
        if (_epfd >= 0)
            close(_epfd);
    }

private:
    int _epfd;
    int _timeout{3000};
};
  • 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for循环内部进行事件处理。
  • 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不会进入到for循环内部进行事件处理。
  • 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数对事件进行处理。

nocopy类:

#pragma once

class nocopy
{
public:
    nocopy(){}
    nocopy(const nocopy &) = delete;
    const nocopy&operator=(const nocopy &) = delete;
};

TcpServer类

  • 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
  • 当事件就绪后需要根据就绪的文件描述符来找到其对应的各种回调函数,由于我们会将每个文件描述符及其对应的各种回调都封装到一个EventItem结构当中,所以实际我们就是需要根据文件描述符找到其对应的EventItem结构。
  • 我们可以使用C++ STL当中的unordered_map,来建立各个文件描述符与其对应的Connection结构之间的映射,这个unordered_map可以作为TcpServer类的一个成员变量,当需要找某个文件描述符的Connection结构时就可以通过该成员变量找到。
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{
    static const int num = 64;

public:
    TcpServer(uint16_t port, func_t OnMessage)
        : _port(port),
          _OnMessage(OnMessage),
          _quit(true),
          _epoller_ptr(new Epoller()),
          _listensock_ptr(new Sock())
    {
    }

     void Init()
    {
        _listensock_ptr->Socket();
        SetNonBlockOrDie(_listensock_ptr->Fd());
        _listensock_ptr->Bind(_port);
        _listensock_ptr->Listen();
        lg(Info, "create listen socket success: %d", _listensock_ptr->Fd());
        //往链接队列中放入链接,是不是一种写事件呢!
        AddConnection(_listensock_ptr->Fd(),
                      EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
       }

    void PrintConnection()
    {
        std::cout << "_connections fd list: ";
        for (auto &connection : _connections)
        {
            std::cout << connection.second->SockFd() << ", ";
            std::cout << "inbuffer: " << connection.second->Inbuffer().c_str();
        }
        std::cout << std::endl;
    }
    ~TcpServer()
    {
    }

private:
    std::shared_ptr<Epoller> _epoller_ptr; // 内核
    std::shared_ptr<Sock> _listensock_ptr; // 监听socket, 可以把他移除到外部
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event revs[num];
    uint16_t _port;
    bool _quit;
    // 让上层处理信息
    func_t _OnMessage;
};

注意:enable_shared_from_this<T>是一个工具模板类:使用shared_from_this()接口可以返回当前对象的this对应的shared_ptr(就是this版本的智能指针)

Dispatcher函数(事件分派器)

Reactor类当中的Dispatcher函数就是之前所说的初始分发器,这里我们更形象的将其称之为事件分派器。

  • 事件分派器要做的就是调用epoll_wait函数等待事件发生。
  • 当某个文件描述符上的事件发生后,先通过unordered_map找到该文件描述符对应的Connection结构,然后调用Connection结构当中对应的回调函数对该事件进行处理即可。

代码如下:

//事件分发器
    void Dispatcher(int timeout)
    {   
        //n返回事件就绪的个数
        int n = _epoller_ptr->EpollerWait(revs, num, timeout);
        for (int i = 0; i < n; i++)
        {
            //就绪的事件
            uint32_t events = revs[i].events;
            //事件对应的文件描述符
            int sock = revs[i].data.fd;
            // 统一把事件异常转换成为读写问题
            // if (events & EPOLLERR)
            //     events |= (EPOLLIN | EPOLLOUT);
            // if (events & EPOLLHUP)
            //     events |= (EPOLLIN | EPOLLOUT);

            // 只需要处理EPOLLIN EPOLLOUT
            // 判断事件是读事件还是写事件,以及对应的描述符是否存在
            //存在则去调用对应的回调函数
            if ((events & EPOLLIN) && IsConnectionSafe(sock))
            {
                if (_connections[sock]->_recv_cb)
                    _connections[sock]->_recv_cb(_connections[sock]);
            }
            if ((events & EPOLLOUT) && IsConnectionSafe(sock))
            {
                if (_connections[sock]->_send_cb)
                    _connections[sock]->_send_cb(_connections[sock]);
            }
        }
    }

回调函数

下面我们就可以实现一些回调函数,这里主要实现四个回调函数。

Accepter:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
Recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
Sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
Excepter:当异常事件就绪时可以调用该函数将对应的文件描述符进行关闭。

AddConnection函数

创建新链接,维护文件描述符和Connection的关系

void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, except_func except_cb,
                       const std::string &ip = "0.0.0.0", uint16_t port = 0)
    {
        // 1. 给sock也建立一个connection对象,将listensock添加到Connection中,同时,listensock和Connecion放入_connections
        // std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this));

        std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock);
        // std::shared_ptr<Connection> new_connection(new Connection(sock));
        new_connection->SetWeakPtr(shared_from_this()); // shared_from_this(): 返回当前对象的shared_ptr
        new_connection->SetHandler(recv_cb, send_cb, except_cb); //三个函数都需要设置,因为就绪的事件可能是其中一个
        new_connection->_ip = ip;
        new_connection->_port = port;
        // // 2. 添加到unordered_map
        _connections.insert(std::make_pair(sock, new_connection));
        // // 3. 我们添加对应的事件,除了要加到内核中,fd, event
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sock, event);

        // lg(Debug, "add a new connection success, sockfd is : %d", sock);
    }
Accepter函数
// 链接管理器
    void Accepter(std::weak_ptr<Connection> conn)
    {
        auto connection = conn.lock();
        while (true)
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len);
            if (sock > 0)
            {
                uint16_t peerport = ntohs(peer.sin_port);
                char ipbuf[128];
                inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));
                lg(Debug, "get a new client, get info-> [%s:%d], sockfd : %d", ipbuf, peerport, sock);

                SetNonBlockOrDie(sock);
                // listensock只需要设置_recv_cb, 而其他sock,读,写,异常
                AddConnection(sock, EVENT_IN,
                              std::bind(&TcpServer::Recver, this, std::placeholders::_1),
                              std::bind(&TcpServer::Sender, this, std::placeholders::_1),
                              std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
                              ipbuf, peerport); // TODO
            }
            else
            {
                if (errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                    break;
            }
        }
    }

tips:

  • 新接收的客户端套接字(sock)执行非阻塞化处理SetNonBlockOrDie):确保后续对该套接字的读写操作不会阻塞服务器主线程(因为是单线程服务器,就算是多线程服务器,多个sock阻塞也会出问题)

设置文件描述符为非阻塞

#include <unistd.h>
#include <fcntl.h>

void SetNonBlockOrDie(int sock)
{
    int fl = fcntl(sock, F_GETFL);
    if (fl < 0)
        exit(NON_BLOCK_ERR);
    fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}

监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数会以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。

  • 如果错误码为EAGAIN或EWOULDBLOCK,说明本次出错返回是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次accepter调用成功。
  • 如果错误码为EINTR,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。
  • 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次accepter调用失败。

accept、recv和send等IO系统调用为什么会被信号中断?

IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前被信号中断了,也就是说IO系统调用在陷入内核,但并没有返回用户态的时候内核跑去处理其他信号了。

  • 在内核态返回用户态之前会检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,那么内核就会对该信号进行处理。
  • 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上是一个特例,因为IO过程分为“等”和“拷贝”两个步骤,而一般“等”的过程比较漫长,而在这个过程中我们的执行流其实是处于闲置的状态的,因此在“等”的过程中如果有信号产生,内核就会立即进行信号的处理。
Sender回调

写事件是按需打开的

  • epoll/select/poll,因为写事件(发送缓冲区是否有空间,经常是ok的),经常是就绪的
  • 如果我们设置对EPOLLOUT关心,EPOLLOUT几乎每次都有就绪
  • 这样就会导致epollserver经常返回,浪费CPU资源
  • 对于读,设置常关心。对于写,按需设置

具体操作:直接写入,如果写入完成,就结束。如果写入完成,但是数据没有写完,outbuffer里面还要内容,我们就需要设置对写事件进行关心,如果写完了,就去掉对写事件的关心

void Sender(std::weak_ptr<Connection> conn) //
    {
        if(conn.expired()) return;
        auto connection = conn.lock();
        auto &outbuffer = connection->OutBuffer();
        while(true)
        {
            ssize_t n = send(connection->SockFd(), outbuffer.c_str(), outbuffer.size(), 0);
            if(n > 0)
            {
                outbuffer.erase(0, n);
                if(outbuffer.empty()) break;
            }
            else if(n == 0)
            {
                return;
            }
            else
            {
                if(errno == EWOULDBLOCK) break;
                else if(errno == EINTR) continue;
                else{
                    lg(Warning, "sockfd: %d, client info %s:%d send error...", connection->SockFd(), connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        //是否需要继续关心,如果outbuffer里面还有数据,则继续关心,若是没有数据,则取消关心
        if(!outbuffer.empty())
        {
            // 开启对写事件的关心
            EnableEvent(connection->SockFd(), true, true);
        }
        else
        {
            // 关闭对写事件的关心
            EnableEvent(connection->SockFd(), true, false);
        }
    }

EnableEvent

    void EnableEvent(int sock, bool readable, bool writeable)
    {
        uint32_t events = 0;
        events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_MOD, sock, events);
    }
Recver回调
 // 事件管理器
    // 应不应该关心数据的格式???不应该!!服务器只要IO数据就可以,有没有读完,报文的格式细节,你不用管。
    void Recver(std::weak_ptr<Connection> conn) //处理读事件
    {
        if(conn.expired()) return; //判断weak_ptr是否过期,是否已经被销毁
        auto connection = conn.lock();  //返回一个shared_ptr的对象
        // std::cout << "haha, got you!!!!, sockfd: " << connection->SockFd() << std::endl;
        int sock = connection->SockFd();
        while (true)
        {
            char buffer[g_buffer_size];
            memset(buffer, 0, sizeof(buffer));
            ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
            if (n > 0)
            {
                connection->AppendInBuffer(buffer);
            }
            else if (n == 0)
            {
                lg(Info, "sockfd: %d, client info %s:%d quit...", sock, connection->_ip.c_str(), connection->_port);
                connection->_except_cb(connection);
                return;
            }
            else
            {
                if (errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                {
                    lg(Warning, "sockfd: %d, client info %s:%d recv error...", sock, connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }
        // 数据有了,但是不一定全,1. 检测 2. 如果有完整报文,就处理
        _OnMessage(connection); // 你读到的sock所有的数据connection
    }
Excepter回调

Excepter回调用于处理异常事件。

    void Excepter(std::weak_ptr<Connection> connection)
    {
        if(connection.expired()) return;
        auto conn = connection.lock();

        int fd = conn->SockFd();
        lg(Warning, "Excepter hander sockfd: %d, client info %s:%d excepter handler",
           conn->SockFd(), conn->_ip.c_str(), conn->_port);
        // 1. 移除对特定fd的关心
        // EnableEvent(connection->SockFd(), false, false);
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
        // 2. 关闭异常的文件描述符
        lg(Debug, "close %d done...\n", fd);
        close(fd);
        // 3. 从unordered_map中移除
        lg(Debug, "remove %d from _connections...\n", fd);
        // TODO bug
        // auto iter = _connections.find(fd);
        // if(iter == _connections.end()) return;
        // _connections.erase(iter);
        // _connections[fd].reset();
        _connections.erase(fd);
    }

套接字相关

这里可以编写一个Socket类,对套接字相关的接口进行一定程度的封装,为了让外部能够直接调用Socket类当中封装的函数

代码如下:

enum
{
    SocketErr = 2,
    BindErr,
    ListenErr,
    NON_BLOCK_ERR
};

// TODO
const int backlog = 10;

class Sock
{
public:
    Sock()
    {
    }
    ~Sock()
    {
    }

public:
    void Socket()
    {
        sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd_ < 0)
        {
            lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
            exit(SocketErr);
        }

        //端口复用,不用进行time_wait等待了
        int opt = 1;
        setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    }
    void Bind(uint16_t port)
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;

        if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
            exit(BindErr);
        }
    }
    void Listen()
    {
        if (listen(sockfd_, backlog) < 0)
        {
            lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
            exit(ListenErr);
        }
    }

    int Accept(std::string *clientip, uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
        if(newfd < 0)
        {
            lg(Warning, "accept error, %s: %d", strerror(errno), errno);
            return -1;
        }
        char ipstr[64];
        inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
        *clientip = ipstr;
        *clientport = ntohs(peer.sin_port);

        return newfd;
    }
    
    bool Connect(const std::string &ip, const uint16_t &port)
    {
        struct sockaddr_in peer;
        memset(&peer, 0, sizeof(peer));
        peer.sin_family = AF_INET;
        peer.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));

        int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));
        if(n == -1) 
        {
            std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
            return false;
        }
        return true;
    }
    void Close()
    {
        close(sockfd_);
    }
    int Fd()
    {
        return sockfd_;
    }

private:
    int sockfd_;
};

运行epoll ET服务器


Calculator calculator;

// for debug
void DefaultOnMessage(std::weak_ptr<Connection> conn)
{
    if(conn.expired()) return;
    auto connection_ptr = conn.lock();
    // 对报文进行处理,有bug
    std::cout << "上层得到了数据: " << connection_ptr->Inbuffer() << std::endl;
    std::string response_str = calculator.Handler(connection_ptr->Inbuffer()); // 我们的业务逻辑比较简单,没有特别耗时的操作
    if(response_str.empty()) return;
    lg(Debug, "%s", response_str.c_str());
    // response_str 发送出去
    connection_ptr->AppendOutBuffer(response_str);
    // 正确的理解发送?
    // connection_ptr->_send_cb(connection_ptr);
    
    auto tcpserver = connection_ptr->_tcp_server_ptr.lock();
    tcpserver->Sender(connection_ptr);
}

int main()
{   
    std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8888, DefaultOnMessage));
    epoll_svr->Init();
    epoll_svr->Loop();

    // Epoller ep;
    // // Epoller ep1 = ep;

    return 0;
}

Handler函数

调用函数Handler来处理对接收缓冲区中的数据

enum
{
    Div_Zero = 1,
    Mod_Zero,
    Other_Oper
};

// 上层业务
class Calculator
{
public:
    Calculator()
    {
    }
    Response CalculatorHelper(const Request &req)
    {
        Response resp(0, 0);
        switch (req.op)
        {
        case '+':
            resp.result = req.x + req.y;
            break;
        case '-':
            resp.result = req.x - req.y;
            break;
        case '*':
            resp.result = req.x * req.y;
            break;
        case '/':
        {
            if (req.y == 0)
                resp.code = Div_Zero;
            else
                resp.result = req.x / req.y;
        }
        break;
        case '%':
        {
            if (req.y == 0)
                resp.code = Mod_Zero;
            else
                resp.result = req.x % req.y;
        }
        break;
        default:
            resp.code = Other_Oper;
            break;
        }

        return resp;
    }
    // "len"\n"10 + 20"\n
    std::string Handler(std::string &package)
    {
        std::string content;
        bool r = Decode(package, &content); // "len"\n"10 + 20"\n
        if (!r)
            return "";
        // "10 + 20"
        Request req;
        r = req.Deserialize(content); // "10 + 20" ->x=10 op=+ y=20
        if (!r)
            return "";

        content = "";                          //
        Response resp = CalculatorHelper(req); // result=30 code=0;

        resp.Serialize(&content);  // "30 0"
        content = Encode(content); // "len"\n"30 0"

        return content;
    }
    ~Calculator()
    {
    }
};

与对端主机约定的协议

Protocol协议

const std::string blank_space_sep = " ";
const std::string protocol_sep = "\n";

std::string Encode(std::string &content)
{
    std::string package = std::to_string(content.size());
    package += protocol_sep;
    package += content;
    package += protocol_sep;

    return package;
}

// "len"\n"x op y"\nXXXXXX
// "protocolnumber"\n"len"\n"x op y"\nXXXXXX
bool Decode(std::string &package, std::string *content)
{
    std::size_t pos = package.find(protocol_sep);
    if(pos == std::string::npos) return false;
    std::string len_str = package.substr(0, pos);
    std::size_t len = std::stoi(len_str);
    // package = len_str + content_str + 2
    std::size_t total_len = len_str.size() + len + 2;
    if(package.size() < total_len) return false;

    *content = package.substr(pos+1, len);
    // earse 移除报文 package.erase(0, total_len);
    package.erase(0, total_len);

    return true;
}


// json, protobuf
class Request
{
public:
    Request(int data1, int data2, char oper) : x(data1), y(data2), op(oper)
    {
    }
    Request()
    {}
public:
    bool Serialize(std::string *out)
    {
#ifdef MySelf
        // 构建报文的有效载荷
        // struct => string, "x op y"
        std::string s = std::to_string(x);
        s += blank_space_sep;
        s += op;
        s += blank_space_sep;
        s += std::to_string(y);
        *out = s;
        return true;
#else
        Json::Value root;
        root["x"] = x;
        root["y"] = y;
        root["op"] = op;
        // Json::FastWriter w;
        Json::StyledWriter w;
        *out = w.write(root);
        return true;
#endif
    }
    bool Deserialize(const std::string &in) // "x op y"
    {
#ifdef MySelf
        std::size_t left = in.find(blank_space_sep);
        if (left == std::string::npos)
            return false;
        std::string part_x = in.substr(0, left);

        std::size_t right = in.rfind(blank_space_sep);
        if (right == std::string::npos)
            return false;
        std::string part_y = in.substr(right + 1);

        if (left + 2 != right)
            return false;
        op = in[left + 1];
        x = std::stoi(part_x);
        y = std::stoi(part_y);
        return true;
#else
        Json::Value root;
        Json::Reader r;
        r.parse(in, root);

        x = root["x"].asInt();
        y = root["y"].asInt();
        op = root["op"].asInt();
        return true;
#endif
    }
    void DebugPrint()
    {
        std::cout << "新请求构建完成:  " << x << op << y << "=?" << std::endl;
    }
public:
    // x op y
    int x;
    int y;
    char op; // + - * / %
};

class Response
{
public:
    Response(int res, int c) : result(res), code(c)
    {
    }

    Response()
    {}
public:
    bool Serialize(std::string *out)
    {
#ifdef MySelf
        // "result code"
        // 构建报文的有效载荷
        std::string s = std::to_string(result);
        s += blank_space_sep;
        s += std::to_string(code);
        *out = s;
        return true;
#else
        Json::Value root;
        root["result"] = result;
        root["code"] = code;
        // Json::FastWriter w;
        Json::StyledWriter w;
        *out = w.write(root);
        return true;
#endif
    }
    bool Deserialize(const std::string &in) // "result code"
    {
#ifdef MySelf
        std::size_t pos = in.find(blank_space_sep);
        if (pos == std::string::npos)
            return false;
        std::string part_left = in.substr(0, pos);
        std::string part_right = in.substr(pos+1);

        result = std::stoi(part_left);
        code = std::stoi(part_right);

        return true;
#else
        Json::Value root;
        Json::Reader r;
        r.parse(in, root);

        result = root["result"].asInt();
        code = root["code"].asInt();
        return true;
#endif

    }
    void DebugPrint()
    {
        std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl;
    }
public:
    int result;
    int code; // 0,可信,否则!0具体是几,表明对应的错误原因
};

接入线程池

单进程epoll服务器存在的问题

因为当前的epoll服务器的业务处理逻辑比较简单,所以单进程的epoll服务器看起来没什么压力,但如果服务器的业务处理逻辑比较复杂,那么某些客户端发来的数据请求就可能长时间得不到响应,因为这时epoll服务器需要花费大量时间进行业务处理,而在这个过程中服务器无法为其他客户端提供服务。

解决思路

可以在当前服务器的基础上接入线程池,将任务放到线程池的任务队列中,然后服务器就可以继续进行事件派发,而不需要将时间耗费到业务处理上面,而放到任务队列当中的任务,则由线程池当中的若干个线程进行处理。

接入线程池

线程池的代码如下:

// 线程池 懒汉模式
template <class T>
class SignaltionThreadPool
{
public:
    void Lock()
    {
        pthread_mutex_lock(&mtx);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&mtx);
    }

    void Wait()
    {
        pthread_cond_wait(&cond, &mtx);
    }

    void WakeUp()
    {
        pthread_cond_signal(&cond);
    }

    // 判断任务队列是否为空
    bool IsEmptyTask()
    {
        return TaskQueue.size() == 0;
    }

public:
    static void *ThreadTask(void *args)
    {
        pthread_detach(pthread_self());
        SignaltionThreadPool<T> *TP = static_cast<SignaltionThreadPool<T> *>(args);

        while (true)
        {
            usleep(500);

            std::string ThreadName = TP->GetThreadName(pthread_self());
            // std::cout << ThreadName << " " << std::endl;

            TP->Lock();
            while (TP->IsEmptyTask())
            {
                TP->Wait();
            }

            T t = TP->pop();
            TP->UnLock();
            t();

            // t.GetRet() 的返回类型是 void,但你试图把它当作一个 “可输出的值”
            // 传给 std::cout,而 C++ 不允许直接输出 void 类型(因为 void 表示 “无类型 / 无返回值”,没有实际数据可以输出)。
            // 不要将返回值为void的函数给operator<<进行输出,没有实际可以输出的类型
            //  std::cout << ThreadName << " " << t.GetRet() << std::endl; err
            std::cout << ThreadName << " ";
            t.GetRet();
        }
    }

      // 懒汉模式
    static ThreadPool<T> *SignaltionInstance()
    {
        if (_inst == nullptr) // 双重判断语句,减少一个线程持有锁,而其他线程一直申请锁的消耗
        {
            pthread_mutex_lock(&Imtx);
            if (_inst == nullptr)
            {
                _inst = new SignaltionThreadPool<T>;
                std::cout << "success create signal" << std::endl;
            }
            pthread_mutex_unlock(&Imtx);
        }
        return _inst;
    }

public:
    void ThreadStart()
    {
        int n = _threads.size();

        for (int i = 0; i < defaultnum; i++)
        {
            pthread_create(&_threads[i].tids, nullptr, ThreadTask, this);
            // pthread_create(&_threads[i].tids, nullptr, Test, nullptr);
            _threads[i].threadname = "thread-" + std::to_string(i);
        }
        // std::cout << "threadpool start" << std::endl;
    }

    std::string GetThreadName(pthread_t tid)
    {
        for (const auto &e : _threads)
        {
            if (tid == e.tids)
            {
                return e.threadname;
            }
        }

        return "none";
    }

    T pop()
    {
        // 在线程中已经加过锁了,所以访问pop时一定是单线程访问
        T t = TaskQueue.front();
        TaskQueue.pop();
        return t;
    }

    void push(const T &task)
    {
        Lock();
        TaskQueue.push(task);
        // 往队列中放了数据,就说明队列是有数据的,这时我们就可以唤醒线程
        WakeUp();
        UnLock();
    }

private:
    SignaltionThreadPool(int num = defaultnum)
        : _threads(num) // 初始化线程个数
    {
        // 初始化锁和条件变量
        pthread_mutex_init(&mtx, nullptr);
        pthread_cond_init(&cond, nullptr);
    }

    ~SignaltionThreadPool()
    {
        pthread_mutex_destroy(&mtx);
        pthread_cond_destroy(&cond);
    }

    // 单例模式
    SignaltionThreadPool(const SignaltionThreadPool<T> &) = delete;
    SignaltionThreadPool<T> &operator=(const SignaltionThreadPool<T> &) = delete;

private:
    // 存放所有进程的信息
    std::vector<ThreadInfo> _threads;
    // 阻塞队列
    std::queue<T> TaskQueue;

    pthread_mutex_t mtx;
    pthread_cond_t cond;

    static SignaltionThreadPool<T> *_inst;
    static pthread_mutex_t Imtx;
};

// 给模型类进行实例化需要加上模板 在变量名显示的使用类名加域作用限定符
template <class T>
SignaltionThreadPool<T> *SignaltionThreadPool<T>::_inst = nullptr;

template <class T>
pthread_mutex_t SignaltionThreadPool<T>::Imtx = PTHREAD_MUTEX_INITIALIZER;

//线程池
template <class T>
class ThreadPool
{
public:
    void Lock()
    {
        pthread_mutex_lock(&mtx);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&mtx);
    }

    void Wait()
    {
        pthread_cond_wait(&cond, &mtx);
    }

    void WakeUp()
    {
        pthread_cond_signal(&cond);
    }

    // 判断任务队列是否为空
    bool IsEmptyTask()
    {
        return TaskQueue.size() == 0;
    }

public:
    static void *ThreadTask(void *args)
    {
        pthread_detach(pthread_self());
        // ThreadPool<T> *TP = static_cast<ThreadPool<T> *>(args);

        while (true)
        {
            usleep(500);

            std::string ThreadName = _inst->GetThreadName(pthread_self());
            // std::cout << ThreadName << " " << std::endl;

            _inst->Lock();
            while (_inst->IsEmptyTask())
            {
                _inst->Wait();
            }

            T t = _inst->pop();
            _inst->UnLock();
            t();

            // t.GetRet() 的返回类型是 void,但你试图把它当作一个 “可输出的值”
            // 传给 std::cout,而 C++ 不允许直接输出 void 类型(因为 void 表示 “无类型 / 无返回值”,没有实际数据可以输出)。
            // 不要将返回值为void的函数给operator<<进行输出,没有实际可以输出的类型
            //  std::cout << ThreadName << " " << t.GetRet() << std::endl; err
            std::cout << ThreadName << " ";
            t.GetRet();
        }
    }

public:
    void ThreadStart()
    {
        int n = _threads.size();

        for (int i = 0; i < defaultnum; i++)
        {
            pthread_create(&_threads[i].tids, nullptr, ThreadTask, this);
            // pthread_create(&_threads[i].tids, nullptr, Test, nullptr);
            _threads[i].threadname = "thread-" + std::to_string(i);
        }
        // std::cout << "threadpool start" << std::endl;
    }

    std::string GetThreadName(pthread_t tid)
    {
        for (const auto &e : _threads)
        {
            if (tid == e.tids)
            {
                return e.threadname;
            }
        }

        return "none";
    }

    T pop()
    {
        // 在线程中已经加过锁了,所以访问pop时一定是单线程访问
        T t = TaskQueue.front();
        TaskQueue.pop();
        return t;
    }

    void push(const T &task)
    {
        Lock();
        TaskQueue.push(task);
        // 往队列中放了数据,就说明队列是有数据的,这时我们就可以唤醒线程
        WakeUp();
        UnLock();
    }

    ThreadPool(int num = defaultnum)
        : _threads(num) // 初始化线程个数
    {
        // 初始化锁和条件变量
        pthread_mutex_init(&mtx, nullptr);
        pthread_cond_init(&cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&mtx);
        pthread_cond_destroy(&cond);
    }

private:
    // 存放所有进程的信息
    std::vector<ThreadInfo> _threads;
    // 阻塞队列
    std::queue<T> TaskQueue;

    pthread_mutex_t mtx;
    pthread_cond_t cond;
};

设计任务类

enum {
    EXITCODE = -1,
    DIVERR,
    MODERR
};

class task
{
public:
    task(int _x, int _y, char _oper, int exitcode = EXITCODE)
        :x(_x)
        , y(_y)
        , oper(_oper)
    {}

    void run()
    {
        switch (oper)
        {
        case '+':
            result = x + y;
            break;
        case '-':
            result = x - y;
            break;
        case '*':
            result = x * y;
            break;
        case '/':
            if(x == 0 && y == 0)
            {
                exitcode = DIVERR;
            }
            result = x / y;
            break;
        case '%':
            if(x == 0 && y == 0)
            {
                exitcode = MODERR;
            }
            result = x % y;
            break;
        default:
            std::cout << "calculation error" << std::endl;
            break;
        }
    }

    void operator()()
    {
        run();
        // GetRet();
    }

    void GetRet()
    {
        std::cout << x << oper << y << "=" << result << " exitcode: " << exitcode << std::endl;
    }
private:
    int x;
    int y;
    char oper;

    int result;
    int exitcode;
};

网站公告

今日签到

点亮在社区的每一天
去签到