【项目】仿muduo库one thread one loop式并发服务器(上)

发布于:2025-06-19 ⋅ 阅读:(13) ⋅ 点赞:(0)

  📚 博主的专栏

🐧 Linux   |   🖥️ C++   |   📊 数据结构  | 💡C++ 算法 | 🅒 C 语言  | 🌐 计算机网络 |🗃️ mysql

项目文章:仿muduo库one thread one loop式并发服务器前置知识准备

思维导图与项目代码已上传至我的gitee

本文摘要:

本文详细介绍了网络编程中关键模块的设计与实现,包括Buffer缓冲区模块、Socket套接字模块、Channel事件管理模块、Poller描述符监控模块、EventLoop事件循环模块和TimerWheel定时器模块。Buffer模块采用vector<char>作为底层容器,避免string的'\0'截断问题,提供数据存储和管理功能;Socket模块封装了TCP套接字操作;Channel和Poller模块实现对描述符的事件监控管理;EventLoop模块采用one thread one loop模型,通过任务队列和eventfd实现线程安全的事件处理;TimerWheel模块提供定时任务功能。各模块通过回调机制协同工作,最终实现了一个高性能的网络服务器框架,并通过TCP客户端/服务器测试验证了功能完整性。代码已上传至Gitee仓库。由于文章字数过多,将新更新一篇文章讲解其余模块(Connection、Accepter、LoopThread、LoopThreadPool、TcpServer、EchoServer等模块.....)

项目代码会随文章内容同步更新,您也可以通过主页动态直接查看本文各模块对应的讲解代码(区间在6.13-6.16这几天)

本文介绍的部分项目结构如下图所示,有助于读者快速了解整体框架。

一、Buffer模块(缓冲区模块)

提供的功能:存储数据、取出数据、数据管理(自动扩展存储空间)

1.1 Buffer缓冲区设计思想:

内存空间管理:

采用vector<char>作为底层容器,原因如下:

  1. 连续内存特性:保证数据存储的连续性,提高访问效率
  2. 自动扩容机制:当存储空间不足时自动扩展
  3. 灵活大小调整:可根据实际需求精确控制缓冲区大小

不使用string的原因:

字符串终止问题:string以'\0'作为结束标志,在处理网络数据时可能:

  1. 网络数据中常包含二进制数据(如文件传输)
  2. 二进制数据可能包含多个'\0'字符使用string会导致数据截断,造成信息丢失

性能考虑:vector<char>更适合频繁的数据写入和读取操作

内存管理:vector提供更直接的内存访问和控制

要素与操作

要素

1.默认的空间大小

2.当前的读取数据位置----read

3.当前的写入数据位置----write

操作:

1.写入数据:

当前写入位置指向哪里,就从哪里开始写入 如果后续剩余空闲空间不够了

考虑整体缓冲区空闲空间是否足够(因为读位置也会向后偏移,前边有可能会有空闲空间)

  • 足够:将数据移动到起始位置即可
  • 不够:扩容,从当前写位置开始扩容足够大小 数据一旦写入成功,当前写位置,就要向后偏移

当下一次所要写入的数据大小超过总的剩余空间=read前+write后(注意,read指针之前的空间也包含在内,也就是读过的区域)时就扩容,否则就移动已写未读的内容至开始,read指针移动至开始,再继续写入。数据一旦写入成功,写位置向后偏移。

如图所示:

将内容写入: 

现在还有内容要写入,因为缓冲区已经没有空余位置,就对vector进行扩容:

2.读取数据

当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读

可读数据大小:当前写入位置,减去当前读取位置

1.2 Buffer缓冲区类功能接口设计

#define BUFFER_DEFAULT_SIZE 1024

class Buffer {
private:
    std::vector<char> _buffer;
    /*是一个相对偏移量,而不是绝对地址*/
    uint64_t _read_idx;   // 相对读偏移量
    uint64_t _writer_idx; // 相对写偏移量

public:
    // 构造函数
    Buffer() : _read_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}

    // 获取当前写位置地址:_Buffer的空间起始地址,加上写偏移量
    void* WritePosition();
    // 获取当前读位置地址
    void* ReadPosition();
    // 获取前沿空闲空间大小
    uint64_t HeadIdleSize();
    // 获取后沿空闲空间大小
    uint64_t TailIdleSize();
    // 获取可读数据大小
    uint64_t ReadableSize();
    // 将读偏移向后移动指定长度
    void MoveReadOffset(uint64_t len);
    // 将写偏移向后移动指定长度
    void MoveWriteOffset(uint64_t len);
    // 确保可写空间足够(移动+扩容)
    void EnsureWriteSpace(uint64_t len);
    // 写入数据
    void Write(void* data, uint64_t len);
    // 读取数据
    void Read(void* buf, uint64_t len);
    // 清理功能:读写偏移归0
    void Clear();
};

1.3 Buffer缓冲区类接口实现

#include <iostream>
#include <vector>
#include <cassert>
#include <cstring>
#include <string>

#define BUFFER_DEFAULT_SIZE 1024

class Buffer {
private:
    std::vector<char> _buffer;
    /*是一个相对偏移量,而不是绝对地址*/
    uint64_t _read_idx;   // 相对读偏移量
    uint64_t _writer_idx; // 相对写偏移量

public:
    // 构造函数
    Buffer() : _read_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
    char* Begin() {
        // &*_buffer.begin(): 迭代器buffer.begin()指向起始元素,*buffer.begin()
        // &*_buffer.begin()--->得到第0个元素的空间地址
        return &*_buffer.begin();
    }
    // 获取当前写位置地址:_Buffer的空间起始地址,加上写偏移量
    char* WritePosition() {
        // 将获取_buffer起始地址的过程用begin()封装起来
        return Begin() + _writer_idx;
    }
    // 获取当前读位置地址
    char* ReadPosition() { return Begin() + _read_idx; }
    // 获取前沿空闲空间大小:读偏移
    uint64_t HeadIdleSize() { return _read_idx; }
    // 获取后沿空闲空间大小:总体空间大小-写偏移
    uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
    // 获取可写空间大小
    uint64_t WritableSize() { return HeadIdleSize() + TailIdleSize(); }
    // 获取可读数据大小:写偏移-读偏移
    uint64_t ReadableSize() { return _writer_idx - _read_idx; }
    // 将读偏移向后移动指定长度
    void MoveReadOffset(uint64_t len) {
        // 向后移动的大小,必须小于可读数据大小
        assert(len <= ReadableSize());
        _read_idx += len;
    }
    // 将写偏移向后移动指定长度
    void MoveWriteOffset(uint64_t len) {
        // 向后移动的大小,必须小于当前后边的空闲空间大小
        assert(len <= TailIdleSize());

        _writer_idx += len;
    }
    // 确保可写空间足够(移动+扩容)
    void EnsureWriteSpace(uint64_t len) {
        if (TailIdleSize() >= len) {
            return;
        }
        // 末尾空闲空间不够:则判断加上前沿空闲空间大小是否足够,足够就挪动数据
        if (HeadIdleSize() + TailIdleSize() >= len) {
            // 移动数据至起始位置先保存好当前数据大小
            uint64_t rsz = ReadableSize(); // 先保存好当前数据大小
                                           // 将可读数据拷贝到起始位置
            // 将可读数据拷贝到起始位置,注意类型转换
            std::copy(ReadPosition(), ReadPosition() + rsz, Begin());
            _read_idx = 0;
            _writer_idx = rsz;
        } else {
            // 总体空间不够,需要扩容,不移动数据,直接给写偏移之后扩容足够空间:
            _buffer.resize(_writer_idx + len);
        }
    }
    // 写入数据
    void Write(const void* data, uint64_t len) {
        // 1.确保有足够的空间,2.拷贝数据进去
        EnsureWriteSpace(len);
        const char* d = (const char*)data;

        std::copy((char*)data, (char*)(d + len), WritePosition());
    }

