Muduo网络库实现 [七] - Connection模块

发布于:2025-04-04 ⋅ 阅读:(20) ⋅ 点赞:(0)

目录

设计思路

类的设计

模块实现

连接前的准备阶段

连接通信阶段 

疑惑点


设计思路

Connection模块是对通信连接也就是通信套接字的整体的管理模块,对连接的所有操作都是通过这个模块提供的接口来完成的。

具体要进行哪些方面的管理呢?

对于管理通信套接字来说,我们此时已经连接上了客户端,当客户端发来消息的时候,我们无法保证发来的消息是完整的,所以此时需要有个缓冲区,通过send把数据从网络写入到输出缓冲区中,然后启动写事件监听,当内核告诉我们"你可以把数据写进来了",我们就把数据发送出去

缓冲区管理

  • 维护两个Buffer对象:输入缓冲区和输出缓冲区
  • 提供Send接口:将数据写入输出缓冲区,启动写事件监听
  • 当写事件触发时,通过Channel回调处理实际数据发送

其次,还也需要对每个通信套接字本身做管理,能够完成套接字的种种操作,其实就是内部需要包含一个Socket对象,未来关闭套接字时能够直接调用Socket的接口

 套接字管理

  • 内部包含Socket对象
  • 封装套接字的基本操作
  • 提供关闭套接字的接口,在需要时直接调用Socket对象的接口

我们上面说了,当数据发送到用户的输出缓冲区之后,需要启动写事件监听,而这个监听就需要Channel模块来实现了,所以我们要设计个事件监听管理进行把各种事件都封装进来

事件监听管理

  • 内部包含Channel对象
  • 提供设置事件回调函数的接口
  • 提供设置需要监控事件的接口
  • 关联事件触发与回调处理的机制

还需要保存协议解析的上下文信息,也就是一个Any对象,未来要提供接口用于获取内部的上下文信息的指针。

协议上下文管理

  • 保存协议解析的上下文信息(Any类型)
  • 提供获取上下文信息指针的接口
  • 支持协议切换和状态保存

当我们的连接建立成功之后,该如何处理,连接关闭之后,该如何处理,以及对于任意事件的产生,又该如何处理呢?所以我们肯定也需要对应的回调函数

 回调函数管理

  • 存储用户通过TcpServer设置的回调函数
  • 在这些回调基础上增加Connection自身的处理逻辑
  • 将最终的回调函数设置到Channel模块,用于事件触发后的处理

这种设计思路体现了模块化和层次化的网络编程架构:

Connection作为中间层,向下管理Socket和Channel。向上为TcpServer提供接口,间接服务于用户,通过缓冲区和事件机制实现异步I/O处理,通过上下文和回调函数实现灵活的协议处理

这时候我们就需要考虑一个很重要的问题: Connection 对象如何管理? 

首先,所有的Connection 对象肯定是要交给TcpServer 模块进行整体的管理的,这个我们还是很好理解,因为未来我们的Acceptor获取到新连接之后,需要创建Connection对象,而创建Connection对象的接口就只能是TcpServer模块提供给他的,也就是由TcpServer来创建,创建之后,他也需要对所有的Connection进行管理。 

那么怎么管理呢?new一个Connection对象然后使用指针进行管理?那么会不会有这样一种情况:

想象你在一个网络聊天室里,有这样一个过程:

  1. 小明的客户端与服务器建立了连接
  2. 他发送了一条消息
  3. 在消息发送的过程中,网络突然断开
  4. 这时服务器正准备处理这条消息

在这个场景中,连接已经关闭,但是后续的操作还在尝试使用这个已经无效的连接。

连接有多种操作,如果其中一个操作把连接关闭释放了,后续的操作再调用Conenction的接口不就是野指针访问了吗?那么程序会触发硬件异常而崩溃。这是我们不希望看到的。即便 TcpServer中可能还保存着这个对象的指针,但是指针指向的资源却可能已经释放了。

所以我们可以用到智能指针share_ptr 来对Connction 进行管理。使用只能指针有哪些优势呢?我们接着看

资源管理机制

  • 使用 share_ptr 智能指针管理 Connection 对象
  • 通过引用计数的方式控制对象的生命周期

