手写网络muduo库核心代码7--TcpConnection详解

发布于:2025-09-07 ⋅ 阅读:(18) ⋅ 点赞:(0)

 TcpConnection类

TcpConnection 是 Muduo 网络库中“连接”的抽象,它封装了 TCP 连接的生命周期、I/O 操作、缓冲区管理和事件回调,是网络通信的核心枢纽。它就像一个“TCP 连接的代理”,你不再直接操作 fd,而是通过 TcpConnection 来:

  • 接收数据
  • 发送数据
  • 关闭连接
  • 设置回调

✅ 1. 连接生命周期管理

  • 构造:在 accept 后创建,表示一个新连接;
  • 析构:连接关闭时自动清理资源;
  • 状态机:维护连接状态(kConnecting → kConnected → kDisconnected);

✅ 2. I/O 事件处理(读/写)

  • 绑定到一个 Channel(封装 fd 和 epoll 事件);
  • 当 epoll 通知可读时,调用 handleRead();
  • 当可写时,调用 handleWrite();
  • 内部使用 Buffer 高效收发数据;

✅ 3. 数据收发接口

  • 提供高层 send 接口,内部自动写入 outputBuffer_;
  • 支持 Buffer、string、原始指针;
  • 自动处理 EAGAIN,未发送完的数据留在 outputBuffer_;

✅ 4. 回调机制(事件驱动)

  • TcpConnection 是事件的分发中心,它支持多种回调:
    • ConnectionCallback    连接建立/关闭时调用(如日志记录)
    • MessageCallback    收到数据时调用(核心业务逻辑)
    • WriteCompleteCallback    数据发送完成后调用
    • HighWaterMarkCallback    发送缓冲区过大时通知
    • CloseCallback    连接真正关闭前调用(资源清理)

✅ 5. 缓冲区管理

  • 内置两个 Buffer:
    • inputBuffer_:接收数据;
    • outputBuffer_:发送数据;
  • 使用 readFd/writeFd 高效 I/O;
  • 支持 high water mark(高水位线),防止客户端发太快导致服务器出问题;

✅ 6. 连接关闭管理

  • 支持优雅关闭(shutdown()):关闭写端,等待读完数据再关闭;
  • 支持强制关闭(forceClose());
  • 支持延迟关闭(在事件循环中安全关闭);

头文件

#pragma once
#include"noncopyable.h"
#include"InetAddress.h"
#include"Callbacks.h"
#include"Timestamp.h"
#include"Buffer.h"
#include<memory>
#include<atomic>
#include<string>
class Channel;
class Eventloop;
class Socket;
/* 
TcpServer通过Acceptor有一个新用户链接,通过accept函数拿到connfd,TcpConnection设置回调给Channel通过它再注册
到poller上,接着有事件之后就会调用回调函数
*/
class TcpConnection:noncopyable,public std::enable_shared_from_this<TcpConnection>
{
    public:
        TcpConnection(Eventloop *loop,
                        const std::string &nameArg,
                        int sockfd,
                        const InetAddress &localAddr,
                        const InetAddress &peerAddr);
        ~TcpConnection();
        Eventloop* getloop() const {return loop_;}
        const std::string& name() const {return name_;}
        const InetAddress& localAddress() const {return localAddr_;}
        const InetAddress& peerAddress() const {return peerAddr_;}
        bool connected() const {return state_==kConnected;}
        void send(const std::string &buf);
        void shutdown();//关闭当前连接
        void setConnectionCallback(const ConnectionCallback& cb){connectionCallback_=cb;}
        void setMessageCallback(const MessageCallback &cb){messageCallback_=cb;}
        void setWriteCompleteCallback(const WriteCompleteCallback& cb){writeCompleteCallback_=cb;}
        void setHighWaterMarkCallback(const HighWaterMarkCallback& cb,size_t highWaterMark)
        {highWaterMarkCallback_=cb;highWaterMark_=highWaterMark;}
        void setCloseback(const CloseCallback& cb){closeCallback_=cb;}