    void WriteAndPush(const void* data, uint64_t len) {
        Write(data, len);
        MoveWriteOffset(len);
    }

    void WriteString(const std::string& data) {
        return Write(data.c_str(), data.size());
    }
    void WriteStringAndPush(const std::string& data) {
        WriteString(data);
        // std::cout << WritePosition() << std::endl;//测试
        MoveWriteOffset(data.size());
        // std::cout << ReadableSize() << std::endl;//测试
    }
    void WriteBuffer(Buffer& data) {
        return Write(data.ReadPosition(), data.ReadableSize());
    }
    void WriteBufferAndPush(Buffer& data) {
        WriteBuffer(data);
        MoveWriteOffset(data.ReadableSize());
    }
    // 读取数据
    void Read(void* buf, uint64_t len) {
        assert(len <= ReadableSize());
        // std::cout << "------" << std::endl;// 测试
        // std::cout << ReadPosition() << std::endl; // 测试
        std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);
        // std::cout << buf << std::endl; // 测试
    }

    // 读并弹出
    void ReadAndPop(void* buf, uint64_t len) {
        Read(buf, len);
        MoveReadOffset(len);
    }

    std::string ReadAsString(uint64_t len) {
        assert(len <= ReadableSize());
        std::string str;
        str.resize(len);
        Read(&str[0], len);
        // std::cout << "******" << std::endl; // 测试
        // std::cout << str << std::endl;      // 测试
        return str;
    }

    std::string ReadAsStringAndPop(uint64_t len) {
        assert(len <= ReadableSize());
        std::string str = ReadAsString(len);
        MoveReadOffset(len);
        // std::cout << "++++++" << std::endl; // 测试
        // std::cout << str << std::endl;      // 测试
        return str;
    }

    // 寻找换行字符
    char* FindCRLF() {
        char* res = (char*)memchr(ReadPosition(), '\n', ReadableSize());
        return res;
    }

    // 获取一行数据
    //  通常获取一行数据,这种情况针对的是
    std::string GetLine() {
        char* pos = FindCRLF();
        if (pos == nullptr) {
            return "";
        }
        //+1是为了把换行字符也取出
        return ReadAsString(pos - ReadPosition() + 1);
    }
    std::string GetLineAndPop() {
        std::string str = GetLine();
        MoveReadOffset(str.size());
        return str;
    }
    // 清理功能:读写偏移归0
    void Clear() {
        _read_idx = 0;
        _writer_idx = 0;
    }
};

1.4 Buffer缓冲区类编译与测试

#include "server.hpp"

// 测试1
// int main()
// {
//     Buffer buf;

//     // 功能测试
//     std::string str = "hello";
//     buf.WriteStringAndPush(str);

//     Buffer buf1;
//     buf1.WriteBufferAndPush(buf); // 拷贝构造函数也可以

//     std::string tmp;
//     // tmp = buf.ReadAsStringAndPop(buf.ReadableSize());
//     tmp = buf1.ReadAsStringAndPop(buf1.ReadableSize());

//     std::cout << tmp << std::endl; //hello
//     std::cout << buf.ReadableSize() << std::endl;//5
//     std::cout << buf1.ReadableSize() << std::endl;//0

//     return 0;
// }

// 测试扩容和获取行
int main() {
    Buffer buf;
    for (int i = 0; i < 300; i++) {
        std::string str = "hello" + std::to_string(i) + '\n';
        buf.WriteStringAndPush(str);
    }
    // 测试获取行

    while (buf.ReadableSize() > 0) {
        std::string line = buf.GetLineAndPop();
        std::cout << line << std::endl;
    }
    // 测试扩容
    // std::string tmp;
    // tmp = buf.ReadAsStringAndPop(buf.ReadableSize());
    // std::cout << tmp << std::endl;

    return 0;
}

二、日志宏的实现

日志编写可以看我的这篇博客,有详细的解说

2.1 我们这里以上面的测试为例,添加宏:

#define LOG(format, ...)                                                       \
    fprintf(stdout, "[%s:%d] " format, __FILE__, __LINE__, __VA_ARGS__)

输出:

2.2 没有不定参数:##__VA_ARGS__