安全访问原理

  • 当一个函数接收 Connection 的 share_ptr  作为参数时
  • 该函数栈帧会增加 Connection 对象的引用计数
  • 只要函数还在执行,Connection 对象就不会被真正释放

生命周期保证

  • 即使其他操作尝试释放连接
  • 只会减少引用计数,不会立即销毁对象
  • 确保正在使用的 Connection 资源不会被意外销毁

多重保护机制

  • TcpServer 管理的基础 share_ptr  可能被释放
  • 但只要还有其他 share_ptr  持有对象
  • Connection 的实际资源不会被销毁

防止野指针的关键

  • 引用计数为零时才真正释放对象
  • 避免悬空指针和非法内存访问
  • 提供了一种线程安全的资源管理方式

基于使用sharet_ptr来管理Connection的思想,我们未来设置接口的时候,就需要传递Connection 的智能指针对象或者引用了,而不是直接传递原始指针。

Connection类该如何设计?

每一个连接都需要有一个在服务器内部的唯一标识,也就是id,为什么不直接使用 fd ?因为我们对fd 的管理也不是直接使用原始的fd 的,而是使用Socket来管理,我们需要将其与系统的IO的部分进行解耦。 同时,未来这个连接的id也是她所对应的定时任务的id。

其次,它需要一个Channel对象、一个Socket对象,两个Buffer对象以及一个Any对象

然后,他需要保存四个回调函数,这四个回调函数是由用户来进行设置的,分别是连接建立时执行的回调函数,新数据到来时执行的回调方法,连接关闭时执行的回调方法以及任意事件到来执行的回调方法。

由于Connection涉及到超时的管理,那么我们还需要一个值来表示是否启动了超时管理。

以及每一个Cconnection对象需要和一个EventLoop对象绑定,所以他还需要一个EventLoop的指针。

这是几个简单的成员,我们还需要一个成员就是连接所处的状态。

 这个状态并不是站在内核层面的状态,而是站在应用层角度的状态。

状态有以下几种:

  • 连接建立中: 此时我们已经从底层将连接的文件描述符获取上来,并为其创建了Connection对象,但是Connection内部的各项设置还未完成
  • 连接建立完成:Connection对象的各项设置已经完成,可以进行通信了
  • 连接待关闭状态:对端或者本端需要关闭这个连接,但是在实际关闭连接之前我们还需要把缓冲区的数据全部处理完
  • 连接关闭状态,处于这个状态的时候,就可以把套接字关闭,以及资源释放了

那么这个状态我们可以使用枚举来定义。

enum CONN_STATU
{
    CLOSED,    //关闭状态,不再进行任何操作,释放资源
    CONNECTING,  //连接带处理,还需要进行各项设置才能开始通信
    CONNECTED,   //连接建立,可以通信
    CLOSING  //连接带关闭,尽快处理缓冲区剩余数据
};

类的设计

// 前向声明
class Buffer;
class Channel;
class Socket;
class EventLoop;

// 连接状态枚举
typedef enum
{
    CLOSED,     // 关闭状态,不再进行任何操作,释放资源
    CONNECTING, // 连接待处理,还需要进行各项设置才能开始通信
    CONNECTED,  // 连接建立,可以通信
    CLOSING     // 连接待关闭,尽快处理缓冲区剩余数据
} ConnStatus;

// Connection 共享指针类型
using PtrConnection = shared_ptr<Connection>;

class Connection : public enable_shared_from_this<Connection>
{
private:
    // 成员变量
    uint64_t _connid;           // 连接的唯一id,也是定时任务的timeid
    Buffer _in_buffer;          // 用户态的输入缓冲区
    Buffer _out_buffer;         // 用户态的输出缓冲区
    Socket _socket;             // 套接字的管理
    Channel _channel;           // 连接的事件管理
    EventLoop *_loop;           // 连接所关联的loop,函数有RunInLoop进行线程安全操作
    bool _inactive_release;     // 是否开启超时连接销毁,默认为false
    Any _context;               // 请求接收处理的上下文
    ConnStatus _status;         // 连接状态
    int _sockfd;                // 套接字文件描述符

