Muduo网络库流程分析

发布于:2025-05-26 ⋅ 阅读:(146) ⋅ 点赞:(0)

目录

Tcp模块的流程 

测试代码

TcpServer初始化阶段详解

 TcpServer构造函数执行流程

EventLoop创建详细步骤:

Channel与Poller的关联

设置Acceptor回调函数

启动监听

启动服务器完整流程

连接建立流程详解

新连接到达流程

连接初始化详解

连接就绪详解

数据收发流程详解

数据接收流程

 数据发送流程

关于Http模块的流程

HTTP请求的接收流程

HTTP响应的发送流程

模块初始化都干了什么事情

Poller对象的创建

EventLoop对象的创建

 Channel对象的创建

Socket对象的创建

Acceptor对象的创建

TcpServer对象的创建

​编辑LoopThreadPool对象的创建

LoopThread对象的创建

Timer_wheel对象的创建

Connection对象的创建

1. 连接触发监听套接字的可读事件

2. Poller检测到事件

3. EventLoop处理就绪事件

4. Acceptor的HandleRead方法被调用

5. TcpServer::NewConnection方法被调用

6. Connection对象创建

7. Connection::Established方法执行

8. Connection::EstablishedInLoop方法执行

9. 启动读事件监控

10. 更新Poller中的事件监控

11. 调用连接建立回调函数

12. 连接就绪,等待数据交互


Tcp模块的流程 

EventLoop模块

构造函数 

class EventLoop {
    private:
        using Functor = std::function<void()>;
        std::thread::id _thread_id;//线程ID
        int _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞
        std::unique_ptr<Channel> _event_channel;
        Poller _poller;//进行所有描述符的事件监控
        std::vector<Functor> _tasks;//任务池
        std::mutex _mutex;//实现任务池操作的线程安全
        TimerWheel _timer_wheel;//定时器模块
    public:
        EventLoop():_thread_id(std::this_thread::get_id()), 
                    _event_fd(CreateEventFd()), 
                    _event_channel(new Channel(this, _event_fd)),
                    _timer_wheel(this) {
            //给eventfd添加可读事件回调函数,读取eventfd事件通知次数
            _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
            //启动eventfd的读事件监控
            _event_channel->EnableRead();
        }
};

 Start接口

        void Start() {
            while(1) {
                //1. 事件监控, 
                std::vector<Channel *> actives;
                _poller.Poll(&actives);
                //2. 事件处理。 
                for (auto &channel : actives) {
                    channel->HandleEvent();
                }
                //3. 执行任务
                RunAllTask();
            }
        }

 TcpServer模块

class TcpServer {    
    private:
        uint64_t _next_id;      //这是一个自动增长的连接ID,
        int _port;
        int _timeout;           //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接
        bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志
        EventLoop _baseloop;    //这是主线程的EventLoop对象,负责监听事件的处理
        Acceptor _acceptor;    //这是监听套接字的管理对象
        LoopThreadPool _pool;   //这是从属EventLoop线程池
        std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象

        using ConnectedCallback = std::function<void(const PtrConnection&)>;
        using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;
        using ClosedCallback = std::function<void(const PtrConnection&)>;
        using AnyEventCallback = std::function<void(const PtrConnection&)>;
        using Functor = std::function<void()>;
        ConnectedCallback _connected_callback;
        MessageCallback _message_callback;
        ClosedCallback _closed_callback;
        AnyEventCallback _event_callback;    
    public:
        TcpServer(int port):
            _port(port), 
            _next_id(0), 
            _enable_inactive_release(false), 
            _acceptor(&_baseloop, port),
            _pool(&_baseloop) {
            _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
            _acceptor.Listen();//将监听套接字挂到baseloop上
        }
        void SetThreadCount(int count) { return _pool.SetThreadCount(count); }
        void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }
        void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }
        void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }
        void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }
        void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }
        //用于添加一个定时任务
        void RunAfter(const Functor &task, int delay) {
            _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
        }
        void Start() { _pool.Create();  _baseloop.Start(); }
};

Poller模块

class Poller {
    private:
        int _epfd;
        struct epoll_event _evs[MAX_EPOLLEVENTS];
        std::unordered_map<int, Channel *> _channels;
    public:
        Poller() {
            _epfd = epoll_create(MAX_EPOLLEVENTS);
            if (_epfd < 0) {
                ERR_LOG("EPOLL CREATE FAILED!!");
                abort();//退出程序
            }
        }
};

 Acceptor模块

class Acceptor {
    private:
        Socket _socket;//用于创建监听套接字
        EventLoop *_loop; //用于对监听套接字进行事件监控
        Channel _channel; //用于对监听套接字进行事件管理