#define LOG(format, ...)                                                       \
    fprintf(stdout, "[%s:%d] " format, __FILE__, __LINE__, ##__VA_ARGS__)

输出:

2.3 添加时间:#include<ctime>

#define LOG(format, ...)                                                                    \
    do                                                                                      \
    {                                                                                       \
        time_t t = time(nullptr);                                                           \
        struct tm *ltm = localtime(&t);                                                     \
        char tmp[32] = {0};                                                                 \
        strftime(tmp, 31, "%H:%M:%S", ltm);                                                 \
        fprintf(stdout, "[%s:%s:%d] " format "\n", tmp, __FILE__, __LINE__, ##__VA_ARGS__); \
    } while (0)

效果:

[15:52:09:main.cc:53] hi, pupu

2.4 添加等级

以下是一个简单的日志打印宏,可以控制日志等级

#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG // 控制: 调整打印等级至INF的等级才能打印

#define LOG(LEVEL, format, ...)                                                \
    do {                                                                       \
        if (LEVEL < LOG_LEVEL)                                                 \
            break;                                                             \
        time_t t = time(nullptr);                                              \
        struct tm* ltm = localtime(&t);                                        \
        char tmp[32] = {0};                                                    \
        strftime(tmp, 31, "%H:%M:%S", ltm);                                    \
        fprintf(stdout, "[%s:%s:%d] " format "\n", tmp, __FILE__, __LINE__,           \
         ##__VA_ARGS__);                                                       \
    } while (0)

#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)

测试:因为我设置的打印等级需要INF以上,所以无输出

        INF_LOG("hi, pupu");

三、Socket套接字模块 

Socket套接字的详细解说可以看我的这几篇文章:

UDP套接字编程 TCP套接字编程 封装好的Socket.hpp,构建网络版计算器

功能:对套接字的操作进行封装

意义:程序中对于套接字的各项操作更加简便

3.1 Socket套接字类功能设计

#define Max_LISTEN 1024
class Socket {
private:
    int _sockfd;

public:
    // 套接字基础操作

    Socket();
    Socket(int fd);
    ~Socket();
    // 创建套接字
    bool Create();
    // 绑定地址信息
    bool Bind(const std::string& ip, uint16_t port);
    // 开始监听
    bool Listen(int backlog);
    // 向服务器发起连接
    bool Connect(const std::string& ip, uint16_t port);
    // 获取新连接
    int Accept(); // 返回文件描述符
    // 接收数据
    ssize_t
    Recv(void* buf, size_t len,
         int flag = 0); // ssize_t:有符号的长整型, flag = 0:默认阻塞操作
    // 发送数据
    ssize_t Send(void* buf, size_t len, int flag = 0);
    // 关闭套接字
    void Close();

    // 套接字其余重要操作
    // 创建一个服务端(监听)连接
    bool CreateServer(uint16_t port,
                      const std::string& ip =
                          "0.0.0.0"); // 主机一般会绑定当前主机上所有的网卡
    // 创建一个客户端连接
    bool CreateClient(uint16_t port, const std::string& ip);
    // 设置套接字选项--开启地址端口重用
    void ReuseAddress();
    // 设置套接字阻塞属性--设置为非阻塞(因为默认是阻塞)
    void NonBlock();
};

3.2 Socket套接字类接口实现

#include <sys/socket.h>
#include <sys/types.h>

#include <arpa/inet.h> //字节序转换
#include <fcntl.h>
#include <netinet/in.h>
#include <unistd.h>

#define Max_LISTEN 1024
class Socket {
private:
    int _sockfd;

public:
    Socket() : _sockfd(-1) {}
    Socket(int fd) : _sockfd(fd) {}
    ~Socket() { Close(); }
    int Fd() { return _sockfd; }
    // 创建套接字
    bool Create() {
        // int socket(int domain ,int type, int protocol);
        _sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd < 0) {
            ERR_LOG("CREATE SOCKET FAILED...");
            return false;
        }
        return true;
    }
    // 绑定地址信息
    bool Bind(const std::string& ip, uint16_t port) {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        // int bind(int sockfd, struct sockaddr *addr, socklen_t len);
        int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
        if (ret < 0) {
            ERR_LOG("BIND ADDRESS FAILED...");
            return false;
        }
        return true;
    }
    // 开始监听
    bool Listen(int backlog) {
        // int listen(int socket, int backlog); //
        // backlog:设置以下同一时间最大并发连接数
        int ret = listen(_sockfd, backlog);
        if (ret < 0) {
            ERR_LOG("SOCKET LISTEN FAILED...");
            return false;
        }
        return true;
    }
    // 向服务器发起连接
    bool Connect(const std::string& ip, uint16_t port) {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        // int connect(int sockfd, struct sockaddr *addr, socklen_t len);
        int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
        if (ret < 0) {
            ERR_LOG("CONNECT ADDRESS FAILED...");
            return false;
        }
        return true;
    }
    // 获取新连接: 返回文件描述符
    int Accept() {
        // int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
        int newfd = accept(_sockfd, nullptr, nullptr);
        if (newfd < 0) {
            ERR_LOG("SOCKET ACCEPT FAILED...");
            return -1;
        }
        return newfd;
    }
    // 接收数据
    // ssize_t:有符号的长整型, flag = 0:默认阻塞操作
    ssize_t Recv(void* buf, size_t len, int flag = 0) {
        // ssize_t recv(int sockfd, void *buf, size_t len, int flags);
        ssize_t ret = recv(_sockfd, buf, len, flag);
        if (ret < 0) {
            // EAGAIN:
            // 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误
            // EINTR: 当前socket的阻塞等待,被信号打断了,
            if (errno == EAGAIN || errno == EINTR) {
                ERR_LOG("SOCKET RECV NULL...");

                return 0; // 表示这次没有接收到数据
            }
            ERR_LOG("SOCKET RECV FAILED...");
            return -1;
        }
        // INF_LOG("%ld", ret);
        return ret; // 实际接收到的数据长度
    }

    // 非阻塞接收数据
    ssize_t NonBlockRecv(void* buf, size_t len) {
        return Recv(buf, len,
                    MSG_DONTWAIT); // MSG_DONTWAIT: 表示当前接收为非阻塞
    }

    // 发送数据
    ssize_t Send(const void* buf, size_t len, int flag = 0) {
        // ssize_t send(int sockfd, const void *buf, size_t len, int flags);
        ssize_t ret = send(_sockfd, buf, len, flag);
        if (ret < 0) {
            if (errno == EAGAIN || errno == EINTR) {
                ERR_LOG("SOCKET RECV NULL...");

                return 0; // 表示这次没有接收到数据
            }
            ERR_LOG("SOCKET SEND FAILED...");
            return -1;
        }
        // ret:
        // 实际发送的数据长度,有可能buf中数据并没有发送完,外部可以根据实际发送长度来决定如何继续处理
        // std::cout << ret << std::endl;
        return ret;
    }
    // 非阻塞发送数据
    ssize_t NonBlockSend(void* buf, size_t len) {
        return Send(buf, len,
                    MSG_DONTWAIT); // MSG_DONTWAIT: 表示当前发送为非阻塞
    }

    // 关闭套接字
    void Close() {
        if (_sockfd != -1) {
            close(_sockfd);
            _sockfd = -1;
        }
    }
    // 套接字其余重要操作
    //  创建一个服务端(监听)连接, 主机一般会绑定当前主机上所有的网卡
    bool CreateServer(uint16_t port, const std::string& ip = "0.0.0.0",
                      bool block_flag = false) {
        // 1.创建套接字 2.绑定地址 3.开始监听 4.设置非阻塞 5.启动地址重用
        if (Create() == false)
            return false;
        if (Bind(ip, port) == false)
            return false;
        if (Listen(1024) == false)
            return false;
        if (block_flag)
            NonBlock();
        ReuseAddress();
        return true;
    }
    // 创建一个客户端连接
    bool CreateClient(uint16_t port, const std::string& ip) {
        // 1.创建套接字 2.直接连接服务器 3.开始监听 4.设置非阻塞 5.启动地址重用
        if (Create() == false) {
            DBG_LOG("SOCKET CREATECLIENT FAILED...");
            return false;
        }

        if (Connect(ip, port) == false)
            return false;
        INF_LOG("SOCKET CREATECLIENT SUCCESS...");

        return true;
    }
    // 设置套接字选项--开启地址端口重用
    void ReuseAddress() {
        //  int setsockopt(int sockfd, int level, int optname,
        //               const void *optval, socklen_t optlen);

        int val = 1;
        // 地址重用
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int));
        val = 1;
        // 端口重用
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(int));
    }
    // 设置套接字阻塞属性--设置为非阻塞(因为默认是阻塞)
    void NonBlock() {
        // int fcntl(int fd, int cmd, ... /* arg */ );
        int flag = fcntl(_sockfd, F_GETFL, 0);
        fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    }
};

