手写Muduo网络库核心代码2--Poller、EPollPoller详细讲解

发布于:2025-09-03 ⋅ 阅读:(16) ⋅ 点赞:(0)

Poller抽象层代码

Muduo 网络库中的 Poller 抽象层是其事件驱动模型的核心组件之一,负责统一封装不同 I/O 复用机制(如 epoll、poll),实现事件监听与分发。

Poller 抽象层的作用

  • 统一 I/O 复用接口
    • Poller 作为抽象基类,定义了 poll()、updateChannel()、removeChannel() 等纯虚函数,强制派生类(如 EPollPoller)实现这些接口。这使得上层模块(如 EventLoop)无需关心底层具体使用 epoll 还是 poll,只需通过基类指针操作,实现了 “多路分发器” 的功能 
  • 解耦事件循环与 I/O 机制
    • EventLoop 依赖 Poller 监听文件描述符(fd)事件,但通过抽象层隔离了具体 I/O 复用机制的实现细节。例如,EventLoop::loop() 调用 Poller::poll() 等待事件,而无需知晓底层是 epoll_wait 还是 poll 调用 


头文件

#pragma once
#include"noncopyable.h"
#include"Timestamp.h"
#include<vector>
#include<unordered_map>
class Channel;
class Eventloop;

class Poller:noncopyable
{
    public:
        using ChannelList=std::vector<Channel*>;

        Poller(Eventloop *loop);
        virtual ~Poller()=default;
        //给所有I/O复用保留统一的接口
        virtual Timestamp poll(int timeoutMs,ChannelList *activeChannels)=0;
        virtual void updateChannel(Channel *channel)=0;
        virtual void removeChannel(Channel *channnel)=0; 

        //判断参数channel是否在当前Poller当中
        bool hasChannel(Channel *channel) const;
        //事件循环可以通过当前接口获取默认的I/O复用的具体实现
        static Poller* newDefaultPoller(Eventloop *loop);
    protected:
        //这里map的key就是sockfd value就是sockfd所属的通道类型channel
        using ChannelMap=std::unordered_map<int,Channel*>;
        ChannelMap Channels_;
    private:
        Eventloop *ownerLoop_;//定义poller所属的事件循环EventLoop
};

1、存放有事件发生的 Channel

using ChannelList = std::vector<Channel*>;

poll 函数会把有事件发生的 Channel 指针填充到这个列表中。

Poller能监听到哪些channel发生事件了,获取活跃事件,然后遍历处理,然后上报给Eventloop,然后通知channel处理相应的事件

2、构造函数

Poller(EventLoop *loop);

初始化必须知道所属 EventLoop,
这是在建立 Poller→EventLoop 的反向指针
由 EventLoop 在初始化时创建(EventLoop 持有 Poller 实例)

3、事件轮循

virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;

定义了一个跨平台 I/O 多路复用的抽象接口,派生类必须重写用它来实现 epoll、poll 等系统调用,上层EventLoop 通过它等待事件并分发处理,返回事件发生的精确时间戳

4、更新/添加通道监控

virtual void updateChannel(Channel *channel) = 0;

当 Channel 的事件关注集变化时(如新增写监听)

调用底层 epoll_ctl/poll 更新监听状态

5、移除通道监控

virtual void removeChannel(Channel *channel) = 0; 

当 Channel 销毁时(如连接关闭)

  • 清理 Poller 内部资源
  • 由 Channel::remove() 调用
  • 清除 channels_ 映射关系
  • 调用底层 epoll_ctl/poll 移除监听

6、判断参数channel是否在当前Poller当中

 bool hasChannel(Channel *channel) const;

防止重复注册同一个 Channel 到 Poller 中,避免重复添加导致的崩溃或未定义行为。

7、默认I/O的复用的具体实现

通过该接口,可以获取到想要的具体的IO复用实例,是Epoll、Poll、还是select