        using AcceptCallback = std::function<void(int)>;
        AcceptCallback _accept_callback;
    public:
        /*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*/
        /*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/
        Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), 
            _channel(loop, _socket.Fd()) {
            _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
        }
        void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }
        void Listen() { _channel.EnableRead(); }
        void HandleRead() {
            int newfd = _socket.Accept();
            if (newfd < 0) {
                return ;
            }
            if (_accept_callback) _accept_callback(newfd);
        }
};

 Channel模块

构造函数 

class Channel {
    private:
        int _fd;
        EventLoop *_loop;
        uint32_t _events;  // 当前需要监控的事件
        uint32_t _revents; // 当前连接触发的事件
        using EventCallback = std::function<void()>;
        EventCallback _read_callback;   //可读事件被触发的回调函数
        EventCallback _write_callback;  //可写事件被触发的回调函数
        EventCallback _error_callback;  //错误事件被触发的回调函数
        EventCallback _close_callback;  //连接断开事件被触发的回调函数
        EventCallback _event_callback;  //任意事件被触发的回调函数
    public:
        Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}
};

LoopThreadPool模块 

构造函数 

class LoopThreadPool {
    private:
        int _thread_count;
        int _next_idx;
        EventLoop *_baseloop;
        std::vector<LoopThread*> _threads;
        std::vector<EventLoop *> _loops;
    public:
        LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}
};

Create接口 

        void Create() {
            if (_thread_count > 0) {
                _threads.resize(_thread_count);
                _loops.resize(_thread_count);
                for (int i = 0; i < _thread_count; i++) {
                    _threads[i] = new LoopThread();
                    _loops[i] = _threads[i]->GetLoop();
                }
            }
            return ;
        }

Connection模块

class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection> {
    private:
        uint64_t _conn_id;  // 连接的唯一ID,便于连接的管理和查找
        //uint64_t _timer_id;   //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器ID
        int _sockfd;        // 连接关联的文件描述符
        bool _enable_inactive_release;  // 连接是否启动非活跃销毁的判断标志,默认为false
        EventLoop *_loop;   // 连接所关联的一个EventLoop
        ConnStatu _statu;   // 连接状态
        Socket _socket;     // 套接字操作管理
        Channel _channel;   // 连接的事件管理
        Buffer _in_buffer;  // 输入缓冲区---存放从socket中读取到的数据
        Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据
        Any _context;       // 请求的接收处理上下文

        /*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*/
        /*换句话说,这几个回调都是组件使用者使用的*/
        using ConnectedCallback = std::function<void(const PtrConnection&)>;
        using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;
        using ClosedCallback = std::function<void(const PtrConnection&)>;
        using AnyEventCallback = std::function<void(const PtrConnection&)>;
        ConnectedCallback _connected_callback;
        MessageCallback _message_callback;
        ClosedCallback _closed_callback;
        AnyEventCallback _event_callback;
        /*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*/
        /*就应该从管理的地方移除掉自己的信息*/
        ClosedCallback _server_closed_callback;

    public:
        Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),
            _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),
            _channel(loop, _sockfd) {
            _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
            _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
            _channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
            _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
            _channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
        }
};

一个Muduo网络库的流程:

在main线程中首先创建一个TcpServer对象_server,

这个_server在构造的时候创建main_loop,从属线程池_pool,_acceptor,设置非活跃连接时间,非活跃连接关闭为false,并且把从属线程池也注册到main_loop上,把acceptor注册到main_loop上,然后给_acceptor绑定新连接到来的回调函数(连接ID递增,增加连接计数器,确保每个连接有唯一ID,创建连接对象:PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)),创建新的Connection智能指针对象,通过线程池获取下一个可用的EventLoop (_pool.NextLoop()),传入新的连接ID和套接字文件描述符,设置回调函数,启用非活跃连接超时检测,初始化连接:conn->Established() 将连接标记为已建立,可能包括:更改连接状态为CONNECTED,启用读事件监听,调用连接建立回调函数,保存连接:将新连接添加到连接管理容器中,使用连接ID作为键,这样服务器可以通过ID快速查找和管理连接),然后调用Listen接口用来监听新连接的到来

这个main_loop在构造的时候创建了_event_channel(使用智能指针管理),_poller,_event_fd,_thread_id(初始化为当前线程的ID,),_task,_mutex,_timer_wheel,然后给_event_channel绑定可读事件回调函数,并且启动eventfd的读事件监控

这个_poller在构造的时候创建了_epfd存储由epoll_create()返回的epoll文件描述符的整数,_evs是一个epoll_event结构体数组,用于存储事件,_channels是一个哈希表,将文件描述符(int)与Channel指针关联起来,使用epoll_create()创建一个epoll实例

这个_pool在构造的时候创建了 _thread_count;线程池中IO线程的数量, _next_idx用于轮询分配线程的下标(实现负载均衡),EventLoop *_baseloop指向主线程的EventLoop,std::vector<LoopThread*> _threads保存所有IO线程对象的指针,std::vector<EventLoop *> _loops保存所有IO线程对应的EventLoop指针

这个_acceptor在构造的时候创建了_socket用于创建监听套接字,EventLoop *_loop用于对监听套接字进行事件监控,Channel _channel用于对监听套接字进行事件管理 _socket(CreateServer(port))创建监听socket并绑定端口, _loop(loop)保存主事件循环指针,_channel(loop, _socket.Fd())创建Channel,管理监听socket的事件,并且 _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this))它使用 std::bind 将 Acceptor 类的 HandleRead 成员函数绑定为当通道上有数据可读时将被调用的回调函数。

这个socket在构造的时候会创一个文件描述符_sockfd,当调用CreateServer会创建套接字,绑定地址,开始监听,设置非阻塞,启动地址重用