3.3 Socket套接字类编译测试

tcp_srv.cc

#include "../source/server.hpp"

int main() {
    Socket lst_sock;
    lst_sock.CreateServer(8500);
    while (1) {
        int newfd = lst_sock.Accept();
        if (newfd < 0) {
            continue;
        }
        Socket cli_sock(newfd);
        char buffer[1024] = {0};
        int ret = cli_sock.Recv(buffer, 1023);
        if (ret < 0) {
            cli_sock.Close();
            continue;
        }
        cli_sock.Send(buffer, ret);
        cli_sock.Close();
    }

    lst_sock.Close();

    return 0;
}

tcp_cli.cc

#include "../source/server.hpp"

int main() {
    Socket cli_sock;
    cli_sock.CreateClient(8500, "127.0.0.1");
    std::string str = "hi pupu";
    cli_sock.Send((void*)str.c_str(), str.size());
    char buf[1024] = {0};
    cli_sock.Recv(buf, 1023);
    DBG_LOG("%s", buf);
    return 0;
}

四、Channel模块

目的:对描述符的监控事件管理

功能:对于一个描述符进行监控事件管理

意义:对于描述符的监控事件在用户态更容易维护,以及触发事件后的操作流程更加的清晰

4.1 Channel事件管理类功能设计

a、对监控事件的管理

是否正在监测描述符的读状态

是否正在监测描述符的写状态

开始监测描述符的读事件

开始监测描述符的写事件

停止监测读事件

停止监测写事件

停止所有事件监测

b、对事件触发后的处理

1.需要处理的事件:可读、可写、挂断、错误、任意

2.事件处理的回调函数

将使用epoll进行事件监控,epoll的详解:

多路转接详解     epoll的机制与实现原理 epoll详细讲解、ET模式、基于Reactor设计模式实现高并发网络服务器

成员

EPOLLIN 可读
EPOLLOUT 可写
EPOLLRDHUP 连接断开
EPOLLPRI 优先数据
EPOLLERR 出错了
EPOLLHUP 挂断
而以上的事件都是一个数值 uint32_t 进行保存

要进行事件管理,就需要有一个uint32_t 类型的成员保存当前需要监控的事件

事件处理这里,因为有五种事件需要处理,就需要五个回调函数

class Channel {
private:
    int _fd;
    uint32_t _events;  // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件
    using EventCallBack = std::function<void()>;
    EventCallBack _read_callback;     // 可读事件被触发的回调函数
    EventCallBack _write_callback;    // 可写事件被触发的回调函数
    EventCallBack _error_callback;    // 错误事件被触发的回调函数
    EventCallBack _close_callback;    // 连接断开事件被触发的回调函数
    EventCallBack _anyevent_callback; // 任意事件被触发的回调函数
public:
    Channel();
    // 进行事件监控之后,fd就绪了的事件,
    // EventLoop就会通过这个接口把实际就绪的事件设置进来,根据revents判断什么事件就绪
    void SetREvent(uint32_t events){}
    // 设置事件回调,你传函数,我回调
    void SetReadCallBack(const EventCallBack& cb);
    void SetWriteCallBack(const EventCallBack& cb);
    void SetErrorCallBack(const EventCallBack& cb);
    void SetCloseCallBack(const EventCallBack& cb);
    void SetAnyEventCallBack(const EventCallBack& cb);
    // 当前是否监控可读
    bool Readable();
    // 当前是否监控可写
    bool Writable();
    // 启动读事件监控 -->挂到EventLoop上
    void EnableRead();
    // 启动写事件监控
    void EnableWrite();
    // 解除可读事件监控
    void DisableRead();
    // 解除可写事件监控
    void DisableWrite();
    // 解除所有事件监控
    void DisableAll();
    // 移除监控:从epoll红黑树上直接移除,从EventLoop上移除
    void Remove();
    // 事件处理:一旦连接触发了事件,就调用这个函数,EventLoop模块只管监控触发何种事件,调用Channel处理函数
    // 如何处理由Channel决定
    void HandleEvent();
};

4.2 Channel事件管理类接口实现

#include <functional>
#include <sys/epoll.h>
class Channel {
private:
    int _fd;
    uint32_t _events;  // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件
    using EventCallBack = std::function<void()>;
    EventCallBack _read_callback;     // 可读事件被触发的回调函数
    EventCallBack _write_callback;    // 可写事件被触发的回调函数
    EventCallBack _error_callback;    // 错误事件被触发的回调函数
    EventCallBack _close_callback;    // 连接断开事件被触发的回调函数
    EventCallBack _anyevent_callback; // 任意事件被触发的回调函数
public:
    Channel(int fd) : _fd(fd), _events(0), _revents(0) {}
    int Fd() { return _fd; }
    uint32_t Events() { return _events; } // 获取想要监控的事件
    
    // 进行事件监控之后,fd就绪了的事件,
    // EventLoop就会通过这个接口把实际就绪的事件设置进来,根据revents判断什么事件就绪
    void SetREvent(uint32_t events) { _revents = events; }
    // 设置事件回调,你传函数,我回调
    void SetReadCallBack(const EventCallBack& cb) { _read_callback = cb; }
    void SetWriteCallBack(const EventCallBack& cb) { _write_callback = cb; }
    void SetErrorCallBack(const EventCallBack& cb) { _error_callback = cb; }
    void SetCloseCallBack(const EventCallBack& cb) { _close_callback = cb; }
    void SetAnyEventCallBack(const EventCallBack& cb) { _anyevent_callback = cb;}
    // 是否正在监控描述符的读事件
    bool Readable() { return _events & EPOLLIN; }
    // 是否正在监控描述符的写事件
    bool Writable() { return _events & EPOLLOUT; }
    // 启动读事件描述符的监控 -->挂到EventLoop上
    void EnableRead() {
        _events |= EPOLLIN; // 后边会添加到EventLoop的事件监控中
    }