    // 回调函数类型定义
    using ConnectedCallback = function<void(const PtrConnection &)>;
    using MessageCallback = function<void(const PtrConnection &, Buffer *)>;
    using CloseCallback = function<void(const PtrConnection &)>;
    using EventCallback = function<void(const PtrConnection &)>;
    
    // 回调函数成员
    ConnectedCallback _connected_callback;  // 连接建立后的回调函数
    MessageCallback _message_callback;      // 消息处理的回调函数,也就是用来处理接收到的数据,并不是要处理发送出去的数据
    CloseCallback _close_callback;          // 关闭连接的回调函数
    EventCallback _event_callback;          // 任意事件的回调函数
    CloseCallback _svrclose_callback;       // 服务器关闭回调函数

private:
    // 私有成员函数 - Channel 事件处理器
    void HanderRead();      // 处理读事件
    void HanderWrite();     // 处理写事件
    void HandlerError();    // 处理错误事件
    void HandlerClose();    // 处理关闭事件
    void HandlerEvent();    // 处理通用事件
    
    // 私有成员函数 - 事件循环中的操作
    void EstablishInLoop();                 // 在事件循环中建立连接
    void SendInLoop(Buffer &buf);           // 在事件循环中发送数据
    void ShutdownInLoop();                  // 在事件循环中关闭连接
    void EnableInactiveReleaseInLoop(int seconds);  // 在事件循环中启用超时释放
    void CancelInactiveReleaseInLoop();     // 在事件循环中取消超时释放
    void ReleaseInLoop();                   // 在事件循环中释放连接
    void UpgradeInLoop(const Any &context,  // 在事件循环中升级连接配置
                       const ConnectedCallback &connected_callback,
                       const MessageCallback &message_callback,
                       const CloseCallback &close_callback,
                       const EventCallback &event_callback);

public:
    // 构造函数和析构函数
    Connection(EventLoop *loop, uint64_t connid, int sockfd);
    ~Connection();
    
    // 获取器方法
    int Socked();       // 获取套接字描述符
    int Id();           // 获取连接ID
    bool Connected();   // 检查是否已连接
    
    // 上下文管理
    void SetContext(const Any &context);    // 设置上下文
    Any *GetContext();                      // 获取上下文
    
    // 设置回调函数
    void SetConnectedCallback(const ConnectedCallback &cb);   // 设置连接建立回调
    void SetMessageCallback(const MessageCallback &cb);       // 设置消息处理回调
    void SetCloseCallback(const CloseCallback &cb);           // 设置关闭连接回调
    void SetSvrCloseCallback(const CloseCallback &cb);        // 设置服务器关闭回调
    void SetEventCallback(const EventCallback &cb);           // 设置事件回调
    
    // 公共接口
    void Establish();                       // 建立连接
    void Send(const char *data, size_t len);// 发送数据
    void ShutDown();                        // 关闭连接
    void EnableInactiveRelease(int sec);    // 启用超时释放
    void CancelInactiveRelease();           // 取消超时释放
    void Release();                         // 释放连接
    void Upgrade(const Any &context,        // 切换协议
                 const ConnectedCallback &connected_callback,
                 const MessageCallback &message_callback,
                 const CloseCallback &close_callback,
                 const EventCallback &event_callback);
};

模块实现

连接前的准备阶段

首先是我们的构造函数和析构函数的实现,传入我们的连接id,套接字,然后把超时释放先设置为false,等后面需要再由用户去设置成true,将状态设置成连接待处理的状态,因为我们并不是构造函数之后就把所有的东西都初始化了,比如说是否启动读事件和连接后的回调函数我们就需要初始化之后进行实现的。

public:
    Connection(EventLoop *loop, uint64_t connid, int sockfd)
        : _connid(connid)
        , _sockfd(sockfd)
        , _inactive_release(false)
        , _loop(loop)
        , _status(CONNECTING)
        , _socket(_sockfd)
        , _channel(loop, _sockfd)
    {
        _channel.SetReadCallback(bind(&Connection::HanderRead,this));
        _channel.SetWriteCallback(bind(&Connection::HanderWrite,this));
        _channel.SetEventCallback(bind(&Connection::HanderEvent,this));
        _channel.SetErrorCallback(bind(&Connection::HanderError,this));
        _channel.SetCloseCallback(bind(&Connection::HanderClose,this));
    }
    ~Connection()
    {
        DBG_LOG("release connection:%p", this);
    }

