`TcpConnection`类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。
成员变量
private:
EventLoop*loop_; //这里绝对不是Mainloop,因为tcpconnection都是在subloop里边管理的
const string name_;
atomic<int> state_;
bool reading;
unique_ptr<Socket> socket_;
unique_ptr<Channle> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
ConnectionCallback connectionCallback_; // 有新连接回调 void(const TcpConnectionPtr&)
MessageCallback messageCallback_; //有读写消息回调 void(constTcpConnectionPtr&,Buffer*,Timestamp)
WriteCompleteCallback writeCompleteCallback_ // 消息发送完成以后的回调 void(const TcpConnectionPtr&)
CloseCallback closeCallbacl_; //void(const TcpConnectionPtr&)
HighWaterMarkCallback highWaterMarkCallback_; //void(const TcpConnectionPtr&,size_t)
size_t highWaterMark_;
Buffer inputBuffer_; //接受缓冲区
Buffer outputBuffer_; //发送缓冲区
enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting};
- `loop_`该`Tcp`连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
- `highWaterMark_` 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
- `inputBuffer_ outputBuffer_` 输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为`Tcp`发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到`Tcp`发送缓冲区,而是暂存在这个`outputBuffer_`中,等TCP发送缓冲区有空间了,触发可写事件了,再把`outputBuffer_`中的数据拷贝到`Tcp`发送缓冲区中。
重要成员函数
void TcpConnection::send(const std::string &buf) //直接引用buffer
{
if(state_ == kConnected)
{
if(loop_->isInLoopThread())
{
//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。
sendInLoop(buf.c_str(),buf.size());
}
else
{
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop
, this
, buf.c_str()
, buf.size()
));
}
}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{
ssize_t nwrote = 0;
size_t remaining = len; //未发送的数据
bool faultError = false; //记录是否产生错误
//之前调用过connection的shutdown 不能在发送了
if(state_ == kDisconnected)
{
LOG_ERROR("disconnected,give up writing!");
return ;
}
//channel 第一次开始写数据,且缓冲区没有待发送数据
if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(),data,len);
if(nwrote >= 0)
{
remaining = len - nwrote;
if(remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this()));
}
}
else
{
nwrote = 0;
if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写
{
LOG_ERROR("TcpConnection::sendInLoop");
if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET
{
faultError = true;
}
}
}
}
if(!faultError && remaining > 0)
{
//目前发送缓冲区剩余的待发送数据的长度
size_t oldlen = outputBuffer_.readableBytes();
if(oldlen + remaining >= highWaterMark_
&& oldlen < highWaterMark_
&& highWaterMark_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining)
);
}
outputBuffer_.append((char*)data + nwrote,remaining);
if(!channel_->isWriting())
{
channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout
}
}
}
- 发送数据要发送的数据长度是`len`,如果在`loop_`在当前的线程里面,就调用`sendInLoop`,`sendInLoop`内部实际上是调用了系统的`write`,如果一次性发送完了,就设置`writeCompleteCallback_`,表明不要再给channel设置`epollout`事件了
- 如果没有写完,先计算一下`oldlen`目前发送缓冲区剩余的待发送数据的长度。满足:`if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)` 就会触发高水位回调
- 不满足以上的话,直接写入`outputBuffer_`
- 剩余的数据保存到缓冲区当中,要给给`channel`注册`epollout`事件(**切记,一定要注册channel的写事件,否则`poller`不会向channel通知`epollout`**),这样`poller`发现`tcp`发送缓冲区有空间,会通知相应的`socket-channel`调用相应的`writeCallback()`回调方法,也就是调用`TcpConnection::handleWrite`,把发送缓冲区中数据全部发送出去。
void TcpConnection::handleRead(TimeStamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
if(n > 0)
{
messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
}
else if(n==0) //客户端断开
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::hanleRead");
handleError();
}
}
负责处理`Tcp`连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中`inputBuffer_`,然后再调用`connectionCallback_`保存的连接建立后的处理函数。
void TcpConnection::handleWrite()
{
if(channel_->isWriting())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
if(n > 0)
{
outputBuffer_.retrieve(n); //处理了n个
if(outputBuffer_.readableBytes() == 0) //发送完成
{
channel_->disableWriting(); //不可写了
if(writeCompleteCallback_)
{
//唤醒loop对应的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
if(state_ == kDisconnecting)
{
shutdownInLoop();// 在当前loop中删除TcpConnection
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());
}
}
- 如果有可写,通过fd发送数据,直到发送完成
- 设置不可写,如果`writeCompleteCallback_`,唤醒`loop`对应的`thread`线程,执行回调
- 当前`tcp`正在断开连接,调用`shutdownInLoop`,在当前`loop`中删除`TcpConnection`
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr); //执行连接关闭的回调
closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}
处理逻辑就是将这个`TcpConnection`对象中的channel_从事件监听器中移除。然后调用`connectionCallback_`和`closeCallback_`保存的回调函数。`closeCallback_`在`TcpServer::newConnection()`为新连接新建`TcpConnection`时,已设为`TcpServer::removeConnection()`,而`removeConnection()`最终会调用`TcpConnection::connectDestroyed()`来销毁连接资源。
void TcpConnection::connectDestroyed()
{
if(state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉
connectionCallback_(shared_from_this());
}
channel_->remove();//把channel从poller中删除掉
}
//关闭连接
void TcpConnection::shutdown()
{
if(state_ == kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(
std::bind(&TcpConnection::shutdownInLoop,this)
);
}
}
void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成
{
socket_->shutdowmWrite(); // 关闭写端
}
}
为什么只关闭了写端?
Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。
TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。
用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。
等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。被动关闭(muduo 在 read() 返回 0 的时候会回调 connection callback,这样客户代码就知道对方断开连接了。)
Muduo 这种关闭连接的方式对对方也有要求,那就是对方 read() 到 0 字节之后会主动关闭连接(无论 shutdownWrite() 还是 close()),一般的网络程序都会这样,不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不不关,那么 muduo 的连接就一直半开着,消耗系统资源。
完整的流程是:我们发完了数据,于是 shutdownWrite,发送 TCP FIN 分节,对方会读到 0 字节,然后对方通常会关闭连接,这样 muduo 会读到 0 字节,然后 muduo 关闭连接。
另外,如果有必要,对方可以在 read() 返回 0 之后继续发送数据,这是直接利用了 half-close TCP 连接。muduo 会收到这些数据,通过 message callback 通知客户代码。
那么 muduo 什么时候真正 close socket 呢?在 TcpConnection 对象析构的时候。TcpConnection 持有一个 Socket 对象,Socket 是一个 RAII handler,它的析构函数会 close(sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从 /proc/pid/fd/ 就能找到没有关闭的文件描述符,便于查错。
muduo 在 read() 返回 0 的时候会回调 connection callback,然后把 TcpConnection 的引用计数减一,如果 TcpConnection 的引用计数降到零,它就会析构了。