    // 启动写事件描述符的监控
    void EnableWrite() {
        _events |= EPOLLOUT; // 后边会添加到EventLoop的事件监控中
    }
    // 解除可读事件监控
    void DisableRead() {
        _events &= ~EPOLLIN; // 后边会修改到EventLoop的事件监控中
    }
    // 解除可写事件监控
    void DisableWrite() {
        _events &= ~EPOLLOUT; // 后边会修改到EventLoop的事件监控中
    }
    // 解除所有事件监控
    void DisableAll() { _events = 0; }
    // 移除监控:从epoll红黑树上直接移除,从EventLoop上移除
    void Remove() {
        // 后边会调用EventLoop接口来移除监控
    }
    // 事件处理:一旦连接触发了事件,就调用这个函数,EventLoop模块只管监控触发何种事件,调用Channel处理函数
    // 如何处理由Channel决定
    void HandleEvent() {
        if ((_revents & EPOLLIN) ||
            (_revents & EPOLLRDHUP || _revents & (EPOLLPRI))) {
            if (_read_callback)
                _read_callback();
            // 无论什么事件,都要调用的回调函数
            if (_anyevent_callback)
                _anyevent_callback();
        }
        /*有可能会出现释放连接的操作,一次只处理一个*/
        if ((_revents & EPOLLOUT)) {
            if (_write_callback)
                _write_callback();
            if (_anyevent_callback)
                _anyevent_callback(); // 放到事件处理完毕之后调用,用于刷新活跃度
        } else if ((_revents & EPOLLERR)) {
            if (_anyevent_callback)
                _anyevent_callback(); // 出错会释放连接,放到事件处理之前调用任意事件回调
            if (_error_callback)
                _error_callback();
        } else if ((_revents & EPOLLHUP)) {
            if (_anyevent_callback)
                _anyevent_callback();
            if (_close_callback)
                _close_callback();
        }
    }
};

在下个模块进行联合poller测试


五、Poller模块

5.1 Poller描述符监控类功能设计

功能:对epoll进行封装,让对描述符进行事件监控的操作更加简单

意义:通过epoll实现对描述符的IO事件监控

  1. 添加/修改 描述符的事件监控(不存在则添加,存在则修改)
  2. 移除描述符的事件监控

封装思想:

必须拥有一个epoll的操作句柄、

  • 创建一个新的 epoll 实例,返回一个文件描述符(_epfd),后续所有操作均基于此描述符。        

拥有一个struct epoll_event 结构数组,监控时保存所有的活跃事件

使用hash表管理描述符fd与描述符对应的事件管理Channel对象:

 逻辑流程:                                                

  • 当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件如何处理)当描述符就绪了,返回就绪描述符对应的Channel

  • 对描述符进行监控,通过Channel才能知道描述符需要监控什么事件

5.2 Poller描述符监控类接口设计

#define MAX_EPOLLEVENTS 1024
class Poller {
private:
    int _epfd;
    struct epoll_event _evs[MAX_EPOLLEVENTS];
    std::unordered_map<int, Channel*> _channels;

private:
    // 1.判断要更新事件的描述符是否存在,被监控
    bool HasChannel(Channel* channel) {}
    // 2.针对epoll直接操作(添加、修改、移除)
    void Update(Channel* channel, int op) {}

public:
    Poller();
    // 1.添加或更新描述符所监控的事件
    void UpdateEvent(Channel* channel);
    // 2.移除描述符的监控
    void RemoveEvent(Channel* channel);
    // 3.开始监控,获取就绪Channel,返回活跃连接
    void Poll(std::vector<Channel> active);
};

5.3 Poller描述符监控类接口实现与编译

#include <unordered_map>
#define MAX_EPOLLEVENTS 1024
class Poller {
private:
    int _epfd;
    struct epoll_event _evs[MAX_EPOLLEVENTS];
    std::unordered_map<int, Channel*> _channels;

private:
    // 1.判断要更新事件的描述符是否存在
    bool HasChannel(Channel* channel) {
        auto it = _channels.find(channel->Fd());
        if (it == _channels.end())
            return false;
        return true;
    }
    // 2.针对epoll直接操作(添加、修改、移除)
    void Update(Channel* channel, int op) {
        // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Events();
        int ret = epoll_ctl(_epfd, op, fd, &ev);
        if (ret < 0) {
            ERR_LOG("EPOLLCTL FAILED...");
        }
        return;
    }

public:
    Poller() {
        _epfd = epoll_create(MAX_EPOLLEVENTS);
        if (_epfd < 0) {
            ERR_LOG("EPOLL CREATE FAILED...");
            abort(); // 退出程序
        }
    }
    // 1.添加或更新描述符所监控的事件
    void UpdateEvent(Channel* channel) {
        bool ret = HasChannel(channel);
        if (ret == false) {
            //不存在则添加
            _channels.insert(std::make_pair(channel->Fd(), channel));
            return Update(channel, EPOLL_CTL_ADD);
        }
        return Update(channel, EPOLL_CTL_MOD);
    }
    // 2.移除描述符的监控
    void RemoveEvent(Channel* channel) {
        auto it = _channels.find(channel->Fd());
        if (it != _channels.end())
            _channels.erase(it);
        Update(channel, EPOLL_CTL_DEL);
    }
    // 3.开始监控,获取就绪Channel,返回活跃连接
    void Poll(std::vector<Channel*>* active) {
        //    int epoll_wait(int epfd, struct epoll_event *events,
        //                   int maxevents, int timeout);
        // 阻塞监控,无序设置超时时间, time_out = -1:阻塞监控
        int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
        if (nfds < 0) {
            if (errno == EINTR) {
                return;
            }
            ERR_LOG("EPOLL WAIT ERROR: %s...\n", strerror(errno));
            abort(); // 退出程序
        }
        for (int i = 0; i < nfds; i++) {
            auto it = _channels.find(_evs[i].data.fd);
            assert(it != _channels.end());
            it->second->SetREvent(_evs[i].events); // 设计实际就绪事件
            active->push_back(it->second);
        }
    }
};

