WebServer实现:muduo库的主丛Reactor架构

发布于:2025-06-21 ⋅ 阅读:(13) ⋅ 点赞:(0)

前言

  作为服务器,核心自然是高效的处理来自client的多个连接啦,那问题在于,如何高效的处理client的连接呢?这里就介绍两种架构:单Reactor架构和主丛Reactor架构。

单Reactor架构

  单Reactor架构的核心为,由一个主线程监听包括ServerFd在内的所有fd,当某个fd 可读/可写的时候,就把它交给线程池中的某个线程去处理,核心流程大概是:
单Reactor模型

while(true) {
	epoll_wait();
	/*遍历所有监听到的event*/
	for(所有活跃的events的fd) {
		if(fd == listenfd) dealListen();	/*如果是listen fd可读 ,则accept建立新的连接*/
		if(fd == readfd) dealRead(); /*如果clientfd可读,就交给线程池处理可读*/
		if(fd == writefd) dealWrite(); /*如果可写 交给线程池处理可写*/
	}
}
/*伪代码*/
dealListen()
{
	do{
		int retFd = accept();
		if(retFd < 0)		/*没有待处理的fd了*/
			break;
		/*其它和retFd相关的处理*/
	}while(true);
}

  这看起来是个很合理的实现,但是在某些场景下是会遇到性能瓶颈的,试想这样一个场景:如果同时突然间出现了大量的client请求链接,那我们的epoll_wait监听到的应该是listenFd可读,调用dealListen()来处理listen事件,dealListen可能是调用多次accept函数直到返回值小于0,才表明没有新的连接了,这个过程很费时间,也就意味着:哪怕有些client已经建立连接了,并且触发可读/可写了,主线程都无法及时将事件交给线程池处理,而这就是它的性能瓶颈
  这就引入了我们的主丛Reactor架构,它的处理逻辑是:主线程的epoll只负责监听,监听到了就把这个fd交给其他的epoll去处理(监听它的可读可写),这样,在面对突发IO连接的时候,也能及时响应。
主丛Reactor
  其中主Reactor的核心是Acceptor中处理listenfd的可读/写操作,然后把监听到的clientfd分发给其他的sub_reactor,当client_fd可读可写的时候,由对应的sub_reactor的epoll监听到进行读写。
  有了思路,剩下的事情就是怎么做,我们来看看muduo库的实现。

Channel类—>对fd的封装

  对于每个fd,不管是clientfd还是listenfd也好,监听到可读/可写后,都应该调用自身对应的可读/写回调函数进行处理;同时对于每个fd,他也有自己对应的epoll(在这里用EventLoop对epoll和thread进一步的封装)

class Channel : noncopyable
{
public:
    /*只是起了个别名的作用*/
    using EventCallback = std::function<void()>; // muduo仍使用typedef
    using ReadEventCallback = std::function<void(Timestamp)>;// read和时间挂钩了
    Channel(EventLoop *loop, int fd);
    ~Channel();
    // fd得到Poller通知以后 处理事件 handleEvent在EventLoop::loop()中调用
    void handleEvent(Timestamp receiveTime);

    // 设置回调函数对象
    // 用move比直接赋值好
    // 直接赋值有两次拷贝操作(实参到形参 形参到类内对象) 但是move简化了第二次
    // 使用move的前提:移动的资源在堆上,且支持这样的操作
    void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
    void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
    void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
    void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }



    // 防止当channel被手动remove掉 channel还在执行回调操作
    void tie(const std::shared_ptr<void> &);

    int fd() const { return fd_; }
    int events() const { return events_; }
    void set_revents(int revt) { revents_ = revt; }

    // 设置fd相应的事件状态 相当于epoll_ctl add delete
    void enableReading() { events_ |= kReadEvent; update(); }
    void disableReading() { events_ &= ~kReadEvent; update(); }
    void enableWriting() { events_ |= kWriteEvent; update(); }
    void disableWriting() { events_ &= ~kWriteEvent; update(); }
    void disableAll() { events_ = kNoneEvent; update(); }


    /*只是可读*/
    void setReading() { events_ = kReadEvent; update(); }
    /*只是可写*/
    void setWriting() { events_ = kWriteEvent; update(); }
    
    // 返回fd当前的事件状态
    bool isNoneEvent() const { return events_ == kNoneEvent; }
    bool isWriting() const { return events_ & kWriteEvent; }
    bool isReading() const { return events_ & kReadEvent; }

    int index() { return index_; }
    void set_index(int idx) { index_ = idx; }

    // one loop per thread
    EventLoop *ownerLoop() { return loop_; }
    void remove();