这个_channel在构造的时候创建了个_fd,*_loop指针,需要监控的事件类型_events,已经触发的事件类型_revents,各种事件回调函数的设置,并且初始化_loop,将Channel与特定的事件循环(EventLoop)关联起来。以便这个loop能管理和调度该Channel上的事件。

这个conn在构造的时候,会把该conn注册到传入的loop中,同时也会创建一个channel,把这个channel注册到这个loop中,同时也需要把sockfd注册到channel上,因为Channel知道它要监听哪个文件描述符(sockfd),Channel知道它的事件应该由哪个事件循环(loop)处理,并且给这个channel绑定上读/写/错误/关闭的事件回调函数,然后会创建 _in_buffer用户态输入缓冲区---存放从socket中读取到的数据。_out_buffer用户态输出缓冲区---存放要发送给对端的数据。当客户端发送数据时,SubLoop检测到读事件,调用lcpconnection::handleRead进行数据的读取,操作系统内核接收数据,暂存在socket的接收缓冲区,然后再把数据被读入InputBuffer,调用用户注册的messageCallback,将数据传递给上层应用处理,上层应用根据自己的业务逻辑处理数据。然后服务器构建好了要把数据响应给上层应用调用TcpConnection::send()发送数据,数据被写入OutputBuffer,然后把OutputBuffer的数据尝试发送到内核缓冲区,如果不能完全发送,则:将剩余数据留在OutputBuffer中,向Poller注册该连接socket的写事件,当socket可写时,SubLoop检测到写事件,继续发送OutputBuffer中的数据,数据发送完毕后,取消写事件关注

测试代码

#include "m_server.h"

// 简单的回显服务器测试
class EchoServer {
private:
    TcpServer _server;

public:
    EchoServer(int port) : _server(port) {
        // 设置服务器回调函数
        _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));
        _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
        _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));
        
        // 启用非活跃连接超时释放(60秒)
        _server.EnableInactiveRelease(60);
        
        // 设置工作线程数量
        _server.SetThreadCount(4);
    }

    // 连接建立回调
    void OnConnected(const PtrConnection& conn) {
        INF_LOG("新连接建立: %d", conn->Id());
        // 发送欢迎消息
        std::string welcome = "欢迎连接到回显服务器!\n";
        conn->Send(welcome.c_str(), welcome.size());
    }

    // 消息处理回调
    void OnMessage(const PtrConnection& conn, Buffer* buf) {
        // 读取所有可读数据
        std::string msg = buf->ReadAsStringAndPop(buf->ReadAbleSize());
        INF_LOG("收到消息[%d]: %s", conn->Id(), msg.c_str());
        
        // 回显消息
        std::string echo_prefix = "回显: ";
        std::string response = echo_prefix + msg;
        conn->Send(response.c_str(), response.size());
    }

    // 连接关闭回调
    void OnClosed(const PtrConnection& conn) {
        INF_LOG("连接关闭: %d", conn->Id());
    }

    // 启动服务器
    void Start() {
        INF_LOG("回显服务器启动在端口: %d", 8080);
        _server.Start();
    }
};

int main(int argc, char* argv[]) {
    int port = 8080;
    
    if (argc > 1) {
        port = atoi(argv[1]);
    }
    
    INF_LOG("启动回显服务器,端口: %d", port);
    
    // 创建并启动服务器
    EchoServer server(port);
    server.Start();
    
    return 0;
}

TcpServer初始化阶段详解

 TcpServer构造函数执行流程

private:
        uint64_t _next_id;      //这是一个自动增长的连接ID,
        int _port;
        int _timeout;           //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接
        bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志
        EventLoop _baseloop;    //这是主线程的EventLoop对象,负责监听事件的处理
        Acceptor _acceptor;    //这是监听套接字的管理对象
        LoopThreadPool _pool;   //这是从属EventLoop线程池
        std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象

        using ConnectedCallback = std::function<void(const PtrConnection&)>;
        using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;
        using ClosedCallback = std::function<void(const PtrConnection&)>;
        using AnyEventCallback = std::function<void(const PtrConnection&)>;
        using Functor = std::function<void()>;
        ConnectedCallback _connected_callback;
        MessageCallback _message_callback;
        ClosedCallback _closed_callback;
        AnyEventCallback _event_callback;
TcpServer(int port):
    _port(port),
    _next_id(0),
    _enable_inactive_release(false),
    _acceptor(&_baseloop, port),
    _pool(&_baseloop) {
    _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
    _acceptor.Listen();//将监听套接字挂到baseloop上
}

详细步骤:

成员初始化:

  • _port: 存储服务器监听端口
  • _next_id: 连接ID计数器初始化为0
  • _enable_inactive_release: 非活跃连接释放标志设为false

当TcpServer构造函数中初始化_baseloop成员变量时,会创建EventLoop对象:

EventLoop::EventLoop():
    _thread_id(std::this_thread::get_id()), 
    _event_fd(CreateEventFd()), 
    _event_channel(new Channel(this, _event_fd)),
    _poller(),  // 创建Poller对象
    _timer_wheel(this) {
    
    // 设置eventfd的读事件回调
    _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
    
    // 启动eventfd的读事件监控
    _event_channel->EnableRead();
}

