多路转接之epoll 【接口】【细节问题】【LT与ET模式】【Reactor】

发布于:2025-08-14 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

一.接口

1.1epoll_creaet 

1.2epoll_ctl

1.3epoll_wait

 二.细节问题

2.1 工作原理

2.2 epoll的demo

2.3 epoll的优点

三. LT 与 ET模式

理解ET

四. reactor


一.接口

1.1epoll_creaet 

注意返回值是一个文件描述符 

 创建一个epoll模型

1.2epoll_ctl

返回值: 

 

第一个参数是epoll_create的返回值

第二个参数表示动作,用三个宏来表示.

第三个参数是需要监听的 fd.

第四个参数是告诉内核需要监听什么事.

第二个参数的取值:

• EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;

• EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件;

• EPOLL_CTL_DEL:从 epfd 中删除一个 fd;  

第四个参数:

• EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);

• EPOLLOUT : 表示对应的文件描述符可以写;

• EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外 数据到来);

• EPOLLERR : 表示对应的文件描述符发生错误;

• EPOLLHUP : 表示对应的文件描述符被挂断;

• EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平 触发(Level Triggered)来说的.

• EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继 续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里

1.3epoll_wait

• epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核 只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存).

• maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建 epoll_create()时的 size.

• 参数 timeout 是超时时间 (毫秒,0 会立即返回,-1 是永久阻塞).

• 如果函数调用成功,返回对应 I/O 上已准备好的文件描述符数目,如返回 0 表 示已超时, 返回小于 0 表示函数失败. 

 二.细节问题

epoll模型实际上就是三种东西,红黑树,就绪队列,回调机制。epoll_ctl实际上就是维护红黑树的,用户告诉内核,要求内核帮我去关心哪些fd。epoll_wait就是内核告诉用户,哪一个fd上面的某些事情已经就绪了。

2.1 工作原理

细节:epoll_ctl的作用:向红黑树中插入节点,向底层回调注册回调方法。

当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构 体,这个结构体中有两个成员与 epoll 的使用方式密切相关.