        //建立连接和销毁连接
        void connectEstablished();
        void connectDestroyed();
        


    private:
    enum StateE{kDisconnected ,kConnecting,kConnected,kDisConnecting};//连接状态
    void setState(StateE state){state_=state;}
    void handleRead(Timestamp receiveTime);
    void handleWrite();
    void handleClose();
    void handleError();
    
    

    void sendInLoop(const void* message,size_t len);//确保所有操作在同一个线程中串行执行
    void shutdownInLoop();
    Eventloop *loop_;//在多线程下这里绝对不是baseloop,因为TcpConnection都是在subloop中管理的
    const std::string name_;
    std::atomic_int state_;
    bool reading_;

    std::unique_ptr<Socket> socket_;
    std::unique_ptr<Channel> channel_;

    const InetAddress localAddr_;
    const InetAddress peerAddr_;

    ConnectionCallback connectionCallback_;//有新连接时候的回调
    MessageCallback messageCallback_;//有读写消息时候的回调
    WriteCompleteCallback writeCompleteCallback_;//消息发送完之后的回调
    HighWaterMarkCallback highWaterMarkCallback_;
    CloseCallback closeCallback_;
    
    size_t highWaterMark_;//发送缓冲区的高水位线,如64MB

    Buffer inputBuffer_;//接受数据的缓冲区
    Buffer outputBuffer_;//发送数据的缓冲区
};

1、智能指针

enable_shared_from_this<TcpConnection>

TcpConnection继承这个类是为了在自己的类中提供一些方法,返回当前对象的一个shared_ptr强智能指针,做参数传递使用。对这里掌握不好的可以先看这一篇文章C++智能指针

源文件

#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "Eventloop.h"
#include "Eventloop.h"
#include <functional>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>