private:

    void update();
    void handleEventWithGuard(Timestamp receiveTime);

    static const int kNoneEvent;
    static const int kReadEvent;
    static const int kWriteEvent;

    EventLoop *loop_; // 事件循环
    const int fd_;    // fd,Poller监听的对象
    int events_;      // 注册fd感兴趣的事件(只是一个表示,通过update调用到epoller的ctl重新设置flag)
    int revents_;     // Poller返回的具体发生的事件(poller在wait到之后通过set_revents返回值)
    int index_;

    std::weak_ptr<void> tie_;       /*观察对象是否存在*/
    bool tied_;

    // 因为channel通道里可获知fd最终发生的具体的事件events,所以它负责调用具体事件的回调操作
    ReadEventCallback readCallback_;    /*具体可读函数得看具体实现是咋样的*/
    EventCallback writeCallback_;
    EventCallback closeCallback_;
    EventCallback errorCallback_;
};
/*核心处理函数,当可读/写/ERROR发生的时候 调用对应的回调*/
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
    //LOG_INFO<<"channel handleEvent revents:"<<revents_;
   // LOG_INFO("channel handleEvent revents:[%d]", revents_);
    // 关闭
    if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) // 当TcpConnection对应Channel 通过shutdown 关闭写端 epoll触发EPOLLHUP
    {
        if (closeCallback_)
        {
            closeCallback_();
        }
    }
    // 错误
    if (revents_ & EPOLLERR)
    {
        if (errorCallback_)
        {
            errorCallback_();
        }
    }
    // 读
    if (revents_ & (EPOLLIN | EPOLLPRI))
    {
        if (readCallback_)
        {
            readCallback_(receiveTime);
        }
    }
    // 写
    if (revents_ & EPOLLOUT)
    {
        if (writeCallback_)
        {
            writeCallback_();
        }
    }
}
 /*update只是对epoll_ctl的进一步封装*/
 /*只是可读*/
 void setReading() { events_ = kReadEvent; update(); }
  /*只是可写*/
 void setWriting() { events_ = kWriteEvent; update(); }

Epoller—>对Epoll的封装

  对于常用API进行了抽象和封装,epoll本身也归属于某个EventLoop。

EventLoop----“one loop per thread的体现”

  对于主Reactor也好,子Reactor也好,它们每个都是对应着某个线程,这也就是我们为什么抽象EventLoop这样一个类,那么思考EventLoop得有什么呢?

  1. 得有个线程处理函数func一直在执行
  2. fuc要做两件事:(1) 处理epoll_wait;(2) 处理和其它线程/资源交互的信息(比如主线程怎么把clientfd分发给其他线程)
      清楚了这两件事,那如何做呢?我们来看看muduo库的做法

线程间交互: eventfd + 回调函数机制

  如果想要和其它线程/资源交互,能想到的一点就是利用信号量/互斥量等操作,但是这样就有了另一个问题:我们的线程处理函数是要同时执行epoll_wait的,等不到的时候可是阻塞当前线程的,所以对于线程间的交互,处理的会不及时

while(true)
{
	epoll_wait(超时时间);
	for(){处理所有的事件};
	lock();	/*最坏情况得等到超时时间到了才能处理*/
	for(){处理线程间的交互事件}
	unlock();
}

  这里就引入了一个eventfd。我们给每一个EventLoop的Epoll,除了要监视clientfd之外,还需要额外监视一个fd----eventfd,关于它如果想多了解可以自行搜索,粗略的讲,eventfd里面有个计数器,每次write()写就会增加(此时会触发epoll可读),而每次读read()就会清零计数器。
  那以此来看其它线程可以怎么通过eventfd来和当前线程交互呢?—就是queueInLoop函数