struct eventpoll{
    ....
    /*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件
    */
    struct rb_root rbr;
    /*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/
    struct list_head rdlist;
    ....
};

• 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方 法向 epoll 对象中添加进来的事件.

• 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高 效的识别出来(红黑树的插入时间效率是 lgn,其中 n 为树的高度).

• 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,也就是 说,当响应的事件发生时会调用这个回调方法.

• 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双 链表中.

• 在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体.

struct epitem{
    struct rb_node rbn;//红黑树节点
    struct list_head rdllink;//双向链表节点
    struct epoll_filefd ffd; //事件句柄信息
    struct eventpoll *ep; //指向其所属的 eventpoll 对象
    struct epoll_event event; //期待发生的事件类型
}

• 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可.(有事件就绪了,提前注册的回调机制会自动的把红黑树的节点添加到双链表中)

• 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用 户. 这个操作的时间复杂度是 O(1).

2.2 epoll的demo

#pragma once

#include <iostream>
#include <memory>
#include <unistd.h>
#include <cstring>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Common.hpp"

using namespace SocketModule;

class EpollServer
{
    const static int size = 64;
    const int defaultfd = -1;

public:
    EpollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isruning(false), _epfd(defaultfd)
    {
        // 1.创建listensocket
        _listensock->BuildTcpSocketMethod(port);
        // 2.创建epoll模型
        _epfd = epoll_create(256);
        if (_epfd < 0)
        {
            LOG(LogLevel::FATAL) << "epoll_create error...";
            exit(EPOLL_CREATE_ERR);
        }
        LOG(LogLevel::FATAL) << "epoll_create success , epfd: " << _epfd;
        // 3.将listensocket设置到内核中
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = _listensock->Fd();
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Fd(), &ev);
        if (n < 0)
        {
            LOG(LogLevel::FATAL) << "add listensocket failed";
            exit(EPOLL_CTL_ERR);
        }
    }
    void Start()
    {
        int timeout = -1;
        _isruning = true;
        while (true)
        {
            int n = epoll_wait(_epfd, revs, size, timeout);
            switch (n)
            {
            case 0:
                LOG(LogLevel::DEBUG) << "timeout...";
                break;
            case -1:
                LOG(LogLevel::ERROR) << "epoll error";
                break;
            default:
                Dispatcher(n);
                break;
            }
        }
    }
    void Dispatcher(int n)
    {
        for (int i = 0; i < n; i++)
        {
            if (revs[i].events & EPOLLIN)
            {
                if (revs[i].data.fd == _listensock->Fd())
                {
                    // 新链接到来
                    Accepter();
                }
                else
                {
                    Recver(i);
                }
            }
        }
    }
    void Accepter()
    {
        InetAddr client;
        int sockfd = _listensock->Accept(&client); // 这里一定不会阻塞,等和拷贝分离了
        if (sockfd >= 0)
        {
            // 获取新链接成功
            LOG(LogLevel::INFO) << "get a new link , sockfd: " << sockfd << "client is: " << client.StringAddr();
            struct epoll_event ev;
            ev.events = EPOLLIN;
            ev.data.fd = sockfd;
            int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);
            if (n < 0)
            {
                LOG(LogLevel::WARING) << "add socket failed";
            }
            else
            {
                LOG(LogLevel::INFO) << "add socket success";
            }
        }
    }
    void Recver(int pos)
    {
        // recv的时候肯定也不会阻塞
        char buffer[1024];
        ssize_t n = recv(revs[pos].data.fd, buffer, sizeof(buffer) - 1, 0); // 这样写是有bug的,tcp是面向字节流的
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client say& " << buffer << std::endl;
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "client quit";
            // 不让epoll关心这个fd了
            int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);
            if (n < 0)
            {
                LOG(LogLevel::FATAL) << "del socket failed";
                exit(EPOLL_CTL_ERR);
            }
            close(revs[pos].data.fd); // 先移除,在关闭
        }
        else
        {
            LOG(LogLevel::ERROR) << "recv error";
            // 不让epoll关心这个fd了
            int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);
            if (n < 0)
            {
                LOG(LogLevel::FATAL) << "del socket failed";
                exit(EPOLL_CTL_ERR);
            }
            close(revs[pos].data.fd); // 先移除,在关闭
        }
    }

    ~EpollServer()
    {
        _listensock->Close();
        if (_epfd > 0)
            close(_epfd);
    }

private:
    std::unique_ptr<Socket> _listensock;
    bool _isruning;

    int _epfd;
    struct epoll_event revs[size];
};

2.3 epoll的优点

• 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要 每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开

• 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到 内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)

• 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符 结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就 绪. 这个操作时间复杂度 O(1). 即使文件描述符数目很多, 效率也不会受到影响.

• 没有数量限制: 文件描述符数目无上限.

三. LT 与 ET模式

理解ET

使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工 程实践" 上的要求. 假设这样的场景: 服务器接收到一个 10k 的请求, 会向客户端返回一个应答数据. 如果客 户端收不到应答, 不会发送第二个 10k 请求.

如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话(read 不能保证一 次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的 9k 数据 就会待在缓冲区中.

此时由于 epoll 是 ET 模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返 回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回

但是问题来了.

• 服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.

• 客户端要读到服务器的响应, 才会发送下一个请求

• 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数 据.

所以, 为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完), 于是就可以使用 非阻塞轮训的方式来读缓冲区, 保证一定能把完整的请求都读出来.而如果是 LT 没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件 描述符读就绪.

四. reactor

Reactor.hpp

#pragma once

#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"

class Reactor
{
private:
    static const int revs_num = 128;
    bool IsConnectionExists(std::shared_ptr<Connection> &conn)
    {
        return IsConnectionExistsHelper(conn->GetSockFd());
    }
    bool IsConnectionExists(int sockfd)
    {
        return IsConnectionExistsHelper(sockfd);
    }
    bool IsConnectionExistsHelper(int sockfd)
    {
        auto iter = _connections.find(sockfd);
        if (iter == _connections.end())
        {
            return false;
        }
        return true;
    }
    bool IsConnectionEmpty()
    {
        return _connections.empty();
    }

public:
    Reactor() : _epoll_ptr(std::make_unique<Epoller>()), _isruning(false)
    {
    }
    void Start()
    {
        if (IsConnectionEmpty())
        {
            return;
        }
        _isruning = true;
        while (true)
        {
            PrintConnection();
            int n = _epoll_ptr->Wait(_revs, revs_num, -1);

            for (int i = 0; i < n; i++)
            {
                int sockfd = _revs[i].data.fd;
                uint32_t revents = _revs[i].events;
                //将所有的异常处理转换为IO错误
                if(revents & EPOLLERR)
                    revents |= (EPOLLIN | EPOLLOUT);//只要是出错了就打开读写端
                if(revents & EPOLLHUP)
                    revents |= (EPOLLIN | EPOLLOUT);

                if(revents & EPOLLIN)
                {
                    //不用区分异常了,因为统一处理
                    //不用区分是listensocket还是普通事件就绪
                    if(IsConnectionExists(sockfd))
                        _connections[sockfd]->Recver();
                }
                if(revents & EPOLLOUT)
                {
                    if(IsConnectionExists(sockfd))
                        _connections[sockfd]->Sender();
                }
            }
        }
        _isruning = false;
    }
    void AddNewConnection(std::shared_ptr<Connection> &conn)
    {
        if (IsConnectionExists(conn))
        {
            LOG(LogLevel::WARING) << "conn is exists: " << conn->GetSockFd();
            return;
        }
        // 1.把conn对应的fd和他关心的事件写到内核
        uint32_t events = conn->GetEvent();
        int sockfd = conn->GetSockFd();
        _epoll_ptr->Add(sockfd, events);
        // *.设置回指指针
        conn->SetOwner(this);
        // 2.把connection对象添加到connections内部
        _connections[sockfd] = conn;
    }
    void EnableReadWrite(int sockfd,bool enableread,bool enablewrite)
    {
        // 不要重复添加
        if (!IsConnectionExists(sockfd))
        {
            LOG(LogLevel::WARING) << "EnableReadWrite: conn is exists: " << sockfd;
            return;
        }
        // 修改当前的sockfd对应的connection关心的事件
        uint32_t events = (EPOLLET | (enableread ? EPOLLIN : 0) | (enablewrite ? EPOLLOUT : 0));
        _connections[sockfd]->SetEvent(events);
        //再去写透到内核中
        _epoll_ptr->Mod(sockfd,events);
    }
    void DelConnection(int sockfd)
    {
        _epoll_ptr->Del(sockfd);

        _connections.erase(sockfd);

        close(sockfd);
    }

    void PrintConnection()
    {
        std::cout << "当前正在管理的fd:" << std::endl;
        for(auto &conn : _connections)
        {
            std::cout << conn.second->GetSockFd() << " ";
        }
        std::cout << "\r\n";
    }
    ~Reactor()
    {
    }

private:
    // 1.epoll模型
    std::unique_ptr<Epoller> _epoll_ptr;
    // // 2.listensocket 单独封装管理
    // std::shared_ptr<Listener> _listener_ptr;
    // 3.每一个fd都需要一个单独的输入输出缓冲区,管理套接字
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;

    // 4.就绪的所有事件
    struct epoll_event _revs[revs_num];

    bool _isruning;
};

Listener.hpp

#pragma once

#include <iostream>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Channel.hpp"
#include "Connection.hpp"

using namespace SocketModule;

class Listener : public Connection
{
public:
    Listener(int port = 8080) : _port(port), _listensock(std::make_unique<TcpSocket>())
    {
        _listensock->BuildTcpSocketMethod(_port);
        SetEvent(EPOLLIN | EPOLLET);
        SetNonBlock(_listensock->Fd());
    }
    int GetSockFd()
    {
        return _listensock->Fd();
    }
    void Recver() override
    {
        InetAddr client;
        //虽然是新链接到来了,但是只有一个链接吗,
        //while,ET,设置fd为非阻塞
        while (true)
        {
            int sockfd = _listensock->Accept(&client);
            if(sockfd == ACCEPT_ERR)
            {
                break;
            }
            else if(sockfd == ACCEPT_CONTINUE)
            {
                continue;
            }
            else if(sockfd == ACCEPT_DONE)
            {
                break;
            }
            else
            {
                //是一个合法的fd,但是怎么去添加到_connections里?需要回调指针
                std::shared_ptr<Connection> conn = std::make_shared<Channel>(sockfd,client);
                conn->SetEvent(EPOLLIN | EPOLLET);
                if(_handler != nullptr)
                    conn->RegisterHandler(_handler);
                GetOwner()->AddNewConnection(conn);
            }
        }
    }
    // std::string& Inbuffer() override
    // {}
    // std::string& AppendOutBuffer(std::string& out) override
    // {}
    void Sender() override
    {}
    void Excepter() override
    {}
    ~Listener()
    {
    }

private:
    int _port;
    std::unique_ptr<Socket> _listensock;
};

Channel.hpp

#pragma once

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Common.hpp"
#include "Connection.hpp"
#include "InetAddr.hpp"


#define SIZE 1024


class Channel : public Connection
{
public:
    Channel(int sockfd,const InetAddr& client) : _sockfd(sockfd),_client_addr(client)
    {
        SetNonBlock(sockfd);
    }
    int GetSockFd()
    {
        return _sockfd;
    }
    //保证把本轮数据读完 (while循环)
    //即便是读完了,怎么知道数据由完整的报文,如果是多个报文呢?(协议)
    void Recver() override
    {
        char buffer[SIZE];
        while(true)
        {
            buffer[0] = 0;
            ssize_t n = recv(_sockfd,buffer,sizeof(buffer) - 1, 0);
            if(n > 0)
            {
                buffer[n] = 0;
                _inbuffer += buffer;
            }
            else if(n == 0)
            {
                Excepter();
                return;
            }
            else
            {
                if(errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    break;//本轮数据读完了
                }
                else if(errno == EINTR)
                {
                    continue;
                }
                else
                {
                    Excepter();
                    return;
                }
            }
        }
        LOG(LogLevel::DEBUG) << _inbuffer;
        if(!_inbuffer.empty())
        {
            // _handler(std::shared_ptr<Connection>(this));
            _outbuffer += _handler(_inbuffer);
        }

        if(!_outbuffer.empty())
        {
            Sender();
            // GetOwner()->EnableReadWrite(_sockfd,true,true);
        }
    }
    // std::string& Inbuffer() override
    // {
    //     return _inbuffer;
    // }
    // std::string& AppendOutBuffer(std::string& out) override
    // {
    //     _outbuffer += out;
    //     return _outbuffer;
    // }
    void Sender() override
    {
        while (true)
        {
            ssize_t n = send(_sockfd,_outbuffer.c_str(),_outbuffer.size(),0);//非阻塞发
            if(n > 0)
            {
                _outbuffer.erase(0,n);
                if(_outbuffer.empty())
                    break;
            }
            else if(n == 0)
            {
                break;
            }
            else
            {
                if(errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    break;
                }
                else if(errno == EINTR)
                {
                    continue;
                }
                else
                {
                    Excepter();
                    return;
                }
            }
        }
        // 1.数据发送完毕
        // 2.发送条件不具备
        if(!_outbuffer.empty())
        {
            // 开启对写事件的关心
            GetOwner()->EnableReadWrite(_sockfd,true,true);
        }
        else
        {
            GetOwner()->EnableReadWrite(_sockfd,true,false);
        }
        
    }
    void Excepter() override
    {
        GetOwner()->DelConnection(_sockfd);
    }
    ~Channel()
    {
    }

private:
    int _sockfd;
    std::string _inbuffer;
    std::string _outbuffer;
    InetAddr _client_addr;
    // handler_t _handler;
};

Connection.hpp

#pragma once

#include <iostream>
#include <string>
#include "InetAddr.hpp"

class Reactor;
class Connection;

using handler_t = std::function<std::string (std::string &)>;

class Connection
{
public:
    Connection():_owner(nullptr),_events(0)
    {
    }
    virtual void Recver() = 0;
    virtual void Sender() = 0;
    virtual void Excepter() = 0;
    virtual int GetSockFd() = 0;
    // virtual std::string& Inbuffer() = 0;
    // virtual std::string& AppendOutBuffer(std::string& out) = 0;
    void RegisterHandler(handler_t handler)
    {
        _handler = handler;
    }
    void SetEvent(const uint32_t &events)
    {
        _events = events;
    }
    uint32_t GetEvent()
    {
        return _events;
    }
    void SetOwner(Reactor *owner)
    {
        _owner = owner;
    }
    Reactor *GetOwner()
    {
        return _owner;
    }

    ~Connection()
    {
    }

private:
    // 回指指针,用于listensocket添加普通套接字
    Reactor *_owner;
    // 关心事件
    uint32_t _events;
public:
    handler_t _handler;
};

Main.cc

#include <iostream>
#include "Log.hpp"
#include "Reactor.hpp"
#include "Listener.hpp"
#include "Protocol.hpp"
#include "NetCal.hpp"

static void Usage(std::string proc)
{
    std::cerr << "Usage: " << proc << " port" << std::endl;
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    Enable_Console_Log_Strtegy();
    int port = std::stoi(argv[1]);

    // 构建业务模块
    std::shared_ptr<Cal> cal = std::make_shared<Cal>();

    // 构建协议对象
    std::shared_ptr<Protocol> protocal = std::make_shared<Protocol>([&cal](Request& req) -> Response{
        return cal->Execute(req);
    });

    // 构建listener对象
    std::shared_ptr<Connection> conn = std::make_shared<Listener>(port);
    conn->RegisterHandler([&protocal](std::string& inbuffer)->std::string{
        std::string response_str;
        //可能不止一个报文
        while(true)
        {
            std::string package;
            if(!protocal->Decode(inbuffer,&package))
                break;
            response_str += protocal->Execute(package);
        }
        return response_str;

    });

    std::unique_ptr<Reactor> tsvr = std::make_unique<Reactor>();
    tsvr->AddNewConnection(conn);

    tsvr->Start();
    return 0;
}


网站公告

今日签到

点亮在社区的每一天
去签到