static Poller* newDefaultPoller(EventLoop *loop);

这里又学到一招,这个接口的实现要放在一个单独的源文件(DefaultPoller.cc)中,因为这里需要 new 出来一个EpollPoller实例,如果放在(Poller.cc)中实现,就会出现基类包含子类头文件的情况。(Poller是基类,EPollPoller是子类)。

8、维护 fd → Channel 的映射

using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap Channels_;

这里map的key就是sockfd value就是sockfd所属的通道类型channel

快速通过 fd 查找 Channel,存储所有被管理的 Channel,避免重复管理同一个 fd.

9、poller所属的事件循环EventLoop

EventLoop *ownerLoop_;

反向指针 - 指向所属 EventLoop

源文件Poller.cc

#include"Poller.h"
#include"Channel.h"

Poller::Poller(Eventloop *loop)
    :ownerLoop_(loop){}

bool Poller::hasChannel(Channel *channel) const
{
    auto it=Channels_.find(channel->fd());
    return it!=Channels_.end() && it->second==channel;
}

channel->fd():获取 Channel 底层的文件描述符
Channels_.find():在 unordered_map<int, Channel*> 中 O(1) 复杂度查找

源文件DefaultPoller.cc

//细节性设计,为了不让基类包含子类的头文件
#include"Poller.h"
#include"EPollPoller.h"
#include<stdlib.h>
Poller* Poller::newDefaultPoller(Eventloop *loop)
{
    if(::getenv("MUDUO_USE_POLL"))//获取环境变量,如果有则生成poll实例,(本代码没有实现)
    {
        return nullptr;//生成poll的实例
    }
    else
    {
        return new EPollPoller(loop);//生成epoll的实例
    }
}

EPollPoller

接下来这个模块用来封装Linux 的 epoll I/O 多路复用机制

头文件

#pragma once
#include"Poller.h"
#include<vector>
#include<sys/epoll.h>
#include"Timestamp.h"
 class EPollPoller:public Poller
 {
 public:
    EPollPoller(Eventloop* loop);
    ~EPollPoller() override;

    //重写基类Poller的方法
    Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;
    void updateChannel(Channel *channel) override;
    void removeChannel(Channel *channel) override;
private:
    static const int kInitEventListSize=16;//给定的vector的一个初始长度默认是16,可以面试证明自己看过源码,为啥是静态

    //填写活跃的连接
    void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;
    //更新channel通道
    void update(int operation ,Channel* channel);
    using EventList=std::vector<epoll_event>;

    int epollfd_;
    EventList events_;//这个将来是作为epoll_wait的第二个参数
 };
 
 

成员变量

static const int kInitEventListSize=16;//Muduo库默认给定的vector的一个初始长度默认是16
using EventList=std::vector<epoll_event>;    
int epollfd_;
EventList events_;//这个将来是作为epoll_wait的第二个参数

定义一个数组,用于存储epoll_wait返回的就绪事件表,使用vector动态扩展事件缓冲区(当就绪事件数 == 缓冲区大小时自动扩容)

成员函数

1、重写Poller的方法
Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;
void updateChannel(Channel *channel) override;
void removeChannel(Channel *channel) override;

这是在Poller类中声明的纯虚函数,继承Poller之后对这些函数必须进行重写,

  • poll()函数:等待并获取当前有事件发生的 Channel 列表。调用 epoll_wait() 等待事件。将“就绪的事件”转换为 Channel* 指针,填充到 activeChannels 中。
  • updateChannel():将一个 Channel 注册或修改到 epoll 中(对应 epoll_ctl(ADD/MOD))。当Channel 的事件发生变化(如从不监听可读 → 监听可读),需要通知 EPollPoller。根据 Channel 的状态决定是 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD。
  • removeChannel():从 epoll 中移除一个 Channel(对应 epoll_ctl(DEL)),从channels_映射表中删除,并清理内部状态。当 Channel 对应的资源(如 socket)关闭时,必须从 epoll 中注销。防止后续 epoll_wait 返回无效的 Channel。