// 把cb放入队列中 唤醒loop所在的线程执行cb(由其他线程或者本线程调用)
void EventLoop::queueInLoop(Functor cb)
{
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);/*存储了所有需要交互的函数*/
    }

    /**
     * || callingPendingFunctors的意思是 当前loop正在执行回调中 但是loop的pendingFunctors_中又加入了新的回调 需要通过wakeup写事件
     * 唤醒相应的需要执行上面回调操作的loop的线程 让loop()下一次poller_->poll()不再阻塞(阻塞的话会延迟前一次新加入的回调的执行),然后
     * 继续执行pendingFunctors_中的回调函数
     **/
    if (!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup(); // 唤醒loop所在线程,本质就是对eventfd进行写的操作
    }
}
// 当epoll触发的时候,eventfd对应的Channel绑定的回调函数
void EventLoop::handleRead()
{
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof(one));	/*不清零就会一直epoll可读*/
    if (n != sizeof(one))
    {
        //LOG_ERROR<<"EventLoop::handleRead() reads"<<n<<"bytes instead of 8";
        LOG_ERROR("EventLoop::handleRead() reads[%d]bytes instead of 8",n);
    }
}

  依次就能写出这个EventLoop的loop函数的逻辑啦,eventfd确实帮了大忙简化了很多逻辑

EventLoop的核心----loop()函数

void EventLoop::loop()
{
    looping_ = true;
    quit_ = false;

    //LOG_INFO<<"EventLoop start looping";
    LOG_INFO("EventLoop start looping");
    while (!quit_)
    {
        activeChannels_.clear();/*清空容器所有变量*/
        pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    
        for (Channel *channel : activeChannels_)
        {
            //LOG_INFO("active channel fd:[%d]",channel->fd());
            // Poller监听哪些channel发生了事件 然后上报给EventLoop 通知channel处理相应的事件
            // 包括cilentfd和eventfd两类(子Reactor)
            // 包括listenfd和eventfd两类(主Reactor)
            // 当然还有一个timefd,可以epoll + timefd处理一些超时事件
            // 感兴趣可以了解一下
            channel->handleEvent(pollRetureTime_);
        }
        /**
         * 执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:
         * accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理
         *
         * mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒
         **/
        doPendingFunctors();		/*调用vector保存的所有回调函数*/
    }
    //LOG_INFO<<"EventLoopstop looping";
    LOG_INFO("EventLoopstop looping")
    looping_ = false;
}

Acceptor:ListenFd的封装

  Acceptor里面的成员函数,就是主Reactor对应的线程的操作函数。我们来想Acceptor需要做什么:

  1. 得能开启listen
  2. 监听到可读事件后,需要调用对应的可读回调函数处理
  3. 在回调函数中,应该把clientfd分配给某个EventLoop

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop)
    , acceptSocket_(createNonblocking())
    , acceptChannel_(loop, acceptSocket_.fd())
    , listenning_(false)
{
    //LOG_INFO("server id:[%d]",acceptSocket_.fd());
    //LOG_INFO("Acceptor make success");
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.setReusePort(true);
    acceptSocket_.bindAddress(listenAddr);
    // TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop)
    // baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数
    acceptChannel_.setReadCallback(
        std::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{
    acceptChannel_.disableAll();    // 把从Poller中感兴趣的事件删除掉
    acceptChannel_.remove();        // 调用EventLoop->removeChannel => Poller->removeChannel 把Poller的ChannelMap对应的部分删除
}

void Acceptor::listen()
{
    listenning_ = true;
    acceptSocket_.listen();         // 开启listen
    acceptChannel_.enableReading(); // acceptChannel_注册至Poller,要不怎么监听呢
}


// listenfd有事件发生了,就是有新用户连接了
void Acceptor::handleRead()
{
    InetAddress peerAddr;
    do
    {
        int connfd = acceptSocket_.accept(&peerAddr);
        if(connfd < 0)
            break;
        //LOG_INFO("listen fd:[%d] success",connfd);

        //fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFD, 0) | O_NONBLOCK);(不用 已经设计过了)
        if (connfd >= 0)
        {
            if (NewConnectionCallback_) /*Tcp Server中调用的*/
            {
            /*这里实现看需求了,所以封装成回调函数的形式更灵活*/
                NewConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop 唤醒并分发当前的新客户端的Channel
            }
            else
            {
                ::close(connfd);
            }
        }
        else
        {
            LOG_ERROR("accept Err");
            if (errno == EMFILE)
            {
                LOG_ERROR("sockfd reached limit");
            }
        }
    } while (true);
    
}

EventLoopThreadPool----对应subReactor的操作

#include "../MultiReactor/EventLoopThread.h"
#include "../MultiReactor/EventLoop.h"

EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,
                                 const std::string &name)
    : loop_(nullptr)
    , exiting_(false)
    , thread_(std::bind(&EventLoopThread::threadFunc, this), name)
    , mutex_()
    , cond_()
    , callback_(cb)
{
}
// 由谁调用呢
EventLoopThread::~EventLoopThread()
{
    exiting_ = true;
    if (loop_ != nullptr)
    {
        loop_->quit();
        thread_.join();//调用EventLoopThread析构的线程会阻塞 直到thread_对应的线程运行完毕
    }
}

/*是让主Reactor调用的*/
EventLoop *EventLoopThread::startLoop()
{
    thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程

    EventLoop *loop = nullptr;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        cond_.wait(lock, [this](){return loop_ != nullptr;});
        loop = loop_;
    }
    return loop;
}

