2025 cs144 Lab Checkpoint 3: TCP Receiver

发布于:2025-06-08 ⋅ 阅读:(17) ⋅ 点赞:(0)

1 关于TCP Sender

核心职责:

  • 跟踪接收方窗口的ackno、window_size
  • 尽可能地填充窗口,直到窗口已满或发送完毕
  • 管理未确认的段、超时未确认则重传(ARQ机制
  • 处理SYN、FIN标志,确保流的正确开始和结束

1.1 关键机制

重传超时(RTO)与定时器

TCPSender 的 owner 会周期性地调用 TCPSender 的 tick 方法,表示时间的流逝。TCPSender 负责查看其未完成的 TCPSenderMessages 集合,并确定最早发送的 segment 是否在没有确认的情况下未完成太长时间(即,没有确认其所有序列号)。如果是这样,则需要重新传输 (再次发送) 。

2 实现TCP Sender

2.1 void push( const TransmitFunction& transmit );

要求 TCPSender 从出站字节流填充窗口:它从流中读取并发送尽可能多的 TCPSenderMessages,只要窗口中有要读取的新字节和可用空间。它通过调用提供的 transmit() 函数来发送它们。
您需要确保您发送的每个 TCPSenderMessage 都完全适合接收者的窗口。使每条消息尽可能大,但不要大于 TCPConfig::MAX PAYLOAD SIZE 给出的值。

const TransmitFunction& transmit 函数型参数?

跳转到定义:

  /* Type of the `transmit` function that the push and tick methods can use to send messages */
  using TransmitFunction = std::function<void( const TCPSenderMessage& )>;

作用是,当发送器生成一个待发送的段(如 SYN 段、数据段、FIN 段)时,通过调用 transmit(segment) 将段传递给框架,segment的类型是TCPSenderMessage

从哪里读取字节?

当然是从TCPSender类里自己的字节流input_里读取,此时,TCPSender作为发送方

input_里的数据是哪来的?

构造TCPSender类时放入的,构造TCPSender时,同时还会给定Initial_RTO和ISN

实现

void TCPSender::push(const TransmitFunction& transmit)
{
  // 首先检查Writer是否存在错误并设置错误状态,有错误的话停止push,并返回空的message
  if (writer().has_error()) {
    _has_error = true;
    cerr << "DEBUG: writer has error, setting _has_error = true" << endl;
  }

  if (_has_error) {
    cerr << "DEBUG: _has_error is true in push(), sending RST message" << endl;
    TCPSenderMessage rst_msg = make_empty_message();
    transmit(rst_msg);
    return;
  }
  
  // 如果没有错误,正常处理...
  // 如果可接收的窗口大小为0且没有要重传的消息,则设置窗口大小为1
  uint64_t effective_window = (received_msg.window_size == 0 && outstanding_bytes == 0) ? 1 : received_msg.window_size;
  
  //如果当前的窗口大小可以容纳待重传的消息,则处理数据
  while (outstanding_bytes < effective_window) {
    TCPSenderMessage msg;
    //发送SYN消息
    if (isSent_ISN == false) {
      msg.SYN = true;
      msg.seqno = isn_;
      isSent_ISN = true;  // 立即设置标志
    } else {
      msg.seqno = Wrap32::wrap(abs_seqno, isn_);
    }
    
    // 计算可用窗口大小(考虑已发送但未确认的字节)
    size_t remaining_window = effective_window - outstanding_bytes;
    // 如果是SYN消息,需要减去一个字节,因为SYN占用一个序列号
    if (msg.SYN) {
      remaining_window = remaining_window > 0 ? remaining_window - 1 : 0;
    }
    
    // 计算可以发送的数据大小
    size_t payload_size = min(remaining_window, TCPConfig::MAX_PAYLOAD_SIZE);
    payload_size = min(payload_size, writer().reader().bytes_buffered());
    
    // 读取数据
    read(writer().reader(), payload_size, msg.payload);
    
    // 修改FIN逻辑:只有当发送完所有数据后,且确保FIN的一个字节也能放入窗口时才添加FIN
    if (writer().is_closed() && !isSent_FIN && 
        writer().reader().bytes_buffered() == 0 && 
        outstanding_bytes + msg.sequence_length()  < effective_window) {
      isSent_FIN = true;
      msg.FIN = true;
    }
    
    if (!msg.sequence_length()) break;

    outstanding_collections.push_back(msg);
    outstanding_bytes += msg.sequence_length();  // 确保正确计算序列号占用
    abs_seqno += msg.sequence_length();
    
    // 立即发送创建的消息
    transmit(msg);
    
    // 如果有未确认的数据,启动计时器
    if (outstanding_bytes > 0 && !is_start_timer) {
      is_start_timer = true;
      cur_RTO_ms = initial_RTO_ms_;
    }
  }
}

2.2 void receive( const TCPReceiverMessage& msg );

从接收器接收一条消息,传达窗口的新左边缘 (= ackno) 和右边缘 (= ackno + 窗口大小)。TCPSender 应查看其未完成的 segment 集合,并删除任何现已完全确认的 segment(ackno 大于 segment 中的所有序列号)。
左边缘:告知发送方 “已确认的数据边界”,释放已处理的段。
右边缘:告知发送方 “可接收的数据边界”,限制发送速率和数据量。

窗口大小是用来限制每次发送的段的大小,还是所有要发送的数据的大小?

后者。因为之前window_size设置的是TCPReceiver的字节流可以接受的最大字节数量。
那段的大小是由谁限制来着?
《自顶向下》上说是MSS(最大报文长度)。

实现

void TCPSender::receive(const TCPReceiverMessage& msg)
{
  // 检查收到的RST标志
  if (msg.RST) {
    _has_error = true;
    // 还需要设置底层writer的错误状态
    const_cast<Writer&>(writer()).set_error();
    return;
  }

  if (_has_error) {
    return;  // 如果有错误,不执行任何操作
  }
  
  received_msg = msg;
  primitive_window_size = msg.window_size;
  if (msg.ackno.has_value() == true) {
    uint64_t ackno_unwrapped = static_cast<uint64_t>(msg.ackno.value().unwrap(isn_, abs_seqno));
    if (ackno_unwrapped > abs_seqno) return;
    while (outstanding_bytes != 0 && 
           static_cast<uint64_t>(outstanding_collections.front().seqno.unwrap(isn_, abs_seqno)) + 
           outstanding_collections.front().sequence_length() <= ackno_unwrapped) {
      outstanding_bytes -= outstanding_collections.front().sequence_length();
      outstanding_collections.pop_front();
      consecutive_retransmissions_nums = 0;
      cur_RTO_ms = initial_RTO_ms_;
      if (outstanding_bytes == 0) is_start_timer = false;
      else is_start_timer = true;
    }
  }
}

2.3 void tick( uint64 t ms since last tick, const TransmitFunction& transmit );

Time has passed — 自上次调用此方法以来的一定毫秒数。发送方可能需要重新传输未完成的段;它可以调用 transmit() 函数来执行此作。(提醒:请不要尝试在代码中使用实际的 “clock” 或 “gettimeofday” 函数;对时间传递的唯一引用来自 ms since last tick 参数。

实现

关键点:

  • 重传判断
  • 指数退避
void TCPSender::tick(uint64_t ms_since_last_tick, const TransmitFunction& transmit)
{
  if (_has_error) {
    return;  // 如果有错误,不执行任何操作
  }
  
  // 只有当有未确认的数据且计时器启动时才减少时间
  if (is_start_timer && outstanding_bytes > 0) {
    if (cur_RTO_ms <= ms_since_last_tick) {
      // 超时,重传第一个未确认的段
      transmit(outstanding_collections.front());
      consecutive_retransmissions_nums++;
      if (primitive_window_size) cur_RTO_ms = (1UL << consecutive_retransmissions_nums) * initial_RTO_ms_;
      else cur_RTO_ms = initial_RTO_ms_;
      
    } else {
      cur_RTO_ms -= ms_since_last_tick;
    }
  }
}

谁来调用tick()?

发送数据时,push中发现有需要重传的消息,就会启动定时器

if (outstanding_bytes > 0 && !is_start_timer) {
  is_start_timer = true;
  cur_RTO_ms = initial_RTO_ms_;
}

只是启动,不是调用,真正的调用tick是在测试框架的上层模块

TCPSenderMessage make empty message() const;

TCPSender 应生成并发送一条序列号设置正确的零长度消息。如果 Peer 节点想要发送 TCPReceiverMessage(例如,因为它需要确认来自 Peer 节点的发送者的某些内容)并且需要生成 TCPSenderMessage 来配合它,这将非常有用。

实现

注意RST的设置

TCPSenderMessage TCPSender::make_empty_message() const
{
  TCPSenderMessage msg;
  msg.seqno = Wrap32::wrap(abs_seqno, isn_);
  
  // 检查是否有错误,无论是来自内部标志还是Writer
  bool has_error = _has_error || writer().has_error();
  cerr << "DEBUG: make_empty_message called, _has_error = " << (_has_error ? "true" : "false") 
       << ", writer().has_error() = " << (writer().has_error() ? "true" : "false") << endl;
  
  if (has_error) {
    msg.RST = true;
  }
  
  return msg;
}

几个疑问

TCP是单工通信还是双工通信?

全双工,“一对流量控制的字节流”。
Sender和Recevier各自独立存在一个字节流,支持双向数据传输。

数据流是如何传输的

留个坑


网站公告

今日签到

点亮在社区的每一天
去签到