那么我们现在就需要实现五个回调函数,

首先读事件回调,他的思路很简单,只需要从套接字中读取数据,然后放在输入缓冲区,再来调用用户传进来的新数据回调函数就行了。

    void HanderRead()
    {
        // 创建一个64KB的临时缓冲区用于接收socket数据
        char buf[65536];

        // 使用非阻塞方式从socket读取数据
        // RecvNonBlock返回实际读取的字节数,或者在出错时返回负值
        ssize_t ret = _socket.RecvNonBlock(buf, 65535);

        // 如果返回值小于0,表示发生错误(如连接已关闭)
        if (ret < 0)
        {
            // 关闭连接并退出
            return ShutDownInLoop();
        }

        // 将读取的数据写入连接的输入缓冲区
        _in_buffer.WriteAndPush(buf, ret);

        // 如果输入缓冲区中有可读数据
        if (_in_buffer.ReadAbleSize() > 0)
        {
            // 调用用户设置的消息回调函数处理数据
            // shared_from_this()返回指向当前对象的智能指针,保证在回调期间对象不会被销毁
            // &_in_buffer传递输入缓冲区的指针,让回调函数可以读取和解析数据
            return _message_callback(shared_from_this(), &_in_buffer);
        }
    }

 这里我们可能会疑惑,为什么调用完_message_cb 之后不添加写事件监控,其实是因为,未来用户只能调用我们Connection提供的Send接口来发送数据,但是Send接口我们也懂,只会将数据写入到输出缓冲区中,我们在Send函数的实现中,只有实际写入了再来启动写事件的监控更加合理

shared_from_this是干嘛的?

这里有一个细节,就是我们前面声明类的时候,Connection 类继承了 std::enable_shared_from_this这个类,这个继承关系是为了我们能够使用 shared_from_this这个功能。

这个用法叫做自我引用。

为什么我们需要引进一个shared_from_this这样的接口呢? 

我们说了使用shared_ptr对所有的Connection对象进行管理,这样能够防止在操作的过程中资源被释放。 但是,我们在给 _message_cb 这样的回调函数传参的时候,如何保证传给他的shared_ptr对象是和管理Conenction 的shared_ptr的对象共享计数呢?

因为如果我们直接使用 shared_ptr<Connection> p (this) ,这样创建一个只能指针对象传参的时候,他的计数是独立的,并不会和TcpServer中管理Conenction的shared_ptr共享计数,那么我们就需要一个办法能够创建出一个和Conenction 的管理的shared_ptr对象共享技术的智能指针进行传参,而shared_from_this就可以解决这样的问题。

  • std::enable_shared_from_this<T> 内部维护了一个 std::weak_ptr<T>。
  • 当第一个 std::shared_ptr<T> 开始管理该对象时,这个 weak_ptr 被初始化。
  • 之后,当 shared_from_this() 被调用时,它将基于这个已经存在的 weak_ptr 返回一个新的 std::shared_ptr<T>,这个新的 shared_ptr 与原有的 shared_ptr 共享对对象的所有权。

那么使用这个接口,我们就能保证在这些回调函数在执行的时候,即使其他的地方调用了_svr_close_cb把TcpServer模块中的基础计数的智能指针释放了,这份资源也还存在,至少在我们这次函数栈帧内还存在,不会出现野指针的问题。


第二个函数就是HandlerWrite ,首先这是设置给channel的回调函数,也就是说只有当写事件触发时才会调用,那么我们直接调用write接口是不会被阻塞的。 当然我们需要判断Write的返回值,判断是否出错,如果写入数据出错了,那么我们就没必要再继续处理数据了,即使处理了也不可能再发出去,那么这时候我们就需要调用实际关闭连接的接口。

同时我们还要考虑一种情况,就是,此时其实是读出错之后,调用ShutDown而监听调用的写事件,那么这时候写完这一次数据之后就需要关闭连接。 其实也就是判断连接是否为待关闭状态。