// 下面这个方法 是在单独的新线程里运行的
void EventLoopThread::threadFunc()
{
    // 给每个线程创建一个EventLoop
    EventLoop loop;
    if(callback_)
    {
        callback_(&loop);//如果设置了回调函数
    }// 新线程调用回调就能获得自己的EventLoop了
    // 所有的Loop是由谁创建的呢?
    {
        std::unique_lock<std::mutex> lock(mutex_);
        loop_ = &loop;
        cond_.notify_one();
    }
    loop.loop();    // 执行EventLoop的loop() 开启了底层的Poller的poll()
    std::unique_lock<std::mutex> lock(mutex_);
    loop_ = nullptr;
}



#include <memory>
#include "../MultiReactor/EventLoopThreadPool.h"
#include "../MultiReactor/EventLoopThread.h"
#include "../Log/log.h"


EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
    : baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(12), next_(0)
{
}
/*析构就是啥也不做?不过好像确实这个是整个程序的生命周期*/
EventLoopThreadPool::~EventLoopThreadPool()
{
    // Don't delete loop, it's stack variable
}void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
    started_ = true;

    for (int i = 0; i < numThreads_; ++i)
    {
        char buf[name_.size() + 32];
        snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
        /*池子确定了回调函数是什么*/
        /*这是一个创建成功的回调函数 新线程创建成功了通过callback就能获得自己的专属loop_*/
        EventLoopThread *t = new EventLoopThread(cb, buf);
        threads_.push_back(std::unique_ptr<EventLoopThread>(t));
        loops_.push_back(t->startLoop()); // 底层创建线程 绑定一个新的EventLoop 并返回该loop的地址
    }

    if (numThreads_ == 0 && cb) // 整个服务端只有一个线程运行baseLoop
    {
        cb(baseLoop_);
    }
}

// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{
    // 如果只设置一个线程 也就是只有一个mainReactor 无subReactor 
    // 那么轮询只有一个线程 getNextLoop()每次都返回当前的baseLoop_
    EventLoop *loop = baseLoop_;    

    // 通过轮询获取下一个处理事件的loop
    // 如果没设置多线程数量,则不会进去,相当于直接返回baseLoop
    if(!loops_.empty())             
    {
        loop = loops_[next_];
        ++next_;
        // 轮询
        if(next_ >= loops_.size())
        {
            next_ = 0;
        }
    }

    return loop;
}


std::vector<EventLoop *> EventLoopThreadPool::getAllLoops()
{
    if (loops_.empty())
    {
        return std::vector<EventLoop *>(1, baseLoop_);
    }
    else
    {
        return loops_;
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到