EventLoop创建详细步骤:

初始化线程ID: 

   _thread_id = std::this_thread::get_id()
  • 记录创建EventLoop的线程ID,用于后续线程安全检查 

创建eventfd:

   _event_fd = CreateEventFd()
   
   static int CreateEventFd() {
       int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
       if (efd < 0) {
           ERR_LOG("CREATE EVENTFD FAILED!!");
           abort();
       }
       return efd;
   }
  • 创建一个eventfd用于线程间通信,设置为非阻塞模式

创建event_channel:

   _event_channel = new Channel(this, _event_fd)
  • 创建一个Channel对象管理eventfd的事件

创建Poller对象:

   _poller()  // 默认构造函数
  • 创建Poller对象用于事件监控

创建TimerWheel对象

   _timer_wheel(this)
  •  创建定时器管理对象

设置eventfd回调:

   _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
  •  设置eventfd的读事件回调函数

启用eventfd读事件监控:

   _event_channel->EnableRead();
  • 启用eventfd的读事件监控,这会将eventfd添加到Poller中 

 Poller创建过程

当EventLoop构造函数中初始化_poller成员变量时,会创建Poller对象:

Poller::Poller() {
    _epfd = epoll_create(MAX_EPOLLEVENTS);
    if (_epfd < 0) {
        ERR_LOG("EPOLL CREATE FAILED!!");
        abort();
    }
}

Channel与Poller的关联