流程图如下 

    void HanderWrite()
    {
        ssize_t ret = _socket.SendNonBlock(_out_buffer.ReadAblesize());
        if (ret < 0)
        {
            // 如果发送错误,就看看是不是接收缓冲区还有数据
            if (_in_buffer.ReadAbleSize() > 0)
            {
                _message_callback(shared_from_this(), &_in_buffer);
            }
            // 然后再真正关闭
            return Release();
        }
        _out_buffer.MoveRindex(ret);

        if (_out_buffer.ReadAbleSize() == 0)
        {
            _channel.DisableWrite();

            if (_status == DISCONNECTING)
            {
                return Release();
            }
            return
        }
    }

然后就是任意事件回调,任意事件回调我们只需要判断是否启动了超时连接,如果启动了,那么就需要刷新定时任务。 同时我们也需要执行以下用户的任意回调函数,除此之外就没其他的操作了。

    void HandlerEvent()
    {
        if (_inactive_release == true)
        {
            _loop->RefreshTimerTask(_connid);
        }

        if (_event_callback)
        {
            _event_callback(shared_from_this());
        }
    }

 然后就是挂断事件回调,挂断事件也很简单,因为可能在挂断事件触发的时候,也触发了读事件,那么我们可以先处理以下数据,然后直接调用Release关闭连接,处不处理都行,反正也无法返回了,但是最好处理以下,因为可能是一些功能性请求。

    void HandlerClose()
    {
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _message_callback(shared_from_this(), &_in_buffer);
        }
        return Release();
    }

最后就是错误事件回调,错误事件触发的时候,我们的处理方法和挂断事件是一样的。

    void HandlerClose()
    {
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _message_callback(shared_from_this(), &_in_buffer);
        }
        return Release();
    }

为什么我们实现这些接口不需要包装一层 _loop->RunInLoop() 呢? 还是那句话,因为这些IO事件的回调一定是在对应的EventLoop线程执行的?

事件循环机制的工作原理

  • 在这种基于Reactor模式的网络库中,每个EventLoop都运行在自己的线程上
  • Channel对象注册到EventLoop时,会将这些回调函数注册到事件循环的事件分发器上
  • 当IO事件发生时,是EventLoop线程本身在调用这些回调函数,而不是其他线程

一线一库的设计理念

  • 每个Connection对象都绑定到特定的EventLoop
  • 每个EventLoop都运行在自己的线程上
  • 事件的检测和分发都在同一个线程中完成

所以没有线程安全问题,但是其他的操作就可能有了,比如我们的Release以及ShutDown等操作,这些对连接的操作未来都可能是在其他线程中执行的,可能是在别的模块中被调用的,所以需要包装一层。

  • 这些方法可能被其他线程调用
    • 例如,用户可能在主线程或其他工作线程中调用Connection::Send()
    • 为了保证线程安全,需要将这些操作转移到Connection所属的EventLoop线程中执行
  • 避免同步问题
    • 使用RunInLoop机制可以避免对共享资源的直接访问,防止竞态条件
    • 所有的操作都会在同一个线程内按顺序执行,无需担心锁的问题

接着我们设置一些获取器方法,用于获取 _sockfd 和 _connid和连接状态


    int Socked()
    {
        return _sockfd;
    }
    int Id()
    {
        return _connid;
    }
    bool Connected()
    {
        return (_status == CONNECTED);
    }

 再来实现几个简单的接口,也就是用户设置回调函数的方法:

    //用户设置的回调函数
    void SetConnectedCallback()
    {
        _connected_callback = cb;
    }
    void SetMessageCallback()
    {
        _message_callback = cb;
    }
    void SetCloseCallback()
    {
        _close_callback = cb;
    }
    void SetSvrCloseCallback()
    {
        _svrclose_callback = cb;
    }
    void SetEventCallback()
    {
        _event_callback = cb;
    }

目前我们大概就能完成连接待处理状态的操作了。然后设置Connection的属性并让其开始通信。

就如同上面的这几个设置回调函数的接口,我们都是在创建出来一个对象之后,正式通信之前设置的,除此之外,我们还可以设置启动和取消非活跃连接销毁机制。