2、填写活跃的连接
 void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;
  • 遍历就绪事件
  • 设置Channel的revents_
  • 填充到activeChannels列表
3、更新channel通道
void update(int operation ,Channel *channel);

执行具体的epoll_ctl操作

源文件

#include"EPollPoller.h"
#include"Logger.h"
#include<unistd.h>
#include<string.h>
#include"Channel.h"
//#include<errno.h>
//表示channel是否添加到poller,与channel的成员index_相关,它的初始值就是-1,表示还没有添加到里面
//用来区分,如果再次添加channel时候发现是kAdded表示已经添加到里面了,那这次操作就是更新channel中的事件
const int kNew=-1;//一个channel还没有添加到poller里边
const int kAdded=1;//一个channel已经添加到poller里面吗
const int kDeleted=2;//删除

EPollPoller::EPollPoller(Eventloop *loop)
    :Poller(loop)
    ,epollfd_(::epoll_create1(EPOLL_CLOEXEC))//EPOLL_CLOEXEC:exec 时自动关闭文件描述符,安全防护
    ,events_(kInitEventListSize)
{
    if(epollfd_<0)
    {
        LOG_FATAL("epoll_create error:%d \n",errno);
    }
}
EPollPoller::~EPollPoller()
{
    ::close(epollfd_);
}
//返回具体发生事件的时间点,主要调用的是epoll_wait
Timestamp EPollPoller::poll(int timeoutMs,ChannelList *activeChannels)
{
    //poll这里短时间是接受大量的高并发请求的,如果在这里使用LOG_INFO则每次调用都会影响它的性能
    //实际上这里应该用LOG_DEBUG,只要我们不定义debug,他就不用调用进而不会影响它的性能
//记录当前系统中“有多少个连接正在被监听”。
    LOG_INFO("func=%s => fd total count:%lu \n",__FUNCTION__,Channels_.size());

    //第二个参数是存放发生事件的数组地址,但是为了方便扩容,
    //我们用的是vector.events_.size()返回的是size_t类型,但是这里参数要求int,所以使用转换
    int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);

    int saveErrno=errno;
    Timestamp now(Timestamp::now());
    if(numEvents>0)
    {
        LOG_INFO("%d events happend \n",numEvents);
        fillActiveChannels(numEvents,activeChannels);
        //扩容
        if(numEvents==events_.size())
        {
            events_.resize(events_.size()*2);
        }
    }
    else if (numEvents==0)
    {
        LOG_INFO("%s timeout! \n",__FUNCTION__);
    }
    else//外部中断,还要接着执行业务逻辑
    {
        if(saveErrno != EINTR)
        {
            errno=saveErrno;//不太懂
            LOG_ERROR("EPollPoller::poll() err!");
        }
    }
    return now;
}
void EPollPoller::updateChannel(Channel *channel) 
{
    const int index=channel->index();
    LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);
    if(index==kNew || index==kDeleted)
    {
        if(index==kNew)
        {
            int fd=channel->fd();
            Channels_[fd]=channel;
        }
        
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD,channel);
    }
    else//此时channel已经在poller上注册过了
    {
        int fd=channel->fd();
        if(channel->isNoneEvent())
        {
            update(EPOLL_CTL_DEL,channel);
        }
        else
        {
            update(EPOLL_CTL_MOD,channel);
        }
    }
}
//从poller中删除channel
void EPollPoller::removeChannel(Channel *channel) 
{
    
    int fd=channel->fd();
    Channels_.erase(fd);
    LOG_INFO("func=%s => fd=%d\n",__FUNCTION__,fd);
    int index=channel->index();
    if(index==kAdded)
    {
        update(EPOLL_CTL_DEL,channel);
    }
    channel->set_index(kNew);
}