static Eventloop *CheckLoopNotNull(Eventloop *loop)
{
    if (loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d main loop is null! \n", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}
TcpConnection::TcpConnection(Eventloop *loop,
                             const std::string &nameArg,
                             int sockfd,
                             const InetAddress &localAddr,
                             const InetAddress &peerAddr)
    : loop_(CheckLoopNotNull(loop)), name_(nameArg), state_(kConnecting), reading_(true), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), peerAddr_(peerAddr), highWaterMark_(64 * 1024 * 1024) // 64M
{
    // 下面给channel设置相应的回调,poller给channel通知感兴趣的事情发生后,channel会回调相应的操作函数
    channel_->setReadCallback(
        std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
    channel_->setWriteCallback(
        std::bind(&TcpConnection::handleWrite, this));
    channel_->setCloseCallback(
        std::bind(&TcpConnection::handleClose, this));
    channel_->setErrorCallback(
        std::bind(&TcpConnection::handleError, this));
    LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n", name_.c_str(), sockfd);
    socket_->setKeepAlive(true);
}
TcpConnection::~TcpConnection()
{
    LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d \n",
             name_.c_str(), channel_->fd(), (int)state_);
}

void TcpConnection::send(const std::string &buf)
{
    if (state_ == kConnected)
    {
        if (loop_->isInLoopThread()) //断当前线程是否是 subloop 线程
        {
            sendInLoop(buf.c_str(), buf.size()); //直接调用 sendInLoo
        }
        else
        {
            //如果不是,通过 runInLoop 把任务派发到 subloop 线程
            loop_->runInLoop(std::bind(
                &TcpConnection::sendInLoop,
                this,
                buf.c_str(), buf.size()));
        }
    }
}

void TcpConnection::sendInLoop(const void *data, size_t len)
{
    ssize_t nwrote = 0;
    size_t remianing = len; // 剩余长度
    bool faultError = false;

    // 之前调用过connection的shutdown,就不能再发送了
    if (state_ == kDisconnected)
    {
        LOG_ERROR("disconnected ,give up writing!");
        return;
    }
    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
    {
        nwrote = ::write(channel_->fd(), data, len);
        if (nwrote >= 0)
        {
            remianing = len - nwrote;
            if (remianing == 0 && writeCompleteCallback_)
            {
                // 既然在这里数据全部发送完成,说明不用再给channel设置epollout事件了
                loop_->queueInLoop(
                    std::bind(writeCompleteCallback_, shared_from_this()));
            }
        }
        else//nwrote<0表示出错,这里包含了remianing>0的情况
        {
            nwrote = 0;
            if (errno != EWOULDBLOCK)
            {
                LOG_ERROR("TcpConnection:;sendInLoop");
                if (errno == EPIPE || errno == ECONNRESET)
                {
                    faultError = true;
                }
            }
        }
    }

    if(!faultError&&remianing>0)
    {
        //目前发送缓冲区中剩余的待发送的数据的长度
        size_t oldLen=outputBuffer_.readableBytes();
        if(oldLen+remianing>=highWaterMark_
            &&oldLen<highWaterMark_
            &&highWaterMarkCallback_)
            {
                loop_->queueInLoop(
                        std::bind(highWaterMarkCallback_,shared_from_this(),oldLen+remianing)
                );
            }
            outputBuffer_.append((char*)data+nwrote,remianing);
            if(!channel_->isWriting())
            {
                channel_->enableReading();//这里一定要注册channel的写事件,否则poller不会给channel通知epollout
            }
    }
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
    if (n > 0)
    {
        // 已建立连接的用户,有可读事件发生了,调用用户传入的回调函数onMessage
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if (n == 0) // 断开了
    {
        handleClose();
    }
    else
    {
        errno = savedErrno;
        LOG_ERROR("TcpConnection::handleRead");
        handleError();
    }
}
void TcpConnection::handleWrite()
{
    if (channel_->isWriting())
    {
        int savedErrno = 0;
        ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);
        if (n > 0)
        {
            outputBuffer_.retrieve(n);
            if (outputBuffer_.readableBytes() == 0) // 表示发送完成
            {
                channel_->disableWriting(); // 变成不可写
                if (writeCompleteCallback_)
                {
                    // 唤醒loop_对应的thread线程,执行回调
                    loop_->queueInLoop(
                        std::bind(writeCompleteCallback_, shared_from_this()));
                }
                if (state_ == kDisConnecting)
                {
                    shutdownInLoop();
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleWrite");
        }
    }
    else
    {
        LOG_ERROR("TcpConnection fd=%d is down,no more writing \n", channel_->fd());
    }
}
void TcpConnection::handleClose()
{
    LOG_INFO("TcpConnection::handleClose fd=%d,state-%d \n", channel_->fd(), (int)state_);
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr connPtr(shared_from_this());
    connectionCallback_(connPtr); // 执行连接关闭的回调
    closeCallback_(connPtr);      // 关闭连接的回调
}
void TcpConnection::handleError()
{
    int optval;
    socklen_t optlen = sizeof optval;
    int err = 0;
    if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
    {
        err = errno;
    }
    else
    {
        err = optval;
    }
    LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}

void TcpConnection::connectEstablished()
{
    setState(kConnected);
    //tie防止底层的channel还在运行而上层的connection已经被remove掉了,用来监视
    channel_->tie(shared_from_this());
    channel_->enableReading();//向poller注册channel的epollin事件

    //新连接建立,执行回调
    connectionCallback_(shared_from_this());

}
void TcpConnection::connectDestroyed()
{
    if(state_==kConnected)
    {
        setState(kDisconnected);
        channel_->disableAll();//把channel所有感兴趣的事件都从poller中del掉
        connectionCallback_(shared_from_this());
    }
    channel_->remove();//把channel从poller中删除掉
}
void TcpConnection::shutdown()
{
    if(state_==kConnected)
    {
        setState(kDisconnected);
        loop_->runInLoop(
            std::bind(&TcpConnection::shutdownInLoop,this)
        );
    }
}
void TcpConnection::shutdownInLoop()
{
    if(!channel_->isWriting())//如果没有注册过写事件,说明已经把发送缓冲区的数据发送完了,考虑到这是用户调用的,可能还没发送完成
    {
        socket_->shutdownWrite();//关闭写端
    }
}

1、构造函数前置检查防止loop为空

static Eventloop *CheckLoopNotNull(Eventloop *loop)
{
    if (loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d main loop is null! \n", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

static:仅在本文件使用,避免命名冲突

2、核心函数sendInLoop()函数

作用:尝试把数据发出去,如果一次发不完,就把剩下的数据存到缓冲区,并监听可写事件,等 socket 能继续写时再发。

分析:

if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)

这里的意思是如果当前的fd没有设置关系写事件(没有在等待写事件),并且 outputBuffer_ 是空的,即用户缓冲区中没有积压的数据。那说明:这条连接现在是“空闲”的,我可以试试直接 write。

nwrote = ::write(sockfd, data, len);

这行代码是把我们的应用层数据,从用户空间拷贝到操作系统内核的 socket 发送缓冲区中。

  • 如果拷贝成功,内核会负责后续的 TCP 发送(分段、重传、确认等)
  • 如果缓冲区满了,write() 就会失败(返回 -1,errno = EAGAIN/EWOULDBLOCK)

这里是为了避免不必要的缓冲区拷贝;能直接发就直接发。

remianing = len - nwrote;
        if (remianing == 0 && writeCompleteCallback_)

这里是看待写入的数据还有多少,如果待写入的数据为0,表明前边调用write()已经将想要写的数据全都拷贝到操作系统内核的 socket 发送缓冲区中了,且设置了数据发送完成事件的回调函数之后,只需要将回调函数加入到subloop 的待处理事件数组中就好了,不用设置channel对写事件的关心了,系统会在合适的时候将发送缓冲区的数据发送。

这是“乐观写(optimistic write)”策略:能不进缓冲区就不进,能不注册事件就不注册。

接着对于其他情况或出错做处理

nwrote = 0;
        if (errno != EWOULDBLOCK)
        {
            LOG_ERROR("TcpConnection::sendInLoop");
            if (errno == EPIPE || errno == ECONNRESET)
            {
                faultError = true;
            }

发生错误时说明写入失败,重新将 nwrote置为0,接着判断错误

  • EWOULDBLOCK:非阻塞 socket 写满,正常;
  • 其他错误:如 EPIPE(管道破裂)、ECONNRESET(连接重置),标记 faultError;
  if(!faultError&&remianing>0)
    {
        //目前发送缓冲区中剩余的待发送的数据的长度
        size_t oldLen=outputBuffer_.readableBytes();
        if(oldLen+remianing>=highWaterMark_
            &&oldLen<highWaterMark_
            &&highWaterMarkCallback_)
            {
                loop_->queueInLoop(
                        std::bind(highWaterMarkCallback_,shared_from_this(),oldLen+remianing)
                );
            }
            outputBuffer_.append((char*)data+nwrote,remianing);
            if(!channel_->isWriting())
            {
                channel_->enableReading();//这里一定要注册channel的写事件,否则poller不会给channel通知epollout
            }
    }

走到这一步说明,正在等待 EPOLLOUT,说明 outputBuffer_ 有数据正在发或者用户层缓冲区已经有数据积压

或者说在上面的代码中,在写入socket缓冲区中时,写入一部分后缓冲区满了,还剩一部分没有写入的数据。

接着

  • 如果 outputBuffer_  中之前存在的数据与这次剩余的数据之和大于等于“高水位线”,即加完新数据后,总长度 ≥ 高水位线这次追加后,缓冲区会“超载”,这是触发回调的前提
  • 如果 加之前,总长度 < 高水位线,表明之前没超,现在才超。
  • 用户是否注册了高水位回调函数,防止空指针调用

这三个条件合起来判断,这次是否是首次超过高水位线,需要通知上层进行流控,即

告诉上层:“你发的数据客户端接收太慢,缓冲区积压很多了!

等到上层调用完 highWaterMarkCallback_ 的回调函数时,一般会做如下处理(我们代码中未实现)

  • 暂停从 socket 读数据(比如暂停 TcpConnection::enableReading())
  • 等待 outputBuffer_ 被逐渐发完
  • 当 outputBuffer_.readableBytes() 降到某个“低水位线”(low water mark)时
  • 触发 lowWaterMarkCallback,通知上层“可以继续读了”

接着将我们待发送的数据放在outputBuffer_缓冲区中

最后一定要注册写事件,不然poller不会给channel通知epollout。

3、handleRead()

这个函数在 TcpConnection 收到 可读事件(EPOLLIN) 时被调用,它负责从 socket 读取数据并放在用户缓冲区inputBuffer_中,并交给上层业务处理。

void TcpConnection::handleRead(Timestamp receiveTime)
{
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);//从 socket fd 读取数据,尽可能多地读入 inputBuffer_
    if (n > 0)
    {
        // 已建立连接的用户,有可读事件发生了,调用用户传入的回调函数onMessage
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if (n == 0) // 断开了,对端正常关闭连接(发送了 FIN 包)
    {
        handleClose();
    }
    else//处理真正的系统错误(如 ECONNRESET)
    {
        errno = savedErrno;
        LOG_ERROR("TcpConnection::handleRead");
        handleError();
    }
}

4、handleWrite()

在 EPOLLOUT 事件触发时被调用,负责从 outputBuffer_ 取数据,写入 socket 发送缓冲区。

void TcpConnection::handleWrite()
{
    if (channel_->isWriting())
    {
        int savedErrno = 0;
        ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);
        if (n > 0)
        {
            outputBuffer_.retrieve(n);
            if (outputBuffer_.readableBytes() == 0) // 表示发送完成
            {
                channel_->disableWriting(); // 变成不可写
                if (writeCompleteCallback_)
                {
                    // 唤醒loop_对应的thread线程,执行回调
                    loop_->queueInLoop(
                        std::bind(writeCompleteCallback_, shared_from_this()));
                }
                if (state_ == kDisConnecting)
                {
                    shutdownInLoop();
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleWrite");
        }
    }
    else
    {
        LOG_ERROR("TcpConnection fd=%d is down,no more writing \n", channel_->fd());
    }
}

5、handleClose()

在连接被对端关闭(read() == 0)或发生错误时被调用,它标志着一个 TCP 连接的正式结束

void TcpConnection::handleClose()
{
    LOG_INFO("TcpConnection::handleClose fd=%d,state-%d \n", channel_->fd(), (int)state_);
    setState(kDisconnected);
    channel_->disableAll();//清楚所有感兴趣的事件

    TcpConnectionPtr connPtr(shared_from_this());//创建一个 shared_ptr
    connectionCallback_(connPtr); // 执行连接关闭的回调
    closeCallback_(connPtr);      // 关闭连接的回调,通知用户“连接已关闭”,可以释放资源
}

清除所有的感兴趣的事件,调用 epoll_ctl(fd, EPOLL_CTL_DEL, ...) 或 MOD 去掉所有事件,防止后续事件被触发(比如连接已关,但还有 EPOLLOUT 事件排队)。

创建一个 shared_ptr,是为了延长 TcpConnection 的生命周期,因为回调函数可能在执行过程中导致 TcpConnection 被销毁(比如用户在回调中释放资源)

这是 C++ 异步编程中的经典模式:确保回调期间对象存活

接着

  • connectionCallback_:通知状态变化(“连接断了”)
  • closeCallback_:执行清理工作(“请清理资源”)

6、handleError()

在 read() 或 write() 返回错误时被调用,这个接口的作用是:获取 socket 的真实错误码(SO_ERROR)

void TcpConnection::handleError()
{
    int optval;
    socklen_t optlen = sizeof optval;
    int err = 0;
    if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)//调用 getsockopt() 获取 socket 的待定错误(pending error)
    {
        err = errno;
    }
    else
    {
        err = optval;
    }
    LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}

感谢阅读!


网站公告

今日签到

点亮在社区的每一天
去签到