我们要注意的是,这两个接口其实有可能在通信的过程中被调用,如果是在通信之前被调用,那么是没有线程安全问题的,但是如果是在连接已经开始通信之后再被调用,那么我们要保证线程安全,就需要封装一层函数放到 EventLoop 的RunInLoop 中运行,比如这些接口其实都是在TcpServer中调用的,我们无法确定具体在哪个线程进行执行。

    void EnableInactiveReleaseInLoop(int seconds)
    {
        _inactive_release = true;
        if (_loop->HasTimerTask(_connid)) // 判断是否有定时器
        {
            return _loop->RefreshTimerTask(_connid);
        }
        _loop->AddTimerTask(_connid, sec, bind(&Connection::Release, this));
    }

    void CancelInactiveReleaseInLoop(int seconds)
    {
        _inactive_release = false;

        _loop->CancelTimerTask(_connid);
    }
   


    void EnableInactiveRelease(int sec)
    {
        _loop->RunInLoop(bind(&Connection::EnableInactiveReleaseInLoop, this, seconds));
    }

    void CancelInactiveRelease()
    {
        _loop->RunInLoop(bind(&Connection::CancelInactiveReleaseInLoop, this));
    }

 当设置完这些之后,我们就可以启动连接,开始通信了,我们可以设置一个 Established () 接口以供外界或者说上层的TcpServer调用。 但是为了线程安全,我们需要将实际的操作封装成一个任务交给loop。

    
    void EstablishInLoop()
    {
        assert(_status == CONNECTING);
        _status = CONNECTED;
        _channel.EnableRead();
        if (_connected_callback)
        {
            _connected_callback(shared_from_this());
        }
    }

    void Establish() // 连接获取之后进行各种设置,设置事件回调,启动读监控,调用读书简回调等
    {
        _loop->RunInLoop(bind(&Connection::EstblishInLoop, this));
    }

同时,在准备阶段也需要设置上下文的接口,这个接口我们就不关心线程安全的问题了,因为一般来说我们的逻辑没问题的话是不会重复设置的。 

    void SetContext(const Any &context)
    {
        _context = context;
    }

连接通信阶段 

那么已经步入通信阶段之后,我们需要提供一个接口给用户用于发送数据,也就是向输出缓冲区写入数据,这个接口也不是线程安全的,所以也需要封装成任务交给 RunInLoop

    void SendInLoop(Buffer &buf) // 把数据写入到用户态的发送缓冲区中
    {
        if (_status == DISCONNECTED)
        {
            return;
        }
        _out_buffer.WriteBufferAndPush(buf);
        if (_channel.WriteAble() == false)
        {
            _channel.EnableWrite();
        }
    }


    void Send(const char *data, size_t len) // 发送并不是真的发送了,是要先把数据放到同一个线程下,然后再同一个线程中去发送
    {
        Buffer buf;
        buf.WriteAndPush(data, len);
        _loop->RunInLoop(bind(&Connection::SendInLoop, this, move(buf)));
    }

然后就是设置上下文的接口,同时要注意线程安全问题。 

设置上下文其实就需要重新设置上下文以及四个回调函数因为不同的协议,它的处理方式可能不同,比如说原先的连接回调函数的逻辑遇到事件直接就处理,但是你切换了某个协议之后,需要先进行验证,那此时旧的回调函数的逻辑就不符合了

    void UpgradeInLoop(const Any &context,
                       const ConnectionCallback &connected_callback,
                       const MessageCallback &message_callback,
                       const CloseCallback &close_callback,
                       const EventCallback &event_callback)
    {
        _context = context;
        _connected_callback = connected_callback;
        _message_callback = message_callback;
        _close_callback = close_callback;
        _event_callback = event_callback;
    }

具体执行的操作就是这样的,但是 Upgrade 这个接口有点特殊,它是线程不安全的,所以必须在EventLoop线程中执行,同时,这还不够,他必须被立马执行,不能放到任务队列中,因为如果如果放到任务队列中,那么如果此时有读事件到来,那么就会先进行事件处理,也就是会先使用老的处理函数进行处理,然后才更新回调方法,这是不对的,我们必须在调用Upgrade的时候立即将协议的回调函数和上下文进行替换。或许换一种说法:我们必须在EventLoop线程中调用Upgrade函数,如果在其他线程内调用,那么直接报错。 绝对不能在其他的线程中调用这个函数。