//填写活跃的连接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{
    for(int i=0;i< numEvents;++i)
    {
        Channel* channel=static_cast<Channel*>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        activeChannels->push_back(channel);//Eventloop就拿到了poller给它返回的所有发生事件的channel列表了
    }
}
//更新channel通道,做的是事件的增删改
void EPollPoller::update(int operation ,Channel* channel)
{
    epoll_event event;
    memset(&event,0,sizeof(event));
    int fd=channel->fd();

    event.events=channel->events();
    event.data.fd=fd;
    event.data.ptr=channel;

    if(::epoll_ctl(epollfd_,operation,fd,&event)<0)
    {
        if(operation==EPOLL_CTL_DEL)
        {
            LOG_ERROR("epoll_ctl del error:%d\n",errno);
        }
        else
        {
            LOG_FATAL("epoll_ctl add/mod error%d \n",errno);
        }
    }
}
1、Channel 状态常量定义
const int kNew = -1;     // Channel 未添加到 Poller
const int kAdded = 1;    // Channel 已添加到 Poller
const int kDeleted = 2;  // Channel 已从 Poller 移除
//kDeleted 状态允许 Channel 暂时移除但保留在映射表中,避免频繁添加/删除的开销

定义 Channel 在 Poller 中的生命周期状态。确保操作的正确顺序(不能重复添加已添加的 Channel)。避免不必要的系统调用(如对已删除 Channel 再次删除)。

2、poll方法重写

等待 I/O 事件发生,并将“活跃的 Channel”填充到 activeChannels 中

Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {
    LOG_INFO("func=%s => fd total count:%lu \n", __FUNCTION__, Channels_.size());
    
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), 
                                static_cast<int>(events_.size()), timeoutMs);
    int saveErrno = errno; // 保存系统错误码
    Timestamp now(Timestamp::now());
    
    if(numEvents > 0) {
        LOG_INFO("%d events happend \n", numEvents);
        fillActiveChannels(numEvents, activeChannels);
        // 动态扩容
        if(numEvents == events_.size()) {
            events_.resize(events_.size() * 2);
        }
    }
    else if (numEvents == 0) {
        LOG_INFO("%s timeout! \n", __FUNCTION__);
    }
    else { // 错误处理
        if(saveErrno != EINTR) { // 非中断错误
            errno = saveErrno;
            LOG_ERROR("EPollPoller::poll() err!");
        }
    }
    return now;
}

2.1 

int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);

这里的第二个参数:

  1. events_.begin() → 返回一个 iterator,指向第一个元素。
  2. *events_.begin() → 解引用 iterator,得到第一个 epoll_event 对象。
  3. &*events_.begin() → 取第一个对象的地址,即 struct epoll_event*。

第三个参数

  1. 我们用的是vector.events_.size()返回的是size_t类型,但是这里参数要求int,所以使用转换

2.2

int saveErrno=errno;

epoll_wait 返回 -1 时,错误原因在 errno 中。但后续代码可能调用其他函数(如 LOG_INFO),会覆盖 errno。所以必须立即保存,否则错误信息丢失。

这是 Linux 系统编程的 黄金法则:一旦系统调用失败,立刻保存 errno。

2.3

else//外部中断,还要接着执行业务逻辑
    {
        if(saveErrno != EINTR)
        {
            errno=saveErrno;//虽然日志不需要了,但有些设计会保留 errno 给上层处理。
            LOG_ERROR("EPollPoller::poll() err!");
        }
    }

EINTR:系统调用被信号中断(如 SIGCHLD、SIGTERM)。这是正常情况,不应视为错误,循环会继续。如果是其他错误(如 EBADF、ENOMEM)才是真正的异常。

3、updateChannel重写

状态驱动 —— 通过 channel->index() 的状态决定行为,即根据 Channel 的当前状态,决定是 epoll_ctl(ADD)、MOD 还是 DEL。
 