5.4 Poller模块与Channel模块整合

将Poller模块整合至Channel,修改Channel

#include <functional>
#include <sys/epoll.h>
class Poller;
class Channel {
private:
    int _fd;
    Poller* _poller;
    uint32_t _events;  // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件
    using EventCallBack = std::function<void()>;
    EventCallBack _read_callback;     // 可读事件被触发的回调函数
    EventCallBack _write_callback;    // 可写事件被触发的回调函数
    EventCallBack _error_callback;    // 错误事件被触发的回调函数
    EventCallBack _close_callback;    // 连接断开事件被触发的回调函数
    EventCallBack _anyevent_callback; // 任意事件被触发的回调函数
public:
    Channel(Poller* poller, int fd)
        : _fd(fd), _events(0), _revents(0), _poller(poller) {}
    int Fd() { return _fd; }
    // 进行事件监控之后,fd就绪了的事件,
    // EventLoop就会通过这个接口把实际就绪的事件设置进来,根据revents判断什么事件就绪
    uint32_t Events() { return _events; } // 获取想要监控的事件

    void SetREvent(uint32_t events) { _revents = events; }
    // 设置事件回调,你传函数,我回调
    void SetReadCallBack(const EventCallBack& cb) { _read_callback = cb; }
    void SetWriteCallBack(const EventCallBack& cb) { _write_callback = cb; }
    void SetErrorCallBack(const EventCallBack& cb) { _error_callback = cb; }
    void SetCloseCallBack(const EventCallBack& cb) { _close_callback = cb; }
    void SetAnyEventCallBack(const EventCallBack& cb) {
        _anyevent_callback = cb;
    }
    // 当前是否监控可读
    bool Readable() { return _events & EPOLLIN; }
    // 当前是否监控可写
    bool Writable() { return _events & EPOLLOUT; }
    // 启动读事件监控 -->挂到EventLoop上
    void EnableRead() {
        _events |= EPOLLIN;
        Update();
    }

    // 启动写事件监控
    void EnableWrite() {
        _events |= EPOLLOUT;
        Update();
    }
    // 解除可读事件监控
    void DisableRead() {
        _events &= ~EPOLLIN;
        Update();
    }
    // 解除可写事件监控
    void DisableWrite() {
        _events &= ~EPOLLOUT;
        Update();
    }
    // 解除所有事件监控
    void DisableAll() {
        _events = 0;
        Update();
    }
    // 这里做声明,在Poller类之后实现
    // 移除监控:从epoll红黑树上直接移除,从EventLoop上移除
    void Remove();
    // 更新监控
    void Update();

    // 事件处理:一旦连接触发了事件,就调用这个函数,EventLoop模块只管监控触发何种事件,调用Channel处理函数
    // 如何处理由Channel决定
    void HandleEvent() {
        if ((_revents & EPOLLIN) ||
            (_revents & EPOLLRDHUP || _revents & (EPOLLPRI))) {
            if (_read_callback)
                _read_callback();
            // 无论什么事件,都要调用的回调函数
            if (_anyevent_callback)
                _anyevent_callback();
        }
        /*有可能会出现释放连接的操作,一次只处理一个*/
        if ((_revents & EPOLLOUT)) {
            if (_write_callback)
                _write_callback();
            if (_anyevent_callback)
                _anyevent_callback(); // 放到事件处理完毕之后调用,用于刷新活跃度
        } else if ((_revents & EPOLLERR)) {
            if (_anyevent_callback)
                _anyevent_callback(); // 出错会释放连接,放到事件处理之前调用任意事件回调
            if (_error_callback)
                _error_callback();
        } else if ((_revents & EPOLLHUP)) {
            if (_anyevent_callback)
                _anyevent_callback();
            if (_close_callback)
                _close_callback();
        }
    }
};

//放置Poller类之后的Update()和Remove()
void Channel::Remove() { return _poller->RemoveEvent(this); }
void Channel::Update() { return _poller->UpdateEvent(this); }

5.5 Poller模块与Channel模块联合调试--是不完整的

注意:Poller模块是EventLoop的一个子模块,EventLoop才是实际上进行事件管理的模块

tcp_svr.cc

#include "../source/server.hpp"

void HandleClose(Channel* channel) {
    std::cout << "close: " << channel->Fd() << std::endl;
    channel->Remove(); // 移除监控
    delete channel;
}

void HandleRead(Channel* channel) {
    int fd = channel->Fd();
    char buf[1024] = {0};
    int ret = recv(fd, buf, 1023, 0);
    if (ret <= 0) {
        return HandleClose(channel); // 关闭释放
    }
    std::cout << buf << std::endl;
    channel->EnableWrite(); // 启动可写事件监控
}
void HandleWrite(Channel* channel) {
    int fd = channel->Fd();
    const char* data = "天气还不错!";
    int ret = send(fd, data, strlen(data), 0);
    if (ret < 0) {
        return HandleClose(channel);
    }
    channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel* channel) { return HandleClose(channel); }
void HandleEvent(Channel* channel) { std::cout << "有了一个事件" << std::endl; }

void Acceptor(Poller* poller, Channel* lst_channel) {
    int fd = lst_channel->Fd();
    int newfd = accept(fd, nullptr, nullptr);
    if (newfd < 0) {
        return;
    }
    Channel* channel = new Channel(poller, newfd);
    channel->SetReadCallBack(
        std::bind(HandleRead, channel)); // 为通信套接字设置可读事件的回调函数
    channel->SetWriteCallBack(
        std::bind(HandleWrite, channel)); // 可写事件的回调函数
    channel->SetCloseCallBack(
        std::bind(HandleClose, channel)); // 关闭事件的回调函数
    channel->SetAnyEventCallBack(
        std::bind(HandleEvent, channel)); // 任何事件的回调函数
    channel->EnableRead();                //启动读事件监控
}