这个读事件为什么会比他先进行 这个任务队列不是按照顺序进行执行的吗?先把更新协议放到任务队列中 然后读事件来了,再把读事件放入到队列中?

在EventLoop中,任务队列确实是按顺序执行的,但问题出在读事件的触发机制上:

当网络数据到达时,操作系统会通知 EventLoop 有读事件发生。这个通知是通过 epoll/poll 等 I/O 复用机制实现的,发生在 EventLoop 的事件循环中。而任务队列通常是在处理完所有已触发的 I/O 事件后才会被执行。所以执行顺序可能是这样的:

  • 调用 Upgrade 方法,将协议切换任务放入队列
  • 数据到达,触发读事件
  • EventLoop 先处理所有 I/O 事件(包括刚才触发的读事件)
  • 然后才执行任务队列中的任务(包括升级连接的任务)

那么我们可以在EventLoop中再提供一个接口

    void AssertInLoop()const{assert(_thread_id == std::this_thread::get_id());}

所以我们的Upgrade接口的实现就是这样的:

    void Upgrade(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
    {
       _loop->AssertInLoop();
        _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,con,msg,clo,evt));
    }

我们还需要提供一个接口用于获取上下文,这个接口可以不进行封装

    Any* GetContext() {return &_context;}

然后就是关闭连接的接口了,我们有两套接口,首先来实现ShutDown,也就是用户关闭连接的接口,这个接口也是需要注意线程安全问题,需要封装成任务。

在这个接口中,我们需要处理输入缓冲区和输出缓冲区的数据,然后再调用Release接口关闭连接。

    void ShutDownInLoop()
    {
        _con_statu = CLOSING;   //设置连接待关闭
        if(_in_buffer.ReadSize()) //有数据待处理
            if(_message_cb) _message_cb(shared_from_this(),&_in_buffer);
        //所有数据都处理完之后,处理待发送数据
        if(_out_buffer.ReadSize())  //如果有数据待发送
        {
            _channel.EnableWrite();    //启动写事件监听
            //触发写事件之后,在写事件回调中会调用Release进行连接释放
        }
        else Release(); //如果没有数据待发送就直接关闭
    }
 
    void ShutDown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
    } 

最后就是实际释放连接的Release接口了

首先,连接状态设置为 DISCONNECTED, 移除所有事件监控,关闭文件描述符,然后取消定时任务,以及调用用户的回调函数,最后释放掉TcpServer所管理的基础计数

    void ReleaseInLoop()
    {
        // 1.判断状态 2.移除所有事件监控 3.关闭套接字 4.移除定时器 5.调用关闭回调函数
        _status = DISCONNECTED;
        _channel.Remove();
        _socket.Close();
        if (_loop->HasTimerTask(_connid)) // 判断是否有定时器
        {
            CancelInactiveReleaseInLoop();
        }
        if (_close_callback)
        {
            _close_callback(share_from_this());
        }
    }


    void Release()
    {
        _loop->RunInLoop(bind(&Connection::ReleaseInLoop, this));
    }

    疑惑点

    在设计 Connection 类时,我们需要考虑如何唯一标识每个连接。对于为什么不直接使用文件描述符(fd)作为唯一标识?

    有几个重要原因:

    1. 文件描述符可能会被重用 - 当一个连接关闭后,操作系统可能会将这个 fd 重新分配给新的连接,这可能导致标识混淆
    2. 跨进程/跨服务器问题 - 如果服务器是分布式或多进程的,fd 在不同进程间可能不唯一
    3. 持久化问题 - 如果需要持久化连接信息,当服务器重启后 fd 会改变

     HanderRead的流程图

     

     ​​​​​​

    为啥connection模块还需要一些InLoop的接口 connection不就在EventLoop模块中吗 不是已经绑定了线程了吗?

    那不能直接把这个send放入私有接口里面吗?

     std::bind(&Connection::SendInLoop, this, in, len)不理解这个代码

     move的作用

    不是channel管理的是事件回调吗?怎么这里也要设置呢 不能调用channel模块的回调函数吗? 

    简而言之,Channel模块的回调函数负责的是底层Socket的事件管理,Connection模块的回调函数负责的是用户层的事件回调 

     


    网站公告

    今日签到

    点亮在社区的每一天
    去签到