void EPollPoller::updateChannel(Channel *channel) 
{
    const int index=channel->index();//获取状态,是未注册到Poller、已注册还是已删除
    LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);
    if(index==kNew || index==kDeleted)
    {
        if(index==kNew)
        {
            int fd=channel->fd();
            Channels_[fd]=channel;
        }
        
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD,channel);
    }
    else//此时channel已经在poller上注册过了
    {
        int fd=channel->fd();
        if(channel->isNoneEvent())
        {
            update(EPOLL_CTL_DEL,channel);
        }
        else
        {
            update(EPOLL_CTL_MOD,channel);
        }
    }
}

如果是kNew(新连接,首次注册)或者是 kDeleted(之前被移除,现在重新启用:比如连接复用、延迟删除后恢复)。无论之前是否注册过,只要不是 kAdded,就当作“新来”的处理。

仅在 kNew 时加入 channels_ 映射表

channels_[fd] = channel:建立 fd → Channel 的映射,用于 epoll_wait 返回后查找 Channel。

而遵循 kDeleted 状态允许 Channel 暂时移除但保留在映射表中,避免频繁添加/删除的开销。

所以不需要将channel再次添加到Channels_中了。

对于已经注册在Poller上的channel,如果他没有感兴趣的事,就从 epoll 实例中删除。

3、update()

封装 epoll_ctl 系统调用,用于向 epoll 实例添加、修改或删除一个 fd 的事件监听。

void EPollPoller::update(int operation ,Channel* channel)
{
    epoll_event event;
    memset(&event,0,sizeof(event));
    int fd=channel->fd();

    event.events=channel->events();
    event.data.fd=fd;
    event.data.ptr=channel;//关键:存储 Channel* 用于事件分发

    if(::epoll_ctl(epollfd_,operation,fd,&event)<0)
    {
        if(operation==EPOLL_CTL_DEL)
        {
            LOG_ERROR("epoll_ctl del error:%d\n",errno);
        }
        else
        {
            LOG_FATAL("epoll_ctl add/mod error%d \n",errno);
        }
    }
}

epoll_event 是 epoll 的核心数据结构,用于描述一个 fd 的监听事件和用户数据。

event.data.ptr=channel::最关键字段绑定用户数据,后面用于 fillActiveChannels() 中直接取出 Channel*

4、removeChannel重写
void EPollPoller::removeChannel(Channel *channel) {
    int fd = channel->fd();
    Channels_.erase(fd); // 从映射表移除
    LOG_INFO("func=%s => fd=%d\n", __FUNCTION__, fd);
    
    int index = channel->index();
    if(index == kAdded) {
        update(EPOLL_CTL_DEL, channel); // 从epoll移除
    }
    channel->set_index(kNew); // 重置状态
}
5、填充活跃的连接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{
    for(int i=0;i< numEvents;++i)
    {
        Channel* channel=static_cast<Channel*>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        activeChannels->push_back(channel);//Eventloop就拿到了poller给它返回的所有发生事件的channel列表了
    }
}

numEvents:来自 epoll_wait 的返回值,表示有多少个 fd 就绪。遍历 events_[i] 数组中的每一个就绪事件。

接着从 epoll_event.data.ptr 取出 Channel*,上面update()中设置过

event.data.ptr = channel;

epoll 本身不关心用户数据,只返回 fd 和 events。但我们需要知道“哪个 Channel 对应这个 fd”。
通过 data.ptr 直接绑定 Channel*,实现了 O(1) 的快速查找,避免了 channels_[fd] 的哈希查找。

利用了空间换时间。

events_ 是 EPollPoller 的成员变量:std::vector<epoll_event> events_;接着设置实际发生的事件。

接着将活跃的channel添加到活跃列表activeChannels中,activeChannels 是一个输出参数,类型为 std::vector<Channel*>*。这个列表会被返回给 EventLoop,后续遍历并调用 channel->handleEvent()。


感谢阅读!


网站公告

今日签到

点亮在社区的每一天
去签到