当调用Channel::EnableRead()等方法时,会将Channel添加到Poller中:

      Acceptor创建过程:

      • 传入&_baseloop和port构造Acceptor
      • Acceptor构造函数内部用CreateServer(port)创建监听套接字:
      Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), 
          _channel(loop, _socket.Fd()) {
          _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
      }
      • CreateServer 内部调用:
      int CreateServer(int port) {
          bool ret = _socket.CreateServer(port);
          assert(ret == true);
          return _socket.Fd();
      }
      • Socket::CreateServer 执行五个关键操作:
      bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {
          //1. 创建套接字
          if (Create() == false) return false;
          //2. 设置非阻塞(如果需要)
          if (block_flag) NonBlock();
          //3. 绑定地址
          if (Bind(ip, port) == false) return false;
          //4. 开始监听(默认最大连接队列1024)
          if (Listen() == false) return false;
          //5. 启动地址端口重用
          ReuseAddress();
          return true;
      }
      • 创建Channel对象管理监听套接字,设置读事件回调函数为 Acceptor::HandleRead

      Channel构造:

         Channel(EventLoop *loop, int fd): _fd(fd), _events(0), _revents(0), _loop(loop) {}
        • 将监听套接字的文件描述符与Channel关联
        • 初始化事件标志和关联的EventLoop
        • 此时还未设置任何监控事件(events = 0)

        设置回调函数

           _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
        • 将Acceptor::HandleRead方法绑定为Channel的读事件回调
        • 当监听套接字可读时(有新连接到达),会触发此回调

         HandleRead实现

           void HandleRead() {
               int newfd = _socket.Accept();
               if (newfd < 0) {
                   return;
               }
               if (_accept_callback) _accept_callback(newfd);
           }
        • 接受新连接,获取新连接的文件描述符
        • 调用预设的_accept_callback处理新连接(即TcpServer::NewConnection)

        线程池创建:

        • 构造 pool(&_baseloop),传入主事件循环指针
        • 此时线程池只是初始化,没有创建工作线程,等待后续 SetThreadCount 和 Create 调用

        设置Acceptor回调函数

        _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));

        详细解析:

        回调绑定机制:

        • 使用 std::bind 创建函数对象,绑定 TcpServer::NewConnection 成员函数
        • this 指针作为第一个参数,确保回调能访问到TcpServer对象
        • std::placeholders::1 占位符表示将来自Acceptor的新连接fd传给NewConnection

        NewConnection函数职责:

        void NewConnection(int fd) {
            _next_id++;
            PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
            conn->SetMessageCallback(_message_callback);
            conn->SetClosedCallback(_closed_callback);
            conn->SetConnectedCallback(_connected_callback);
            conn->SetAnyEventCallback(_event_callback);
            conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
            if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);
            conn->Established();
            _conns.insert(std::make_pair(_next_id, conn));
        }

        启动监听

        _acceptor.Listen();

        内部实现详解:

        Acceptor::Listen内部:

           void Listen() { _channel.EnableRead(); }
        • 调用Channel的EnableRead方法启动监听套接字的读事件监控

        Channel::EnableRead内部:

             void EnableRead() { _events |= EPOLLIN; Update(); }
        • 将EPOLLIN标志添加到_events中
        • 调用Update更新事件监控

        Channel::Update内部: 

             void Update() { return _loop->UpdateEvent(this); }
        • 调用EventLoop的UpdateEvent方法

        EventLoop::UpdateEvent内部:

        void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
        • 调用Poller的UpdateEvent方法

        Poller::UpdateEvent内部:

        void UpdateEvent(Channel *channel) {
            bool ret = HasChannel(channel);
            if (ret == false) {
                //不存在则添加
                _channels.insert(std::make_pair(channel->Fd(), channel));
                return Update(channel, EPOLL_CTL_ADD);
            }
            return Update(channel, EPOLL_CTL_MOD);
        }

         

        • 检查Channel是否已在监控集合中
        • 如果不存在, 添加到_channels映射并调用epoll_ctl添加监控
        • 如果存在, 调用epoll_ctl修改监控事件

        Poller::Update内部:

        void Update(Channel *channel, int op) {
            int fd = channel->Fd();
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = channel->Events();
            int ret = epoll_ctl(_epfd, op, fd, &ev);
            if (ret < 0) {
                ERR_LOG("EPOLLCTL FAILED!");
            }
            return;
        }
        • 准备epoll_event结构
        • 调用epoll_ctl将监听套接字添加到epoll实例
        • 此时监听套接字的可读事件(有新连接)开始被监控

        启动服务器完整流程

        void Start() {
            _pool.Create();  // 创建线程池中的工作线程
            _baseloop.Start(); // 启动主事件循环
        }

         线程池创建:

        void Create() {
            if (_thread_count > 0) {
                _threads.resize(_thread_count);
                _loops.resize(_thread_count);
                for (int i = 0; i < _thread_count; i++) {
                    _threads[i] = new LoopThread();
                    _loops[i] = _threads[i]->GetLoop();
                }
            }
            return ;
        }
        • 线程池创建指定数量的LoopThread
        • 调用GetLoop获取并保存每个线程的EventLoop指针
        • GetLoop会等待线程创建并初始化EventLoop
        class LoopThread {
        private:
            std::mutex _mutex;          // 互斥锁
            std::condition_variable _cond;   // 条件变量
            EventLoop *_loop;       // EventLoop指针变量,这个对象需要在线程内实例化
            std::thread _thread;    // EventLoop对应的线程
        
        private:
            void ThreadEntry() {
                EventLoop loop;
                {
                    std::unique_lock<std::mutex> lock(_mutex);//加锁
                    _loop = &loop;
                    _cond.notify_all();
                }
                loop.Start();
            }
        
        public:
            LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
            
            EventLoop *GetLoop() {
                EventLoop *loop = NULL;
                {
                    std::unique_lock<std::mutex> lock(_mutex);//加锁
                    _cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞
                    loop = _loop;
                }
                return loop;
            }
        };

         线程创建:

           LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
        • 构造函数初始化_loop为NULL
        • 创建线程,给这个线程绑定入口函数为ThreadEntry
        • 线程立即开始执行
           void ThreadEntry() {
               EventLoop loop;  // 在线程栈上创建EventLoop对象
               {
                   std::unique_lock<std::mutex> lock(_mutex);  // 加锁
                   _loop = &loop;  // 将指针指向线程栈上的EventLoop
                   _cond.notify_all();  // 通知等待的线程
               }
               loop.Start();  // 开始事件循环
           }
        • 在线程内部创建EventLoop对象
        • 通过互斥锁保护设置_loop指针
        • 通知可能等待的GetLoop()调用
        • 启动事件循环

         获取EventLoop指针

           EventLoop *GetLoop() {
               EventLoop *loop = NULL;
               {
                   std::unique_lock<std::mutex> lock(_mutex);  // 加锁
                   _cond.wait(lock, [&](){ return _loop != NULL; });  // loop为NULL就一直阻塞
                   loop = _loop;
               }
               return loop;
           }
        • 主线程调用此函数获取工作线程的EventLoop
        • 使用条件变量等待EventLoop创建完成
        • 返回EventLoop指针

        主事件循环启动:

        void Start() {
            while(1) {
                //1. 事件监控
                std::vector<Channel *> actives;
                _poller.Poll(&actives);
                //2. 事件处理
                for (auto &channel : actives) {
                    channel->HandleEvent();
                }
        • 进入无限循环,监控事件、处理就绪事件、执行任务队列
        • 此时服务器完全启动,等待客户端连接 

        连接建立流程详解

        新连接到达流程

        新连接到达,监听套接字可读事件触发,

        Acceptor::HandleRead被调用

        void HandleRead() {
            int newfd = _socket.Accept();
            if (newfd < 0) {
                return;
            }
            if (_accept_callback) _accept_callback(newfd);
        }
        • Poller检测到监听套接字可读(有新连接请求)
        • EventLoop的事件循环调用Channel::HandleEvent
        • Channel根据事件类型调用Acceptor::HandleRead
        • 调用Socket::Accept获取新连接的文件描述符
        • 触发_accept_callback即TcpServer::NewConnection

        Socket::Accept实现:

        int Accept() {
            // int accept(int sockfd, struct sockaddr *addr, socklen_t *len);
            int newfd = accept(_sockfd, NULL, NULL);
            if (newfd < 0) {
                ERR_LOG("SOCKET ACCEPT FAILED!");
                return -1;
            }
            return newfd;
        }
        • 调用系统调用accept接受新连接
        • 返回新连接的文件描述符

        连接初始化详解

        1. TcpServer::NewConnection实现:
        void NewConnection(int fd) {
            _next_id++;
            PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
            conn->SetMessageCallback(_message_callback);
            conn->SetClosedCallback(_closed_callback);
            conn->SetConnectedCallback(_connected_callback);
            conn->SetAnyEventCallback(_event_callback);
            conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
            if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);
            conn->Established();
            _conns.insert(std::make_pair(_next_id, conn));
        }
        1. 工作线程选择机制:
        EventLoop* NextLoop() {
            if (_thread_count == 0) {
                return _baseloop;
            }
            _next_idx = (_next_idx + 1) % _thread_count;
            return _loops[_next_idx];
        }
        • 如果没有工作线程,使用主线程的EventLoop
        • 否则采用简单的轮询算法选择一个工作线程
        • 确保连接均匀分布在各个线程中
        1. Connection构造过程:
        Connection(EventLoop *loop, uint64_t conn_id, int sockfd):
            _conn_id(conn_id), 
            _sockfd(sockfd),
            _enable_inactive_release(false), 
            _loop(loop), 
            _statu(CONNECTING), 
            _socket(_sockfd),
            _channel(loop, _sockfd) {
            _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
            _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
            _channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
            _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
            _channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
        }
        • 初始化连接ID和状态
        • 创建Socket和Channel对象管理连接
        • 设置Channel的各种事件回调
        1. 非活跃连接释放机制:
        void EnableInactiveRelease(int sec) {
            _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
        }
        
        void EnableInactiveReleaseInLoop(int sec) {
            _enable_inactive_release = true;
            if (_loop->HasTimer(_conn_id)) {
                return _loop->TimerRefresh(_conn_id);
            }
            _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
        }
        • 将操作封装为任务,确保在正确的线程中执行
        • 添加定时器任务,超时后自动释放连接
        • 使用连接ID作为定时器ID

        连接就绪详解

        1. Connection::Established实现:
        void Established() {
            _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
        }
        • 使用RunInLoop确保在连接所属的EventLoop线程中执行
        1. Connection::EstablishedInLoop实现:
        void EstablishedInLoop() {
            assert(_statu == CONNECTING);
            _statu = CONNECTED;
            _channel.EnableRead();
            if (_connected_callback) _connected_callback(shared_from_this());
        }
        • 断言确保当前状态为CONNECTING
        • 将状态更新为CONNECTED
        • 调用Channel::EnableRead启动读事件监控:
          void EnableRead() { _events |= EPOLLIN; Update(); }
        • 最后调用用户设置的连接建立回调
        1. shared_from_this机制:
        _connected_callback(shared_from_this())
        • Connection继承自std::enable_shared_from_this
        • shared_from_this()返回管理当前对象的shared_ptr
        • 确保回调中使用的Connection对象生命周期安全
        1. 回调函数调用:
        • 回调函数是在连接分配到的工作线程中执行的
        • 这确保了每个连接的所有操作都在同一个线程中进行
        • 避免了多线程并发访问导致的竞态条件
        1. 连接保存到管理表:
        _conns.insert(std::make_pair(_next_id, conn));
        • 将连接保存到TcpServer的_conns哈希表中
        • 使用连接ID作为键,便于后续查找和管理

        数据收发流程详解

        数据接收流程

        1. 可读事件触发,Connection::HandleRead被调用

        当客户端发送数据到服务器时,连接对应的socket变为可读状态,触发事件处理:

        // Channel::HandleEvent内部逻辑(事件分发)
        void HandleEvent() {
            if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
                if (_read_callback) _read_callback();
            }
            // ...其他事件处理
        }
        • Poller检测到连接socket可读
        • EventLoop调用对应Channel的HandleEvent
        • Channel根据事件类型调用Connection::HandleRead

        Connection::HandleRead实现详解

        void HandleRead() {
            // 1. 接收socket的数据,放到缓冲区
            char buf[65536];
            ssize_t ret = _socket.NonBlockRecv(buf, 65535);
            if (ret < 0) {
                // 出错了,不能直接关闭连接
                return ShutdownInLoop();
            }
            // 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动
            _in_buffer.WriteAndPush(buf, ret);
            
            // 2. 调用message_callback进行业务处理
            if (_in_buffer.ReadAbleSize() > 0) {
                // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象
                return _message_callback(shared_from_this(), &_in_buffer);
            }
        }

         Socket::NonBlockRecv实现

        ssize_t NonBlockRecv(void *buf, size_t len) {
            return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
        }
        
        ssize_t Recv(void *buf, size_t len, int flag = 0) {
            // ssize_t recv(int sockfd, void *buf, size_t len, int flag);
            ssize_t ret = recv(_sockfd, buf, len, flag);
            if (ret <= 0) {
                // EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误
                // EINTR  表示当前socket的阻塞等待,被信号打断了
                if (errno == EAGAIN || errno == EINTR) {
                    return 0; // 表示这次接收没有接收到数据
                }
                ERR_LOG("SOCKET RECV FAILED!!");
                return -1;
            }
            return ret; // 实际接收的数据长度
        }

         Buffer::WriteAndPush实现

        void WriteAndPush(const void *data, uint64_t len) {
            Write(data, len);
            MoveWriteOffset(len);
        }
        
        void Write(const void *data, uint64_t len) {
            // 1. 保证有足够空间,2. 拷贝数据进去
            if (len == 0) return;
            EnsureWriteSpace(len);
            const char *d = (const char *)data;
            std::copy(d, d + len, WritePosition());
        }

         调用用户消息回调

        _message_callback(shared_from_this(), &_in_buffer);
        • 传递连接管理对象和输入缓冲区
        • 在回显服务器示例中,对应EchoServer::OnMessage
        • 用户在回调中可以读取缓冲区数据并进行处理

         数据发送流程

        用户调用Connection::Send发送数据

        void Send(const char *data, size_t len) {
            Buffer buf;
            buf.WriteAndPush(data, len);
            _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
        }
        • 创建临时Buffer对象存储数据
        • 使用std::move优化性能,避免拷贝
        • 通过RunInLoop确保在正确的线程中执行SendInLoop

        Connection::SendInLoop实现

        void SendInLoop(Buffer &buf) {
            if (_statu == DISCONNECTED) return;
            _out_buffer.WriteBufferAndPush(buf);
            if (_channel.WriteAble() == false) {
                _channel.EnableWrite();
            }
        }
        • 检查连接状态,断开则不发送
        • 将数据写入输出缓冲区
        • 如果未启动写事件监控,则启动

        Channel::EnableWrite实现

        void EnableWrite() { _events |= EPOLLOUT; Update(); }
        • 添加EPOLLOUT事件到监控
        • 通过Update更新事件监控

        连接可写事件触发,Connection::HandleWrite被调用

        void HandleWrite() {
            // _out_buffer中保存的数据就是要发送的数据
            ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
            if (ret < 0) {
                // 发送错误就该关闭连接了
                if (_in_buffer.ReadAbleSize() > 0) {
                    _message_callback(shared_from_this(), &_in_buffer);
                }
                return Release(); // 这时候就是实际的关闭释放操作了
            }
            _out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动
            if (_out_buffer.ReadAbleSize() == 0) {
                _channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控
                // 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放
                if (_statu == DISCONNECTING) {
                    return Release();
                }
            }
            return;
        }

         Socket::NonBlockSend实现

        ssize_t NonBlockSend(void *buf, size_t len) {
            if (len == 0) return 0;
            return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞
        }
        
        ssize_t Send(const void *buf, size_t len, int flag = 0) {
            // ssize_t send(int sockfd, void *data, size_t len, int flag);
            ssize_t ret = send(_sockfd, buf, len, flag);
            if (ret < 0) {
                if (errno == EAGAIN || errno == EINTR) {
                    return 0;
                }
                ERR_LOG("SOCKET SEND FAILED!!");
                return -1;
            }
            return ret; // 实际发送的数据长度
        }

        处理发送结果

        • 如果发送出错(ret < 0):
        • 处理剩余输入数据
        • 调用Release关闭连接
        • 如果发送成功:
        • 更新缓冲区读偏移(MoveReadOffset(ret))
        • 如果缓冲区为空,关闭写事件监控(DisableWrite())
        • 如果连接状态为DISCONNECTING且所有数据已发送,释放连接

         Channel::DisableWrite实现

        void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
        • 从事件标志中移除EPOLLOUT
        • 通过Update更新事件监控

        关于Http模块的流程

        HTTP请求的接收流程

        客户端发送HTTP请求

        • 数据通过TCP连接到达服务器。

        数据进入内核接收缓冲区

        • 操作系统内核接收数据,暂存在socket的接收缓冲区。

        muduo通过read()读取数据到in_buffer

        • muduo检测到socket可读事件,调用read()把数据读到in_buffer。

        HTTP解析器处理in_buffer

        • muduo的HTTP模块会不断检查in_buffer,尝试解析出完整的HTTP请求(包括请求行,请求头,请求体)。
        • 如果数据不完整(比如POST体还没收全),就继续等待数据到来。

        解析出完整请求后,调用用户回调

        • 一旦解析出完整的HTTP请求,muduo会调用你注册的HTTP请求处理回调(如onRequest),把HttpRequest对象传给你的业务代码。

        HTTP响应的发送流程

        你的业务代码生成HttpResponse

        • 你在回调里构造HttpResponse对象,设置响应内容、状态码、头部等。

        muduo把HttpResponse序列化到out_buffer

        • muduo会把HttpResponse对象序列化成HTTP协议格式的字符串,写入out_buffer。

        尝试write()到socket

        • muduo尝试把out_buffer的数据写入socket(内核发送缓冲区)。
        • 如果一次写不完,剩余数据继续留在out_buffer,等待下次socket可写时再发。

        客户端收到HTTP响应

        • 数据通过网络传输到客户端,客户端解析HTTP响应。

        首先是会创建个TCPserver对象_server,然后就是然后这个_server成员列表会创建个_baseloop对象,_acceptor对象,_pool对象,会先执行它们的默认构造函数,base_loop对象的成员列表中会创建poller对象,channel对象,timer_wheel对象

        模块初始化都干了什么事情

        Poller对象的创建

        EventLoop对象的创建

         Channel对象的创建

        Socket对象的创建

        Acceptor对象的创建

        TcpServer对象的创建

        LoopThreadPool对象的创建

        LoopThread对象的创建

        Timer_wheel对象的创建

        Connection对象的创建

        1. 连接触发监听套接字的可读事件

        客户端发起连接请求(connect),操作系统内核接收到SYN包,完成三次握手后,监听套接字变为可读状态(有新连接待接受)。

        2. Poller检测到事件

        在主事件循环(baseloop)中:

        - _poller.Poll(&actives)检测到监听套接字的可读事件

        • 将监听套接字对应的Channel添加到活跃Channel列表中
        3. EventLoop处理就绪事件

        在主事件循环的事件处理阶段:

        for (auto &channel : actives) {
            channel->HandleEvent();
        }
        • 调用监听套接字Channel的HandleEvent方法
        • Channel根据就绪事件类型调用相应的回调函数
        4. Acceptor的HandleRead方法被调用

        监听套接字的可读事件触发了Channel的读回调,即Acceptor::HandleRead:

        void Acceptor::HandleRead() {
            int newfd = _socket.Accept();  // 接受新连接
            if (newfd < 0) {
                return;
            }
            if (_accept_callback) _accept_callback(newfd);  // 调用接受连接回调
        }
        5. TcpServer::NewConnection方法被调用

        Acceptor的接受连接回调是TcpServer::NewConnection:

        void TcpServer::NewConnection(int fd) {
            _next_id++;  // 生成新的连接ID
            
            // 选择一个EventLoop来处理这个连接(负载均衡)
            EventLoop* loop = _pool.NextLoop();
            
            // 创建Connection对象管理这个连接
            PtrConnection conn(new Connection(loop, _next_id, fd));
            
            // 设置Connection的各种回调函数
            conn->SetMessageCallback(_message_callback);
            conn->SetClosedCallback(_closed_callback);
            conn->SetConnectedCallback(_connected_callback);
            conn->SetAnyEventCallback(_event_callback);
            conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
            
            // 如果启用了非活跃连接超时释放,设置超时时间
            if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);
            
            // 完成连接初始化
            conn->Established();
            
            // 将连接添加到管理表
            _conns.insert(std::make_pair(_next_id, conn));
        }
        6. Connection对象创建

        当执行new Connection(loop, _next_id, fd)时:

        • 初始化成员变量:连接ID、文件描述符、EventLoop指针等
        • 创建Socket对象包装文件描述符
        • 创建Channel对象关联到EventLoop和连接套接字
        • 设置Channel的各种事件回调函数
        • 连接状态设置为CONNECTING
        7. Connection::Established方法执行
        void Connection::Established() {
            _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
        }
        • 将EstablishedInLoop任务放入连接所属的EventLoop的任务队列
        • 这确保了连接的初始化在正确的线程中执行
        8. Connection::EstablishedInLoop方法执行

        在连接所属的EventLoop线程中:

        void Connection::EstablishedInLoop() {
            assert(_statu == CONNECTING);  // 确保当前状态是CONNECTING
            _statu = CONNECTED;  // 修改连接状态为CONNECTED
            
            // 启动读事件监控
            _channel.EnableRead();
            
            // 调用连接建立回调函数
            if (_connected_callback) _connected_callback(shared_from_this());
        }
        9. 启动读事件监控

        当执行_channel.EnableRead()时:

        void Channel::EnableRead() { 
            _events |= EPOLLIN; 
            Update(); 
        }
        
        void Channel::Update() {
            // 通过EventLoop更新事件监控
            _loop->UpdateEvent(this);
        }
        10. 更新Poller中的事件监控

        EventLoop::UpdateEvent调用Poller::UpdateEvent:

        void Poller::UpdateEvent(Channel *channel) {
            bool ret = HasChannel(channel);
            if (ret == false) {
                // 不存在则添加
                _channels.insert(std::make_pair(channel->Fd(), channel));
                return Update(channel, EPOLL_CTL_ADD);
            }
            return Update(channel, EPOLL_CTL_MOD);
        }
        
        void Update(Channel *channel, int op) {
            int fd = channel->Fd();
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = channel->Events();
            int ret = epoll_ctl(_epfd, op, fd, &ev);
            if (ret < 0) {
                ERR_LOG("EPOLLCTL FAILED!!");
            }
            return;
        }
        • 将连接套接字添加到epoll的监控列表中
        • 设置监控的事件类型(EPOLLIN表示可读事件)
        11. 调用连接建立回调函数

        执行用户设置的连接建立回调函数:

        if (_connected_callback) _connected_callback(shared_from_this());
        • 将当前Connection的shared_ptr传递给回调函数
        • 用户可以在回调函数中处理连接建立事件
        12. 连接就绪,等待数据交互

        至此,新连接的处理流程完成:

        • 连接已被接受并分配了唯一ID
        • 连接被分配给了一个EventLoop进行管理
        • 连接的读事件监控已启动
        • 连接状态已设置为CONNECTED
        • 连接已添加到TcpServer的连接管理表中
        • 用户的连接建立回调函数已被调用

        连接现在处于就绪状态,可以进行数据收发。当有数据到达时,会触发连接套接字的可读事件,从而调用Connection::HandleRead方法处理数据。


        网站公告


        今日签到

        点亮在社区的每一天
        去签到