int main() {
    Poller poller;
    Socket lst_sock;
    lst_sock.CreateServer(8500);
    // 为监听套接字,创建一个Channel,进行事件的管理,以及事件的处理
    Channel channel(&poller, lst_sock.Fd());
    channel.SetReadCallBack(std::bind(
        Acceptor, &poller,
        &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控
    channel.EnableRead();
    while (1) {
        std::vector<Channel*> actives;
        poller.Poll(&actives);
        for (auto& a : actives) {
            a->HandleEvent();
        }
    }

    lst_sock.Close();

    return 0;
}

tcp_cli.cc

运行结果:一次是两个事件(读事件、写事件),客户端退出,出现段错误

使用gdb进行调试:

gdb ./server -----> run -----> ./client -----> 关闭客户端 -------> bt(查看函数调用栈)

我们发现到问题:触发可读和可写事件之后都有可能释放连接,因此需要先触发任意事件回调函数

现在我们根据几张图和解释来理解这个代码所描述的

过程:

  1. 监听Channel

    • lst_channel:监听套接字的Channel

    • SetReadCallBack(Acceptor):设置读事件回调为Acceptor函数

    • EnableRead:启用读事件监控

    • 添加到Poller的<fd, Channel>映射中

    • 最终由epoll监控 

  2. Acceptor回调

    • 当有新连接时触发

    • 通过accept()获取新连接

    • 创建新的连接Channel(con_channel)

  3. 连接Channel

    • 为每个新连接创建单独的Channel

    • 设置五种回调函数:

      • setreadcallback → HandleRead(读取数据)

      • setwritecallback → HandleWrite(写数据)

      • setclosecallback → HandleClose(关闭连接)

      • seterrorcallback → HandleError(错误处理)

      • seteventcallback → HandleEvent(任意事件)

  4. 数据流处理(底部)

  • HandleRead流程

    1. 当客户端发送数据时触发

    2. 读取数据并打印:hi pupu

    3. 调用EnableWrite()启动写监控

  • HandleWrite流程

    1. epoll检测到可写事件时触发

    2. 调用send()发送回复数据:“天气还不错!”

    3. 调用DisableWrite()关闭写监控

核心组件交互与工作流程

核心工作流程

初始化:创建监听Channel并设置Acceptor回调

事件监听:通过epoll监控所有Channel

连接建立:Acceptor接收新连接并创建通信Channel

数据处理

  • 读事件 → 读取数据 → 启用写监控
  • 写事件 → 发送响应 → 禁用写监控

资源管理:通过回调处理关闭和错误


六、EventLoop模块

Poller模块是EventLoop的一个子模块,EventLoop才是实际上进行事件管理的模块

6.1 EventLoop模块eventfd的认识

eventfd:一种事件通知机制,头文件:<sys/eventfd.h>

创建文件描述符用于事件通知

vs信号:

  • 信号是针对进程进行事件通知,事件具体是被哪一个线程所处理不一定。

eventfd本质在内核里边管理的就是一个计数器

  • 创建eventfd就会在内核中创建一个计数器(结构体)
  • 每当向eventfd中写入一个数值---用于表示事件的通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数:假设每次给eventfd中写入1,就表示通知了一次,连续写了三次之后,再read出来的数字就是3,读取之后计数清0。

用处:在EventLoop模块中实现线程间的事件通知机制

系统调用

#include <sys/eventfd.h>

int eventfd(unsigned int initval, int flags);

参数说明:

  • initval:初始计数器值
  • flags:控制标志,常用值包括:
    • EFD_CLOEXEC:执行exec时关闭文件描述符(禁止进程复制,父进程打开一个文件后创建子进程,子进程不会复制父进程打开的这个文件描述符:不知道为什么可以看)
    • EFD_NONBLOCK:设置为非阻塞模式
    • EFD_SEMAPHORE:设置为信号量模式

返回值:返回一个文件描述符用于操作

eventfd也是通过read / write / close进行操作的

注意点:read & write进行IO时,数据只能是一个8字节数据

使用方法:

#include <iostream>
#include <stdint.h>
#include <sys/eventfd.h>
#include <unistd.h>
int main() {
    // 创建eventfd
    int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
    if (efd < 0) {
        perror("eventfd failed!!");
        return -1;
    }

    uint64_t val = 1;
    // 写入事件
    write(efd, &val, sizeof(val));
    write(efd, &val, sizeof(val));
    write(efd, &val, sizeof(val));
    // write(efd, &val, sizeof(val));
    uint64_t res = 0;
    // 读取事件
    read(efd, &res, sizeof(res));
    printf("%ld\n", res);
    close(efd);
    return 0;
}

eventfd需要配合多路复用和IO操作才能充分发挥其优势

6.2 EventLoop模块设计思想

功能

  • 进行事件监控管理的模块(封装了TimerQueue、Poller模块)

  • 这个模块实际上就是我们所说的one thread one loop中的loop,也就是我们所说的Reactor

  • 这个模块与线程是一一对应关联的

监测一个连接时,当连接就绪后需要处理对应事件。但若该描述符在多个线程中同时触发事件,就会产生线程安全问题。因此,我们需要将连接的监控、事件处理及其他相关操作都置于同一线程中执行。

这就相当于将Channel(对事件监控的管理,事件触发的处理)和Poller(封装epoll监控事件)关联到了一起。

意义

  • 对于服务器中的所有事件都是由EventLoop模块来完成

  • 每一个Connection连接,都会绑定一个EventLoop模块和线程,因为外界对于连接的所有操作,都是要放到同一个线程中进行的

思想

对于所有的连接进行事件监控

  • 连接触发事件后调用回调进行处理        

对于连接的所有操作,都要放到eventloop对应的线程中执行

解决方案:

在EventLoop模块中,添加一个任务队列,对连接的所有操作都进行一次封装,将对连接的操作都不直接执行,当做任务添加到任务队列当中。

EventLoop处理流程:

        1.在线程中对描述符进行事件监控

        2.有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中呢:将操作都封装到任务队列)

        3.所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行

最后仅在任务队列处加上锁保证线程安全

6.3 EventLoop模块类接口设计

1.事件监控

        使用Poller模块

        有事件就绪则进行事件处理

2.执行任务队列中的任务

        一个线程安全的任务队列

关键注意事项:当任务执行流程因等待文件描述符IO事件就绪而阻塞时,可能导致队列中的任务无法执行。为解决这个问题,需要引入eventfd事件通知机制,及时唤醒处于阻塞状态的事件监控。

当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作: 这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。

  1. 如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行

  2. 如果执行的操作不再线程中,才需要加入任务池,等到事件处理完了然后执行任务

接口设计

class EventLoop {
private:
    using Functor = std::function<void()>;

    std::thread::id _thread_id;
    int _event_fd;               // eventfd唤醒IO事件监控有可能导致的阻塞
    Poller _poller;              // 进行所有描述符的事件监控
    std::vector<Functor> _tasks; // 任务池
    std::mutex _mutex;

private:
    // 执行任务池中的所有任务
    void RunAllTask();

public:
    EventLoop();
    // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列
    void RunInLoop(const Functor& cb);
    // 将操作压入任务池
    void QueueInLoop(const Functor& cb);
    // 判断任务所在的线程是否是在当前的线程中不断处理的事件处理循环
    bool IsInLoop();
    // 修改/添加描述符的事件监控
    void UpdateEvent(Channel* channel);
    // 移除描述符的所有监控
    void RemoveEvent(Channel* channel);

    // 启动EventLoop模块:先进行任务监控,接下来处理就绪事件,事件处理完了,处理任务
    void Start();
};

6.4 EventLoop模块类接口实现、整合编译

注意:

与 `std::lock_guard` 相比,`std::unique_lock` 更加灵活。它允许延迟锁定和解锁操作,可以通过成员函数如 `lock()` 和 `unlock()` 来控制锁的获取和释放时机,出作用域解锁。而 `std::lock_guard` 是在构造时锁定互斥锁,在析构时解锁,不能对锁定和解锁的过程进行更细致的控制。

#include <bits/std_thread.h>
#include <mutex>
#include <stdint.h>
#include <sys/eventfd.h>
class EventLoop {
private:
    using Functor = std::function<void()>;

    std::thread::id _thread_id;
    int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞
    // Channel *_event_channel; //
    // 注意EventLoop析构,但是_event_channel并不会被释放,所以要在析构里delete
    // 所以使用unique_str将该指针管理起来,EventLoop释放的时候
    //  ,unique_ptr也会释放,连带所管理的_event_channel一并释放
    std::unique_ptr<Channel> _event_channel;
    Poller _poller;              // 进行所有描述符的事件监控
    std::vector<Functor> _tasks; // 任务池
    std::mutex _mutex;

private:
    // 执行任务池中的所有任务
    void RunAllTask() {
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> _lock(_mutex); // 出作用域解锁
            _tasks.swap(functor); // functor中保存所有的任务
        }
        for (auto& f : functor) {
            f();
        }
        return;
    }
    //
    static int CreateEventFd() {
        int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (efd < 0) {
            ERR_LOG("CREATE EVENTFD FAILED!!");
            abort();
        }
        return efd;
    }
    void ReadEventFd() {

        uint64_t res = 0;
        // 读取事件
        int ret = read(_event_fd, &res, sizeof(res));
        if (ret < 0) {
            // EINTR:被信号打断,EAGAIN:无数据可读
            if (errno == EINTR || errno == EAGAIN) {
                return;
            }
            ERR_LOG("READ EVENTFD FAILED");
        }
        return;
    }
    void WeakUpEvented() {
        // 给Eventfd写入数据
        uint64_t val = 1;
        int ret = write(_event_fd, &val, sizeof(val));
        if (ret < 0) {
            if (errno == EINTR) {
                return;
            }
            ERR_LOG("READ EVENTFD FAILED");
        }
        return;
    }

public:
    EventLoop()
        : _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()),
          _event_channel(new Channel(this, _event_fd)) // 注意_event_fd要先定义
    {
        // 给eventfd添加可读事件回调函数,读取eventfd事件通知次数
        _event_channel->SetReadCallBack(
            std::bind(&EventLoop::ReadEventFd, this));
        // 启动eventfd的读事件监控
        _event_channel->EnableRead();
    }
    // 启动EventLoop模块:1.先进行任务监控,2.接下来处理就绪事件,事件处理完了,3.执行任务池
    void Start() {
        // 1.事件监控
        std::vector<Channel*> actives;
        _poller.Poll(&actives);
        for (auto& channel : actives) { // 2.事件处理
            channel->HandleEvent();
        }
        // 3.事件处理完了,执行任务
        RunAllTask();
    }

    // 判断任务所在的线程是否是在当前的线程中不断处理的事件处理循环
    bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); }
    // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列
    void RunInLoop(const Functor& cb) {
        if (IsInLoop()) {
            return cb();
        }
        return QueueInLoop(cb);
    }
    // 将操作压入任务池
    void QueueInLoop(const Functor& cb) {
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(cb);
        }
        // 唤醒有可能因为没有事件就绪而导致的epoll阻塞
        // 其实就是给Eventfd写入一个数据,eventfd就会出发可读事件
        WeakUpEvented();
    }

    // 修改/添加描述符的事件监控
    void UpdateEvent(Channel* channel) { return _poller.UpdateEvent(channel); }
    // 移除描述符的所有监控
    void RemoveEvent(Channel* channel) { return _poller.RemoveEvent(channel); }
};
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }

注意:在Channel中,将Poller *poller---->替换为EventLoop *loop,Remove和Update中也需要更改对象_poller ---> _loop

6.5 EventLoop模块类整合测试

测试用例依旧使用前面的tcp_svr.cc和tcp_cli.cc,仅修改:将tcp_svr.cc中的Poller ---> EventLoop,poller ---> loop,以及循环内处理就绪事件改为:loop.Start()

七、TimerWheel定时器模块整合

我的上一篇博客有详细讲解了时间轮定时器原理以及实现,本模块对上篇博客中的代码进行调整,将timerfd.cpp + timerwheel.cpp整合成一个类至server.hpp中,代码也在我的gitee中。

timerfd:负责实现内核每隔一段时间,给进程触发一次定=超时事件

timerwheel:负责实现每次执行RunTimeTask,都可以执行一批到期的定时任务

要实现一个完整的秒级定时器,就需要将这两个功能整合至一起,并且与Channel联系起来

7.1 EventLoop与TimerWheel模块整合与编译

这一处的代码的调整可以看我的gitee

7.2 EventLoop与TimerWheel联合调试

tcp_svr.cpp修改处:

tcp_cli.cpp

我这里所验证的实验现象是,客户端向服务器发送5次数据(5s),就死循环不再发送消息,服务器就不再刷新活跃度,10s过后就关闭该连接

7.3 EventLoop模块联调中(简单服务器)的模块流程关系图

根据以下这幅图所标明的步骤结合我们的代码来看便于理解

结语:

       随着这篇博客接近尾声,我衷心希望我所分享的内容能为你带来一些启发和帮助。学习和理解的过程往往充满挑战,但正是这些挑战让我们不断成长和进步。我在准备这篇文章时,也深刻体会到了学习与分享的乐趣。    

         在此,我要特别感谢每一位阅读到这里的你。是你的关注和支持,给予了我持续写作和分享的动力。我深知,无论我在某个领域有多少见解,都离不开大家的鼓励与指正。因此,如果你在阅读过程中有任何疑问、建议或是发现了文章中的不足之处,都欢迎你慷慨赐教。

        你的每一条反馈都是我前进路上的宝贵财富。同时,我也非常期待能够得到你的点赞、收藏,关注这将是对我莫大的支持和鼓励。当然,我更期待的是能够持续为你带来有价值的内容。


网站公告

今日签到

点亮在社区的每一天
去签到