文章目录
1 关于TCP Sender
核心职责:
- 跟踪接收方窗口的ackno、window_size
- 尽可能地填充窗口,直到窗口已满或发送完毕
- 管理未确认的段、超时未确认则重传(ARQ机制)
- 处理SYN、FIN标志,确保流的正确开始和结束
1.1 关键机制
重传超时(RTO)与定时器
TCPSender 的 owner 会周期性地调用 TCPSender 的 tick 方法,表示时间的流逝。TCPSender 负责查看其未完成的 TCPSenderMessages 集合,并确定最早发送的 segment 是否在没有确认的情况下未完成太长时间(即,没有确认其所有序列号)。如果是这样,则需要重新传输 (再次发送) 。
2 实现TCP Sender
2.1 void push( const TransmitFunction& transmit );
要求 TCPSender 从出站字节流填充窗口:它从流中读取并发送尽可能多的 TCPSenderMessages,只要窗口中有要读取的新字节和可用空间。它通过调用提供的 transmit() 函数来发送它们。
您需要确保您发送的每个 TCPSenderMessage 都完全适合接收者的窗口。使每条消息尽可能大,但不要大于 TCPConfig::MAX PAYLOAD SIZE 给出的值。
const TransmitFunction& transmit 函数型参数?
跳转到定义:
/* Type of the `transmit` function that the push and tick methods can use to send messages */
using TransmitFunction = std::function<void( const TCPSenderMessage& )>;
作用是,当发送器生成一个待发送的段(如 SYN 段、数据段、FIN 段)时,通过调用 transmit(segment) 将段传递给框架,segment的类型是TCPSenderMessage
从哪里读取字节?
当然是从TCPSender类里自己的字节流input_里读取,此时,TCPSender作为发送方
input_里的数据是哪来的?
构造TCPSender类时放入的,构造TCPSender时,同时还会给定Initial_RTO和ISN
实现
void TCPSender::push(const TransmitFunction& transmit)
{
// 首先检查Writer是否存在错误并设置错误状态,有错误的话停止push,并返回空的message
if (writer().has_error()) {
_has_error = true;
cerr << "DEBUG: writer has error, setting _has_error = true" << endl;
}
if (_has_error) {
cerr << "DEBUG: _has_error is true in push(), sending RST message" << endl;
TCPSenderMessage rst_msg = make_empty_message();
transmit(rst_msg);
return;
}
// 如果没有错误,正常处理...
// 如果可接收的窗口大小为0且没有要重传的消息,则设置窗口大小为1
uint64_t effective_window = (received_msg.window_size == 0 && outstanding_bytes == 0) ? 1 : received_msg.window_size;
//如果当前的窗口大小可以容纳待重传的消息,则处理数据
while (outstanding_bytes < effective_window) {
TCPSenderMessage msg;
//发送SYN消息
if (isSent_ISN == false) {
msg.SYN = true;
msg.seqno = isn_;
isSent_ISN = true; // 立即设置标志
} else {
msg.seqno = Wrap32::wrap(abs_seqno, isn_);
}
// 计算可用窗口大小(考虑已发送但未确认的字节)
size_t remaining_window = effective_window - outstanding_bytes;
// 如果是SYN消息,需要减去一个字节,因为SYN占用一个序列号
if (msg.SYN) {
remaining_window = remaining_window > 0 ? remaining_window - 1 : 0;
}
// 计算可以发送的数据大小
size_t payload_size = min(remaining_window, TCPConfig::MAX_PAYLOAD_SIZE);
payload_size = min(payload_size, writer().reader().bytes_buffered());
// 读取数据
read(writer().reader(), payload_size, msg.payload);
// 修改FIN逻辑:只有当发送完所有数据后,且确保FIN的一个字节也能放入窗口时才添加FIN
if (writer().is_closed() && !isSent_FIN &&
writer().reader().bytes_buffered() == 0 &&
outstanding_bytes + msg.sequence_length() < effective_window) {
isSent_FIN = true;
msg.FIN = true;
}
if (!msg.sequence_length()) break;
outstanding_collections.push_back(msg);
outstanding_bytes += msg.sequence_length(); // 确保正确计算序列号占用
abs_seqno += msg.sequence_length();
// 立即发送创建的消息
transmit(msg);
// 如果有未确认的数据,启动计时器
if (outstanding_bytes > 0 && !is_start_timer) {
is_start_timer = true;
cur_RTO_ms = initial_RTO_ms_;
}
}
}
2.2 void receive( const TCPReceiverMessage& msg );
从接收器接收一条消息,传达窗口的新左边缘 (= ackno) 和右边缘 (= ackno + 窗口大小)。TCPSender 应查看其未完成的 segment 集合,并删除任何现已完全确认的 segment(ackno 大于 segment 中的所有序列号)。
左边缘:告知发送方 “已确认的数据边界”,释放已处理的段。
右边缘:告知发送方 “可接收的数据边界”,限制发送速率和数据量。
窗口大小是用来限制每次发送的段的大小,还是所有要发送的数据的大小?
后者。因为之前window_size设置的是TCPReceiver的字节流可以接受的最大字节数量。
那段的大小是由谁限制来着?
《自顶向下》上说是MSS(最大报文长度)。
实现
void TCPSender::receive(const TCPReceiverMessage& msg)
{
// 检查收到的RST标志
if (msg.RST) {
_has_error = true;
// 还需要设置底层writer的错误状态
const_cast<Writer&>(writer()).set_error();
return;
}
if (_has_error) {
return; // 如果有错误,不执行任何操作
}
received_msg = msg;
primitive_window_size = msg.window_size;
if (msg.ackno.has_value() == true) {
uint64_t ackno_unwrapped = static_cast<uint64_t>(msg.ackno.value().unwrap(isn_, abs_seqno));
if (ackno_unwrapped > abs_seqno) return;
while (outstanding_bytes != 0 &&
static_cast<uint64_t>(outstanding_collections.front().seqno.unwrap(isn_, abs_seqno)) +
outstanding_collections.front().sequence_length() <= ackno_unwrapped) {
outstanding_bytes -= outstanding_collections.front().sequence_length();
outstanding_collections.pop_front();
consecutive_retransmissions_nums = 0;
cur_RTO_ms = initial_RTO_ms_;
if (outstanding_bytes == 0) is_start_timer = false;
else is_start_timer = true;
}
}
}
2.3 void tick( uint64 t ms since last tick, const TransmitFunction& transmit );
Time has passed — 自上次调用此方法以来的一定毫秒数。发送方可能需要重新传输未完成的段;它可以调用 transmit() 函数来执行此作。(提醒:请不要尝试在代码中使用实际的 “clock” 或 “gettimeofday” 函数;对时间传递的唯一引用来自 ms since last tick 参数。
实现
关键点:
- 重传判断
- 指数退避
void TCPSender::tick(uint64_t ms_since_last_tick, const TransmitFunction& transmit)
{
if (_has_error) {
return; // 如果有错误,不执行任何操作
}
// 只有当有未确认的数据且计时器启动时才减少时间
if (is_start_timer && outstanding_bytes > 0) {
if (cur_RTO_ms <= ms_since_last_tick) {
// 超时,重传第一个未确认的段
transmit(outstanding_collections.front());
consecutive_retransmissions_nums++;
if (primitive_window_size) cur_RTO_ms = (1UL << consecutive_retransmissions_nums) * initial_RTO_ms_;
else cur_RTO_ms = initial_RTO_ms_;
} else {
cur_RTO_ms -= ms_since_last_tick;
}
}
}
谁来调用tick()?
发送数据时,push中发现有需要重传的消息,就会启动定时器
if (outstanding_bytes > 0 && !is_start_timer) {
is_start_timer = true;
cur_RTO_ms = initial_RTO_ms_;
}
只是启动,不是调用,真正的调用tick是在测试框架的上层模块
TCPSenderMessage make empty message() const;
TCPSender 应生成并发送一条序列号设置正确的零长度消息。如果 Peer 节点想要发送 TCPReceiverMessage(例如,因为它需要确认来自 Peer 节点的发送者的某些内容)并且需要生成 TCPSenderMessage 来配合它,这将非常有用。
实现
注意RST的设置
TCPSenderMessage TCPSender::make_empty_message() const
{
TCPSenderMessage msg;
msg.seqno = Wrap32::wrap(abs_seqno, isn_);
// 检查是否有错误,无论是来自内部标志还是Writer
bool has_error = _has_error || writer().has_error();
cerr << "DEBUG: make_empty_message called, _has_error = " << (_has_error ? "true" : "false")
<< ", writer().has_error() = " << (writer().has_error() ? "true" : "false") << endl;
if (has_error) {
msg.RST = true;
}
return msg;
}
几个疑问
TCP是单工通信还是双工通信?
全双工,“一对流量控制的字节流”。
Sender和Recevier各自独立存在一个字节流,支持双向数据传输。
数据流是如何传输的
留个坑